diff --git a/deps/agg/include/agg_pixfmt_rgba.h b/deps/agg/include/agg_pixfmt_rgba.h index 4f20fc1d2..e314b2531 100644 --- a/deps/agg/include/agg_pixfmt_rgba.h +++ b/deps/agg/include/agg_pixfmt_rgba.h @@ -1444,68 +1444,8 @@ namespace agg } }; - // colorize alpha values - // TODO - consider moving to image-filters: - // https://github.com/mapnik/mapnik/issues/1371 - /* - template - struct comp_op_rgba_colorize_alpha - { - typedef ColorT color_type; - typedef Order order_type; - typedef typename color_type::value_type value_type; - typedef typename color_type::calc_type calc_type; - typedef typename color_type::long_type long_type; - enum base_scale_e - { - base_shift = color_type::base_shift, - base_mask = color_type::base_mask - }; - - static AGG_INLINE void blend_pix(value_type* p, - // source rgb - unsigned sr, unsigned sg, unsigned sb, - // source alpha and opacity - unsigned sa, unsigned cover) - { - if(cover < 255) - { - sr = (sr * cover + 255) >> 8; - sg = (sg * cover + 255) >> 8; - sb = (sb * cover + 255) >> 8; - sa = (sa * cover + 255) >> 8; - } - if (sa > 0) - { - p[Order::R] = (value_type)(((0 + base_mask) >> base_shift)); - p[Order::G] = (value_type)(((0 + base_mask) >> base_shift)); - p[Order::B] = (value_type)(((0 + base_mask) >> base_shift)); - p[Order::A] = (value_type)(sa + p[Order::A] - ((sa * p[Order::A] + base_mask) >> base_shift)); - // http://en.wikipedia.org/wiki/File:HSV-RGB-comparison.svg - if (p[Order::A] < 64) { - p[Order::G] = ((p[Order::A] - 64) * 4); - p[Order::B] = 255; - } - if (p[Order::A] >= 64 && p[Order::A] < 128) { - p[Order::G] = 255; - p[Order::B] = 255 - ((p[Order::A] - 64) * 4); - } - if (p[Order::A] >= 128 && p[Order::A] < 192) { - p[Order::R] = ((p[Order::A] - 128) * 4); - p[Order::G] = 255; - } - if (p[Order::A] >= 192) { - p[Order::R] = 255; - p[Order::G] = 255 - ((p[Order::A] - 192) * 4); - } - } - } - }; - */ - // grain extract (GIMP) // E = I - M + 128 - template struct comp_op_rgba_grain_extract { diff --git a/include/mapnik/datasource.hpp b/include/mapnik/datasource.hpp index b2edb91d4..947fc056a 100644 --- a/include/mapnik/datasource.hpp +++ b/include/mapnik/datasource.hpp @@ -30,6 +30,7 @@ #include #include #include +#include // boost #include @@ -109,6 +110,13 @@ public: * @return The type of the datasource (Vector or Raster) */ virtual datasource_t type() const = 0; + + virtual processor_context_ptr get_context(feature_style_context_map&) const { return processor_context_ptr(); } + virtual featureset_ptr features_with_context(const query& q,processor_context_ptr ctx= processor_context_ptr()) const + { + // default implementation without context use features method + return features(q); + } virtual featureset_ptr features(query const& q) const = 0; virtual featureset_ptr features_at_point(coord2d const& pt, double tol = 0) const = 0; virtual box2d envelope() const = 0; diff --git a/include/mapnik/feature_style_processor.hpp b/include/mapnik/feature_style_processor.hpp index 93a3114bb..6a652150c 100644 --- a/include/mapnik/feature_style_processor.hpp +++ b/include/mapnik/feature_style_processor.hpp @@ -27,6 +27,7 @@ #include // for featureset_ptr #include + // stl #include #include @@ -41,6 +42,7 @@ class projection; class proj_transform; class feature_type_style; class rule_cache; +struct layer_rendering_material; enum eAttributeCollectionPolicy { @@ -67,6 +69,7 @@ public: void apply(mapnik::layer const& lyr, std::set& names, double scale_denom_override=0.0); + /*! * \brief render a layer given a projection and scale. */ @@ -91,6 +94,26 @@ private: featureset_ptr features, proj_transform const& prj_trans); + /*! + * \brief prepare features for rendering asynchronously. + */ + void prepare_layer(layer_rendering_material & mat, + feature_style_context_map & ctx_map, + Processor & p, + projection const& proj0, + double scale, + double scale_denom, + unsigned width, + unsigned height, + box2d const& extent, + int buffer_size, + std::set& names); + + /*! + * \brief render features list queued when they are available. + */ + void render_material(layer_rendering_material & mat, Processor & p ); + Map const& m_; }; } diff --git a/include/mapnik/feature_style_processor_context.hpp b/include/mapnik/feature_style_processor_context.hpp new file mode 100644 index 000000000..376886b9d --- /dev/null +++ b/include/mapnik/feature_style_processor_context.hpp @@ -0,0 +1,47 @@ +/***************************************************************************** + * + * This file is part of Mapnik (c++ mapping toolkit) + * + * Copyright (C) 2013 Artem Pavlenko + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + *****************************************************************************/ + +#ifndef FEATURE_STYLE_PROCESSOR_CONTEXT_HPP +#define FEATURE_STYLE_PROCESSOR_CONTEXT_HPP + +// boost +#include +#include + +// stl +#include +#include + +namespace mapnik { + + +class IProcessorContext { +public: + virtual ~IProcessorContext() {} +}; + +typedef boost::shared_ptr processor_context_ptr; +typedef std::map feature_style_context_map; + +} + +#endif /* FEATURE_STYLE_PROCESSOR_CONTEXT_HPP */ diff --git a/include/mapnik/feature_style_processor_impl.hpp b/include/mapnik/feature_style_processor_impl.hpp index 76f4f190c..2bd7fda97 100644 --- a/include/mapnik/feature_style_processor_impl.hpp +++ b/include/mapnik/feature_style_processor_impl.hpp @@ -45,6 +45,7 @@ #include #include + // boost #include #include @@ -126,6 +127,25 @@ struct has_process ); }; +// Store material for layer rendering in a two step process +struct layer_rendering_material { + layer const& lay_; + projection const& proj0_; + projection proj1_; + box2d layer_ext2_; + std::vector active_styles_; + std::vector featureset_ptr_list_; + boost::ptr_vector rule_caches_; + + layer_rendering_material(layer const& lay, projection const& dest) : + lay_(lay), + proj0_(dest), + proj1_(lay.srs(),true) {} +}; + +typedef boost::shared_ptr layer_rendering_material_ptr; + + template feature_style_processor::feature_style_processor(Map const& m, double scale_factor) : m_(m) @@ -148,27 +168,54 @@ void feature_style_processor::apply(double scale_denom) scale_denom = mapnik::scale_denominator(m_.scale(),proj.is_geographic()); scale_denom *= p.scale_factor(); + // Asynchronous query supports: + // This is a two steps process, + // first we setup all queries at layer level + // in a second time, we fetch the results and + // do the actual rendering + + std::vector mat_list; + + // Define processing context map used by datasources + // implementing asynchronous queries + feature_style_context_map ctx_map; + BOOST_FOREACH ( layer const& lyr, m_.layers() ) { if (lyr.visible(scale_denom)) { std::set names; - apply_to_layer(lyr, - p, - proj, - m_.scale(), - scale_denom, - m_.width(), - m_.height(), - m_.get_current_extent(), - m_.buffer_size(), - names); + layer_rendering_material_ptr mat = boost::make_shared(lyr, proj); + prepare_layer(*mat, + ctx_map, + p, + proj, + m_.scale(), + scale_denom, + m_.width(), + m_.height(), + m_.get_current_extent(), + m_.buffer_size(), + names); + + // Store active material + if (!mat->active_styles_.empty()) + { + mat_list.push_back(mat); + } + } + } + + BOOST_FOREACH ( layer_rendering_material_ptr mat, mat_list ) + { + if (!mat->active_styles_.empty()) + { + render_material(*mat,p); } } p.end_map_processing(m_); - } template @@ -199,8 +246,12 @@ void feature_style_processor::apply(mapnik::layer const& lyr, p.end_map_processing(m_); } +/*! + * \brief render a layer given a projection and scale. + */ template -void feature_style_processor::apply_to_layer(layer const& lay, Processor & p, +void feature_style_processor::apply_to_layer(layer const& lay, + Processor & p, projection const& proj0, double scale, double scale_denom, @@ -210,6 +261,42 @@ void feature_style_processor::apply_to_layer(layer const& lay, Proces int buffer_size, std::set& names) { + feature_style_context_map ctx_map; + layer_rendering_material mat(lay, proj0); + + prepare_layer(mat, + ctx_map, + p, + proj0, + m_.scale(), + scale_denom, + m_.width(), + m_.height(), + m_.get_current_extent(), + m_.buffer_size(), + names); + + if (!mat.active_styles_.empty()) + { + render_material(mat,p); + } +} + +template +void feature_style_processor::prepare_layer(layer_rendering_material & mat, + feature_style_context_map & ctx_map, + Processor & p, + projection const& proj0, + double scale, + double scale_denom, + unsigned width, + unsigned height, + box2d const& extent, + int buffer_size, + std::set& names) +{ + layer const& lay = mat.lay_; + std::vector const& style_names = lay.styles(); unsigned int num_styles = style_names.size(); @@ -228,8 +315,8 @@ void feature_style_processor::apply_to_layer(layer const& lay, Proces return; } - projection proj1(lay.srs(),true); - proj_transform prj_trans(proj0,proj1); + processor_context_ptr current_ctx = ds->get_context(ctx_map); + proj_transform prj_trans(mat.proj0_,mat.proj1_); box2d query_ext = extent; // unbuffered box2d buffered_query_ext(query_ext); // buffered @@ -288,6 +375,8 @@ void feature_style_processor::apply_to_layer(layer const& lay, Proces early_return = true; } + std::vector & active_styles = mat.active_styles_; + if (early_return) { // check for styles needing compositing operations applied @@ -299,13 +388,13 @@ void feature_style_processor::apply_to_layer(layer const& lay, Proces { continue; } + if (style->comp_op() || style->image_filters().size() > 0) { if (style->active(scale_denom)) { - // trigger any needed compositing ops - p.start_style_processing(*style); - p.end_style_processing(*style); + // we'll have to handle compositing ops + active_styles.push_back(&(*style)); } } } @@ -319,7 +408,9 @@ void feature_style_processor::apply_to_layer(layer const& lay, Proces query_ext.clip(*maximum_extent); } - box2d layer_ext2 = lay.envelope(); + box2d & layer_ext2 = mat.layer_ext2_; + + layer_ext2 = lay.envelope(); if (fw_success) { if (prj_trans.forward(query_ext, PROJ_ENVELOPE_POINTS)) @@ -344,9 +435,8 @@ void feature_style_processor::apply_to_layer(layer const& lay, Proces height/qh); query q(layer_ext,res,scale_denom,extent); - std::vector active_styles; + boost::ptr_vector & rule_caches = mat.rule_caches_; attribute_collector collector(names); - boost::ptr_vector rule_caches; // iterate through all named styles collecting active styles and attribute names BOOST_FOREACH(std::string const& style_name, style_names) @@ -406,57 +496,90 @@ void feature_style_processor::apply_to_layer(layer const& lay, Proces { q.add_property_name(group_by); } + } - bool cache_features = lay.cache_features() && active_styles.size() > 1; + bool cache_features = lay.cache_features() && active_styles.size() > 1; + std::string group_by = lay.group_by(); - // Render incrementally when the column that we group by changes value. - if (!group_by.empty()) + std::vector & featureset_ptr_list = mat.featureset_ptr_list_; + if ( (group_by != "") || cache_features) + { + featureset_ptr_list.push_back(ds->features_with_context(q,current_ctx)); + } + else + { + for(size_t i = 0 ; i < active_styles.size(); i++) { - featureset_ptr features = ds->features(q); - if (features) - { - boost::shared_ptr cache = boost::make_shared(); - feature_ptr feature, prev; - while ((feature = features->next())) - { - if (prev && prev->get(group_by) != feature->get(group_by)) - { - // We're at a value boundary, so render what we have - // up to this point. - int i = 0; - BOOST_FOREACH (feature_type_style const* style, active_styles) - { - cache->prepare(); - render_style(p, style, rule_caches[i], cache, prj_trans); - i++; - } - cache->clear(); - } - cache->push(feature); - prev = feature; - } - - int i = 0; - BOOST_FOREACH (feature_type_style const* style, active_styles) - { - cache->prepare(); - render_style(p, style, rule_caches[i], cache, prj_trans); - i++; - } - } + featureset_ptr_list.push_back(ds->features_with_context(q,current_ctx)); } - else if (cache_features) + } +} + + +template +void feature_style_processor::render_material(layer_rendering_material & mat, Processor & p ) +{ + std::vector & active_styles = mat.active_styles_; + std::vector & featureset_ptr_list = mat.featureset_ptr_list_; + if (featureset_ptr_list.empty()) + { + // The datasource wasn't querried because of early return + // but we have to apply compositing operations on styles + BOOST_FOREACH (feature_type_style const* style, active_styles) { - featureset_ptr features = ds->features(q); + p.start_style_processing(*style); + p.end_style_processing(*style); + } + + return; + } + + p.start_layer_processing(mat.lay_, mat.layer_ext2_); + + layer const& lay = mat.lay_; + + boost::ptr_vector & rule_caches = mat.rule_caches_; + + proj_transform prj_trans(mat.proj0_,mat.proj1_); + + bool cache_features = lay.cache_features() && active_styles.size() > 1; + + datasource_ptr ds = lay.datasource(); + std::string group_by = lay.group_by(); + + // Render incrementally when the column that we group by + // changes value. + if (group_by != "") + { + featureset_ptr features = *featureset_ptr_list.begin(); + if (features) { + // Cache all features into the memory_datasource before rendering. boost::shared_ptr cache = boost::make_shared(); - if (features) + feature_ptr feature, prev; + + while ((feature = features->next())) { - feature_ptr feature; - while ((feature = features->next())) + if (prev && prev->get(group_by) != feature->get(group_by)) { - cache->push(feature); + // We're at a value boundary, so render what we have + // up to this point. + int i = 0; + BOOST_FOREACH (feature_type_style const* style, active_styles) + { + + cache->prepare(); + render_style(p, style, + rule_caches[i], + cache, + prj_trans); + i++; + } + cache->clear(); } + cache->push(feature); + prev = feature; } + int i = 0; BOOST_FOREACH (feature_type_style const* style, active_styles) { @@ -466,20 +589,48 @@ void feature_style_processor::apply_to_layer(layer const& lay, Proces } cache->clear(); } - // We only have a single style and no grouping. - else - { - int i = 0; - BOOST_FOREACH (feature_type_style const* style, active_styles) + } + else if (cache_features) + { + boost::shared_ptr cache = boost::make_shared(); + featureset_ptr features = *featureset_ptr_list.begin(); + if (features) { + // Cache all features into the memory_datasource before rendering. + feature_ptr feature; + while ((feature = features->next())) { - render_style(p, style, rule_caches[i], ds->features(q), prj_trans); - i++; + + cache->push(feature); } } + int i = 0; + BOOST_FOREACH (feature_type_style const* style, active_styles) + { + cache->prepare(); + render_style(p, style, + rule_caches[i], + cache, prj_trans); + i++; + } + } + // We only have a single style and no grouping. + else + { + int i = 0; + std::vector::iterator featuresets = featureset_ptr_list.begin(); + BOOST_FOREACH (feature_type_style const* style, active_styles) + { + featureset_ptr features = *featuresets++; + render_style(p, style, + rule_caches[i], + features, + prj_trans); + i++; + } } - p.end_layer_processing(lay); -} + p.end_layer_processing(mat.lay_); +} template void feature_style_processor::render_style( diff --git a/include/mapnik/image_compositing.hpp b/include/mapnik/image_compositing.hpp index 6d5d05fab..f0e39a901 100644 --- a/include/mapnik/image_compositing.hpp +++ b/include/mapnik/image_compositing.hpp @@ -75,7 +75,6 @@ enum composite_mode_e saturation, _color, _value - //colorize_alpha }; MAPNIK_DECL boost::optional comp_op_from_string(std::string const& name); diff --git a/include/mapnik/json/geometry_generator_grammar.hpp b/include/mapnik/json/geometry_generator_grammar.hpp index c094daed0..dd6950ca5 100644 --- a/include/mapnik/json/geometry_generator_grammar.hpp +++ b/include/mapnik/json/geometry_generator_grammar.hpp @@ -143,7 +143,7 @@ struct multi_geometry_type for ( ; itr != end; ++itr) { - if (type != 0u && itr->type() != type) + if (type != 0u && static_cast(itr->type()) != type) { collection = true; break; diff --git a/plugins/input/postgis/asyncresultset.hpp b/plugins/input/postgis/asyncresultset.hpp new file mode 100644 index 000000000..a5f838ad8 --- /dev/null +++ b/plugins/input/postgis/asyncresultset.hpp @@ -0,0 +1,213 @@ +/***************************************************************************** + * + * This file is part of Mapnik (c++ mapping toolkit) + * + * Copyright (C) 2013 Artem Pavlenko + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + *****************************************************************************/ + +#ifndef POSTGIS_ASYNCRESULTSET_HPP +#define POSTGIS_ASYNCRESULTSET_HPP + +#include +#include + +#include "connection_manager.hpp" +#include "resultset.hpp" +#include + +class postgis_processor_context; +typedef boost::shared_ptr postgis_processor_context_ptr; + +class AsyncResultSet : public IResultSet, private mapnik::noncopyable +{ +public: + AsyncResultSet(postgis_processor_context_ptr const& ctx, + boost::shared_ptr< Pool > const& pool, + boost::shared_ptr const& conn, std::string const& sql ) + : ctx_(ctx), + pool_(pool), + conn_(conn), + sql_(sql), + is_closed_(false) + { + } + + virtual bool use_connection() { return true; } + + virtual ~AsyncResultSet() + { + close(); + } + + virtual void close() + { + if (!is_closed_) + { + rs_.reset(); + is_closed_ = true; + if (conn_) + { + conn_.reset(); + } + } + } + + virtual int getNumFields() const + { + return rs_->getNumFields(); + } + + virtual bool next() + { + bool next_res = false; + if (!rs_) + { + // Ensure connection is valid + if (conn_ && conn_->isOK()) + { + rs_ = conn_->getAsyncResult(); + } + else + { + throw mapnik::datasource_exception("invalid connection in AsyncResultSet::next"); + } + } + + next_res = rs_->next(); + if (!next_res) + { + rs_.reset(); + rs_ = conn_->getNextAsyncResult(); + if (rs_ && rs_->next()) + { + return true; + } + close(); + prepare_next(); + } + return next_res; + } + + virtual const char* getFieldName(int index) const + { + return rs_->getFieldName(index); + } + + virtual int getFieldLength(int index) const + { + return rs_->getFieldLength(index); + } + + virtual int getFieldLength(const char* name) const + { + return rs_->getFieldLength(name); + } + + virtual int getTypeOID(int index) const + { + return rs_->getTypeOID(index); + } + + virtual int getTypeOID(const char* name) const + { + return rs_->getTypeOID(name); + } + + virtual bool isNull(int index) const + { + return rs_->isNull(index); + } + + virtual const char* getValue(int index) const + { + return rs_->getValue(index); + } + + virtual const char* getValue(const char* name) const + { + return rs_->getValue(name); + } + +private: + postgis_processor_context_ptr ctx_; + boost::shared_ptr< Pool > pool_; + boost::shared_ptr conn_; + std::string sql_; + boost::shared_ptr rs_; + bool is_closed_; + + void prepare() + { + conn_ = pool_->borrowObject(); + if (conn_ && conn_->isOK()) + { + conn_->executeAsyncQuery(sql_, 1); + } + else + { + throw mapnik::datasource_exception("Postgis Plugin: bad connection"); + } + } + + void prepare_next(); + +}; + + +class postgis_processor_context : public mapnik::IProcessorContext +{ +public: + postgis_processor_context() + : num_async_requests_(0) {} + ~postgis_processor_context() {} + + void add_request(boost::shared_ptr const& req) + { + q_.push(req); + } + + boost::shared_ptr pop_next_request() + { + boost::shared_ptr r; + if (!q_.empty()) + { + r = q_.front(); + q_.pop(); + } + return r; + } + + int num_async_requests_; + +private: + typedef std::queue > async_queue; + async_queue q_; + +}; + +inline void AsyncResultSet::prepare_next() +{ + // ensure cnx pool has unused cnx + boost::shared_ptr next = ctx_->pop_next_request(); + if (next) + { + next->prepare(); + } +} + +#endif // POSTGIS_ASYNCRESULTSET_HPP diff --git a/plugins/input/postgis/connection.hpp b/plugins/input/postgis/connection.hpp index d8726aef0..8c764225f 100644 --- a/plugins/input/postgis/connection.hpp +++ b/plugins/input/postgis/connection.hpp @@ -70,9 +70,7 @@ public: if (! closed_) { PQfinish(conn_); - MAPNIK_LOG_DEBUG(postgis) << "postgis_connection: postgresql connection closed - " << conn_; - closed_ = true; } } @@ -107,15 +105,15 @@ public: if (! result || (PQresultStatus(result) != PGRES_TUPLES_OK)) { - std::string err_msg = status(); - err_msg += "\nFull sql was: '"; + std::string err_msg = "Postgis Plugin: "; + err_msg += status(); + err_msg += "\nin executeQuery Full sql was: '"; err_msg += sql; err_msg += "'\n"; if (result) { PQclear(result); } - throw mapnik::datasource_exception(err_msg); } @@ -136,6 +134,66 @@ public: return status; } + bool executeAsyncQuery(std::string const& sql, int type = 0) + { + int result = 0; + if (type == 1) + { + result = PQsendQueryParams(conn_,sql.c_str(), 0, 0, 0, 0, 0, 1); + } + else + { + result = PQsendQuery(conn_, sql.c_str()); + } + if (result != 1) + { + std::string err_msg = "Postgis Plugin: "; + err_msg += status(); + err_msg += "\nin executeAsyncQuery Full sql was: '"; + err_msg += sql; + err_msg += "'\n"; + clearAsyncResult(PQgetResult(conn_)); + close(); + throw mapnik::datasource_exception(err_msg); + } + return result; + } + + + boost::shared_ptr getNextAsyncResult() + { + PGresult *result = PQgetResult(conn_); + if( result && (PQresultStatus(result) != PGRES_TUPLES_OK)) + { + std::string err_msg = "Postgis Plugin: "; + err_msg += status(); + err_msg += "\nin getNextAsyncResult"; + clearAsyncResult(result); + // We need to guarde against losing the connection + // (i.e db restart) so here we invalidate the full connection + close(); + throw mapnik::datasource_exception(err_msg); + } + return boost::make_shared(result); + } + + boost::shared_ptr getAsyncResult() + { + PGresult *result = PQgetResult(conn_); + if ( !result || (PQresultStatus(result) != PGRES_TUPLES_OK)) + { + std::string err_msg = "Postgis Plugin: "; + err_msg += status(); + err_msg += "\nin getAsyncResult"; + clearAsyncResult(result); + // We need to be guarded against losing the connection + // (i.e db restart), we invalidate the full connection + close(); + throw mapnik::datasource_exception(err_msg); + } + return boost::make_shared(result); + } + std::string client_encoding() const { return PQparameterStatus(conn_, "client_encoding"); @@ -151,9 +209,7 @@ public: if (! closed_) { PQfinish(conn_); - MAPNIK_LOG_DEBUG(postgis) << "postgis_connection: datasource closed, also closing connection - " << conn_; - closed_ = true; } } @@ -169,6 +225,16 @@ private: PGconn *conn_; int cursorId; bool closed_; + + void clearAsyncResult(PGresult *result) const + { + // Clear all pending results + while(result) + { + PQclear(result); + result = PQgetResult(conn_); + } + } }; #endif //CONNECTION_HPP diff --git a/plugins/input/postgis/connection_manager.hpp b/plugins/input/postgis/connection_manager.hpp index 4fbcce2d6..d457da393 100644 --- a/plugins/input/postgis/connection_manager.hpp +++ b/plugins/input/postgis/connection_manager.hpp @@ -101,8 +101,12 @@ private: class ConnectionManager : public singleton { - friend class CreateStatic; +public: typedef Pool PoolType; + +private: + friend class CreateStatic; + typedef std::map > ContType; typedef boost::shared_ptr HolderType; ContType pools_; diff --git a/plugins/input/postgis/cursorresultset.hpp b/plugins/input/postgis/cursorresultset.hpp index 611e394f8..59b4e2dfc 100644 --- a/plugins/input/postgis/cursorresultset.hpp +++ b/plugins/input/postgis/cursorresultset.hpp @@ -28,56 +28,23 @@ #include "connection.hpp" #include "resultset.hpp" -class CursorResultSet : public IResultSet +class CursorResultSet : public IResultSet, private mapnik::noncopyable { public: CursorResultSet(boost::shared_ptr const &conn, std::string cursorName, int fetch_count) : conn_(conn), cursorName_(cursorName), fetch_size_(fetch_count), - is_closed_(false), - refCount_(new int(1)) + is_closed_(false) { getNextResultSet(); } - CursorResultSet(const CursorResultSet& rhs) - : conn_(rhs.conn_), - cursorName_(rhs.cursorName_), - rs_(rhs.rs_), - fetch_size_(rhs.fetch_size_), - is_closed_(rhs.is_closed_), - refCount_(rhs.refCount_) - { - (*refCount_)++; - } - virtual ~CursorResultSet() { - if (--(*refCount_)==0) - { - close(); - delete refCount_,refCount_=0; - } + close(); } - CursorResultSet& operator=(const CursorResultSet& rhs) - { - if (this==&rhs) return *this; - if (--(refCount_)==0) - { - close(); - delete refCount_,refCount_=0; - } - conn_=rhs.conn_; - cursorName_=rhs.cursorName_; - rs_=rhs.rs_; - refCount_=rhs.refCount_; - fetch_size_=rhs.fetch_size_; - is_closed_ = false; - (*refCount_)++; - return *this; - } virtual void close() { @@ -92,6 +59,7 @@ public: conn_->execute(s.str()); is_closed_ = true; + conn_.reset(); } } @@ -105,6 +73,7 @@ public: if (rs_->next()) { return true; } else if (rs_->size() == 0) { + close(); return false; } else { getNextResultSet(); @@ -171,7 +140,8 @@ private: boost::shared_ptr rs_; int fetch_size_; bool is_closed_; - int *refCount_; + + }; #endif // POSTGIS_CURSORRESULTSET_HPP diff --git a/plugins/input/postgis/postgis_datasource.cpp b/plugins/input/postgis/postgis_datasource.cpp index 4b3be44ce..bc858d7b0 100644 --- a/plugins/input/postgis/postgis_datasource.cpp +++ b/plugins/input/postgis/postgis_datasource.cpp @@ -23,6 +23,8 @@ #include "connection_manager.hpp" #include "postgis_datasource.hpp" #include "postgis_featureset.hpp" +#include "asyncresultset.hpp" + // mapnik #include @@ -67,22 +69,25 @@ postgis_datasource::postgis_datasource(parameters const& params) srid_(*params.get("srid", 0)), extent_initialized_(false), simplify_geometries_(false), - desc_(*params.get("type"), "utf-8"), - creator_(params.get("host"), + desc_(*params.get("type"), "utf-8"), + creator_(params.get("host"), params.get("port"), params.get("dbname"), params.get("user"), params.get("password"), params.get("connect_timeout", "4")), - bbox_token_("!bbox!"), - scale_denom_token_("!scale_denominator!"), - pixel_width_token_("!pixel_width!"), - pixel_height_token_("!pixel_height!"), - persist_connection_(*params.get("persist_connection", true)), - extent_from_subquery_(*params.get("extent_from_subquery", false)), -// params below are for testing purposes only (will likely be removed at any time) - intersect_min_scale_(*params.get("intersect_min_scale", 0)), - intersect_max_scale_(*params.get("intersect_max_scale", 0)) + bbox_token_("!bbox!"), + scale_denom_token_("!scale_denominator!"), + pixel_width_token_("!pixel_width!"), + pixel_height_token_("!pixel_height!"), + pool_max_size_(*params_.get("max_size", 10)), + persist_connection_(*params.get("persist_connection", true)), + extent_from_subquery_(*params.get("extent_from_subquery", false)), + max_async_connections_(*params_.get("max_async_connection", 1)), + asynchronous_request_(false), + // params below are for testing purposes only and may be removed at any time + intersect_min_scale_(*params.get("intersect_min_scale", 0)), + intersect_max_scale_(*params.get("intersect_max_scale", 0)) { #ifdef MAPNIK_STATS mapnik::progress_timer __stats__(std::clog, "postgis_datasource::init"); @@ -98,16 +103,29 @@ postgis_datasource::postgis_datasource(parameters const& params) extent_initialized_ = extent_.from_string(*ext); } + // NOTE: In multithread environment, pool_max_size_ should be + // max_async_connections_ * num_threads + if(max_async_connections_ > 1) + { + if(max_async_connections_ > pool_max_size_) + { + std::ostringstream err; + err << "PostGIS Plugin: Error: 'max_async_connections (" + << max_async_connections_ << ") must be <= max_size(" << pool_max_size_ << ")"; + throw mapnik::datasource_exception(err.str()); + } + asynchronous_request_ = true; + } + boost::optional initial_size = params.get("initial_size", 1); - boost::optional max_size = params.get("max_size", 10); boost::optional autodetect_key_field = params.get("autodetect_key_field", false); boost::optional estimate_extent = params.get("estimate_extent", false); estimate_extent_ = estimate_extent && *estimate_extent; boost::optional simplify_opt = params.get("simplify_geometries", false); simplify_geometries_ = simplify_opt && *simplify_opt; - ConnectionManager::instance().registerPool(creator_, *initial_size, *max_size); - shared_ptr< Pool > pool = ConnectionManager::instance().getPool(creator_.id()); + ConnectionManager::instance().registerPool(creator_, *initial_size, pool_max_size_); + CnxPool_ptr pool = ConnectionManager::instance().getPool(creator_.id()); if (pool) { shared_ptr conn = pool->borrowObject(); @@ -413,6 +431,10 @@ postgis_datasource::postgis_datasource(parameters const& params) rs->close(); } + + // Close explicitly the connection so we can 'fork()' without sharing open connections + conn->close(); + } } @@ -420,7 +442,7 @@ postgis_datasource::~postgis_datasource() { if (! persist_connection_) { - shared_ptr< Pool > pool = ConnectionManager::instance().getPool(creator_.id()); + CnxPool_ptr pool = ConnectionManager::instance().getPool(creator_.id()); if (pool) { shared_ptr conn = pool->borrowObject(); @@ -557,156 +579,222 @@ std::string postgis_datasource::populate_tokens(std::string const& sql, double s } -boost::shared_ptr postgis_datasource::get_resultset(boost::shared_ptr const &conn, std::string const& sql) const +boost::shared_ptr postgis_datasource::get_resultset(boost::shared_ptr &conn, std::string const& sql, CnxPool_ptr const& pool, processor_context_ptr ctx) const { - if (cursor_fetch_size_ > 0) + + if (!ctx) { - // cursor - std::ostringstream csql; - std::string cursor_name = conn->new_cursor_name(); - - csql << "DECLARE " << cursor_name << " BINARY INSENSITIVE NO SCROLL CURSOR WITH HOLD FOR " << sql << " FOR READ ONLY"; - - if (! conn->execute(csql.str())) + // ! asynchronous_request_ + if (cursor_fetch_size_ > 0) { - // TODO - better error - throw mapnik::datasource_exception("Postgis Plugin: error creating cursor for data select." ); + // cursor + std::ostringstream csql; + std::string cursor_name = conn->new_cursor_name(); + + csql << "DECLARE " << cursor_name << " BINARY INSENSITIVE NO SCROLL CURSOR WITH HOLD FOR " << sql << " FOR READ ONLY"; + + if (! conn->execute(csql.str())) + { + // TODO - better error + throw mapnik::datasource_exception("Postgis Plugin: error creating cursor for data select." ); + } + + return boost::make_shared(conn, cursor_name, cursor_fetch_size_); + + } + else + { + // no cursor + return conn->executeQuery(sql, 1); } - - return boost::make_shared(conn, cursor_name, cursor_fetch_size_); - } else - { - // no cursor - return conn->executeQuery(sql, 1); + { // asynchronous requests + + boost::shared_ptr pgis_ctxt = boost::static_pointer_cast(ctx); + if (conn) + { + // lauch async req & create asyncresult with conn + conn->executeAsyncQuery(sql, 1); + return boost::make_shared(pgis_ctxt, pool, conn, sql); + } + else + { + // create asyncresult with null connection + boost::shared_ptr res = boost::make_shared(pgis_ctxt, pool, conn, sql); + pgis_ctxt->add_request(res); + return res; + } } } -featureset_ptr postgis_datasource::features(const query& q) const +processor_context_ptr postgis_datasource::get_context(feature_style_context_map & ctx) const { + if (!asynchronous_request_) + { + return processor_context_ptr(); + } + + std::string ds_name(name()); + feature_style_context_map::const_iterator itr = ctx.find(ds_name); + if (itr != ctx.end()) + { + return itr->second; + } + else + { + return ctx.insert(std::make_pair(ds_name,boost::make_shared())).first->second; + } +} + +featureset_ptr postgis_datasource::features(query const& q) const +{ + // if the driver is in asynchronous mode, return the appropriate fetaures + if (asynchronous_request_ ) + { + return features_with_context(q,boost::make_shared()); + } + else + { + return features_with_context(q); + } +} + +featureset_ptr postgis_datasource::features_with_context(query const& q,processor_context_ptr proc_ctx) const +{ + #ifdef MAPNIK_STATS - mapnik::progress_timer __stats__(std::clog, "postgis_datasource::features"); + mapnik::progress_timer __stats__(std::clog, "postgis_datasource::features_with_context"); #endif + box2d const& box = q.get_bbox(); double scale_denom = q.scale_denominator(); - shared_ptr< Pool > pool = ConnectionManager::instance().getPool(creator_.id()); + CnxPool_ptr pool = ConnectionManager::instance().getPool(creator_.id()); + if (pool) { - shared_ptr conn = pool->borrowObject(); - if (conn && conn->isOK()) + shared_ptr conn; + + if ( asynchronous_request_ ) { - if (geometryColumn_.empty()) + // limit use to num_async_request_ => if reached don't borrow the last connexion object + boost::shared_ptr pgis_ctxt = boost::static_pointer_cast(proc_ctx); + if ( pgis_ctxt->num_async_requests_ < max_async_connections_ ) { - std::ostringstream s_error; - s_error << "PostGIS: geometry name lookup failed for table '"; - - if (! schema_.empty()) - { - s_error << schema_ << "."; - } - s_error << geometry_table_ - << "'. Please manually provide the 'geometry_field' parameter or add an entry " - << "in the geometry_columns for '"; - - if (! schema_.empty()) - { - s_error << schema_ << "."; - } - s_error << geometry_table_ << "'."; - - throw mapnik::datasource_exception(s_error.str()); + conn = pool->borrowObject(); + pgis_ctxt->num_async_requests_++; } - - std::ostringstream s; - - const double px_gw = 1.0 / boost::get<0>(q.resolution()); - const double px_gh = 1.0 / boost::get<1>(q.resolution()); - - s << "SELECT ST_AsBinary("; - - if (simplify_geometries_) { - s << "ST_Simplify("; - } - - s << "\"" << geometryColumn_ << "\""; - - if (simplify_geometries_) { - // 1/20 of pixel seems to be a good compromise to avoid - // drop of collapsed polygons. - // See https://github.com/mapnik/mapnik/issues/1639 - const double tolerance = std::min(px_gw, px_gh) / 20.0; - s << ", " << tolerance << ")"; - } - - s << ") AS geom"; - - mapnik::context_ptr ctx = boost::make_shared(); - std::set const& props = q.property_names(); - std::set::const_iterator pos = props.begin(); - std::set::const_iterator end = props.end(); - - if (! key_field_.empty()) + } + else + { + // Always get a connection in synchronous mode + conn = pool->borrowObject(); + if(!conn ) { - mapnik::sql_utils::quote_attr(s, key_field_); - ctx->push(key_field_); - - for (; pos != end; ++pos) - { - if (*pos != key_field_) - { - mapnik::sql_utils::quote_attr(s, *pos); - ctx->push(*pos); - } - } + throw mapnik::datasource_exception("Postgis Plugin: Null connection"); } - else + } + + + if (geometryColumn_.empty()) + { + std::ostringstream s_error; + s_error << "PostGIS: geometry name lookup failed for table '"; + + if (! schema_.empty()) { - for (; pos != end; ++pos) + s_error << schema_ << "."; + } + s_error << geometry_table_ + << "'. Please manually provide the 'geometry_field' parameter or add an entry " + << "in the geometry_columns for '"; + + if (! schema_.empty()) + { + s_error << schema_ << "."; + } + s_error << geometry_table_ << "'."; + + throw mapnik::datasource_exception(s_error.str()); + } + + std::ostringstream s; + + const double px_gw = 1.0 / boost::get<0>(q.resolution()); + const double px_gh = 1.0 / boost::get<1>(q.resolution()); + + s << "SELECT ST_AsBinary("; + + if (simplify_geometries_) { + s << "ST_Simplify("; + } + + s << "\"" << geometryColumn_ << "\""; + + if (simplify_geometries_) { + // 1/20 of pixel seems to be a good compromise to avoid + // drop of collapsed polygons. + // See https://github.com/mapnik/mapnik/issues/1639 + const double tolerance = std::min(px_gw, px_gh) / 2.0; + s << ", " << tolerance << ")"; + } + + s << ") AS geom"; + + mapnik::context_ptr ctx = boost::make_shared(); + std::set const& props = q.property_names(); + std::set::const_iterator pos = props.begin(); + std::set::const_iterator end = props.end(); + + if (! key_field_.empty()) + { + mapnik::sql_utils::quote_attr(s, key_field_); + ctx->push(key_field_); + + for (; pos != end; ++pos) + { + if (*pos != key_field_) { mapnik::sql_utils::quote_attr(s, *pos); ctx->push(*pos); } } - - std::string table_with_bbox = populate_tokens(table_, scale_denom, box, px_gw, px_gh); - - s << " FROM " << table_with_bbox; - - if (row_limit_ > 0) - { - s << " LIMIT " << row_limit_; - } - - boost::shared_ptr rs = get_resultset(conn, s.str()); - return boost::make_shared(rs, ctx, desc_.get_encoding(), !key_field_.empty()); } else { - std::string err_msg = "Postgis Plugin:"; - if (conn) + for (; pos != end; ++pos) { - err_msg += conn->status(); + mapnik::sql_utils::quote_attr(s, *pos); + ctx->push(*pos); } - else - { - err_msg += " Null connection"; - } - throw mapnik::datasource_exception(err_msg); } + + std::string table_with_bbox = populate_tokens(table_, scale_denom, box, px_gw, px_gh); + + s << " FROM " << table_with_bbox; + + if (row_limit_ > 0) + { + s << " LIMIT " << row_limit_; + } + + boost::shared_ptr rs = get_resultset(conn, s.str(), pool, proc_ctx); + return boost::make_shared(rs, ctx, desc_.get_encoding(), !key_field_.empty()); + } return featureset_ptr(); } + featureset_ptr postgis_datasource::features_at_point(coord2d const& pt, double tol) const { #ifdef MAPNIK_STATS mapnik::progress_timer __stats__(std::clog, "postgis_datasource::features_at_point"); #endif - shared_ptr< Pool > pool = ConnectionManager::instance().getPool(creator_.id()); + CnxPool_ptr pool = ConnectionManager::instance().getPool(creator_.id()); if (pool) { shared_ptr conn = pool->borrowObject(); @@ -775,7 +863,7 @@ featureset_ptr postgis_datasource::features_at_point(coord2d const& pt, double t s << " LIMIT " << row_limit_; } - boost::shared_ptr rs = get_resultset(conn, s.str()); + boost::shared_ptr rs = get_resultset(conn, s.str(), pool); return boost::make_shared(rs, ctx, desc_.get_encoding(), !key_field_.empty()); } } @@ -790,7 +878,7 @@ box2d postgis_datasource::envelope() const return extent_; } - shared_ptr< Pool > pool = ConnectionManager::instance().getPool(creator_.id()); + CnxPool_ptr pool = ConnectionManager::instance().getPool(creator_.id()); if (pool) { shared_ptr conn = pool->borrowObject(); @@ -881,7 +969,7 @@ boost::optional postgis_datasource::get_geometry { boost::optional result; - shared_ptr< Pool > pool = ConnectionManager::instance().getPool(creator_.id()); + CnxPool_ptr pool = ConnectionManager::instance().getPool(creator_.id()); if (pool) { shared_ptr conn = pool->borrowObject(); diff --git a/plugins/input/postgis/postgis_datasource.hpp b/plugins/input/postgis/postgis_datasource.hpp index d0f5a2beb..c55af9812 100644 --- a/plugins/input/postgis/postgis_datasource.hpp +++ b/plugins/input/postgis/postgis_datasource.hpp @@ -49,6 +49,8 @@ using mapnik::transcoder; using mapnik::datasource; +using mapnik::feature_style_context_map; +using mapnik::processor_context_ptr; using mapnik::box2d; using mapnik::layer_descriptor; using mapnik::featureset_ptr; @@ -57,6 +59,8 @@ using mapnik::query; using mapnik::parameters; using mapnik::coord2d; +typedef boost::shared_ptr< ConnectionManager::PoolType> CnxPool_ptr; + class postgis_datasource : public datasource { public: @@ -64,7 +68,9 @@ public: ~postgis_datasource(); mapnik::datasource::datasource_t type() const; static const char * name(); - featureset_ptr features(const query& q) const; + processor_context_ptr get_context(feature_style_context_map &) const; + featureset_ptr features_with_context(query const& q, processor_context_ptr ctx= processor_context_ptr()) const; + featureset_ptr features(query const& q) const; featureset_ptr features_at_point(coord2d const& pt, double tol = 0) const; mapnik::box2d envelope() const; boost::optional get_geometry_type() const; @@ -74,8 +80,7 @@ private: std::string sql_bbox(box2d const& env) const; std::string populate_tokens(std::string const& sql, double scale_denom, box2d const& env, double pixel_width, double pixel_height) const; std::string populate_tokens(std::string const& sql) const; - boost::shared_ptr get_resultset(boost::shared_ptr const &conn, std::string const& sql) const; - + boost::shared_ptr get_resultset(boost::shared_ptr &conn, std::string const& sql, CnxPool_ptr const& pool, processor_context_ptr ctx= processor_context_ptr()) const; static const std::string GEOMETRY_COLUMNS; static const std::string SPATIAL_REF_SYS; static const double FMAX; @@ -102,10 +107,12 @@ private: const std::string scale_denom_token_; const std::string pixel_width_token_; const std::string pixel_height_token_; + int pool_max_size_; bool persist_connection_; bool extent_from_subquery_; bool estimate_extent_; - // params below are for testing purposes only (will likely be removed at any time) + int max_async_connections_; + bool asynchronous_request_; int intersect_min_scale_; int intersect_max_scale_; }; diff --git a/plugins/input/postgis/resultset.hpp b/plugins/input/postgis/resultset.hpp index c8734f538..600fc96dd 100644 --- a/plugins/input/postgis/resultset.hpp +++ b/plugins/input/postgis/resultset.hpp @@ -45,49 +45,16 @@ public: virtual const char* getValue(const char* name) const = 0; }; -class ResultSet : public IResultSet +class ResultSet : public IResultSet, private mapnik::noncopyable { public: ResultSet(PGresult *res) : res_(res), - pos_(-1), - refCount_(new int(1)) + pos_(-1) { numTuples_ = PQntuples(res_); } - ResultSet(const ResultSet& rhs) - : res_(rhs.res_), - pos_(rhs.pos_), - numTuples_(rhs.numTuples_), - refCount_(rhs.refCount_) - { - (*refCount_)++; - } - - ResultSet& operator=(const ResultSet& rhs) - { - if (this == &rhs) - { - return *this; - } - - if (--(refCount_) == 0) - { - close(); - - delete refCount_; - refCount_ = 0; - } - - res_ = rhs.res_; - pos_ = rhs.pos_; - numTuples_ = rhs.numTuples_; - refCount_ = rhs.refCount_; - (*refCount_)++; - return *this; - } - virtual void close() { PQclear(res_); @@ -96,13 +63,7 @@ public: virtual ~ResultSet() { - if (--(*refCount_) == 0) - { - PQclear(res_); - - delete refCount_; - refCount_ = 0; - } + PQclear(res_); } virtual int getNumFields() const @@ -184,7 +145,6 @@ private: PGresult* res_; int pos_; int numTuples_; - int *refCount_; }; #endif // POSTGIS_RESULTSET_HPP diff --git a/src/image_compositing.cpp b/src/image_compositing.cpp index e6c7b147a..c9fcfdaf5 100644 --- a/src/image_compositing.cpp +++ b/src/image_compositing.cpp @@ -75,7 +75,6 @@ static const comp_op_lookup_type comp_lookup = boost::assign::list_of comp_op_from_string(std::string const& name) diff --git a/tests/data/good_maps/colorize-alpha.xml b/tests/data/good_maps/colorize-alpha.xml index 8fd39e7cd..373d1d939 100644 --- a/tests/data/good_maps/colorize-alpha.xml +++ b/tests/data/good_maps/colorize-alpha.xml @@ -8,10 +8,7 @@ - -