diff --git a/plugins/input/tiles/xyz_tiles.hpp b/plugins/input/tiles/xyz_tiles.hpp index 5de7a2916..bc46ceda9 100644 --- a/plugins/input/tiles/xyz_tiles.hpp +++ b/plugins/input/tiles/xyz_tiles.hpp @@ -53,8 +53,9 @@ struct tile_data std::size_t y; std::string data; - tile_data() = default; - tile_data(tile_data const&) = default; + tile_data() = default; // default-ctor + tile_data(tile_data const&) = default; // copy-ctor + tile_data& operator=(tile_data const&) = default; // copy-assignable tile_data(std::size_t zoom_, std::size_t x_, std::size_t y_, std::string && data_) : zoom(zoom_), x(x_), y(y_), data(std::move(data_)) {} @@ -90,10 +91,10 @@ public: }); } - void push(tile_data && data) - { - queue_.push(std::move(data)); - } + // void push(tile_data && data) + // { + // queue_.push(std::move(data)); + // } std::optional get_zxy() { @@ -257,15 +258,17 @@ class worker : public std::enable_shared_from_this http::request req_; http::response res_; zxy tile_; + std::atomic & done_; public: worker(worker&&) = default; - explicit worker(boost::asio::io_context& ioc, std::string const& url_template, tiles_stash & stash) + explicit worker(boost::asio::io_context& ioc, std::string const& url_template, tiles_stash & stash, std::atomic & done) : stash_(stash), url_template_(url_template), ex_(boost::asio::make_strand(ioc.get_executor())), resolver_(boost::asio::make_strand(ioc)), - stream_(boost::asio::make_strand(ioc)) + stream_(boost::asio::make_strand(ioc)), + done_(done) { req_.version(11); // HTTP 1.1 req_.method(http::verb::get); @@ -286,7 +289,11 @@ public: void on_resolve(beast::error_code ec, tcp::resolver::results_type results) { - if (ec) return fail(ec, "resolve"); + if (ec) + { + done_.store(true); + return fail(ec, "resolve"); + } // Set a timeout on the operation stream_.expires_after(std::chrono::seconds(10)); // Make the connection on the IP address we get from a lookup @@ -299,11 +306,16 @@ public: void on_connect(beast::error_code ec, tcp::resolver::results_type::endpoint_type) { - if (ec) return fail(ec, "connect"); + if (ec) + { + done_.store(true); + return fail(ec, "connect"); + } auto zxy = stash_.get_zxy(); if (!zxy) { // Work is done, gracefully close the socket + //std::cerr << "\e[1;41m stream_.socket().shutdown \e[0m" << std::endl; stream_.socket().shutdown(tcp::socket::shutdown_both, ec); // not_connected happens sometimes so don't bother reporting it. if (ec && ec != beast::errc::not_connected) @@ -319,7 +331,6 @@ public: {"x", std::get<1>(tile_)}, {"y", std::get<2>(tile_)}}); - std::cerr << "#1 URL: " << url << std::endl; std::string target = url.path(); if (!url.query().empty()) target += "?" + url.query(); req_.target(target); @@ -335,7 +346,11 @@ public: { boost::ignore_unused(bytes_transferred); - if(ec) return fail(ec, "write"); + if(ec) + { + done_.store(true); + return fail(ec, "write"); + } res_ = {}; stream_.expires_after(std::chrono::seconds(10)); http::async_read(stream_, buffer_, res_, @@ -347,8 +362,12 @@ public: void on_read(beast::error_code ec, std::size_t bytes_transferred) { boost::ignore_unused(bytes_transferred); - if(ec) return fail(ec, "read"); - stash_.push_async(tile_data(std::get<0>(tile_), std::get<1>(tile_), std::get<2>(tile_), std::move(res_.body()))); + if(ec) + { + done_.store(true); + return fail(ec, "read"); + } + stash_.push_async(tile_data{std::get<0>(tile_), std::get<1>(tile_), std::get<2>(tile_), std::move(res_.body())}); auto zxy = stash_.get_zxy(); if (!zxy) { @@ -365,7 +384,6 @@ public: {"x", std::get<1>(tile_)}, {"y", std::get<2>(tile_)}}); - std::cerr << "#2 URL: " << url << std::endl; std::string target = url.path(); if (!url.query().empty()) target += "?" + url.query(); req_.target(target); @@ -388,15 +406,17 @@ class worker_ssl : public std::enable_shared_from_this http::request req_; http::response res_; zxy tile_; + std::atomic & done_; public: worker_ssl(worker_ssl&&) = default; - explicit worker_ssl(boost::asio::io_context& ioc, boost::asio::ssl::context& ctx, std::string const& url_template, tiles_stash & stash) + explicit worker_ssl(boost::asio::io_context& ioc, boost::asio::ssl::context& ctx, std::string const& url_template, tiles_stash & stash, std::atomic & done) : stash_(stash), url_template_(url_template), ex_(boost::asio::make_strand(ioc.get_executor())), resolver_(boost::asio::make_strand(ioc)), - stream_(boost::asio::make_strand(ioc), ctx) + stream_(boost::asio::make_strand(ioc), ctx), + done_(done) { req_.version(11); // HTTP 1.1 req_.method(http::verb::get); @@ -409,7 +429,7 @@ public: req_.set(http::field::host, host); // SSL - beast::error_code ec{}; + beast::error_code ec {}; if (!SSL_set_tlsext_host_name(stream_.native_handle(), host.c_str())) { ec.assign(static_cast(::ERR_get_error()), boost::asio::error::get_ssl_category()); @@ -427,7 +447,11 @@ public: void on_resolve(beast::error_code ec, tcp::resolver::results_type results) { - if (ec) return fail(ec, "resolve"); + if (ec) + { + done_.store(true); + return fail(ec, "resolve"); + } // Set a timeout on the operation beast::get_lowest_layer(stream_).expires_after(std::chrono::seconds(10)); // Make the connection on the IP address we get from a lookup @@ -439,7 +463,11 @@ public: } void on_connect(beast::error_code ec, tcp::resolver::results_type::endpoint_type) { - if (ec) return fail(ec, "connect"); + if (ec) + { + done_.store(true); + return fail(ec, "connect"); + } // SSL handshake stream_.async_handshake( boost::asio::ssl::stream_base::client, @@ -450,7 +478,11 @@ public: void on_handshake(beast::error_code ec) { - if (ec) return fail(ec, "connect"); + if (ec) + { + done_.store(true); + return fail(ec, "connect"); + } auto zxy = stash_.get_zxy(); if (!zxy) @@ -467,7 +499,7 @@ public: {{"z", std::get<0>(tile_)}, {"x", std::get<1>(tile_)}, {"y", std::get<2>(tile_)}}); - std::cerr << "#1 SSL URL: " << url << std::endl; + std::string target = url.path(); if (!url.query().empty()) target += "?" + url.query(); req_.target(target); @@ -481,7 +513,11 @@ public: void on_write(beast::error_code ec, std::size_t bytes_transferred) { boost::ignore_unused(bytes_transferred); - if(ec) return fail(ec, "write"); + if(ec) + { + done_.store(true); + return fail(ec, "write"); + } // Receive the HTTP response res_ = {}; http::async_read(stream_, buffer_, res_, @@ -494,18 +530,19 @@ public: { boost::ignore_unused(bytes_transferred); - if(ec) return fail(ec, "read"); - stash_.push(tile_data(std::get<0>(tile_), std::get<1>(tile_), std::get<2>(tile_), std::move(res_.body()))); + if(ec) + { + done_.store(true); + return fail(ec, "read"); + } + stash_.push_async(tile_data(std::get<0>(tile_), std::get<1>(tile_), std::get<2>(tile_), std::move(res_.body()))); auto zxy = stash_.get_zxy(); if (!zxy) { - // Work is done, gracefully close the socket beast::get_lowest_layer(stream_).socket().shutdown(tcp::socket::shutdown_both, ec); - // not_connected happens sometimes so don't bother reporting it. if (ec && ec != beast::errc::not_connected) { return fail(ec, "shutdown"); - // If we get here then the connection is closed gracefully } return; } @@ -514,11 +551,11 @@ public: {{"z", std::get<0>(tile_)}, {"x", std::get<1>(tile_)}, {"y", std::get<2>(tile_)}}); - std::cerr << "#2 SSL URL: " << url << std::endl; std::string target = url.path(); if (!url.query().empty()) target += "?" + url.query(); req_.target(target); std::cerr << "\e[45m target:" << target << " thread:" << std::this_thread::get_id() <<"\e[0m" << std::endl; + beast::get_lowest_layer(stream_).expires_after(std::chrono::seconds(10)); http::async_write(stream_, req_, @@ -528,5 +565,4 @@ public: } }; - #endif // XYZ_FEATURESET_HPP