From 8f11ffd921246f8b35cd2e7b3bed8a0311d32670 Mon Sep 17 00:00:00 2001 From: Dane Springmeyer Date: Fri, 14 Oct 2011 20:28:23 -0700 Subject: [PATCH] add boost::spirit for speed boost, add strict/quiet modes, expose quote/sep/escape chars, allow user supplied headers --- plugins/input/csv/csv_datasource.cpp | 767 ++++++++++++++++++++------- plugins/input/csv/csv_datasource.hpp | 12 +- 2 files changed, 596 insertions(+), 183 deletions(-) diff --git a/plugins/input/csv/csv_datasource.cpp b/plugins/input/csv/csv_datasource.cpp index 26c7e60c3..5ec700473 100644 --- a/plugins/input/csv/csv_datasource.cpp +++ b/plugins/input/csv/csv_datasource.cpp @@ -6,6 +6,7 @@ #include #include #include +#include // mapnik #include @@ -13,6 +14,7 @@ #include #include #include +#include // mapnik::boolean // stl #include @@ -20,32 +22,54 @@ #include #include // ostream_operator -// clib +// std lib #include #include -//#include using mapnik::datasource; using mapnik::parameters; +using namespace boost::spirit; DATASOURCE_PLUGIN(csv_datasource) csv_datasource::csv_datasource(parameters const& params, bool bind) - : datasource(params), - desc_(*params_.get("type"), *params_.get("encoding","utf-8")), - extent_(), - filename_(), - inline_string_(), - features_(), - separator_(*params_.get("separator",",")), - escape_(*params_.get("escape","\\")), - quote_(*params_.get("quote","\"")) + : datasource(params), + desc_(*params_.get("type"), *params_.get("encoding","utf-8")), + extent_(), + filename_(), + inline_string_(), + file_length_(0), + row_limit_(*params_.get("row_limit",0)), + features_(), + escape_(*params_.get("escape","")), + separator_(*params_.get("separator","")), + quote_(*params_.get("quote","")), + headers_(), + manual_headers_(boost::trim_copy(*params_.get("headers",""))), + strict_(*params_.get("strict",false)), + quiet_(*params_.get("quiet",false)) { /* TODO: - build up features lazily, and filter cols using query - support for newlines other than \n - https://docs.google.com/a/dbsgeo.com/spreadsheet/pub?key=0AqV4OJpywingdFBCV1o3SXp3OU94U3VJWTRoLWRPbGc&output=csv - spatial index + general: + - refactor parser into generic class + - tests + - clean up double usage of Tokenizer types + alternate large file pipeline: + - stat file, detect > 15 MB + - build up csv line-by-line iterator + - creates opportunity to filter attributes by map query + speed: + - add properties for wkt/lon/lat at parse time + - remove boost::lexical_cast + - add ability to pass 'filter' keyword to drop attributes at layer init + - create quad tree on the fly for small/med size files + - memory map large files for reading + - smaller features (less memory overhead) + usability: + - enforce column names without leading digit + - better error messages (add filepath) if not reading from string + - move to spirit to tokenize and add character level error feedback: + http://boost-spirit.com/home/articles/qi-example/tracking-the-input-position-while-parsing/ */ boost::optional inline_string = params_.get("inline"); @@ -64,7 +88,7 @@ csv_datasource::csv_datasource(parameters const& params, bool bind) else filename_ = *file; } - + if (bind) { this->bind(); @@ -81,206 +105,539 @@ void csv_datasource::bind() const if (!inline_string_.empty()) { std::istringstream in(inline_string_); - parse_csv(in); + parse_csv(in,escape_, separator_, quote_); } else { - std::ifstream in(filename_.c_str()); + std::ifstream in(filename_.c_str(),std::ios_base::in | std::ios_base::binary); if (!in.is_open()) throw mapnik::datasource_exception("CSV Plugin: could not open: '" + filename_ + "'"); - parse_csv(in); + parse_csv(in,escape_, separator_, quote_); in.close(); } + is_bound_ = true; } template -void csv_datasource::parse_csv(T& stream) const +void csv_datasource::parse_csv(T& stream, + std::string const& escape, + std::string const& separator, + std::string const& quote) const { - typedef boost::escaped_list_separator separator_type; - typedef boost::tokenizer< separator_type > Tokenizer; + // TODO - throw if file is to big to read into memory + //stream.seekg (0, std::ios::end); + //file_length_ = stream.tellg(); + // set back to start + //stream.seekg (0, std::ios::beg); + + char newline; std::string csv_line; - boost::escaped_list_separator grammer(escape_, separator_, quote_); - mapnik::transcoder tr(desc_.get_encoding()); - int line_no(1); - int feature_count(0); - while (std::getline(stream,csv_line)) + // autodetect newlines + bool found_break = false; + if (std::getline(stream,csv_line,'\n')) { - Tokenizer tok(csv_line, grammer); - - Tokenizer::iterator beg = tok.begin(); - std::string val = boost::trim_copy(*beg); - - // skip lines with leading blanks (assume whole line is empty) - if (val.empty()) continue; - - // handle headers - if (line_no == 1) + found_break = true; + newline = '\n'; + } + else if (std::getline(stream,csv_line,'\r')) + { + found_break = true; + newline = '\r'; + } + else + { + throw mapnik::datasource_exception("CSV Plugin: could not detect any line breaks in this csv (http://en.wikipedia.org/wiki/Newline)\n"); + } + + // set back to start + stream.seekg (0, std::ios::beg); + + // if user has not passed separator manuall + // then attempt to detect by reading first line + std::string sep = boost::trim_copy(separator); + if (sep.empty()) + { + // default to ',' + sep = ","; + // detect tabs + int num_tabs = std::count(csv_line.begin(), csv_line.end(), '\t'); + if (num_tabs > 0) { - unsigned i = 0; - for (; beg != tok.end(); ++beg) + int num_commas = std::count(csv_line.begin(), csv_line.end(), ','); + if (num_tabs > num_commas) { - std::string value = boost::trim_copy(*beg); - // todo - ensure col names do not start with digit - try + sep = "\t"; +#ifdef MAPNIK_DEBUG + std::clog << "CSV Plugin: auto detected tab separator\n"; +#endif + } + } + } + + typedef boost::escaped_list_separator escape_type; + typedef boost::char_separator separator_type; + + std::string esc = boost::trim_copy(escape); + if (esc.empty()) esc = "\\"; + + std::string quo = boost::trim_copy(quote); + if (quo.empty()) quo = "\""; + +#ifdef MAPNIK_DEBUG + std::clog << "CSV Plugin: csv grammer: sep: '" << sep << "' quo: '" << quo << "' esc: '" << esc << "'\n"; +#endif + + boost::escaped_list_separator grammer; + try + { + //grammer = boost::escaped_list_separator('\\', ',', '\"'); + grammer = boost::escaped_list_separator(esc, sep, quo); + } + catch (const std::exception & ex ) + { + std::ostringstream s; + s << "CSV Plugin: " << ex.what(); + throw mapnik::datasource_exception(s.str()); + } + + typedef boost::tokenizer< separator_type > Tokenizer; + typedef boost::tokenizer< escape_type > ETokenizer; + + int line_number(1); + bool has_wkt_field = false; + bool has_lat_field = false; + bool has_lon_field = false; + unsigned wkt_idx; + unsigned lat_idx; + unsigned lon_idx; + + if (!manual_headers_.empty()) + { + //escape_type grammer2(esc, ",", quo); + separator_type sep(","); + Tokenizer tok(manual_headers_, sep); + Tokenizer::iterator beg = tok.begin(); + unsigned idx(0); + for (; beg != tok.end(); ++beg) + { + std::string val = boost::trim_copy(*beg); + std::string lower_val = boost::algorithm::to_lower_copy(val); + if (lower_val == "wkt") + { + wkt_idx = idx; + has_wkt_field = true; + } + if (lower_val == "x" || (lower_val.find("longitude") != std::string::npos)) + { + lon_idx = idx; + has_lon_field = true; + } + if (lower_val == "y" || (lower_val.find("latitude") != std::string::npos)) + { + lat_idx = idx; + has_lat_field = true; + } + ++idx; + headers_.push_back(val); + } + } + else // parse first line as headers + { + while (std::getline(stream,csv_line,newline)) + { + try + { + separator_type sep(",","",boost::keep_empty_tokens); + Tokenizer tok(csv_line, sep); + Tokenizer::iterator beg = tok.begin(); + std::string val = boost::trim_copy(*beg); + + // skip blank lines + if (val.empty()) { - headers_.push_back(boost::lexical_cast(value)); + // do nothing + ++line_number; } - catch (boost::bad_lexical_cast & ex) + else + { + int idx = -1; + for (; beg != tok.end(); ++beg) + { + ++idx; + val = boost::trim_copy(*beg); + if (val.empty()) + { + std::ostringstream s; + s << "CSV Plugin: expected a column header at line " + << line_number << ", column " << idx + << " - ensure this row contains valid header fields: '" + << csv_line << "'\n"; + throw mapnik::datasource_exception(s.str()); + } + else + { + std::string lower_val = boost::algorithm::to_lower_copy(val); + if (lower_val == "wkt") + { + wkt_idx = idx; + has_wkt_field = true; + } + if (lower_val == "x" || (lower_val.find("longitude") != std::string::npos)) + { + lon_idx = idx; + has_lon_field = true; + } + if (lower_val == "y" || (lower_val.find("latitude") != std::string::npos)) + { + lat_idx = idx; + has_lat_field = true; + } + headers_.push_back(val); + } + } + ++line_number; + break; + } + } + catch (const std::exception & ex ) + { + std::ostringstream s; + s << "CSV Plugin: error parsing headers: " << ex.what(); + throw mapnik::datasource_exception(s.str()); + } + } + } + + if (!has_wkt_field && (!has_lon_field || !has_lat_field) ) + { + std::ostringstream s; + s << "CSV Plugin: could not detect column headers with the name of 'wkt' or lat/lon - this is required for reading geometry data"; + throw mapnik::datasource_exception(s.str()); + } + + int feature_count(0); + bool extent_initialized = false; + int num_headers = headers_.size(); + mapnik::transcoder tr(desc_.get_encoding()); + + while (std::getline(stream,csv_line,newline)) + { + if ((row_limit_ > 0) && (line_number > row_limit_)) + { +#ifdef MAPNIK_DEBUG + std::clog << "CSV Plugin: row limit hit, exiting at feature: " << feature_count << "\n"; +#endif + break; + } + + try + { + ETokenizer tok(csv_line, grammer); + ETokenizer::iterator beg = tok.begin(); + + // early return for strict mode + if (strict_) + { + int num_fields = std::distance(beg,tok.end()); + if (num_fields != num_headers) { std::ostringstream s; - s << "CSV Plugin: expected string type column header - could not parse column " - << i << " - found: '" - << value << "'"; + s << "CSV Plugin: # of headers != # of values parsed for row " << line_number << "\n"; throw mapnik::datasource_exception(s.str()); } } - ++i; - } - else - { - double x; - double y; + + std::string val = boost::trim_copy(*beg); + + // skip lines with leading blanks (assume whole line is empty) + // TODO - test this more! + if (val.empty()){ + ++line_number; + continue; + #ifdef MAPNIK_DEBUG + std::clog << "CSV Plugin: empty row encountered at line: " << line_number << "\n"; + #endif + } + + mapnik::feature_ptr feature(mapnik::feature_factory::create(feature_count)); + double x(0); + double y(0); bool parsed_x = false; bool parsed_y = false; - bool has_wkt_field = false; bool parsed_wkt = false; - bool extent_initialized = false; - // look for wkt field - if (std::find(headers_.begin(), headers_.end(), "wkt") != headers_.end()) - { - has_wkt_field = true; - } - - mapnik::feature_ptr feature(mapnik::feature_factory::create(feature_count)); - ++feature_count; - - unsigned i = 0; + bool first_feature = true; + bool skip = false; + bool null_geom = false; + std::vector collected; + + int i = -1; for (;beg != tok.end(); ++beg) { + ++i; std::string value = boost::trim_copy(*beg); - // avoid range error if trailing separator on last col - // TODO - should we throw instead? - if (i >= headers_.size()) + // avoid range error if trailing separator + if (i >= num_headers) + { + #ifdef MAPNIK_DEBUG + std::clog << "CSV Plugin: messed up line encountered where # values > # column headers at: " << line_number << "\n"; + #endif + skip = true; break; + } std::string fld_name(headers_.at(i)); + collected.push_back(fld_name); + int value_length = value.length(); // parse wkt - if (has_wkt_field && fld_name == "wkt" && !parsed_wkt) + if (has_wkt_field) { - // skip empty geoms - if (value.empty()) - break; - bool result = mapnik::from_wkt(value, feature->paths()); - if (!result) + if (i == wkt_idx) { - std::ostringstream s; - s << "CSV Plugin: expected well known text geometry: could not parse row " - << line_no - << ",column " - << i << " - found: '" - << value << "'"; - throw mapnik::datasource_exception(s.str()); + // skip empty geoms + if (value.empty()) + { + null_geom = true; + break; + } + + // optimize simple "POINT (x y)" + // using this shaved 2 seconds off csv that took 8 seconds total to parse + if (value.find("POINT") == 0) + { + using boost::phoenix::ref; + using boost::spirit::qi::_1; + std::string::const_iterator str_beg = value.begin(); + std::string::const_iterator str_end = value.end(); + bool r = qi::phrase_parse(str_beg,str_end, + ( + qi::lit("POINT") >> '(' >> double_[ref(x) = _1] >> double_[ref(y) = _1] >> ')' + ), + ascii::space); + + if (r /*&& (str_beg != str_end)*/) + { + mapnik::geometry_type * pt = new mapnik::geometry_type(mapnik::Point); + pt->move_to(x,y); + feature->add_geometry(pt); + parsed_wkt = true; + } + else + { + std::clog << "could not parse: " << value << "\n"; + } + } + else + { + if (mapnik::from_wkt(value, feature->paths())) + { + parsed_wkt = true; + } + else + { + std::ostringstream s; + s << "CSV Plugin: expected well known text geometry: could not parse row " + << line_number + << ",column " + << i << " - found: '" + << value << "'"; + if (strict_) + { + throw mapnik::datasource_exception(s.str()); + } + else + { + if (!quiet_) std::clog << s.str() << "\n"; + } + } + } } - parsed_wkt = true; } - // longitude - else if ( !parsed_x && (fld_name == "x" || fld_name == "lon" || fld_name == "longitude") ) + else { - try + // longitude + if (i == lon_idx) { - x = boost::lexical_cast(value); - parsed_x = true; - } - catch (boost::bad_lexical_cast & ex) - { - std::ostringstream s; - s << "CSV Plugin: expected longitude: could not parse row " - << line_no - << ", column " - << i << " - found: '" - << value << "'"; - throw mapnik::datasource_exception(s.str()); - } - } - // latitude - else if ( !parsed_y && (fld_name == "y" || fld_name == "lat" || fld_name == "latitude") ) - { - try - { - y = boost::lexical_cast(value); - parsed_y = true; - } - catch (boost::bad_lexical_cast & ex) - { - std::ostringstream s; - s << "CSV Plugin: expected latitude: could not parse row " - << line_no - << ", column " - << i << " - found: '" - << value << "'"; - throw mapnik::datasource_exception(s.str()); - } - } - // add all values as attributes - try - { - if (value.find(".") != std::string::npos) - { - double val = boost::lexical_cast(value); - boost::put(*feature,fld_name,val); - if (line_no == 2) - desc_.add_descriptor(mapnik::attribute_descriptor(fld_name,mapnik::Double)); - } - else - { - int val = boost::lexical_cast(value); - boost::put(*feature,fld_name,val); - if (line_no == 2) - desc_.add_descriptor(mapnik::attribute_descriptor(fld_name,mapnik::Integer)); - } - } - catch (boost::bad_lexical_cast & ex) - { - std::string val = boost::lexical_cast(value); - if (!val.empty()) - { - if (val == "true") + // skip empty geoms + if (value.empty()) { - boost::put(*feature,fld_name,true); - if (line_no == 2) - desc_.add_descriptor(mapnik::attribute_descriptor(fld_name,mapnik::Boolean)); + null_geom = true; + break; } - else if(val == "false") + + try { - boost::put(*feature,fld_name,false); - if (line_no == 2) - desc_.add_descriptor(mapnik::attribute_descriptor(fld_name,mapnik::Boolean)); + x = boost::lexical_cast(value); + parsed_x = true; + } + catch (boost::bad_lexical_cast & ex) + { + std::ostringstream s; + s << "CSV Plugin: expected a float value for longitude: could not parse row " + << line_number + << ", column " + << i << " - found: '" + << value << "'"; + if (strict_) + { + throw mapnik::datasource_exception(s.str()); + } + else + { + if (!quiet_) std::clog << s.str() << "\n"; + } + } + } + // latitude + else if (i == lat_idx) + { + // skip empty geoms + if (value.empty()) + { + null_geom = true; + break; + } + + try + { + y = boost::lexical_cast(value); + parsed_y = true; + } + catch (boost::bad_lexical_cast & ex) + { + std::ostringstream s; + s << "CSV Plugin: expected a float value for latitude: could not parse row " + << line_number + << ", column " + << i << " - found: '" + << value << "'"; + if (strict_) + { + throw mapnik::datasource_exception(s.str()); + } + else + { + if (!quiet_) std::clog << s.str() << "\n"; + } + } + } + } + + // add all values as attributes + if (value.empty()) + { + boost::put(*feature,fld_name,mapnik::value_null()); + } + // only true strings are this long + else if (value_length > 20) + { + UnicodeString ustr = tr.transcode(value.c_str()); + boost::put(*feature,fld_name,ustr); + if (first_feature) + desc_.add_descriptor(mapnik::attribute_descriptor(fld_name,mapnik::String)); + + } + else if ((value[0] >= '0' && value[0] <= '9') || value[0] == '-') + { + double float_val = 0.0; + std::string::const_iterator str_beg = value.begin(); + std::string::const_iterator str_end = value.end(); + bool r = qi::phrase_parse(str_beg,str_end,qi::double_,ascii::space,float_val); + if (r) + { + if (value.find(".") != std::string::npos) + { + boost::put(*feature,fld_name,float_val); + if (first_feature) + desc_.add_descriptor(mapnik::attribute_descriptor(fld_name,mapnik::Double)); } else { - UnicodeString ustr = tr.transcode(val.c_str()); - boost::put(*feature,fld_name,ustr); - if (line_no == 2) - desc_.add_descriptor(mapnik::attribute_descriptor(fld_name,mapnik::String)); + int val = static_cast(float_val); + boost::put(*feature,fld_name,val); + if (first_feature) + desc_.add_descriptor(mapnik::attribute_descriptor(fld_name,mapnik::Integer)); } } else { - boost::put(*feature,headers_.at(i),mapnik::value_null()); + // fallback to normal string + UnicodeString ustr = tr.transcode(value.c_str()); + boost::put(*feature,fld_name,ustr); + if (first_feature) + desc_.add_descriptor(mapnik::attribute_descriptor(fld_name,mapnik::String)); + } + } + else + { + if (value == "true") + { + boost::put(*feature,fld_name,true); + if (first_feature) + desc_.add_descriptor(mapnik::attribute_descriptor(fld_name,mapnik::Boolean)); + } + else if(value == "false") + { + boost::put(*feature,fld_name,false); + if (first_feature) + desc_.add_descriptor(mapnik::attribute_descriptor(fld_name,mapnik::Boolean)); + } + else + { + // fallback to normal string + UnicodeString ustr = tr.transcode(value.c_str()); + boost::put(*feature,fld_name,ustr); + if (first_feature) + desc_.add_descriptor(mapnik::attribute_descriptor(fld_name,mapnik::String)); } } - ++i; } - + + first_feature = false; + + if (skip) + { + ++line_number; + std::ostringstream s; + s << "CSV Plugin: # values > # column headers" + << "for line " << line_number << " - found " << headers_.size() + << " with values like: " << csv_line << "\n"; + //<< "for: " << boost::algorithm::join(collected, ",") << "\n"; + if (strict_) + { + throw mapnik::datasource_exception(s.str()); + } + else + { + if (!quiet_) std::clog << s.str() << "\n"; + continue; + } + } + else if (null_geom) + { + ++line_number; + std::ostringstream s; + s << "CSV Plugin: null geometry encountered for line " + << line_number; + if (strict_) + { + throw mapnik::datasource_exception(s.str()); + } + else + { + if (!quiet_) std::clog << s.str() << "\n"; + continue; + } + } + if (has_wkt_field) { if (parsed_wkt) { - if (line_no >= 2 && !extent_initialized) + if (!extent_initialized) { extent_initialized = true; extent_ = feature->envelope(); @@ -291,43 +648,91 @@ void csv_datasource::parse_csv(T& stream) const } features_.push_back(feature); } - } - else - { - if (!parsed_x) - { - std::ostringstream s; - s << "CSV Plugin: could not detect or parse any rows named 'x', 'lon' or 'longitude' " - << "does your csv have headers?"; - throw mapnik::datasource_exception(s.str()); - } - else if (!parsed_y) - { - std::ostringstream s; - s << "CSV Plugin: could not detect or parse rows named 'y', 'lat' or 'latitude' " - << "does your csv have headers?"; - throw mapnik::datasource_exception(s.str()); - } else { - if (line_no >= 2 && !extent_initialized) + std::ostringstream s; + s << "CSV Plugin: could not read WKT geometry " + << "for line " << line_number << " - found " << headers_.size() + << " with values like: " << csv_line << "\n"; + if (strict_) { - extent_initialized = true; - extent_.init(x, y, x, y); + throw mapnik::datasource_exception(s.str()); } else { - extent_.expand_to_include(x,y); + if (!quiet_) std::clog << s.str() << "\n"; + continue; } - + } + } + else + { + if (parsed_x && parsed_y) + { mapnik::geometry_type * pt = new mapnik::geometry_type(mapnik::Point); pt->move_to(x,y); feature->add_geometry(pt); - features_.push_back(feature); + features_.push_back(feature); + ++feature_count; + + if (!extent_initialized) + { + extent_initialized = true; + extent_ = feature->envelope(); + + } + else + { + extent_.expand_to_include(feature->envelope()); + } + } + else + { + std::ostringstream s; + if (!parsed_x) + { + s << "CSV Plugin: does your csv have valid headers?\n" + << "Could not detect or parse any rows named 'x' or 'longitude' " + << "for line " << line_number << " but found " << headers_.size() + << " with values like: " << csv_line << "\n" + << "for: " << boost::algorithm::join(collected, ",") << "\n"; + } + if (!parsed_y) + { + s << "CSV Plugin: does your csv have valid headers?\n" + << "Could not detect or parse any rows named 'y' or 'latitude' " + << "for line " << line_number << " but found " << headers_.size() + << " with values like: " << csv_line << "\n" + << "for: " << boost::algorithm::join(collected, ",") << "\n"; + } + if (strict_) + { + throw mapnik::datasource_exception(s.str()); + } + else + { + if (!quiet_) std::clog << s.str() << "\n"; + continue; + } } } + ++line_number; + } + catch (const std::exception & ex ) + { + std::ostringstream s; + s << "CSV Plugin: unexpected error parsing line: " << line_number + << " - found " << headers_.size() << " with values like: " << csv_line << "\n" + << " and got error like: " << ex.what(); + if (strict_) + { + throw mapnik::datasource_exception(s.str()); + } + else + { + if (!quiet_) std::clog << s.str() << "\n"; + } } - ++line_no; } } diff --git a/plugins/input/csv/csv_datasource.hpp b/plugins/input/csv/csv_datasource.hpp index 296a6b3c7..d1806e626 100644 --- a/plugins/input/csv/csv_datasource.hpp +++ b/plugins/input/csv/csv_datasource.hpp @@ -20,17 +20,25 @@ class csv_datasource : public mapnik::datasource mapnik::layer_descriptor get_descriptor() const; void bind() const; template - void parse_csv(T& stream) const; + void parse_csv(T& stream, + std::string const& escape, + std::string const& separator, + std::string const& quote) const; private: mutable mapnik::layer_descriptor desc_; mutable mapnik::box2d extent_; mutable std::string filename_; mutable std::string inline_string_; + mutable int file_length_; + mutable int row_limit_; mutable std::vector features_; - mutable std::string separator_; mutable std::string escape_; + mutable std::string separator_; mutable std::string quote_; mutable std::vector headers_; + mutable std::string manual_headers_; + mutable bool strict_; + mutable bool quiet_; };