Refactor thread state and pipeline

This commit is contained in:
Maximilian Ammann 2022-05-31 14:20:52 +02:00
parent ad6a0be20d
commit c40f39a72c
16 changed files with 426 additions and 456 deletions

View File

@ -2,12 +2,12 @@ use geozero::mvt::tile;
use maplibre::benchmarking::tessellation::{IndexDataType, OverAlignedVertexBuffer}; use maplibre::benchmarking::tessellation::{IndexDataType, OverAlignedVertexBuffer};
use maplibre::coords::WorldTileCoords; use maplibre::coords::WorldTileCoords;
use maplibre::error::Error; use maplibre::error::Error;
use maplibre::io::pipeline::steps::build_vector_tile_pipeline;
use maplibre::io::pipeline::Processable; use maplibre::io::pipeline::Processable;
use maplibre::io::pipeline::{PipelineContext, PipelineProcessor}; use maplibre::io::pipeline::{PipelineContext, PipelineProcessor};
use maplibre::io::pipeline_steps::build_vector_tile_pipeline;
use maplibre::io::scheduler::ScheduleMethod; use maplibre::io::scheduler::ScheduleMethod;
use maplibre::io::source_client::{HttpClient, HttpSourceClient}; use maplibre::io::source_client::{HttpClient, HttpSourceClient};
use maplibre::io::{LayerTessellateMessage, TileRequest, TileRequestID}; use maplibre::io::{TileRequest, TileRequestID};
use maplibre::map_schedule::{EventuallyMapContext, InteractiveMapSchedule}; use maplibre::map_schedule::{EventuallyMapContext, InteractiveMapSchedule};
use maplibre::platform::http_client::ReqwestHttpClient; use maplibre::platform::http_client::ReqwestHttpClient;
use maplibre::platform::run_multithreaded; use maplibre::platform::run_multithreaded;
@ -65,9 +65,17 @@ fn run_in_window() {
}) })
} }
struct TessellatedLayer {
coords: WorldTileCoords,
buffer: OverAlignedVertexBuffer<ShaderVertex, IndexDataType>,
/// Holds for each feature the count of indices.
feature_indices: Vec<u32>,
layer_data: tile::Layer,
}
#[derive(Default)] #[derive(Default)]
struct HeadlessPipelineProcessor { struct HeadlessPipelineProcessor {
layers: Vec<LayerTessellateMessage>, layers: Vec<TessellatedLayer>,
} }
impl PipelineProcessor for HeadlessPipelineProcessor { impl PipelineProcessor for HeadlessPipelineProcessor {
@ -82,7 +90,7 @@ impl PipelineProcessor for HeadlessPipelineProcessor {
feature_indices: Vec<u32>, feature_indices: Vec<u32>,
layer_data: tile::Layer, layer_data: tile::Layer,
) { ) {
self.layers.push(LayerTessellateMessage::TessellatedLayer { self.layers.push(TessellatedLayer {
coords: *coords, coords: *coords,
buffer, buffer,
feature_indices, feature_indices,
@ -148,7 +156,7 @@ fn run_headless() {
map.map_schedule_mut() map.map_schedule_mut()
.map_context .map_context
.tile_cache .tile_cache
.put_tessellated_layer(v); .put_tessellated_layer_(v.coords, v.buffer, v.feature_indices, v.layer_data);
} }
match map.map_schedule_mut().update_and_redraw() { match map.map_schedule_mut().update_and_redraw() {

View File

@ -1,6 +1,5 @@
use crate::coords::{Zoom, TILE_SIZE}; use crate::coords::{Zoom, TILE_SIZE};
use crate::io::tile_cache::TileCache; use crate::io::tile_cache::TileCache;
use crate::io::TessellateMessage;
use crate::render::camera::{Camera, Perspective, ViewProjection}; use crate::render::camera::{Camera, Perspective, ViewProjection};
use crate::util::ChangeObserver; use crate::util::ChangeObserver;
use crate::{Renderer, ScheduleMethod, Style, WindowSize}; use crate::{Renderer, ScheduleMethod, Style, WindowSize};

View File

@ -15,83 +15,10 @@ pub mod static_tile_fetcher;
pub mod geometry_index; pub mod geometry_index;
pub mod pipeline; pub mod pipeline;
pub mod pipeline_steps;
pub mod tile_cache; pub mod tile_cache;
pub mod tile_request_state; pub mod tile_request_state;
/// Contains a `Tile` if the fetch was successful otherwise `Unavailable`.
pub enum TileFetchResult {
Unavailable {
coords: WorldTileCoords,
},
Tile {
coords: WorldTileCoords,
data: Box<[u8]>,
},
}
impl fmt::Debug for TileFetchResult {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"TileFetchResult({})",
match self {
TileFetchResult::Unavailable { coords, .. } => coords,
TileFetchResult::Tile { coords, .. } => coords,
}
)
}
}
/// [crate::io::TileTessellateMessage] or [crate::io::LayerTessellateMessage] tessellation message.
pub enum TessellateMessage {
Tile(TileTessellateMessage),
Layer(LayerTessellateMessage),
}
/// The result of the tessellation of a tile.
pub struct TileTessellateMessage {
pub request_id: TileRequestID,
pub coords: WorldTileCoords,
}
/// `TessellatedLayer` contains the result of the tessellation for a specific layer, otherwise
/// `UnavailableLayer` if the layer doesn't exist.
pub enum LayerTessellateMessage {
UnavailableLayer {
coords: WorldTileCoords,
layer_name: String,
},
TessellatedLayer {
coords: WorldTileCoords,
buffer: OverAlignedVertexBuffer<ShaderVertex, IndexDataType>,
/// Holds for each feature the count of indices.
feature_indices: Vec<u32>,
layer_data: tile::Layer,
},
}
impl fmt::Debug for LayerTessellateMessage {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "LayerTessellateMessage{}", self.get_coords())
}
}
impl LayerTessellateMessage {
pub fn get_coords(&self) -> WorldTileCoords {
match self {
LayerTessellateMessage::UnavailableLayer { coords, .. } => *coords,
LayerTessellateMessage::TessellatedLayer { coords, .. } => *coords,
}
}
pub fn layer_name(&self) -> &str {
match self {
LayerTessellateMessage::UnavailableLayer { layer_name, .. } => layer_name.as_str(),
LayerTessellateMessage::TessellatedLayer { layer_data, .. } => &layer_data.name,
}
}
}
/// A request for a tile at the given coordinates and in the given layers. /// A request for a tile at the given coordinates and in the given layers.
#[derive(Clone)] #[derive(Clone)]
pub struct TileRequest { pub struct TileRequest {

View File

@ -1,5 +1,6 @@
use crate::coords::WorldTileCoords; use crate::coords::WorldTileCoords;
use crate::io::{LayerTessellateMessage, TessellateMessage, TileRequestID, TileTessellateMessage}; use crate::io::geometry_index::IndexedGeometry;
use crate::io::TileRequestID;
use crate::render::ShaderVertex; use crate::render::ShaderVertex;
use crate::tessellation::{IndexDataType, OverAlignedVertexBuffer}; use crate::tessellation::{IndexDataType, OverAlignedVertexBuffer};
use downcast_rs::{impl_downcast, Downcast}; use downcast_rs::{impl_downcast, Downcast};
@ -10,8 +11,8 @@ use std::process::Output;
use std::sync::mpsc; use std::sync::mpsc;
pub trait PipelineProcessor: Downcast { pub trait PipelineProcessor: Downcast {
fn finished_tile_tesselation(&mut self, request_id: TileRequestID, coords: &WorldTileCoords); fn finished_tile_tesselation(&mut self, request_id: TileRequestID, coords: &WorldTileCoords) {}
fn unavailable_layer(&mut self, coords: &WorldTileCoords, layer_name: &str); fn unavailable_layer(&mut self, coords: &WorldTileCoords, layer_name: &str) {}
fn finished_layer_tesselation( fn finished_layer_tesselation(
&mut self, &mut self,
coords: &WorldTileCoords, coords: &WorldTileCoords,
@ -19,53 +20,18 @@ pub trait PipelineProcessor: Downcast {
// Holds for each feature the count of indices. // Holds for each feature the count of indices.
feature_indices: Vec<u32>, feature_indices: Vec<u32>,
layer_data: tile::Layer, layer_data: tile::Layer,
); ) {
}
fn finished_layer_indexing(
&mut self,
coords: &WorldTileCoords,
geometries: Vec<IndexedGeometry<f64>>,
) {
}
} }
impl_downcast!(PipelineProcessor); impl_downcast!(PipelineProcessor);
pub struct HeadedPipelineProcessor {
pub message_sender: mpsc::Sender<TessellateMessage>,
}
impl PipelineProcessor for HeadedPipelineProcessor {
fn finished_tile_tesselation(&mut self, request_id: TileRequestID, coords: &WorldTileCoords) {
self.message_sender
.send(TessellateMessage::Tile(TileTessellateMessage {
request_id,
coords: *coords,
}))
.unwrap();
}
fn unavailable_layer(&mut self, coords: &WorldTileCoords, layer_name: &str) {
self.message_sender.send(TessellateMessage::Layer(
LayerTessellateMessage::UnavailableLayer {
coords: *coords,
layer_name: layer_name.to_owned(),
},
));
}
fn finished_layer_tesselation(
&mut self,
coords: &WorldTileCoords,
buffer: OverAlignedVertexBuffer<ShaderVertex, IndexDataType>,
feature_indices: Vec<u32>,
layer_data: tile::Layer,
) {
self.message_sender
.send(TessellateMessage::Layer(
LayerTessellateMessage::TessellatedLayer {
coords: *coords,
buffer,
feature_indices,
layer_data,
},
))
.unwrap();
}
}
pub struct PipelineContext { pub struct PipelineContext {
pub processor: Box<dyn PipelineProcessor>, pub processor: Box<dyn PipelineProcessor>,
} }
@ -92,6 +58,16 @@ where
next: N, next: N,
} }
impl<P, N> PipelineStep<P, N>
where
P: Processable,
N: Processable<Input = P::Output>,
{
pub fn new(process: P, next: N) -> Self {
Self { process, next }
}
}
impl<P, N> Processable for PipelineStep<P, N> impl<P, N> Processable for PipelineStep<P, N>
where where
P: Processable, P: Processable,
@ -106,11 +82,18 @@ where
} }
} }
#[derive(Default)]
pub struct EndStep<I> { pub struct EndStep<I> {
phantom: PhantomData<I>, phantom: PhantomData<I>,
} }
impl<I> Default for EndStep<I> {
fn default() -> Self {
Self {
phantom: PhantomData::default(),
}
}
}
impl<I> Processable for EndStep<I> { impl<I> Processable for EndStep<I> {
type Input = I; type Input = I;
type Output = I; type Output = I;
@ -185,149 +168,18 @@ impl<I, O> Processable for Closure2Processable<I, O> {
} }
} }
pub mod steps {
use crate::io::pipeline::{EndStep, PipelineContext, PipelineStep, Processable};
use crate::io::{TileRequest, TileRequestID};
use crate::tessellation::zero_tessellator::ZeroTessellator;
use crate::tessellation::IndexDataType;
use geozero::GeozeroDatasource;
use prost::Message;
use std::collections::HashSet;
pub struct ParseTileStep {}
impl Processable for ParseTileStep {
type Input = (TileRequest, TileRequestID, Box<[u8]>);
type Output = (TileRequest, TileRequestID, geozero::mvt::Tile);
fn process(
&self,
(tile_request, request_id, data): Self::Input,
_context: &mut PipelineContext,
) -> Self::Output {
let tile = geozero::mvt::Tile::decode(data.as_ref()).expect("failed to load tile");
(tile_request, request_id, tile)
}
}
pub struct IndexLayerStep {}
pub struct TessellateLayerStep {}
impl Processable for TessellateLayerStep {
type Input = (TileRequest, TileRequestID, geozero::mvt::Tile);
type Output = ();
fn process(
&self,
(tile_request, request_id, mut tile): Self::Input,
context: &mut PipelineContext,
) -> Self::Output {
let coords = &tile_request.coords;
for layer in &mut tile.layers {
let cloned_layer = layer.clone();
let layer_name: &str = &cloned_layer.name;
if !tile_request.layers.contains(layer_name) {
continue;
}
tracing::info!("layer {} at {} ready", layer_name, coords);
let mut tessellator = ZeroTessellator::<IndexDataType>::default();
if let Err(e) = layer.process(&mut tessellator) {
context.processor.unavailable_layer(coords, layer_name);
tracing::error!(
"layer {} at {} tesselation failed {:?}",
layer_name,
&coords,
e
);
} else {
context.processor.finished_layer_tesselation(
coords,
tessellator.buffer.into(),
tessellator.feature_indices,
cloned_layer,
)
}
}
let available_layers: HashSet<_> = tile
.layers
.iter()
.map(|layer| layer.name.clone())
.collect::<HashSet<_>>();
for missing_layer in tile_request.layers.difference(&available_layers) {
context.processor.unavailable_layer(coords, missing_layer);
tracing::info!(
"requested layer {} at {} not found in tile",
missing_layer,
&coords
);
}
tracing::info!("tile tessellated at {} finished", &tile_request.coords);
context
.processor
.finished_tile_tesselation(request_id, &tile_request.coords);
}
}
pub fn build_vector_tile_pipeline(
) -> impl Processable<Input = <ParseTileStep as Processable>::Input> {
PipelineStep {
process: ParseTileStep {},
next: PipelineStep {
process: TessellateLayerStep {},
next: EndStep::default(),
},
}
}
#[cfg(test)]
mod tests {
use crate::io::pipeline::steps::build_vector_tile_pipeline;
use crate::io::pipeline::{HeadedPipelineProcessor, PipelineContext, Processable};
use crate::io::TileRequest;
use std::sync::mpsc;
#[test]
fn test() {
let mut context = PipelineContext {
processor: Box::new(HeadedPipelineProcessor {
message_sender: mpsc::channel().0,
}),
};
let pipeline = build_vector_tile_pipeline();
let output = pipeline.process(
(
TileRequest {
coords: (0, 0, 0).into(),
layers: Default::default(),
},
0,
Box::new([0]),
),
&mut context,
);
}
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::io::pipeline::{ use crate::io::pipeline::{
Closure2Processable, ClosureProcessable, EndStep, FnProcessable, HeadedPipelineProcessor, Closure2Processable, ClosureProcessable, EndStep, FnProcessable, HeadedPipelineProcessor,
PipelineContext, PipelineStep, Processable, PipelineContext, PipelineProcessor, PipelineStep, Processable,
}; };
use std::sync::mpsc; use std::sync::mpsc;
pub struct DummyPipelineProcessor;
impl PipelineProcessor for DummyPipelineProcessor {}
fn add_one(input: u32, context: &mut PipelineContext) -> u8 { fn add_one(input: u32, context: &mut PipelineContext) -> u8 {
input as u8 + 1 input as u8 + 1
} }
@ -339,9 +191,7 @@ mod tests {
#[test] #[test]
fn test() { fn test() {
let mut context = PipelineContext { let mut context = PipelineContext {
processor: Box::new(HeadedPipelineProcessor { processor: Box::new(DummyPipelineProcessor),
message_sender: mpsc::channel().0,
}),
}; };
let output: u32 = PipelineStep { let output: u32 = PipelineStep {
process: FnProcessable { process: FnProcessable {

View File

@ -0,0 +1,153 @@
use crate::io::geometry_index::IndexProcessor;
use crate::io::pipeline::{EndStep, PipelineContext, PipelineStep, Processable};
use crate::io::{TileRequest, TileRequestID};
use crate::tessellation::zero_tessellator::ZeroTessellator;
use crate::tessellation::IndexDataType;
use geozero::GeozeroDatasource;
use prost::Message;
use std::collections::HashSet;
pub struct ParseTileStep;
impl Processable for ParseTileStep {
type Input = (TileRequest, TileRequestID, Box<[u8]>);
type Output = (TileRequest, TileRequestID, geozero::mvt::Tile);
// TODO (perf): Maybe force inline
fn process(
&self,
(tile_request, request_id, data): Self::Input,
_context: &mut PipelineContext,
) -> Self::Output {
let tile = geozero::mvt::Tile::decode(data.as_ref()).expect("failed to load tile");
(tile_request, request_id, tile)
}
}
pub struct IndexLayerStep;
impl Processable for IndexLayerStep {
type Input = (TileRequest, TileRequestID, geozero::mvt::Tile);
type Output = (TileRequest, TileRequestID, geozero::mvt::Tile);
// TODO (perf): Maybe force inline
fn process(
&self,
(tile_request, request_id, tile): Self::Input,
context: &mut PipelineContext,
) -> Self::Output {
let index = IndexProcessor::new();
context
.processor
.finished_layer_indexing(&tile_request.coords, index.get_geometries());
(tile_request, request_id, tile)
}
}
pub struct TessellateLayerStep;
impl Processable for TessellateLayerStep {
type Input = (TileRequest, TileRequestID, geozero::mvt::Tile);
type Output = (TileRequest, TileRequestID, geozero::mvt::Tile);
// TODO (perf): Maybe force inline
fn process(
&self,
(tile_request, request_id, mut tile): Self::Input,
context: &mut PipelineContext,
) -> Self::Output {
let coords = &tile_request.coords;
for layer in &mut tile.layers {
let cloned_layer = layer.clone();
let layer_name: &str = &cloned_layer.name;
if !tile_request.layers.contains(layer_name) {
continue;
}
tracing::info!("layer {} at {} ready", layer_name, coords);
let mut tessellator = ZeroTessellator::<IndexDataType>::default();
if let Err(e) = layer.process(&mut tessellator) {
context.processor.unavailable_layer(coords, layer_name);
tracing::error!(
"layer {} at {} tesselation failed {:?}",
layer_name,
&coords,
e
);
} else {
context.processor.finished_layer_tesselation(
coords,
tessellator.buffer.into(),
tessellator.feature_indices,
cloned_layer,
)
}
}
let available_layers: HashSet<_> = tile
.layers
.iter()
.map(|layer| layer.name.clone())
.collect::<HashSet<_>>();
for missing_layer in tile_request.layers.difference(&available_layers) {
context.processor.unavailable_layer(coords, missing_layer);
tracing::info!(
"requested layer {} at {} not found in tile",
missing_layer,
&coords
);
}
tracing::info!("tile tessellated at {} finished", &tile_request.coords);
context
.processor
.finished_tile_tesselation(request_id, &tile_request.coords);
(tile_request, request_id, tile)
}
}
pub fn build_vector_tile_pipeline(
) -> impl Processable<Input = <ParseTileStep as Processable>::Input> {
PipelineStep::new(
ParseTileStep,
PipelineStep::new(TessellateLayerStep, EndStep::default()),
)
}
#[cfg(test)]
mod tests {
use super::build_vector_tile_pipeline;
use crate::io::pipeline::{PipelineContext, PipelineProcessor, Processable};
use crate::io::TileRequest;
pub struct DummyPipelineProcessor;
impl PipelineProcessor for DummyPipelineProcessor {}
#[test]
fn test() {
let mut context = PipelineContext {
processor: Box::new(DummyPipelineProcessor),
};
let pipeline = build_vector_tile_pipeline();
let output = pipeline.process(
(
TileRequest {
coords: (0, 0, 0).into(),
layers: Default::default(),
},
0,
Box::new([0]),
),
&mut context,
);
}
}

View File

@ -1,14 +1,15 @@
//! Tile cache. //! Tile cache.
use crate::coords::{Quadkey, WorldTileCoords}; use crate::coords::{Quadkey, WorldTileCoords};
use crate::render::ShaderVertex;
use crate::io::LayerTessellateMessage; use crate::stages::LayerTessellateMessage;
use crate::tessellation::{IndexDataType, OverAlignedVertexBuffer};
use geozero::mvt::tile;
use std::collections::{btree_map, BTreeMap, HashSet}; use std::collections::{btree_map, BTreeMap, HashSet};
/// Stores the multiple [crate::io::LayerTessellateMessage] of a cached tile. /// Stores the multiple [crate::io::LayerTessellateMessage] of a cached tile.
pub struct CachedTile { pub struct CachedTile {
layers: Vec<LayerTessellateMessage>, layers: Vec<LayerTessellateMessage>, // TODO: Changen type here, its no message
} }
impl CachedTile { impl CachedTile {
@ -22,6 +23,7 @@ impl CachedTile {
/// Stores and provides access to a quad tree of cached tiles with world tile coords. /// Stores and provides access to a quad tree of cached tiles with world tile coords.
#[derive(Default)] #[derive(Default)]
pub struct TileCache { pub struct TileCache {
// TODO: Change name to TileStore
cache: BTreeMap<Quadkey, CachedTile>, cache: BTreeMap<Quadkey, CachedTile>,
} }
@ -32,6 +34,21 @@ impl TileCache {
} }
} }
pub fn put_tessellated_layer_(
&mut self,
coords: WorldTileCoords,
buffer: OverAlignedVertexBuffer<ShaderVertex, IndexDataType>,
feature_indices: Vec<u32>,
layer_data: tile::Layer,
) {
self.put_tessellated_layer(LayerTessellateMessage::TessellatedLayer {
coords,
buffer,
feature_indices,
layer_data,
})
}
/// Inserts a tessellated layer into the quad tree at its world tile coords. /// Inserts a tessellated layer into the quad tree at its world tile coords.
/// If the space is vacant, the tessellated layer is inserted into a new /// If the space is vacant, the tessellated layer is inserted into a new
/// [crate::io::tile_cache::CachedTile]. /// [crate::io::tile_cache::CachedTile].

View File

@ -7,7 +7,6 @@ use crate::io::scheduler::Scheduler;
use crate::io::source_client::{HttpClient, HttpSourceClient, SourceClient}; use crate::io::source_client::{HttpClient, HttpSourceClient, SourceClient};
use crate::io::tile_cache::TileCache; use crate::io::tile_cache::TileCache;
use crate::io::tile_request_state::TileRequestState; use crate::io::tile_request_state::TileRequestState;
use crate::io::TessellateMessage;
use crate::render::register_render_stages; use crate::render::register_render_stages;
use crate::schedule::{Schedule, Stage}; use crate::schedule::{Schedule, Stage};
use crate::stages::register_stages; use crate::stages::register_stages;

View File

@ -3,7 +3,6 @@
use crate::context::MapContext; use crate::context::MapContext;
use crate::coords::{ViewRegion, Zoom}; use crate::coords::{ViewRegion, Zoom};
use crate::io::tile_cache::TileCache; use crate::io::tile_cache::TileCache;
use crate::io::LayerTessellateMessage;
use crate::render::camera::ViewProjection; use crate::render::camera::ViewProjection;
use crate::render::render_phase::RenderPhase; use crate::render::render_phase::RenderPhase;
use crate::render::resource::IndexEntry; use crate::render::resource::IndexEntry;

View File

@ -3,7 +3,6 @@
use crate::context::MapContext; use crate::context::MapContext;
use crate::coords::{ViewRegion, Zoom}; use crate::coords::{ViewRegion, Zoom};
use crate::io::tile_cache::TileCache; use crate::io::tile_cache::TileCache;
use crate::io::LayerTessellateMessage;
use crate::render::camera::ViewProjection; use crate::render::camera::ViewProjection;
use crate::render::render_phase::RenderPhase; use crate::render::render_phase::RenderPhase;
use crate::render::resource::IndexEntry; use crate::render::resource::IndexEntry;

View File

@ -3,7 +3,6 @@
use crate::context::MapContext; use crate::context::MapContext;
use crate::coords::{ViewRegion, Zoom}; use crate::coords::{ViewRegion, Zoom};
use crate::io::tile_cache::TileCache; use crate::io::tile_cache::TileCache;
use crate::io::LayerTessellateMessage;
use crate::render::camera::ViewProjection; use crate::render::camera::ViewProjection;
use crate::render::resource::IndexEntry; use crate::render::resource::IndexEntry;
use crate::render::shaders::{ use crate::render::shaders::{

View File

@ -3,7 +3,6 @@
use crate::context::MapContext; use crate::context::MapContext;
use crate::coords::{ViewRegion, Zoom}; use crate::coords::{ViewRegion, Zoom};
use crate::io::tile_cache::TileCache; use crate::io::tile_cache::TileCache;
use crate::io::LayerTessellateMessage;
use crate::render::camera::ViewProjection; use crate::render::camera::ViewProjection;
use crate::render::resource::IndexEntry; use crate::render::resource::IndexEntry;
use crate::render::shaders::{ use crate::render::shaders::{
@ -14,6 +13,7 @@ use crate::render::util::Eventually::Initialized;
use crate::schedule::Stage; use crate::schedule::Stage;
use crate::{RenderState, Renderer, Style}; use crate::{RenderState, Renderer, Style};
use crate::stages::LayerTessellateMessage;
use std::iter; use std::iter;
#[derive(Default)] #[derive(Default)]

View File

@ -3,7 +3,6 @@
use crate::context::MapContext; use crate::context::MapContext;
use crate::coords::{ViewRegion, Zoom}; use crate::coords::{ViewRegion, Zoom};
use crate::io::tile_cache::TileCache; use crate::io::tile_cache::TileCache;
use crate::io::LayerTessellateMessage;
use crate::render::camera::ViewProjection; use crate::render::camera::ViewProjection;
use crate::render::render_phase::RenderPhase; use crate::render::render_phase::RenderPhase;
use crate::render::resource::{BufferDimensions, BufferedTextureHead, Head, IndexEntry}; use crate::render::resource::{BufferDimensions, BufferedTextureHead, Head, IndexEntry};

View File

@ -1,22 +1,32 @@
//! [Stages](Stage) for requesting and preparing data //! [Stages](Stage) for requesting and preparing data
use crate::coords::{WorldCoords, WorldTileCoords, Zoom};
use crate::error::Error;
use crate::io::geometry_index::GeometryIndex; use crate::io::geometry_index::GeometryIndex;
use crate::io::geometry_index::{IndexProcessor, IndexedGeometry, TileIndex};
use crate::io::pipeline::{PipelineContext, PipelineProcessor};
use crate::io::pipeline_steps::build_vector_tile_pipeline;
use crate::io::source_client::{HttpSourceClient, SourceClient}; use crate::io::source_client::{HttpSourceClient, SourceClient};
use crate::io::tile_request_state::TileRequestState; use crate::io::tile_request_state::TileRequestState;
use crate::io::TessellateMessage; use crate::io::{TileRequest, TileRequestID};
use crate::render::ShaderVertex;
use crate::schedule::Schedule; use crate::schedule::Schedule;
use crate::stages::populate_tile_store_stage::PopulateTileStore; use crate::stages::populate_tile_store_stage::PopulateTileStore;
use crate::stages::shared_thread_state::SharedThreadState; use crate::tessellation::zero_tessellator::ZeroTessellator;
use crate::tessellation::{IndexDataType, OverAlignedVertexBuffer};
use crate::{HttpClient, ScheduleMethod, Scheduler}; use crate::{HttpClient, ScheduleMethod, Scheduler};
use geozero::mvt::tile;
use geozero::GeozeroDatasource;
use prost::Message;
use request_stage::RequestStage; use request_stage::RequestStage;
use std::collections::HashSet;
use std::fmt;
use std::sync::{mpsc, Arc, Mutex}; use std::sync::{mpsc, Arc, Mutex};
use crate::io::pipeline::Processable;
mod populate_tile_store_stage; mod populate_tile_store_stage;
mod request_stage; mod request_stage;
mod shared_thread_state;
pub type MessageSender = mpsc::Sender<TessellateMessage>;
pub type MessageReceiver = mpsc::Receiver<TessellateMessage>;
pub fn register_stages<HC: HttpClient, SM: ScheduleMethod>( pub fn register_stages<HC: HttpClient, SM: ScheduleMethod>(
schedule: &mut Schedule, schedule: &mut Schedule,
@ -41,3 +51,186 @@ pub fn register_stages<HC: HttpClient, SM: ScheduleMethod>(
PopulateTileStore::new(shared_thread_state, message_receiver), PopulateTileStore::new(shared_thread_state, message_receiver),
); );
} }
type MessageSender = mpsc::Sender<TessellateMessage>;
type MessageReceiver = mpsc::Receiver<TessellateMessage>;
/// [crate::io::TileTessellateMessage] or [crate::io::LayerTessellateMessage] tessellation message.
enum TessellateMessage {
Tile(TileTessellateMessage),
Layer(LayerTessellateMessage),
}
/// The result of the tessellation of a tile.
struct TileTessellateMessage {
pub request_id: TileRequestID,
pub coords: WorldTileCoords,
}
/// `TessellatedLayer` contains the result of the tessellation for a specific layer, otherwise
/// `UnavailableLayer` if the layer doesn't exist.
enum LayerTessellateMessage {
UnavailableLayer {
coords: WorldTileCoords,
layer_name: String,
},
TessellatedLayer {
coords: WorldTileCoords,
buffer: OverAlignedVertexBuffer<ShaderVertex, IndexDataType>,
/// Holds for each feature the count of indices.
feature_indices: Vec<u32>,
layer_data: tile::Layer,
},
}
impl fmt::Debug for LayerTessellateMessage {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "LayerTessellateMessage{}", self.get_coords())
}
}
impl LayerTessellateMessage {
pub fn get_coords(&self) -> WorldTileCoords {
match self {
LayerTessellateMessage::UnavailableLayer { coords, .. } => *coords,
LayerTessellateMessage::TessellatedLayer { coords, .. } => *coords,
}
}
pub fn layer_name(&self) -> &str {
match self {
LayerTessellateMessage::UnavailableLayer { layer_name, .. } => layer_name.as_str(),
LayerTessellateMessage::TessellatedLayer { layer_data, .. } => &layer_data.name,
}
}
}
pub struct HeadedPipelineProcessor {
state: SharedThreadState,
}
impl PipelineProcessor for HeadedPipelineProcessor {
fn finished_tile_tesselation(&mut self, request_id: TileRequestID, coords: &WorldTileCoords) {
self.state
.message_sender
.send(TessellateMessage::Tile(TileTessellateMessage {
request_id,
coords: *coords,
}))
.unwrap();
}
fn unavailable_layer(&mut self, coords: &WorldTileCoords, layer_name: &str) {
self.state.message_sender.send(TessellateMessage::Layer(
LayerTessellateMessage::UnavailableLayer {
coords: *coords,
layer_name: layer_name.to_owned(),
},
));
}
fn finished_layer_tesselation(
&mut self,
coords: &WorldTileCoords,
buffer: OverAlignedVertexBuffer<ShaderVertex, IndexDataType>,
feature_indices: Vec<u32>,
layer_data: tile::Layer,
) {
self.state
.message_sender
.send(TessellateMessage::Layer(
LayerTessellateMessage::TessellatedLayer {
coords: *coords,
buffer,
feature_indices,
layer_data,
},
))
.unwrap();
}
fn finished_layer_indexing(
&mut self,
coords: &WorldTileCoords,
geometries: Vec<IndexedGeometry<f64>>,
) {
if let Ok(mut geometry_index) = self.state.geometry_index.lock() {
geometry_index.index_tile(&coords, TileIndex::Linear { list: geometries })
}
}
}
/// Stores and provides access to the thread safe data shared between the schedulers.
#[derive(Clone)]
pub struct SharedThreadState {
pub tile_request_state: Arc<Mutex<TileRequestState>>,
pub message_sender: mpsc::Sender<TessellateMessage>,
pub geometry_index: Arc<Mutex<GeometryIndex>>,
}
impl SharedThreadState {
fn get_tile_request(&self, request_id: TileRequestID) -> Option<TileRequest> {
self.tile_request_state
.lock()
.ok()
.and_then(|tile_request_state| tile_request_state.get_tile_request(request_id).cloned())
}
#[tracing::instrument(skip_all)]
pub fn process_tile(&self, request_id: TileRequestID, data: Box<[u8]>) -> Result<(), Error> {
if let Some(tile_request) = self.get_tile_request(request_id) {
let mut processor = HeadedPipelineProcessor {
state: self.clone(),
};
let mut pipeline_context = PipelineContext {
processor: Box::new(processor),
};
let pipeline = build_vector_tile_pipeline();
pipeline.process((tile_request, request_id, data), &mut pipeline_context);
}
Ok(())
}
pub fn tile_unavailable(
&self,
coords: &WorldTileCoords,
request_id: TileRequestID,
) -> Result<(), Error> {
if let Some(tile_request) = self.get_tile_request(request_id) {
for to_load in &tile_request.layers {
tracing::warn!("layer {} at {} unavailable", to_load, coords);
self.message_sender.send(TessellateMessage::Layer(
LayerTessellateMessage::UnavailableLayer {
coords: tile_request.coords,
layer_name: to_load.to_string(),
},
))?;
}
}
Ok(())
}
#[tracing::instrument(skip_all)]
pub fn query_point(
&self,
world_coords: &WorldCoords,
z: u8,
zoom: Zoom,
) -> Option<Vec<IndexedGeometry<f64>>> {
if let Ok(geometry_index) = self.geometry_index.lock() {
geometry_index
.query_point(world_coords, z, zoom)
.map(|geometries| {
geometries
.iter()
.cloned()
.cloned()
.collect::<Vec<IndexedGeometry<f64>>>()
})
} else {
unimplemented!()
}
}
}

View File

@ -1,10 +1,8 @@
//! Receives data from async threads and populates the [`crate::io::tile_cache::TileCache`]. //! Receives data from async threads and populates the [`crate::io::tile_cache::TileCache`].
use crate::context::MapContext; use crate::context::MapContext;
use crate::io::{TessellateMessage, TileTessellateMessage};
use crate::schedule::Stage; use crate::schedule::Stage;
use crate::stages::shared_thread_state::SharedThreadState; use crate::stages::{MessageReceiver, SharedThreadState, TessellateMessage, TileTessellateMessage};
use crate::stages::MessageReceiver;
use std::sync::mpsc; use std::sync::mpsc;
pub struct PopulateTileStore { pub struct PopulateTileStore {

View File

@ -7,7 +7,7 @@ use crate::io::source_client::{HttpSourceClient, SourceClient};
use crate::io::tile_cache::TileCache; use crate::io::tile_cache::TileCache;
use crate::io::TileRequest; use crate::io::TileRequest;
use crate::schedule::Stage; use crate::schedule::Stage;
use crate::stages::shared_thread_state::SharedThreadState; use crate::stages::SharedThreadState;
use crate::{HttpClient, ScheduleMethod, Style}; use crate::{HttpClient, ScheduleMethod, Style};
use std::collections::HashSet; use std::collections::HashSet;

View File

@ -1,170 +0,0 @@
//! Shared thread state.
use crate::coords::{WorldCoords, WorldTileCoords, Zoom};
use crate::error::Error;
use crate::io::geometry_index::{GeometryIndex, IndexProcessor, IndexedGeometry, TileIndex};
use crate::io::tile_request_state::TileRequestState;
use crate::io::{
LayerTessellateMessage, TessellateMessage, TileRequest, TileRequestID, TileTessellateMessage,
};
use std::collections::HashSet;
use crate::tessellation::zero_tessellator::ZeroTessellator;
use geozero::GeozeroDatasource;
use prost::Message;
use std::sync::{mpsc, Arc, Mutex};
/// Stores and provides access to the thread safe data shared between the schedulers.
#[derive(Clone)]
pub struct SharedThreadState {
pub tile_request_state: Arc<Mutex<TileRequestState>>,
pub message_sender: mpsc::Sender<TessellateMessage>,
pub geometry_index: Arc<Mutex<GeometryIndex>>,
}
impl SharedThreadState {
fn get_tile_request(&self, request_id: TileRequestID) -> Option<TileRequest> {
self.tile_request_state
.lock()
.ok()
.and_then(|tile_request_state| tile_request_state.get_tile_request(request_id).cloned())
}
#[tracing::instrument(skip_all)]
pub fn process_tile(&self, request_id: TileRequestID, data: Box<[u8]>) -> Result<(), Error> {
if let Some(tile_request) = self.get_tile_request(request_id) {
let coords = tile_request.coords;
tracing::info!("parsing tile {} with {}bytes", &coords, data.len());
let _span_ = tracing::span!(tracing::Level::TRACE, "parse_tile_bytes").entered();
let mut tile = geozero::mvt::Tile::decode(data.as_ref()).expect("failed to load tile");
let index = IndexProcessor::new();
for layer in &mut tile.layers {
let cloned_layer = layer.clone();
let layer_name: &str = &cloned_layer.name;
if !tile_request.layers.contains(layer_name) {
continue;
}
tracing::info!("layer {} at {} ready", layer_name, &coords);
let mut tessellator = ZeroTessellator::default();
if let Err(e) = layer.process(&mut tessellator) {
self.message_sender.send(TessellateMessage::Layer(
LayerTessellateMessage::UnavailableLayer {
coords,
layer_name: layer_name.to_owned(),
},
))?;
tracing::error!(
"layer {} at {} tesselation failed {:?}",
layer_name,
&coords,
e
);
} else {
self.message_sender.send(TessellateMessage::Layer(
LayerTessellateMessage::TessellatedLayer {
coords,
buffer: tessellator.buffer.into(),
feature_indices: tessellator.feature_indices,
layer_data: cloned_layer,
},
))?;
}
// TODO
// layer.process(&mut index).unwrap();
}
let available_layers: HashSet<_> = tile
.layers
.iter()
.map(|layer| layer.name.clone())
.collect::<HashSet<_>>();
for missing_layer in tile_request.layers.difference(&available_layers) {
self.message_sender.send(TessellateMessage::Layer(
LayerTessellateMessage::UnavailableLayer {
coords,
layer_name: missing_layer.to_owned(),
},
))?;
tracing::info!(
"requested layer {} at {} not found in tile",
missing_layer,
&coords
);
}
tracing::info!("tile tessellated at {} finished", &tile_request.coords);
self.message_sender
.send(TessellateMessage::Tile(TileTessellateMessage {
request_id,
coords: tile_request.coords,
}))?;
if let Ok(mut geometry_index) = self.geometry_index.lock() {
geometry_index.index_tile(
&coords,
TileIndex::Linear {
list: index.get_geometries(),
},
)
}
}
Ok(())
}
pub fn tile_unavailable(
&self,
coords: &WorldTileCoords,
request_id: TileRequestID,
) -> Result<(), Error> {
if let Some(tile_request) = self.get_tile_request(request_id) {
for to_load in &tile_request.layers {
tracing::warn!("layer {} at {} unavailable", to_load, coords);
self.message_sender.send(TessellateMessage::Layer(
LayerTessellateMessage::UnavailableLayer {
coords: tile_request.coords,
layer_name: to_load.to_string(),
},
))?;
}
}
Ok(())
}
#[tracing::instrument(skip_all)]
pub fn query_point(
&self,
world_coords: &WorldCoords,
z: u8,
zoom: Zoom,
) -> Option<Vec<IndexedGeometry<f64>>> {
if let Ok(geometry_index) = self.geometry_index.lock() {
geometry_index
.query_point(world_coords, z, zoom)
.map(|geometries| {
geometries
.iter()
.cloned()
.cloned()
.collect::<Vec<IndexedGeometry<f64>>>()
})
} else {
unimplemented!()
}
}
}