mirror of
https://github.com/mapnik/mapnik.git
synced 2026-02-01 17:36:36 +00:00
initial cleanup + use atomic_bool to notify consumer on net io errors [WIP] [skip ci]
This commit is contained in:
parent
2c488c0d21
commit
4f2d5f8a6f
@ -53,8 +53,9 @@ struct tile_data
|
||||
std::size_t y;
|
||||
std::string data;
|
||||
|
||||
tile_data() = default;
|
||||
tile_data(tile_data const&) = default;
|
||||
tile_data() = default; // default-ctor
|
||||
tile_data(tile_data const&) = default; // copy-ctor
|
||||
tile_data& operator=(tile_data const&) = default; // copy-assignable
|
||||
tile_data(std::size_t zoom_, std::size_t x_, std::size_t y_, std::string && data_)
|
||||
: zoom(zoom_), x(x_), y(y_), data(std::move(data_))
|
||||
{}
|
||||
@ -90,10 +91,10 @@ public:
|
||||
});
|
||||
}
|
||||
|
||||
void push(tile_data && data)
|
||||
{
|
||||
queue_.push(std::move(data));
|
||||
}
|
||||
// void push(tile_data && data)
|
||||
// {
|
||||
// queue_.push(std::move(data));
|
||||
// }
|
||||
|
||||
std::optional<zxy> get_zxy()
|
||||
{
|
||||
@ -257,15 +258,17 @@ class worker : public std::enable_shared_from_this<worker>
|
||||
http::request<http::empty_body> req_;
|
||||
http::response<http::string_body> res_;
|
||||
zxy tile_;
|
||||
std::atomic<bool> & done_;
|
||||
public:
|
||||
worker(worker&&) = default;
|
||||
|
||||
explicit worker(boost::asio::io_context& ioc, std::string const& url_template, tiles_stash & stash)
|
||||
explicit worker(boost::asio::io_context& ioc, std::string const& url_template, tiles_stash & stash, std::atomic<bool> & done)
|
||||
: stash_(stash),
|
||||
url_template_(url_template),
|
||||
ex_(boost::asio::make_strand(ioc.get_executor())),
|
||||
resolver_(boost::asio::make_strand(ioc)),
|
||||
stream_(boost::asio::make_strand(ioc))
|
||||
stream_(boost::asio::make_strand(ioc)),
|
||||
done_(done)
|
||||
{
|
||||
req_.version(11); // HTTP 1.1
|
||||
req_.method(http::verb::get);
|
||||
@ -286,7 +289,11 @@ public:
|
||||
|
||||
void on_resolve(beast::error_code ec, tcp::resolver::results_type results)
|
||||
{
|
||||
if (ec) return fail(ec, "resolve");
|
||||
if (ec)
|
||||
{
|
||||
done_.store(true);
|
||||
return fail(ec, "resolve");
|
||||
}
|
||||
// Set a timeout on the operation
|
||||
stream_.expires_after(std::chrono::seconds(10));
|
||||
// Make the connection on the IP address we get from a lookup
|
||||
@ -299,11 +306,16 @@ public:
|
||||
|
||||
void on_connect(beast::error_code ec, tcp::resolver::results_type::endpoint_type)
|
||||
{
|
||||
if (ec) return fail(ec, "connect");
|
||||
if (ec)
|
||||
{
|
||||
done_.store(true);
|
||||
return fail(ec, "connect");
|
||||
}
|
||||
auto zxy = stash_.get_zxy();
|
||||
if (!zxy)
|
||||
{
|
||||
// Work is done, gracefully close the socket
|
||||
//std::cerr << "\e[1;41m stream_.socket().shutdown \e[0m" << std::endl;
|
||||
stream_.socket().shutdown(tcp::socket::shutdown_both, ec);
|
||||
// not_connected happens sometimes so don't bother reporting it.
|
||||
if (ec && ec != beast::errc::not_connected)
|
||||
@ -319,7 +331,6 @@ public:
|
||||
{"x", std::get<1>(tile_)},
|
||||
{"y", std::get<2>(tile_)}});
|
||||
|
||||
std::cerr << "#1 URL: " << url << std::endl;
|
||||
std::string target = url.path();
|
||||
if (!url.query().empty()) target += "?" + url.query();
|
||||
req_.target(target);
|
||||
@ -335,7 +346,11 @@ public:
|
||||
{
|
||||
boost::ignore_unused(bytes_transferred);
|
||||
|
||||
if(ec) return fail(ec, "write");
|
||||
if(ec)
|
||||
{
|
||||
done_.store(true);
|
||||
return fail(ec, "write");
|
||||
}
|
||||
res_ = {};
|
||||
stream_.expires_after(std::chrono::seconds(10));
|
||||
http::async_read(stream_, buffer_, res_,
|
||||
@ -347,8 +362,12 @@ public:
|
||||
void on_read(beast::error_code ec, std::size_t bytes_transferred)
|
||||
{
|
||||
boost::ignore_unused(bytes_transferred);
|
||||
if(ec) return fail(ec, "read");
|
||||
stash_.push_async(tile_data(std::get<0>(tile_), std::get<1>(tile_), std::get<2>(tile_), std::move(res_.body())));
|
||||
if(ec)
|
||||
{
|
||||
done_.store(true);
|
||||
return fail(ec, "read");
|
||||
}
|
||||
stash_.push_async(tile_data{std::get<0>(tile_), std::get<1>(tile_), std::get<2>(tile_), std::move(res_.body())});
|
||||
auto zxy = stash_.get_zxy();
|
||||
if (!zxy)
|
||||
{
|
||||
@ -365,7 +384,6 @@ public:
|
||||
{"x", std::get<1>(tile_)},
|
||||
{"y", std::get<2>(tile_)}});
|
||||
|
||||
std::cerr << "#2 URL: " << url << std::endl;
|
||||
std::string target = url.path();
|
||||
if (!url.query().empty()) target += "?" + url.query();
|
||||
req_.target(target);
|
||||
@ -388,15 +406,17 @@ class worker_ssl : public std::enable_shared_from_this<worker_ssl>
|
||||
http::request<http::empty_body> req_;
|
||||
http::response<http::string_body> res_;
|
||||
zxy tile_;
|
||||
std::atomic<bool> & done_;
|
||||
public:
|
||||
worker_ssl(worker_ssl&&) = default;
|
||||
|
||||
explicit worker_ssl(boost::asio::io_context& ioc, boost::asio::ssl::context& ctx, std::string const& url_template, tiles_stash & stash)
|
||||
explicit worker_ssl(boost::asio::io_context& ioc, boost::asio::ssl::context& ctx, std::string const& url_template, tiles_stash & stash, std::atomic<bool> & done)
|
||||
: stash_(stash),
|
||||
url_template_(url_template),
|
||||
ex_(boost::asio::make_strand(ioc.get_executor())),
|
||||
resolver_(boost::asio::make_strand(ioc)),
|
||||
stream_(boost::asio::make_strand(ioc), ctx)
|
||||
stream_(boost::asio::make_strand(ioc), ctx),
|
||||
done_(done)
|
||||
{
|
||||
req_.version(11); // HTTP 1.1
|
||||
req_.method(http::verb::get);
|
||||
@ -409,7 +429,7 @@ public:
|
||||
req_.set(http::field::host, host);
|
||||
|
||||
// SSL
|
||||
beast::error_code ec{};
|
||||
beast::error_code ec {};
|
||||
if (!SSL_set_tlsext_host_name(stream_.native_handle(), host.c_str()))
|
||||
{
|
||||
ec.assign(static_cast<int>(::ERR_get_error()), boost::asio::error::get_ssl_category());
|
||||
@ -427,7 +447,11 @@ public:
|
||||
|
||||
void on_resolve(beast::error_code ec, tcp::resolver::results_type results)
|
||||
{
|
||||
if (ec) return fail(ec, "resolve");
|
||||
if (ec)
|
||||
{
|
||||
done_.store(true);
|
||||
return fail(ec, "resolve");
|
||||
}
|
||||
// Set a timeout on the operation
|
||||
beast::get_lowest_layer(stream_).expires_after(std::chrono::seconds(10));
|
||||
// Make the connection on the IP address we get from a lookup
|
||||
@ -439,7 +463,11 @@ public:
|
||||
}
|
||||
void on_connect(beast::error_code ec, tcp::resolver::results_type::endpoint_type)
|
||||
{
|
||||
if (ec) return fail(ec, "connect");
|
||||
if (ec)
|
||||
{
|
||||
done_.store(true);
|
||||
return fail(ec, "connect");
|
||||
}
|
||||
// SSL handshake
|
||||
stream_.async_handshake(
|
||||
boost::asio::ssl::stream_base::client,
|
||||
@ -450,7 +478,11 @@ public:
|
||||
|
||||
void on_handshake(beast::error_code ec)
|
||||
{
|
||||
if (ec) return fail(ec, "connect");
|
||||
if (ec)
|
||||
{
|
||||
done_.store(true);
|
||||
return fail(ec, "connect");
|
||||
}
|
||||
|
||||
auto zxy = stash_.get_zxy();
|
||||
if (!zxy)
|
||||
@ -467,7 +499,7 @@ public:
|
||||
{{"z", std::get<0>(tile_)},
|
||||
{"x", std::get<1>(tile_)},
|
||||
{"y", std::get<2>(tile_)}});
|
||||
std::cerr << "#1 SSL URL: " << url << std::endl;
|
||||
|
||||
std::string target = url.path();
|
||||
if (!url.query().empty()) target += "?" + url.query();
|
||||
req_.target(target);
|
||||
@ -481,7 +513,11 @@ public:
|
||||
void on_write(beast::error_code ec, std::size_t bytes_transferred)
|
||||
{
|
||||
boost::ignore_unused(bytes_transferred);
|
||||
if(ec) return fail(ec, "write");
|
||||
if(ec)
|
||||
{
|
||||
done_.store(true);
|
||||
return fail(ec, "write");
|
||||
}
|
||||
// Receive the HTTP response
|
||||
res_ = {};
|
||||
http::async_read(stream_, buffer_, res_,
|
||||
@ -494,18 +530,19 @@ public:
|
||||
{
|
||||
boost::ignore_unused(bytes_transferred);
|
||||
|
||||
if(ec) return fail(ec, "read");
|
||||
stash_.push(tile_data(std::get<0>(tile_), std::get<1>(tile_), std::get<2>(tile_), std::move(res_.body())));
|
||||
if(ec)
|
||||
{
|
||||
done_.store(true);
|
||||
return fail(ec, "read");
|
||||
}
|
||||
stash_.push_async(tile_data(std::get<0>(tile_), std::get<1>(tile_), std::get<2>(tile_), std::move(res_.body())));
|
||||
auto zxy = stash_.get_zxy();
|
||||
if (!zxy)
|
||||
{
|
||||
// Work is done, gracefully close the socket
|
||||
beast::get_lowest_layer(stream_).socket().shutdown(tcp::socket::shutdown_both, ec);
|
||||
// not_connected happens sometimes so don't bother reporting it.
|
||||
if (ec && ec != beast::errc::not_connected)
|
||||
{
|
||||
return fail(ec, "shutdown");
|
||||
// If we get here then the connection is closed gracefully
|
||||
}
|
||||
return;
|
||||
}
|
||||
@ -514,11 +551,11 @@ public:
|
||||
{{"z", std::get<0>(tile_)},
|
||||
{"x", std::get<1>(tile_)},
|
||||
{"y", std::get<2>(tile_)}});
|
||||
std::cerr << "#2 SSL URL: " << url << std::endl;
|
||||
std::string target = url.path();
|
||||
if (!url.query().empty()) target += "?" + url.query();
|
||||
req_.target(target);
|
||||
std::cerr << "\e[45m target:" << target << " thread:" << std::this_thread::get_id() <<"\e[0m" << std::endl;
|
||||
|
||||
beast::get_lowest_layer(stream_).expires_after(std::chrono::seconds(10));
|
||||
|
||||
http::async_write(stream_, req_,
|
||||
@ -528,5 +565,4 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
#endif // XYZ_FEATURESET_HPP
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user