Introduce AsyncProcedureCall

This commit is contained in:
Maximilian Ammann 2022-09-10 11:11:12 +02:00
parent f012d23b7e
commit a66519ccca
18 changed files with 509 additions and 380 deletions

View File

@ -1,6 +1,8 @@
use instant::Instant;
use maplibre::environment::Environment;
use maplibre::environment::{DefaultTransferables, Environment};
use maplibre::io::apc::Transferable;
use maplibre::io::scheduler::Scheduler;
use maplibre::platform::apc::TokioAsyncProcedureCall;
use maplibre::{
error::Error,
io::source_client::HttpClient,
@ -71,8 +73,10 @@ pub struct WinitEnvironment<S: Scheduler, HC: HttpClient> {
impl<S: Scheduler, HC: HttpClient> Environment for WinitEnvironment<S, HC> {
type MapWindowConfig = WinitMapWindowConfig;
type AsyncProcedureCall = TokioAsyncProcedureCall<DefaultTransferables>;
type Scheduler = S;
type HttpClient = HC;
type Transferables = DefaultTransferables;
}
///Main (platform-specific) main loop which handles:

View File

@ -15,6 +15,7 @@ use crate::{
SignificantlyDifferent,
},
};
use serde::Serialize;
pub const EXTENT_UINT: u32 = 4096;
pub const EXTENT_SINT: i32 = EXTENT_UINT as i32;
@ -67,7 +68,7 @@ impl fmt::Debug for Quadkey {
}
}
#[derive(Ord, PartialOrd, Eq, PartialEq, Hash, Copy, Clone, Debug, Default)]
#[derive(Ord, PartialOrd, Eq, PartialEq, Hash, Copy, Clone, Debug, Default, Serialize)]
pub struct ZoomLevel(u8);
impl ZoomLevel {
@ -289,7 +290,7 @@ impl From<(u32, u32, ZoomLevel)> for TileCoords {
/// # Coordinate System Origin
///
/// The origin of the coordinate system is in the upper-left corner.
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Default, Serialize)]
pub struct WorldTileCoords {
pub x: i32,
pub y: i32,

View File

@ -1,7 +1,25 @@
use crate::io::apc::AsyncProcedureCall;
use crate::io::transferables::Transferables;
use crate::io::transferables::{
DefaultTessellatedLayer, DefaultTileTessellated, DefaultUnavailableLayer,
};
use crate::{HttpClient, MapWindowConfig, Scheduler};
pub trait Environment: 'static {
type MapWindowConfig: MapWindowConfig;
type AsyncProcedureCall: AsyncProcedureCall<Self::Transferables>;
type Scheduler: Scheduler;
type HttpClient: HttpClient;
type Transferables: Transferables;
}
#[derive(Copy, Clone)]
pub struct DefaultTransferables;
impl Transferables for DefaultTransferables {
type TileTessellated = DefaultTileTessellated;
type UnavailableLayer = DefaultUnavailableLayer;
type TessellatedLayer = DefaultTessellatedLayer;
}

View File

@ -12,6 +12,8 @@ use std::{
use tokio::{runtime::Handle, task};
use wgpu::{BufferAsyncError, BufferSlice};
use crate::environment::DefaultTransferables;
use crate::platform::apc::TokioAsyncProcedureCall;
use crate::{
context::{MapContext, ViewState},
coords::{LatLon, ViewRegion, WorldCoords, WorldTileCoords, Zoom, TILE_SIZE},
@ -22,7 +24,6 @@ use crate::{
source_client::HttpSourceClient,
tile_pipelines::build_vector_tile_pipeline,
tile_repository::{StoredLayer, TileRepository},
tile_request_state::TileRequestState,
TileRequest,
},
render::{
@ -68,8 +69,10 @@ pub struct HeadlessEnvironment<S: Scheduler, HC: HttpClient> {
impl<S: Scheduler, HC: HttpClient> Environment for HeadlessEnvironment<S, HC> {
type MapWindowConfig = HeadlessMapWindowConfig;
type AsyncProcedureCall = TokioAsyncProcedureCall<DefaultTransferables>;
type Scheduler = S;
type HttpClient = HC;
type Transferables = DefaultTransferables;
}
pub struct HeadlessMap<E: Environment> {
@ -92,7 +95,6 @@ pub struct HeadlessMapSchedule<E: Environment> {
schedule: Schedule,
scheduler: E::Scheduler,
http_client: E::HttpClient,
tile_request_state: TileRequestState,
}
impl<E: Environment> HeadlessMapSchedule<E> {
@ -139,7 +141,6 @@ impl<E: Environment> HeadlessMapSchedule<E> {
schedule,
scheduler,
http_client,
tile_request_state: Default::default(),
}
}
@ -185,11 +186,7 @@ impl<E: Environment> HeadlessMapSchedule<E> {
layers: source_layers,
};
let request_id = self
.tile_request_state
.start_tile_request(request.clone())?;
pipeline.process((request, request_id, data), &mut pipeline_context);
self.tile_request_state.finish_tile_request(request_id);
pipeline.process((request, data), &mut pipeline_context);
let mut processor = pipeline_context
.take_processor::<HeadlessPipelineProcessor>()

38
maplibre/src/io/apc.rs Normal file
View File

@ -0,0 +1,38 @@
use crate::coords::WorldTileCoords;
use crate::io::transferables::Transferables;
use crate::Environment;
use serde::Serialize;
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
/// The result of the tessellation of a tile.
/// `TessellatedLayer` contains the result of the tessellation for a specific layer, otherwise
/// `UnavailableLayer` if the layer doesn't exist.
#[derive(Clone)]
pub enum Transferable<T: Transferables> {
TileTessellated(T::TileTessellated),
UnavailableLayer(T::UnavailableLayer),
TessellatedLayer(T::TessellatedLayer),
}
pub trait Context<T: Transferables> {
fn send(&self, data: Transferable<T>);
}
pub type AsyncProcedure<I, C> =
fn(input: I, context: C) -> Pin<Box<dyn Future<Output = ()> + Send>>;
pub trait AsyncProcedureCall<T: Transferables> {
type Context: Context<T> + Send;
fn new() -> Self;
fn receive(&self) -> Option<Transferable<T>>;
fn schedule<I: Send + Serialize + 'static>(
&self,
input: I,
procedure: AsyncProcedure<I, Self::Context>,
);
}

View File

@ -3,7 +3,9 @@
use std::{collections::HashSet, fmt};
use crate::coords::WorldTileCoords;
use serde::Serialize;
pub mod apc;
pub mod geometry_index;
pub mod pipeline;
pub mod scheduler;
@ -12,12 +14,12 @@ pub mod source_client;
pub mod static_tile_fetcher;
pub mod tile_pipelines;
pub mod tile_repository;
pub mod tile_request_state;
pub mod transferables;
pub use geozero::mvt::tile::Layer as RawLayer;
/// A request for a tile at the given coordinates and in the given layers.
#[derive(Clone)]
#[derive(Clone, Serialize)]
pub struct TileRequest {
pub coords: WorldTileCoords,
pub layers: HashSet<String>,
@ -28,6 +30,3 @@ impl fmt::Debug for TileRequest {
write!(f, "TileRequest({}, {:?})", &self.coords, &self.layers)
}
}
/// The ID format for a tile request.
pub type TileRequestID = u32;

View File

@ -5,14 +5,14 @@ use geozero::mvt::tile;
use crate::{
coords::WorldTileCoords,
io::{geometry_index::IndexedGeometry, TileRequestID},
io::geometry_index::IndexedGeometry,
render::ShaderVertex,
tessellation::{IndexDataType, OverAlignedVertexBuffer},
};
/// Processes events which happen during the pipeline execution
pub trait PipelineProcessor: Downcast {
fn tile_finished(&mut self, _request_id: TileRequestID, _coords: &WorldTileCoords) {}
fn tile_finished(&mut self, _coords: &WorldTileCoords) {}
fn layer_unavailable(&mut self, _coords: &WorldTileCoords, _layer_name: &str) {}
fn layer_tesselation_finished(
&mut self,

View File

@ -7,7 +7,7 @@ use crate::{
io::{
geometry_index::IndexProcessor,
pipeline::{DataPipeline, PipelineContext, PipelineEnd, Processable},
TileRequest, TileRequestID,
TileRequest,
},
tessellation::{zero_tessellator::ZeroTessellator, IndexDataType},
};
@ -16,17 +16,17 @@ use crate::{
pub struct ParseTile;
impl Processable for ParseTile {
type Input = (TileRequest, TileRequestID, Box<[u8]>);
type Output = (TileRequest, TileRequestID, geozero::mvt::Tile);
type Input = (TileRequest, Box<[u8]>);
type Output = (TileRequest, geozero::mvt::Tile);
// TODO (perf): Maybe force inline
fn process(
&self,
(tile_request, request_id, data): Self::Input,
(tile_request, data): Self::Input,
_context: &mut PipelineContext,
) -> Self::Output {
let tile = geozero::mvt::Tile::decode(data.as_ref()).expect("failed to load tile");
(tile_request, request_id, tile)
(tile_request, tile)
}
}
@ -34,13 +34,13 @@ impl Processable for ParseTile {
pub struct IndexLayer;
impl Processable for IndexLayer {
type Input = (TileRequest, TileRequestID, geozero::mvt::Tile);
type Output = (TileRequest, TileRequestID, geozero::mvt::Tile);
type Input = (TileRequest, geozero::mvt::Tile);
type Output = (TileRequest, geozero::mvt::Tile);
// TODO (perf): Maybe force inline
fn process(
&self,
(tile_request, request_id, tile): Self::Input,
(tile_request, tile): Self::Input,
context: &mut PipelineContext,
) -> Self::Output {
let index = IndexProcessor::new();
@ -48,7 +48,7 @@ impl Processable for IndexLayer {
context
.processor_mut()
.layer_indexing_finished(&tile_request.coords, index.get_geometries());
(tile_request, request_id, tile)
(tile_request, tile)
}
}
@ -56,13 +56,13 @@ impl Processable for IndexLayer {
pub struct TessellateLayer;
impl Processable for TessellateLayer {
type Input = (TileRequest, TileRequestID, geozero::mvt::Tile);
type Output = (TileRequest, TileRequestID, geozero::mvt::Tile);
type Input = (TileRequest, geozero::mvt::Tile);
type Output = (TileRequest, geozero::mvt::Tile);
// TODO (perf): Maybe force inline
fn process(
&self,
(tile_request, request_id, mut tile): Self::Input,
(tile_request, mut tile): Self::Input,
context: &mut PipelineContext,
) -> Self::Output {
let coords = &tile_request.coords;
@ -118,11 +118,9 @@ impl Processable for TessellateLayer {
tracing::info!("tile tessellated at {} finished", &tile_request.coords);
context
.processor_mut()
.tile_finished(request_id, &tile_request.coords);
context.processor_mut().tile_finished(&tile_request.coords);
(tile_request, request_id, tile)
(tile_request, tile)
}
}

View File

@ -41,15 +41,24 @@ impl StoredLayer {
}
}
#[derive(Eq, PartialEq)]
pub enum TileStatus {
Pending,
Failed,
Success,
}
/// Stores multiple [StoredLayers](StoredLayer).
pub struct StoredTile {
layers: Vec<StoredLayer>,
status: TileStatus,
}
impl StoredTile {
pub fn new(first_layer: StoredLayer) -> Self {
pub fn new() -> Self {
Self {
layers: vec![first_layer],
layers: vec![],
status: TileStatus::Pending,
}
}
}
@ -84,7 +93,7 @@ impl TileRepository {
{
match entry {
btree_map::Entry::Vacant(entry) => {
entry.insert(StoredTile::new(layer));
panic!("Can not add a tessellated layer if no request has been started before.")
}
btree_map::Entry::Occupied(mut entry) => {
entry.get_mut().layers.push(layer);
@ -105,6 +114,46 @@ impl TileRepository {
.map(|results| results.layers.iter())
}
/// Create a new tile.
pub fn create_tile(&mut self, coords: &WorldTileCoords) -> bool {
if let Some(entry) = coords.build_quad_key().map(|key| self.tree.entry(key)) {
match entry {
btree_map::Entry::Vacant(entry) => {
entry.insert(StoredTile::new());
}
_ => {}
}
}
true
}
/// Checks if a layer has been fetched.
pub fn needs_fetching(&self, coords: &WorldTileCoords) -> bool {
if let Some(_) = coords.build_quad_key().and_then(|key| self.tree.get(&key)) {
return false;
}
true
}
pub fn success(&mut self, coords: &WorldTileCoords) {
if let Some(cached_tile) = coords
.build_quad_key()
.and_then(|key| self.tree.get_mut(&key))
{
cached_tile.status = TileStatus::Success;
}
}
/// Checks if a layer has been fetched.
pub fn fail(&mut self, coords: &WorldTileCoords) {
if let Some(cached_tile) = coords
.build_quad_key()
.and_then(|key| self.tree.get_mut(&key))
{
cached_tile.status = TileStatus::Failed;
}
}
/// Removes all the cached tessellate layers that are not contained within the given
/// layers hashset.
pub fn retain_missing_layer_names(

View File

@ -1,53 +0,0 @@
//! Tile request state.
use std::collections::{HashMap, HashSet};
use crate::{
coords::WorldTileCoords,
io::{TileRequest, TileRequestID},
};
/// Stores a map of pending requests, coords and the current tile being requested.
#[derive(Default)]
pub struct TileRequestState {
current_id: TileRequestID,
pending_tile_requests: HashMap<TileRequestID, TileRequest>,
pending_coords: HashSet<WorldTileCoords>,
}
impl TileRequestState {
pub fn new() -> Self {
Self {
current_id: 1,
pending_tile_requests: Default::default(),
pending_coords: Default::default(),
}
}
pub fn is_tile_request_pending(&self, coords: &WorldTileCoords) -> bool {
self.pending_coords.contains(coords)
}
pub fn start_tile_request(&mut self, tile_request: TileRequest) -> Option<TileRequestID> {
if self.is_tile_request_pending(&tile_request.coords) {
return None;
}
self.pending_coords.insert(tile_request.coords);
let id = self.current_id;
self.pending_tile_requests.insert(id, tile_request);
self.current_id += 1;
Some(id)
}
pub fn finish_tile_request(&mut self, id: TileRequestID) -> Option<TileRequest> {
self.pending_tile_requests.remove(&id).map(|request| {
self.pending_coords.remove(&request.coords);
request
})
}
pub fn get_tile_request(&self, id: TileRequestID) -> Option<&TileRequest> {
self.pending_tile_requests.get(&id)
}
}

View File

@ -0,0 +1,99 @@
use crate::coords::WorldTileCoords;
use crate::io::tile_repository::StoredLayer;
use crate::render::ShaderVertex;
use crate::tessellation::{IndexDataType, OverAlignedVertexBuffer};
use geozero::mvt::tile;
use geozero::mvt::tile::Layer;
pub trait TileTessellated: Send {
fn new(coords: WorldTileCoords) -> Self;
fn coords(&self) -> &WorldTileCoords;
}
pub trait UnavailableLayer: Send {
fn new(coords: WorldTileCoords, layer_name: String) -> Self;
fn to_stored_layer(self) -> StoredLayer;
}
pub trait TessellatedLayer: Send {
fn new(
coords: WorldTileCoords,
buffer: OverAlignedVertexBuffer<ShaderVertex, IndexDataType>,
feature_indices: Vec<u32>,
layer_data: tile::Layer,
) -> Self;
fn to_stored_layer(self) -> StoredLayer;
}
pub struct DefaultTileTessellated {
pub coords: WorldTileCoords,
}
impl TileTessellated for DefaultTileTessellated {
fn new(coords: WorldTileCoords) -> Self {
Self { coords }
}
fn coords(&self) -> &WorldTileCoords {
&self.coords
}
}
pub struct DefaultUnavailableLayer {
pub coords: WorldTileCoords,
pub layer_name: String,
}
impl UnavailableLayer for DefaultUnavailableLayer {
fn new(coords: WorldTileCoords, layer_name: String) -> Self {
Self { coords, layer_name }
}
fn to_stored_layer(self) -> StoredLayer {
StoredLayer::UnavailableLayer {
coords: self.coords,
layer_name: self.layer_name,
}
}
}
pub struct DefaultTessellatedLayer {
pub coords: WorldTileCoords,
pub buffer: OverAlignedVertexBuffer<ShaderVertex, IndexDataType>,
/// Holds for each feature the count of indices.
pub feature_indices: Vec<u32>,
pub layer_data: tile::Layer,
}
impl TessellatedLayer for DefaultTessellatedLayer {
fn new(
coords: WorldTileCoords,
buffer: OverAlignedVertexBuffer<ShaderVertex, IndexDataType>,
feature_indices: Vec<u32>,
layer_data: Layer,
) -> Self {
Self {
coords,
buffer,
feature_indices,
layer_data,
}
}
fn to_stored_layer(self) -> StoredLayer {
StoredLayer::TessellatedLayer {
coords: self.coords,
buffer: self.buffer,
feature_indices: self.feature_indices,
layer_data: self.layer_data,
}
}
}
pub trait Transferables: 'static {
type TileTessellated: TileTessellated;
type UnavailableLayer: UnavailableLayer;
type TessellatedLayer: TessellatedLayer;
}

View File

@ -41,6 +41,12 @@ pub mod scheduler {
pub use super::noweb::scheduler::*;
}
/// APC for non-web targets.
pub mod apc {
#[cfg(not(target_arch = "wasm32"))]
pub use super::noweb::apc::*;
}
#[cfg(not(target_arch = "wasm32"))]
pub use noweb::run_multithreaded;

View File

@ -0,0 +1,65 @@
use crate::environment::DefaultTransferables;
use crate::io::apc::{AsyncProcedure, AsyncProcedureCall, Context, Transferable};
use crate::io::transferables::Transferables;
use crate::Environment;
use serde::Serialize;
use std::collections::HashMap;
use std::sync::mpsc;
use std::sync::mpsc::{Receiver, Sender};
#[derive(Clone)]
pub struct TokioContext<T: Transferables> {
sender: Sender<Transferable<T>>,
}
impl<T: Transferables> Context<T> for TokioContext<T>
where
T: Clone,
{
fn send(&self, data: Transferable<T>) {
self.sender.send(data).unwrap();
log::debug!("sent");
}
}
pub struct TokioAsyncProcedureCall<T: Transferables> {
channel: (Sender<Transferable<T>>, Receiver<Transferable<T>>),
}
impl<T: Transferables> TokioAsyncProcedureCall<T> {
pub fn new() -> Self {
Self {
channel: mpsc::channel(),
}
}
}
impl<T: Transferables> AsyncProcedureCall<T> for TokioAsyncProcedureCall<T>
where
T: Clone,
{
type Context = TokioContext<T>;
fn new() -> Self {
Self {
channel: mpsc::channel(),
}
}
fn receive(&self) -> Option<Transferable<T>> {
let transferred = self.channel.1.try_recv().ok()?;
log::debug!("received");
Some(transferred)
}
fn schedule<I: Serialize + Send + 'static>(
&self,
input: I,
procedure: AsyncProcedure<I, TokioContext<T>>,
) {
let sender = self.channel.0.clone();
tokio::task::spawn(async move {
(procedure)(input, TokioContext { sender }).await;
});
}
}

View File

@ -2,6 +2,7 @@
use std::future::Future;
pub mod apc;
pub mod http_client;
pub mod scheduler;

View File

@ -1,75 +0,0 @@
use std::{fmt, sync::mpsc};
use geozero::mvt::tile;
use crate::{
coords::WorldTileCoords,
io::{tile_repository::StoredLayer, TileRequestID},
render::ShaderVertex,
tessellation::{IndexDataType, OverAlignedVertexBuffer},
};
pub type MessageSender = mpsc::Sender<TessellateMessage>;
pub type MessageReceiver = mpsc::Receiver<TessellateMessage>;
/// [crate::io::TileTessellateMessage] or [crate::io::LayerTessellateMessage] tessellation message.
pub enum TessellateMessage {
Tile(TileTessellateMessage),
Layer(LayerTessellateMessage),
}
/// The result of the tessellation of a tile.
pub struct TileTessellateMessage {
pub request_id: TileRequestID,
pub coords: WorldTileCoords,
}
/// `TessellatedLayer` contains the result of the tessellation for a specific layer, otherwise
/// `UnavailableLayer` if the layer doesn't exist.
pub enum LayerTessellateMessage {
UnavailableLayer {
coords: WorldTileCoords,
layer_name: String,
},
TessellatedLayer {
coords: WorldTileCoords,
buffer: OverAlignedVertexBuffer<ShaderVertex, IndexDataType>,
/// Holds for each feature the count of indices.
feature_indices: Vec<u32>,
layer_data: tile::Layer,
},
}
impl Into<StoredLayer> for LayerTessellateMessage {
fn into(self) -> StoredLayer {
match self {
LayerTessellateMessage::UnavailableLayer { coords, layer_name } => {
StoredLayer::UnavailableLayer { coords, layer_name }
}
LayerTessellateMessage::TessellatedLayer {
coords,
buffer,
feature_indices,
layer_data,
} => StoredLayer::TessellatedLayer {
coords,
buffer,
feature_indices,
layer_data,
},
}
}
}
impl fmt::Debug for LayerTessellateMessage {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"LayerTessellateMessage{}",
match self {
LayerTessellateMessage::UnavailableLayer { coords, .. } => coords,
LayerTessellateMessage::TessellatedLayer { coords, .. } => coords,
}
)
}
}

View File

@ -1,10 +1,19 @@
//! [Stages](Stage) for requesting and preparing data
use std::rc::Rc;
use std::sync::{mpsc, Arc, Mutex};
use geozero::{mvt::tile, GeozeroDatasource};
use request_stage::RequestStage;
use crate::environment::DefaultTransferables;
use crate::io::apc::{AsyncProcedureCall, Context, Transferable};
use crate::io::transferables::Transferables;
use crate::io::transferables::{
DefaultTessellatedLayer, DefaultTileTessellated, DefaultUnavailableLayer, TessellatedLayer,
TileTessellated, UnavailableLayer,
};
use crate::platform::apc::{TokioAsyncProcedureCall, TokioContext};
use crate::{
coords::{WorldCoords, WorldTileCoords, Zoom, ZoomLevel},
error::Error,
@ -13,23 +22,15 @@ use crate::{
pipeline::{PipelineContext, PipelineProcessor, Processable},
source_client::HttpSourceClient,
tile_pipelines::build_vector_tile_pipeline,
tile_request_state::TileRequestState,
TileRequest, TileRequestID,
TileRequest,
},
render::ShaderVertex,
schedule::Schedule,
stages::{
message::{
LayerTessellateMessage, MessageReceiver, MessageSender, TessellateMessage,
TileTessellateMessage,
},
populate_tile_store_stage::PopulateTileStore,
},
stages::populate_tile_store_stage::PopulateTileStore,
tessellation::{IndexDataType, OverAlignedVertexBuffer},
Environment, HttpClient, Scheduler,
};
mod message;
mod populate_tile_store_stage;
mod request_stage;
@ -39,49 +40,33 @@ pub fn register_stages<E: Environment>(
http_source_client: HttpSourceClient<E::HttpClient>,
scheduler: Box<E::Scheduler>,
) {
let (message_sender, message_receiver): (MessageSender, MessageReceiver) = mpsc::channel();
let shared_thread_state = SharedThreadState {
tile_request_state: Arc::new(Mutex::new(TileRequestState::new())),
message_sender,
// TODO: Readd
//geometry_index: Arc::new(Mutex::new(GeometryIndex::new())),
};
let apc = Rc::new(E::AsyncProcedureCall::new());
schedule.add_stage(
"request",
RequestStage::<E>::new(shared_thread_state.clone(), http_source_client, *scheduler),
);
schedule.add_stage(
"populate_tile_store",
PopulateTileStore::new(shared_thread_state, message_receiver),
RequestStage::<E>::new(http_source_client, apc.clone()),
);
schedule.add_stage("populate_tile_store", PopulateTileStore::<E>::new(apc));
}
pub struct HeadedPipelineProcessor {
state: SharedThreadState,
pub struct HeadedPipelineProcessor<E: Environment> {
context: <E::AsyncProcedureCall as AsyncProcedureCall<E::Transferables>>::Context,
}
impl PipelineProcessor for HeadedPipelineProcessor {
fn tile_finished(&mut self, request_id: TileRequestID, coords: &WorldTileCoords) {
self.state
.message_sender
.send(TessellateMessage::Tile(TileTessellateMessage {
request_id,
coords: *coords,
}))
.unwrap();
impl<E: Environment> PipelineProcessor for HeadedPipelineProcessor<E> {
fn tile_finished(&mut self, coords: &WorldTileCoords) {
self.context.send(Transferable::TileTessellated(
<E::Transferables as Transferables>::TileTessellated::new(*coords),
))
}
fn layer_unavailable(&mut self, coords: &WorldTileCoords, layer_name: &str) {
self.state
.message_sender
.send(TessellateMessage::Layer(
LayerTessellateMessage::UnavailableLayer {
coords: *coords,
layer_name: layer_name.to_owned(),
},
))
.unwrap();
self.context.send(Transferable::UnavailableLayer(
<E::Transferables as Transferables>::UnavailableLayer::new(
*coords,
layer_name.to_owned(),
),
))
}
fn layer_tesselation_finished(
@ -91,17 +76,15 @@ impl PipelineProcessor for HeadedPipelineProcessor {
feature_indices: Vec<u32>,
layer_data: tile::Layer,
) {
self.state
.message_sender
.send(TessellateMessage::Layer(
LayerTessellateMessage::TessellatedLayer {
coords: *coords,
buffer,
feature_indices,
layer_data,
},
))
.unwrap();
log::info!("layer_tesselation_finished");
self.context.send(Transferable::TessellatedLayer(
<E::Transferables as Transferables>::TessellatedLayer::new(
*coords,
buffer,
feature_indices,
layer_data,
),
))
}
fn layer_indexing_finished(
@ -116,76 +99,76 @@ impl PipelineProcessor for HeadedPipelineProcessor {
}
}
/// Stores and provides access to the thread safe data shared between the schedulers.
#[derive(Clone)]
pub struct SharedThreadState {
pub tile_request_state: Arc<Mutex<TileRequestState>>,
pub message_sender: mpsc::Sender<TessellateMessage>,
// TODO: Readd
//pub geometry_index: Arc<Mutex<GeometryIndex>>,
///// Stores and provides access to the thread safe data shared between the schedulers.
//[derive(Clone)]
//pub struct SharedThreadState {
/* pub tile_request_state: Arc<Mutex<TileRequestState>>,
pub message_sender: mpsc::Sender<TessellateMessage>,*/
// TODO: Readd
//pub geometry_index: Arc<Mutex<GeometryIndex>>,
//}
//impl SharedThreadState {
/* fn get_tile_request(&self, request_id: TileRequestID) -> Option<TileRequest> {
self.tile_request_state
.lock()
.ok()
.and_then(|tile_request_state| tile_request_state.get_tile_request(request_id).cloned())
}
impl SharedThreadState {
fn get_tile_request(&self, request_id: TileRequestID) -> Option<TileRequest> {
self.tile_request_state
.lock()
.ok()
.and_then(|tile_request_state| tile_request_state.get_tile_request(request_id).cloned())
#[tracing::instrument(skip_all)]
pub fn process_tile(&self, request_id: TileRequestID, data: Box<[u8]>) -> Result<(), Error> {
if let Some(tile_request) = self.get_tile_request(request_id) {
let mut pipeline_context = PipelineContext::new(HeadedPipelineProcessor {
state: self.clone(),
});
let pipeline = build_vector_tile_pipeline();
pipeline.process((tile_request, request_id, data), &mut pipeline_context);
}
#[tracing::instrument(skip_all)]
pub fn process_tile(&self, request_id: TileRequestID, data: Box<[u8]>) -> Result<(), Error> {
if let Some(tile_request) = self.get_tile_request(request_id) {
let mut pipeline_context = PipelineContext::new(HeadedPipelineProcessor {
state: self.clone(),
});
let pipeline = build_vector_tile_pipeline();
pipeline.process((tile_request, request_id, data), &mut pipeline_context);
}
Ok(())
}
pub fn tile_unavailable(
&self,
coords: &WorldTileCoords,
request_id: TileRequestID,
) -> Result<(), Error> {
if let Some(tile_request) = self.get_tile_request(request_id) {
for to_load in &tile_request.layers {
tracing::warn!("layer {} at {} unavailable", to_load, coords);
self.message_sender.send(TessellateMessage::Layer(
LayerTessellateMessage::UnavailableLayer {
coords: tile_request.coords,
layer_name: to_load.to_string(),
},
))?;
}
}
Ok(())
}
// TODO: Readd
/* #[tracing::instrument(skip_all)]
pub fn query_point(
&self,
world_coords: &WorldCoords,
z: ZoomLevel,
zoom: Zoom,
) -> Option<Vec<IndexedGeometry<f64>>> {
if let Ok(geometry_index) = self.geometry_index.lock() {
geometry_index
.query_point(world_coords, z, zoom)
.map(|geometries| {
geometries
.iter()
.cloned()
.cloned()
.collect::<Vec<IndexedGeometry<f64>>>()
})
} else {
unimplemented!()
}
}*/
Ok(())
}
pub fn tile_unavailable(
&self,
coords: &WorldTileCoords,
request_id: TileRequestID,
) -> Result<(), Error> {
if let Some(tile_request) = self.get_tile_request(request_id) {
for to_load in &tile_request.layers {
tracing::warn!("layer {} at {} unavailable", to_load, coords);
self.message_sender.send(TessellateMessage::Layer(
LayerTessellateMessage::UnavailableLayer {
coords: tile_request.coords,
layer_name: to_load.to_string(),
},
))?;
}
}
Ok(())
}*/
// TODO: Readd
/* #[tracing::instrument(skip_all)]
pub fn query_point(
&self,
world_coords: &WorldCoords,
z: ZoomLevel,
zoom: Zoom,
) -> Option<Vec<IndexedGeometry<f64>>> {
if let Ok(geometry_index) = self.geometry_index.lock() {
geometry_index
.query_point(world_coords, z, zoom)
.map(|geometries| {
geometries
.iter()
.cloned()
.cloned()
.collect::<Vec<IndexedGeometry<f64>>>()
})
} else {
unimplemented!()
}
}*/
//}

View File

@ -1,49 +1,52 @@
//! Receives data from async threads and populates the [`crate::io::tile_repository::TileRepository`].
use super::{MessageReceiver, SharedThreadState, TessellateMessage, TileTessellateMessage};
use crate::{context::MapContext, io::tile_repository::StoredLayer, schedule::Stage};
use crate::io::apc::{AsyncProcedureCall, Transferable};
use crate::io::transferables::{TessellatedLayer, TileTessellated, UnavailableLayer};
use crate::{context::MapContext, io::tile_repository::StoredLayer, schedule::Stage, Environment};
use std::rc::Rc;
pub struct PopulateTileStore {
shared_thread_state: SharedThreadState,
message_receiver: MessageReceiver,
pub struct PopulateTileStore<E: Environment> {
apc: Rc<E::AsyncProcedureCall>,
}
impl PopulateTileStore {
pub fn new(shared_thread_state: SharedThreadState, message_receiver: MessageReceiver) -> Self {
Self {
shared_thread_state,
message_receiver,
}
impl<E: Environment> PopulateTileStore<E> {
pub fn new(apc: Rc<E::AsyncProcedureCall>) -> Self {
Self { apc }
}
}
impl Stage for PopulateTileStore {
impl<E: Environment> Stage for PopulateTileStore<E> {
fn run(
&mut self,
MapContext {
tile_repository, ..
}: &mut MapContext,
) {
if let Ok(result) = self.message_receiver.try_recv() {
if let Some(result) = self.apc.receive() {
match result {
TessellateMessage::Layer(layer_result) => {
let layer: StoredLayer = layer_result.into();
tracing::trace!(
Transferable::TileTessellated(tranferred) => {
let coords = tranferred.coords();
tile_repository.success(coords);
tracing::trace!("Tile at {} finished loading", coords);
}
Transferable::UnavailableLayer(tranferred) => {
let layer: StoredLayer = tranferred.to_stored_layer();
tracing::debug!(
"Layer {} at {} reached main thread",
layer.layer_name(),
layer.get_coords()
);
tile_repository.put_tessellated_layer(layer);
}
Transferable::TessellatedLayer(data) => {
let layer: StoredLayer = data.to_stored_layer();
tracing::debug!(
"Layer {} at {} reached main thread",
layer.layer_name(),
layer.get_coords()
);
tile_repository.put_tessellated_layer(layer);
}
TessellateMessage::Tile(TileTessellateMessage { request_id, coords }) => loop {
if let Ok(mut tile_request_state) =
self.shared_thread_state.tile_request_state.try_lock()
{
tile_request_state.finish_tile_request(request_id);
tracing::trace!("Tile at {} finished loading", coords);
break;
}
},
}
}
}

View File

@ -1,7 +1,12 @@
//! Requests tiles which are currently in view
use std::collections::HashSet;
use crate::coords::ZoomLevel;
use crate::io::apc::AsyncProcedureCall;
use crate::io::pipeline::PipelineContext;
use crate::io::pipeline::Processable;
use crate::io::tile_pipelines::build_vector_tile_pipeline;
use crate::platform::http_client::ReqwestHttpClient;
use crate::stages::HeadedPipelineProcessor;
use crate::{
context::MapContext,
coords::{ViewRegion, WorldTileCoords},
@ -12,28 +17,28 @@ use crate::{
TileRequest,
},
schedule::Stage,
stages::SharedThreadState,
Environment, HttpClient, Scheduler, Style,
};
use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::pin::Pin;
use std::process::Output;
use std::rc::Rc;
use std::str::FromStr;
pub struct RequestStage<E: Environment> {
shared_thread_state: SharedThreadState,
scheduler: E::Scheduler,
apc: Rc<E::AsyncProcedureCall>,
http_source_client: HttpSourceClient<E::HttpClient>,
try_failed: bool,
}
impl<E: Environment> RequestStage<E> {
pub fn new(
shared_thread_state: SharedThreadState,
http_source_client: HttpSourceClient<E::HttpClient>,
scheduler: E::Scheduler,
apc: Rc<E::AsyncProcedureCall>,
) -> Self {
Self {
shared_thread_state,
scheduler,
apc,
http_source_client,
try_failed: false,
}
}
}
@ -50,11 +55,10 @@ impl<E: Environment> Stage for RequestStage<E> {
) {
let view_region = view_state.create_view_region();
if view_state.camera.did_change(0.05) || view_state.zoom.did_change(0.05) || self.try_failed
{
if view_state.camera.did_change(0.05) || view_state.zoom.did_change(0.05) {
if let Some(view_region) = &view_region {
// FIXME: We also need to request tiles from layers above if we are over the maximum zoom level
self.try_failed = self.request_tiles_in_view(tile_repository, style, view_region);
self.request_tiles_in_view(tile_repository, style, view_region);
}
}
@ -63,16 +67,41 @@ impl<E: Environment> Stage for RequestStage<E> {
}
}
pub fn schedule<E: Environment>(
input: TileRequest,
context: <E::AsyncProcedureCall as AsyncProcedureCall<E::Transferables>>::Context,
) -> Pin<Box<(dyn Future<Output = ()> + Send + 'static)>> {
Box::pin(async move {
let coords = input.coords;
let client = SourceClient::Http(HttpSourceClient::new(ReqwestHttpClient::new(None)));
let request_id = 0;
match client.fetch(&coords).await {
Ok(data) => {
let data = data.into_boxed_slice();
let mut pipeline_context =
PipelineContext::new(HeadedPipelineProcessor::<E> { context });
let pipeline = build_vector_tile_pipeline();
pipeline.process((input, data), &mut pipeline_context);
}
Err(e) => {
log::error!("{:?}", &e);
//state.tile_unavailable(&coords, request_id).unwrap()
}
}
})
}
impl<E: Environment> RequestStage<E> {
/// Request tiles which are currently in view.
#[tracing::instrument(skip_all)]
fn request_tiles_in_view(
&self,
tile_repository: &TileRepository,
tile_repository: &mut TileRepository,
style: &Style,
view_region: &ViewRegion,
) -> bool {
let mut try_failed = false;
) {
let source_layers: HashSet<String> = style
.layers
.iter()
@ -82,68 +111,35 @@ impl<E: Environment> RequestStage<E> {
for coords in view_region.iter() {
if coords.build_quad_key().is_some() {
// TODO: Make tesselation depend on style?
try_failed = self
.try_request_tile(tile_repository, &coords, &source_layers)
self.request_tile(tile_repository, &coords, &source_layers)
.unwrap();
}
}
try_failed
}
fn try_request_tile(
fn request_tile(
&self,
tile_repository: &TileRepository,
tile_repository: &mut TileRepository,
coords: &WorldTileCoords,
layers: &HashSet<String>,
) -> Result<bool, Error> {
if !tile_repository.is_layers_missing(coords, layers) {
) -> Result<(), Error> {
/* if !tile_repository.is_layers_missing(coords, layers) {
return Ok(false);
}*/
if tile_repository.needs_fetching(&coords) {
tile_repository.create_tile(coords);
tracing::info!("new tile request: {}", &coords);
self.apc.schedule(
TileRequest {
coords: *coords,
layers: layers.clone(),
},
schedule::<E>,
);
}
if let Ok(mut tile_request_state) = self.shared_thread_state.tile_request_state.try_lock() {
if let Some(request_id) = tile_request_state.start_tile_request(TileRequest {
coords: *coords,
layers: layers.clone(),
}) {
tracing::info!("new tile request: {}", &coords);
// The following snippet can be added instead of the next code block to demonstrate
// an understanable approach of fetching
/*#[cfg(target_arch = "wasm32")]
if let Some(tile_coords) = coords.into_tile(TileAddressingScheme::TMS) {
crate::platform::legacy_webworker_fetcher::request_tile(
request_id,
tile_coords,
);
}*/
let client = SourceClient::Http(self.http_source_client.clone());
let coords = *coords;
let state = self.shared_thread_state.clone();
self.scheduler
.schedule(Box::new(move || {
Box::pin(async move {
match client.fetch(&coords).await {
Ok(data) => {
log::warn!("fetching done");
state
.process_tile(request_id, data.into_boxed_slice())
.unwrap()
}
Err(e) => {
log::error!("{:?}", &e);
state.tile_unavailable(&coords, request_id).unwrap()
}
}
})
}))
.unwrap();
}
Ok(false)
} else {
Ok(true)
}
Ok(())
}
}