From c40f39a72cffc3a80fb05ab8f00f22cb9a957a18 Mon Sep 17 00:00:00 2001 From: Maximilian Ammann Date: Tue, 31 May 2022 14:20:52 +0200 Subject: [PATCH] Refactor thread state and pipeline --- maplibre-demo/src/main.rs | 18 +- maplibre/src/context.rs | 1 - maplibre/src/io/mod.rs | 75 +----- maplibre/src/io/pipeline.rs | 222 +++--------------- maplibre/src/io/pipeline_steps.rs | 153 ++++++++++++ maplibre/src/io/tile_cache.rs | 25 +- maplibre/src/map_schedule.rs | 1 - maplibre/src/render/stages/extract_stage.rs | 1 - .../src/render/stages/phase_sort_stage.rs | 1 - maplibre/src/render/stages/queue_stage.rs | 1 - maplibre/src/render/stages/upload_stage.rs | 2 +- .../stages/write_surface_buffer_stage.rs | 1 - maplibre/src/stages/mod.rs | 205 +++++++++++++++- .../src/stages/populate_tile_store_stage.rs | 4 +- maplibre/src/stages/request_stage.rs | 2 +- maplibre/src/stages/shared_thread_state.rs | 170 -------------- 16 files changed, 426 insertions(+), 456 deletions(-) create mode 100644 maplibre/src/io/pipeline_steps.rs delete mode 100644 maplibre/src/stages/shared_thread_state.rs diff --git a/maplibre-demo/src/main.rs b/maplibre-demo/src/main.rs index fbd7ce23..19b81808 100644 --- a/maplibre-demo/src/main.rs +++ b/maplibre-demo/src/main.rs @@ -2,12 +2,12 @@ use geozero::mvt::tile; use maplibre::benchmarking::tessellation::{IndexDataType, OverAlignedVertexBuffer}; use maplibre::coords::WorldTileCoords; use maplibre::error::Error; -use maplibre::io::pipeline::steps::build_vector_tile_pipeline; use maplibre::io::pipeline::Processable; use maplibre::io::pipeline::{PipelineContext, PipelineProcessor}; +use maplibre::io::pipeline_steps::build_vector_tile_pipeline; use maplibre::io::scheduler::ScheduleMethod; use maplibre::io::source_client::{HttpClient, HttpSourceClient}; -use maplibre::io::{LayerTessellateMessage, TileRequest, TileRequestID}; +use maplibre::io::{TileRequest, TileRequestID}; use maplibre::map_schedule::{EventuallyMapContext, InteractiveMapSchedule}; use maplibre::platform::http_client::ReqwestHttpClient; use maplibre::platform::run_multithreaded; @@ -65,9 +65,17 @@ fn run_in_window() { }) } +struct TessellatedLayer { + coords: WorldTileCoords, + buffer: OverAlignedVertexBuffer, + /// Holds for each feature the count of indices. + feature_indices: Vec, + layer_data: tile::Layer, +} + #[derive(Default)] struct HeadlessPipelineProcessor { - layers: Vec, + layers: Vec, } impl PipelineProcessor for HeadlessPipelineProcessor { @@ -82,7 +90,7 @@ impl PipelineProcessor for HeadlessPipelineProcessor { feature_indices: Vec, layer_data: tile::Layer, ) { - self.layers.push(LayerTessellateMessage::TessellatedLayer { + self.layers.push(TessellatedLayer { coords: *coords, buffer, feature_indices, @@ -148,7 +156,7 @@ fn run_headless() { map.map_schedule_mut() .map_context .tile_cache - .put_tessellated_layer(v); + .put_tessellated_layer_(v.coords, v.buffer, v.feature_indices, v.layer_data); } match map.map_schedule_mut().update_and_redraw() { diff --git a/maplibre/src/context.rs b/maplibre/src/context.rs index e5a3f04e..5182ebf8 100644 --- a/maplibre/src/context.rs +++ b/maplibre/src/context.rs @@ -1,6 +1,5 @@ use crate::coords::{Zoom, TILE_SIZE}; use crate::io::tile_cache::TileCache; -use crate::io::TessellateMessage; use crate::render::camera::{Camera, Perspective, ViewProjection}; use crate::util::ChangeObserver; use crate::{Renderer, ScheduleMethod, Style, WindowSize}; diff --git a/maplibre/src/io/mod.rs b/maplibre/src/io/mod.rs index 001d8611..6614e4b5 100644 --- a/maplibre/src/io/mod.rs +++ b/maplibre/src/io/mod.rs @@ -15,83 +15,10 @@ pub mod static_tile_fetcher; pub mod geometry_index; pub mod pipeline; +pub mod pipeline_steps; pub mod tile_cache; pub mod tile_request_state; -/// Contains a `Tile` if the fetch was successful otherwise `Unavailable`. -pub enum TileFetchResult { - Unavailable { - coords: WorldTileCoords, - }, - Tile { - coords: WorldTileCoords, - data: Box<[u8]>, - }, -} - -impl fmt::Debug for TileFetchResult { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "TileFetchResult({})", - match self { - TileFetchResult::Unavailable { coords, .. } => coords, - TileFetchResult::Tile { coords, .. } => coords, - } - ) - } -} - -/// [crate::io::TileTessellateMessage] or [crate::io::LayerTessellateMessage] tessellation message. -pub enum TessellateMessage { - Tile(TileTessellateMessage), - Layer(LayerTessellateMessage), -} - -/// The result of the tessellation of a tile. -pub struct TileTessellateMessage { - pub request_id: TileRequestID, - pub coords: WorldTileCoords, -} - -/// `TessellatedLayer` contains the result of the tessellation for a specific layer, otherwise -/// `UnavailableLayer` if the layer doesn't exist. -pub enum LayerTessellateMessage { - UnavailableLayer { - coords: WorldTileCoords, - layer_name: String, - }, - TessellatedLayer { - coords: WorldTileCoords, - buffer: OverAlignedVertexBuffer, - /// Holds for each feature the count of indices. - feature_indices: Vec, - layer_data: tile::Layer, - }, -} - -impl fmt::Debug for LayerTessellateMessage { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "LayerTessellateMessage{}", self.get_coords()) - } -} - -impl LayerTessellateMessage { - pub fn get_coords(&self) -> WorldTileCoords { - match self { - LayerTessellateMessage::UnavailableLayer { coords, .. } => *coords, - LayerTessellateMessage::TessellatedLayer { coords, .. } => *coords, - } - } - - pub fn layer_name(&self) -> &str { - match self { - LayerTessellateMessage::UnavailableLayer { layer_name, .. } => layer_name.as_str(), - LayerTessellateMessage::TessellatedLayer { layer_data, .. } => &layer_data.name, - } - } -} - /// A request for a tile at the given coordinates and in the given layers. #[derive(Clone)] pub struct TileRequest { diff --git a/maplibre/src/io/pipeline.rs b/maplibre/src/io/pipeline.rs index 3e451039..fc2025a6 100644 --- a/maplibre/src/io/pipeline.rs +++ b/maplibre/src/io/pipeline.rs @@ -1,5 +1,6 @@ use crate::coords::WorldTileCoords; -use crate::io::{LayerTessellateMessage, TessellateMessage, TileRequestID, TileTessellateMessage}; +use crate::io::geometry_index::IndexedGeometry; +use crate::io::TileRequestID; use crate::render::ShaderVertex; use crate::tessellation::{IndexDataType, OverAlignedVertexBuffer}; use downcast_rs::{impl_downcast, Downcast}; @@ -10,8 +11,8 @@ use std::process::Output; use std::sync::mpsc; pub trait PipelineProcessor: Downcast { - fn finished_tile_tesselation(&mut self, request_id: TileRequestID, coords: &WorldTileCoords); - fn unavailable_layer(&mut self, coords: &WorldTileCoords, layer_name: &str); + fn finished_tile_tesselation(&mut self, request_id: TileRequestID, coords: &WorldTileCoords) {} + fn unavailable_layer(&mut self, coords: &WorldTileCoords, layer_name: &str) {} fn finished_layer_tesselation( &mut self, coords: &WorldTileCoords, @@ -19,53 +20,18 @@ pub trait PipelineProcessor: Downcast { // Holds for each feature the count of indices. feature_indices: Vec, layer_data: tile::Layer, - ); + ) { + } + fn finished_layer_indexing( + &mut self, + coords: &WorldTileCoords, + geometries: Vec>, + ) { + } } impl_downcast!(PipelineProcessor); -pub struct HeadedPipelineProcessor { - pub message_sender: mpsc::Sender, -} - -impl PipelineProcessor for HeadedPipelineProcessor { - fn finished_tile_tesselation(&mut self, request_id: TileRequestID, coords: &WorldTileCoords) { - self.message_sender - .send(TessellateMessage::Tile(TileTessellateMessage { - request_id, - coords: *coords, - })) - .unwrap(); - } - - fn unavailable_layer(&mut self, coords: &WorldTileCoords, layer_name: &str) { - self.message_sender.send(TessellateMessage::Layer( - LayerTessellateMessage::UnavailableLayer { - coords: *coords, - layer_name: layer_name.to_owned(), - }, - )); - } - fn finished_layer_tesselation( - &mut self, - coords: &WorldTileCoords, - buffer: OverAlignedVertexBuffer, - feature_indices: Vec, - layer_data: tile::Layer, - ) { - self.message_sender - .send(TessellateMessage::Layer( - LayerTessellateMessage::TessellatedLayer { - coords: *coords, - buffer, - feature_indices, - layer_data, - }, - )) - .unwrap(); - } -} - pub struct PipelineContext { pub processor: Box, } @@ -92,6 +58,16 @@ where next: N, } +impl PipelineStep +where + P: Processable, + N: Processable, +{ + pub fn new(process: P, next: N) -> Self { + Self { process, next } + } +} + impl Processable for PipelineStep where P: Processable, @@ -106,11 +82,18 @@ where } } -#[derive(Default)] pub struct EndStep { phantom: PhantomData, } +impl Default for EndStep { + fn default() -> Self { + Self { + phantom: PhantomData::default(), + } + } +} + impl Processable for EndStep { type Input = I; type Output = I; @@ -185,149 +168,18 @@ impl Processable for Closure2Processable { } } -pub mod steps { - use crate::io::pipeline::{EndStep, PipelineContext, PipelineStep, Processable}; - use crate::io::{TileRequest, TileRequestID}; - use crate::tessellation::zero_tessellator::ZeroTessellator; - use crate::tessellation::IndexDataType; - use geozero::GeozeroDatasource; - use prost::Message; - use std::collections::HashSet; - - pub struct ParseTileStep {} - - impl Processable for ParseTileStep { - type Input = (TileRequest, TileRequestID, Box<[u8]>); - type Output = (TileRequest, TileRequestID, geozero::mvt::Tile); - - fn process( - &self, - (tile_request, request_id, data): Self::Input, - _context: &mut PipelineContext, - ) -> Self::Output { - let tile = geozero::mvt::Tile::decode(data.as_ref()).expect("failed to load tile"); - (tile_request, request_id, tile) - } - } - - pub struct IndexLayerStep {} - - pub struct TessellateLayerStep {} - - impl Processable for TessellateLayerStep { - type Input = (TileRequest, TileRequestID, geozero::mvt::Tile); - type Output = (); - - fn process( - &self, - (tile_request, request_id, mut tile): Self::Input, - context: &mut PipelineContext, - ) -> Self::Output { - let coords = &tile_request.coords; - - for layer in &mut tile.layers { - let cloned_layer = layer.clone(); - let layer_name: &str = &cloned_layer.name; - if !tile_request.layers.contains(layer_name) { - continue; - } - - tracing::info!("layer {} at {} ready", layer_name, coords); - - let mut tessellator = ZeroTessellator::::default(); - if let Err(e) = layer.process(&mut tessellator) { - context.processor.unavailable_layer(coords, layer_name); - - tracing::error!( - "layer {} at {} tesselation failed {:?}", - layer_name, - &coords, - e - ); - } else { - context.processor.finished_layer_tesselation( - coords, - tessellator.buffer.into(), - tessellator.feature_indices, - cloned_layer, - ) - } - } - - let available_layers: HashSet<_> = tile - .layers - .iter() - .map(|layer| layer.name.clone()) - .collect::>(); - - for missing_layer in tile_request.layers.difference(&available_layers) { - context.processor.unavailable_layer(coords, missing_layer); - - tracing::info!( - "requested layer {} at {} not found in tile", - missing_layer, - &coords - ); - } - - tracing::info!("tile tessellated at {} finished", &tile_request.coords); - - context - .processor - .finished_tile_tesselation(request_id, &tile_request.coords); - } - } - - pub fn build_vector_tile_pipeline( - ) -> impl Processable::Input> { - PipelineStep { - process: ParseTileStep {}, - next: PipelineStep { - process: TessellateLayerStep {}, - next: EndStep::default(), - }, - } - } - - #[cfg(test)] - mod tests { - use crate::io::pipeline::steps::build_vector_tile_pipeline; - use crate::io::pipeline::{HeadedPipelineProcessor, PipelineContext, Processable}; - use crate::io::TileRequest; - use std::sync::mpsc; - - #[test] - fn test() { - let mut context = PipelineContext { - processor: Box::new(HeadedPipelineProcessor { - message_sender: mpsc::channel().0, - }), - }; - - let pipeline = build_vector_tile_pipeline(); - let output = pipeline.process( - ( - TileRequest { - coords: (0, 0, 0).into(), - layers: Default::default(), - }, - 0, - Box::new([0]), - ), - &mut context, - ); - } - } -} - #[cfg(test)] mod tests { use crate::io::pipeline::{ Closure2Processable, ClosureProcessable, EndStep, FnProcessable, HeadedPipelineProcessor, - PipelineContext, PipelineStep, Processable, + PipelineContext, PipelineProcessor, PipelineStep, Processable, }; use std::sync::mpsc; + pub struct DummyPipelineProcessor; + + impl PipelineProcessor for DummyPipelineProcessor {} + fn add_one(input: u32, context: &mut PipelineContext) -> u8 { input as u8 + 1 } @@ -339,9 +191,7 @@ mod tests { #[test] fn test() { let mut context = PipelineContext { - processor: Box::new(HeadedPipelineProcessor { - message_sender: mpsc::channel().0, - }), + processor: Box::new(DummyPipelineProcessor), }; let output: u32 = PipelineStep { process: FnProcessable { diff --git a/maplibre/src/io/pipeline_steps.rs b/maplibre/src/io/pipeline_steps.rs new file mode 100644 index 00000000..9749c90a --- /dev/null +++ b/maplibre/src/io/pipeline_steps.rs @@ -0,0 +1,153 @@ +use crate::io::geometry_index::IndexProcessor; +use crate::io::pipeline::{EndStep, PipelineContext, PipelineStep, Processable}; +use crate::io::{TileRequest, TileRequestID}; +use crate::tessellation::zero_tessellator::ZeroTessellator; +use crate::tessellation::IndexDataType; +use geozero::GeozeroDatasource; +use prost::Message; +use std::collections::HashSet; + +pub struct ParseTileStep; + +impl Processable for ParseTileStep { + type Input = (TileRequest, TileRequestID, Box<[u8]>); + type Output = (TileRequest, TileRequestID, geozero::mvt::Tile); + + // TODO (perf): Maybe force inline + fn process( + &self, + (tile_request, request_id, data): Self::Input, + _context: &mut PipelineContext, + ) -> Self::Output { + let tile = geozero::mvt::Tile::decode(data.as_ref()).expect("failed to load tile"); + (tile_request, request_id, tile) + } +} + +pub struct IndexLayerStep; + +impl Processable for IndexLayerStep { + type Input = (TileRequest, TileRequestID, geozero::mvt::Tile); + type Output = (TileRequest, TileRequestID, geozero::mvt::Tile); + + // TODO (perf): Maybe force inline + fn process( + &self, + (tile_request, request_id, tile): Self::Input, + context: &mut PipelineContext, + ) -> Self::Output { + let index = IndexProcessor::new(); + + context + .processor + .finished_layer_indexing(&tile_request.coords, index.get_geometries()); + (tile_request, request_id, tile) + } +} + +pub struct TessellateLayerStep; + +impl Processable for TessellateLayerStep { + type Input = (TileRequest, TileRequestID, geozero::mvt::Tile); + type Output = (TileRequest, TileRequestID, geozero::mvt::Tile); + + // TODO (perf): Maybe force inline + fn process( + &self, + (tile_request, request_id, mut tile): Self::Input, + context: &mut PipelineContext, + ) -> Self::Output { + let coords = &tile_request.coords; + + for layer in &mut tile.layers { + let cloned_layer = layer.clone(); + let layer_name: &str = &cloned_layer.name; + if !tile_request.layers.contains(layer_name) { + continue; + } + + tracing::info!("layer {} at {} ready", layer_name, coords); + + let mut tessellator = ZeroTessellator::::default(); + if let Err(e) = layer.process(&mut tessellator) { + context.processor.unavailable_layer(coords, layer_name); + + tracing::error!( + "layer {} at {} tesselation failed {:?}", + layer_name, + &coords, + e + ); + } else { + context.processor.finished_layer_tesselation( + coords, + tessellator.buffer.into(), + tessellator.feature_indices, + cloned_layer, + ) + } + } + + let available_layers: HashSet<_> = tile + .layers + .iter() + .map(|layer| layer.name.clone()) + .collect::>(); + + for missing_layer in tile_request.layers.difference(&available_layers) { + context.processor.unavailable_layer(coords, missing_layer); + + tracing::info!( + "requested layer {} at {} not found in tile", + missing_layer, + &coords + ); + } + + tracing::info!("tile tessellated at {} finished", &tile_request.coords); + + context + .processor + .finished_tile_tesselation(request_id, &tile_request.coords); + + (tile_request, request_id, tile) + } +} + +pub fn build_vector_tile_pipeline( +) -> impl Processable::Input> { + PipelineStep::new( + ParseTileStep, + PipelineStep::new(TessellateLayerStep, EndStep::default()), + ) +} + +#[cfg(test)] +mod tests { + use super::build_vector_tile_pipeline; + use crate::io::pipeline::{PipelineContext, PipelineProcessor, Processable}; + use crate::io::TileRequest; + pub struct DummyPipelineProcessor; + + impl PipelineProcessor for DummyPipelineProcessor {} + + #[test] + fn test() { + let mut context = PipelineContext { + processor: Box::new(DummyPipelineProcessor), + }; + + let pipeline = build_vector_tile_pipeline(); + let output = pipeline.process( + ( + TileRequest { + coords: (0, 0, 0).into(), + layers: Default::default(), + }, + 0, + Box::new([0]), + ), + &mut context, + ); + } +} diff --git a/maplibre/src/io/tile_cache.rs b/maplibre/src/io/tile_cache.rs index 2166b391..56f6140b 100644 --- a/maplibre/src/io/tile_cache.rs +++ b/maplibre/src/io/tile_cache.rs @@ -1,14 +1,15 @@ //! Tile cache. use crate::coords::{Quadkey, WorldTileCoords}; - -use crate::io::LayerTessellateMessage; - +use crate::render::ShaderVertex; +use crate::stages::LayerTessellateMessage; +use crate::tessellation::{IndexDataType, OverAlignedVertexBuffer}; +use geozero::mvt::tile; use std::collections::{btree_map, BTreeMap, HashSet}; /// Stores the multiple [crate::io::LayerTessellateMessage] of a cached tile. pub struct CachedTile { - layers: Vec, + layers: Vec, // TODO: Changen type here, its no message } impl CachedTile { @@ -22,6 +23,7 @@ impl CachedTile { /// Stores and provides access to a quad tree of cached tiles with world tile coords. #[derive(Default)] pub struct TileCache { + // TODO: Change name to TileStore cache: BTreeMap, } @@ -32,6 +34,21 @@ impl TileCache { } } + pub fn put_tessellated_layer_( + &mut self, + coords: WorldTileCoords, + buffer: OverAlignedVertexBuffer, + feature_indices: Vec, + layer_data: tile::Layer, + ) { + self.put_tessellated_layer(LayerTessellateMessage::TessellatedLayer { + coords, + buffer, + feature_indices, + layer_data, + }) + } + /// Inserts a tessellated layer into the quad tree at its world tile coords. /// If the space is vacant, the tessellated layer is inserted into a new /// [crate::io::tile_cache::CachedTile]. diff --git a/maplibre/src/map_schedule.rs b/maplibre/src/map_schedule.rs index 7931078e..59ae97c1 100644 --- a/maplibre/src/map_schedule.rs +++ b/maplibre/src/map_schedule.rs @@ -7,7 +7,6 @@ use crate::io::scheduler::Scheduler; use crate::io::source_client::{HttpClient, HttpSourceClient, SourceClient}; use crate::io::tile_cache::TileCache; use crate::io::tile_request_state::TileRequestState; -use crate::io::TessellateMessage; use crate::render::register_render_stages; use crate::schedule::{Schedule, Stage}; use crate::stages::register_stages; diff --git a/maplibre/src/render/stages/extract_stage.rs b/maplibre/src/render/stages/extract_stage.rs index 0c2bb925..c6920c63 100644 --- a/maplibre/src/render/stages/extract_stage.rs +++ b/maplibre/src/render/stages/extract_stage.rs @@ -3,7 +3,6 @@ use crate::context::MapContext; use crate::coords::{ViewRegion, Zoom}; use crate::io::tile_cache::TileCache; -use crate::io::LayerTessellateMessage; use crate::render::camera::ViewProjection; use crate::render::render_phase::RenderPhase; use crate::render::resource::IndexEntry; diff --git a/maplibre/src/render/stages/phase_sort_stage.rs b/maplibre/src/render/stages/phase_sort_stage.rs index 51022053..1e989b6d 100644 --- a/maplibre/src/render/stages/phase_sort_stage.rs +++ b/maplibre/src/render/stages/phase_sort_stage.rs @@ -3,7 +3,6 @@ use crate::context::MapContext; use crate::coords::{ViewRegion, Zoom}; use crate::io::tile_cache::TileCache; -use crate::io::LayerTessellateMessage; use crate::render::camera::ViewProjection; use crate::render::render_phase::RenderPhase; use crate::render::resource::IndexEntry; diff --git a/maplibre/src/render/stages/queue_stage.rs b/maplibre/src/render/stages/queue_stage.rs index 7b605d87..c9051491 100644 --- a/maplibre/src/render/stages/queue_stage.rs +++ b/maplibre/src/render/stages/queue_stage.rs @@ -3,7 +3,6 @@ use crate::context::MapContext; use crate::coords::{ViewRegion, Zoom}; use crate::io::tile_cache::TileCache; -use crate::io::LayerTessellateMessage; use crate::render::camera::ViewProjection; use crate::render::resource::IndexEntry; use crate::render::shaders::{ diff --git a/maplibre/src/render/stages/upload_stage.rs b/maplibre/src/render/stages/upload_stage.rs index ac3d8b1a..7122a33b 100644 --- a/maplibre/src/render/stages/upload_stage.rs +++ b/maplibre/src/render/stages/upload_stage.rs @@ -3,7 +3,6 @@ use crate::context::MapContext; use crate::coords::{ViewRegion, Zoom}; use crate::io::tile_cache::TileCache; -use crate::io::LayerTessellateMessage; use crate::render::camera::ViewProjection; use crate::render::resource::IndexEntry; use crate::render::shaders::{ @@ -14,6 +13,7 @@ use crate::render::util::Eventually::Initialized; use crate::schedule::Stage; use crate::{RenderState, Renderer, Style}; +use crate::stages::LayerTessellateMessage; use std::iter; #[derive(Default)] diff --git a/maplibre/src/render/stages/write_surface_buffer_stage.rs b/maplibre/src/render/stages/write_surface_buffer_stage.rs index adcba764..139ecfc4 100644 --- a/maplibre/src/render/stages/write_surface_buffer_stage.rs +++ b/maplibre/src/render/stages/write_surface_buffer_stage.rs @@ -3,7 +3,6 @@ use crate::context::MapContext; use crate::coords::{ViewRegion, Zoom}; use crate::io::tile_cache::TileCache; -use crate::io::LayerTessellateMessage; use crate::render::camera::ViewProjection; use crate::render::render_phase::RenderPhase; use crate::render::resource::{BufferDimensions, BufferedTextureHead, Head, IndexEntry}; diff --git a/maplibre/src/stages/mod.rs b/maplibre/src/stages/mod.rs index e4c8a317..a6b0ab02 100644 --- a/maplibre/src/stages/mod.rs +++ b/maplibre/src/stages/mod.rs @@ -1,22 +1,32 @@ //! [Stages](Stage) for requesting and preparing data +use crate::coords::{WorldCoords, WorldTileCoords, Zoom}; +use crate::error::Error; use crate::io::geometry_index::GeometryIndex; +use crate::io::geometry_index::{IndexProcessor, IndexedGeometry, TileIndex}; +use crate::io::pipeline::{PipelineContext, PipelineProcessor}; +use crate::io::pipeline_steps::build_vector_tile_pipeline; use crate::io::source_client::{HttpSourceClient, SourceClient}; use crate::io::tile_request_state::TileRequestState; -use crate::io::TessellateMessage; +use crate::io::{TileRequest, TileRequestID}; +use crate::render::ShaderVertex; use crate::schedule::Schedule; use crate::stages::populate_tile_store_stage::PopulateTileStore; -use crate::stages::shared_thread_state::SharedThreadState; +use crate::tessellation::zero_tessellator::ZeroTessellator; +use crate::tessellation::{IndexDataType, OverAlignedVertexBuffer}; use crate::{HttpClient, ScheduleMethod, Scheduler}; +use geozero::mvt::tile; +use geozero::GeozeroDatasource; +use prost::Message; use request_stage::RequestStage; +use std::collections::HashSet; +use std::fmt; use std::sync::{mpsc, Arc, Mutex}; +use crate::io::pipeline::Processable; + mod populate_tile_store_stage; mod request_stage; -mod shared_thread_state; - -pub type MessageSender = mpsc::Sender; -pub type MessageReceiver = mpsc::Receiver; pub fn register_stages( schedule: &mut Schedule, @@ -41,3 +51,186 @@ pub fn register_stages( PopulateTileStore::new(shared_thread_state, message_receiver), ); } + +type MessageSender = mpsc::Sender; +type MessageReceiver = mpsc::Receiver; + +/// [crate::io::TileTessellateMessage] or [crate::io::LayerTessellateMessage] tessellation message. +enum TessellateMessage { + Tile(TileTessellateMessage), + Layer(LayerTessellateMessage), +} + +/// The result of the tessellation of a tile. +struct TileTessellateMessage { + pub request_id: TileRequestID, + pub coords: WorldTileCoords, +} + +/// `TessellatedLayer` contains the result of the tessellation for a specific layer, otherwise +/// `UnavailableLayer` if the layer doesn't exist. +enum LayerTessellateMessage { + UnavailableLayer { + coords: WorldTileCoords, + layer_name: String, + }, + TessellatedLayer { + coords: WorldTileCoords, + buffer: OverAlignedVertexBuffer, + /// Holds for each feature the count of indices. + feature_indices: Vec, + layer_data: tile::Layer, + }, +} + +impl fmt::Debug for LayerTessellateMessage { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "LayerTessellateMessage{}", self.get_coords()) + } +} + +impl LayerTessellateMessage { + pub fn get_coords(&self) -> WorldTileCoords { + match self { + LayerTessellateMessage::UnavailableLayer { coords, .. } => *coords, + LayerTessellateMessage::TessellatedLayer { coords, .. } => *coords, + } + } + + pub fn layer_name(&self) -> &str { + match self { + LayerTessellateMessage::UnavailableLayer { layer_name, .. } => layer_name.as_str(), + LayerTessellateMessage::TessellatedLayer { layer_data, .. } => &layer_data.name, + } + } +} + +pub struct HeadedPipelineProcessor { + state: SharedThreadState, +} + +impl PipelineProcessor for HeadedPipelineProcessor { + fn finished_tile_tesselation(&mut self, request_id: TileRequestID, coords: &WorldTileCoords) { + self.state + .message_sender + .send(TessellateMessage::Tile(TileTessellateMessage { + request_id, + coords: *coords, + })) + .unwrap(); + } + + fn unavailable_layer(&mut self, coords: &WorldTileCoords, layer_name: &str) { + self.state.message_sender.send(TessellateMessage::Layer( + LayerTessellateMessage::UnavailableLayer { + coords: *coords, + layer_name: layer_name.to_owned(), + }, + )); + } + + fn finished_layer_tesselation( + &mut self, + coords: &WorldTileCoords, + buffer: OverAlignedVertexBuffer, + feature_indices: Vec, + layer_data: tile::Layer, + ) { + self.state + .message_sender + .send(TessellateMessage::Layer( + LayerTessellateMessage::TessellatedLayer { + coords: *coords, + buffer, + feature_indices, + layer_data, + }, + )) + .unwrap(); + } + + fn finished_layer_indexing( + &mut self, + coords: &WorldTileCoords, + geometries: Vec>, + ) { + if let Ok(mut geometry_index) = self.state.geometry_index.lock() { + geometry_index.index_tile(&coords, TileIndex::Linear { list: geometries }) + } + } +} + +/// Stores and provides access to the thread safe data shared between the schedulers. +#[derive(Clone)] +pub struct SharedThreadState { + pub tile_request_state: Arc>, + pub message_sender: mpsc::Sender, + pub geometry_index: Arc>, +} + +impl SharedThreadState { + fn get_tile_request(&self, request_id: TileRequestID) -> Option { + self.tile_request_state + .lock() + .ok() + .and_then(|tile_request_state| tile_request_state.get_tile_request(request_id).cloned()) + } + + #[tracing::instrument(skip_all)] + pub fn process_tile(&self, request_id: TileRequestID, data: Box<[u8]>) -> Result<(), Error> { + if let Some(tile_request) = self.get_tile_request(request_id) { + let mut processor = HeadedPipelineProcessor { + state: self.clone(), + }; + let mut pipeline_context = PipelineContext { + processor: Box::new(processor), + }; + let pipeline = build_vector_tile_pipeline(); + pipeline.process((tile_request, request_id, data), &mut pipeline_context); + } + + Ok(()) + } + + pub fn tile_unavailable( + &self, + coords: &WorldTileCoords, + request_id: TileRequestID, + ) -> Result<(), Error> { + if let Some(tile_request) = self.get_tile_request(request_id) { + for to_load in &tile_request.layers { + tracing::warn!("layer {} at {} unavailable", to_load, coords); + self.message_sender.send(TessellateMessage::Layer( + LayerTessellateMessage::UnavailableLayer { + coords: tile_request.coords, + layer_name: to_load.to_string(), + }, + ))?; + } + } + + Ok(()) + } + + #[tracing::instrument(skip_all)] + pub fn query_point( + &self, + world_coords: &WorldCoords, + z: u8, + zoom: Zoom, + ) -> Option>> { + if let Ok(geometry_index) = self.geometry_index.lock() { + geometry_index + .query_point(world_coords, z, zoom) + .map(|geometries| { + geometries + .iter() + .cloned() + .cloned() + .collect::>>() + }) + } else { + unimplemented!() + } + } +} diff --git a/maplibre/src/stages/populate_tile_store_stage.rs b/maplibre/src/stages/populate_tile_store_stage.rs index 51e4d4ef..9d0bb961 100644 --- a/maplibre/src/stages/populate_tile_store_stage.rs +++ b/maplibre/src/stages/populate_tile_store_stage.rs @@ -1,10 +1,8 @@ //! Receives data from async threads and populates the [`crate::io::tile_cache::TileCache`]. use crate::context::MapContext; -use crate::io::{TessellateMessage, TileTessellateMessage}; use crate::schedule::Stage; -use crate::stages::shared_thread_state::SharedThreadState; -use crate::stages::MessageReceiver; +use crate::stages::{MessageReceiver, SharedThreadState, TessellateMessage, TileTessellateMessage}; use std::sync::mpsc; pub struct PopulateTileStore { diff --git a/maplibre/src/stages/request_stage.rs b/maplibre/src/stages/request_stage.rs index d2240e98..d4756672 100644 --- a/maplibre/src/stages/request_stage.rs +++ b/maplibre/src/stages/request_stage.rs @@ -7,7 +7,7 @@ use crate::io::source_client::{HttpSourceClient, SourceClient}; use crate::io::tile_cache::TileCache; use crate::io::TileRequest; use crate::schedule::Stage; -use crate::stages::shared_thread_state::SharedThreadState; +use crate::stages::SharedThreadState; use crate::{HttpClient, ScheduleMethod, Style}; use std::collections::HashSet; diff --git a/maplibre/src/stages/shared_thread_state.rs b/maplibre/src/stages/shared_thread_state.rs deleted file mode 100644 index 97e9fdeb..00000000 --- a/maplibre/src/stages/shared_thread_state.rs +++ /dev/null @@ -1,170 +0,0 @@ -//! Shared thread state. - -use crate::coords::{WorldCoords, WorldTileCoords, Zoom}; -use crate::error::Error; -use crate::io::geometry_index::{GeometryIndex, IndexProcessor, IndexedGeometry, TileIndex}; -use crate::io::tile_request_state::TileRequestState; -use crate::io::{ - LayerTessellateMessage, TessellateMessage, TileRequest, TileRequestID, TileTessellateMessage, -}; - -use std::collections::HashSet; - -use crate::tessellation::zero_tessellator::ZeroTessellator; - -use geozero::GeozeroDatasource; -use prost::Message; -use std::sync::{mpsc, Arc, Mutex}; - -/// Stores and provides access to the thread safe data shared between the schedulers. -#[derive(Clone)] -pub struct SharedThreadState { - pub tile_request_state: Arc>, - pub message_sender: mpsc::Sender, - pub geometry_index: Arc>, -} - -impl SharedThreadState { - fn get_tile_request(&self, request_id: TileRequestID) -> Option { - self.tile_request_state - .lock() - .ok() - .and_then(|tile_request_state| tile_request_state.get_tile_request(request_id).cloned()) - } - - #[tracing::instrument(skip_all)] - pub fn process_tile(&self, request_id: TileRequestID, data: Box<[u8]>) -> Result<(), Error> { - if let Some(tile_request) = self.get_tile_request(request_id) { - let coords = tile_request.coords; - - tracing::info!("parsing tile {} with {}bytes", &coords, data.len()); - - let _span_ = tracing::span!(tracing::Level::TRACE, "parse_tile_bytes").entered(); - - let mut tile = geozero::mvt::Tile::decode(data.as_ref()).expect("failed to load tile"); - - let index = IndexProcessor::new(); - - for layer in &mut tile.layers { - let cloned_layer = layer.clone(); - let layer_name: &str = &cloned_layer.name; - if !tile_request.layers.contains(layer_name) { - continue; - } - - tracing::info!("layer {} at {} ready", layer_name, &coords); - - let mut tessellator = ZeroTessellator::default(); - if let Err(e) = layer.process(&mut tessellator) { - self.message_sender.send(TessellateMessage::Layer( - LayerTessellateMessage::UnavailableLayer { - coords, - layer_name: layer_name.to_owned(), - }, - ))?; - - tracing::error!( - "layer {} at {} tesselation failed {:?}", - layer_name, - &coords, - e - ); - } else { - self.message_sender.send(TessellateMessage::Layer( - LayerTessellateMessage::TessellatedLayer { - coords, - buffer: tessellator.buffer.into(), - feature_indices: tessellator.feature_indices, - layer_data: cloned_layer, - }, - ))?; - } - - // TODO - // layer.process(&mut index).unwrap(); - } - - let available_layers: HashSet<_> = tile - .layers - .iter() - .map(|layer| layer.name.clone()) - .collect::>(); - - for missing_layer in tile_request.layers.difference(&available_layers) { - self.message_sender.send(TessellateMessage::Layer( - LayerTessellateMessage::UnavailableLayer { - coords, - layer_name: missing_layer.to_owned(), - }, - ))?; - - tracing::info!( - "requested layer {} at {} not found in tile", - missing_layer, - &coords - ); - } - - tracing::info!("tile tessellated at {} finished", &tile_request.coords); - - self.message_sender - .send(TessellateMessage::Tile(TileTessellateMessage { - request_id, - coords: tile_request.coords, - }))?; - - if let Ok(mut geometry_index) = self.geometry_index.lock() { - geometry_index.index_tile( - &coords, - TileIndex::Linear { - list: index.get_geometries(), - }, - ) - } - } - - Ok(()) - } - - pub fn tile_unavailable( - &self, - coords: &WorldTileCoords, - request_id: TileRequestID, - ) -> Result<(), Error> { - if let Some(tile_request) = self.get_tile_request(request_id) { - for to_load in &tile_request.layers { - tracing::warn!("layer {} at {} unavailable", to_load, coords); - self.message_sender.send(TessellateMessage::Layer( - LayerTessellateMessage::UnavailableLayer { - coords: tile_request.coords, - layer_name: to_load.to_string(), - }, - ))?; - } - } - - Ok(()) - } - - #[tracing::instrument(skip_all)] - pub fn query_point( - &self, - world_coords: &WorldCoords, - z: u8, - zoom: Zoom, - ) -> Option>> { - if let Ok(geometry_index) = self.geometry_index.lock() { - geometry_index - .query_point(world_coords, z, zoom) - .map(|geometries| { - geometries - .iter() - .cloned() - .cloned() - .collect::>>() - }) - } else { - unimplemented!() - } - } -}