mirror of
https://github.com/maplibre/maplibre-rs.git
synced 2025-12-08 19:05:57 +00:00
Refactor thread state and pipeline
This commit is contained in:
parent
c40f39a72c
commit
deb442287e
@ -7,6 +7,7 @@ use maplibre::io::pipeline::{PipelineContext, PipelineProcessor};
|
||||
use maplibre::io::pipeline_steps::build_vector_tile_pipeline;
|
||||
use maplibre::io::scheduler::ScheduleMethod;
|
||||
use maplibre::io::source_client::{HttpClient, HttpSourceClient};
|
||||
use maplibre::io::tile_repository::StoredLayer;
|
||||
use maplibre::io::{TileRequest, TileRequestID};
|
||||
use maplibre::map_schedule::{EventuallyMapContext, InteractiveMapSchedule};
|
||||
use maplibre::platform::http_client::ReqwestHttpClient;
|
||||
@ -65,24 +66,12 @@ fn run_in_window() {
|
||||
})
|
||||
}
|
||||
|
||||
struct TessellatedLayer {
|
||||
coords: WorldTileCoords,
|
||||
buffer: OverAlignedVertexBuffer<ShaderVertex, IndexDataType>,
|
||||
/// Holds for each feature the count of indices.
|
||||
feature_indices: Vec<u32>,
|
||||
layer_data: tile::Layer,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct HeadlessPipelineProcessor {
|
||||
layers: Vec<TessellatedLayer>,
|
||||
layers: Vec<StoredLayer>,
|
||||
}
|
||||
|
||||
impl PipelineProcessor for HeadlessPipelineProcessor {
|
||||
fn finished_tile_tesselation(&mut self, request_id: TileRequestID, coords: &WorldTileCoords) {}
|
||||
|
||||
fn unavailable_layer(&mut self, coords: &WorldTileCoords, layer_name: &str) {}
|
||||
|
||||
fn finished_layer_tesselation(
|
||||
&mut self,
|
||||
coords: &WorldTileCoords,
|
||||
@ -90,7 +79,7 @@ impl PipelineProcessor for HeadlessPipelineProcessor {
|
||||
feature_indices: Vec<u32>,
|
||||
layer_data: tile::Layer,
|
||||
) {
|
||||
self.layers.push(TessellatedLayer {
|
||||
self.layers.push(StoredLayer::TessellatedLayer {
|
||||
coords: *coords,
|
||||
buffer,
|
||||
feature_indices,
|
||||
@ -127,7 +116,7 @@ fn run_headless() {
|
||||
.unwrap()
|
||||
.into_boxed_slice();
|
||||
|
||||
let mut processor = HeadlessPipelineProcessor::default();
|
||||
let processor = HeadlessPipelineProcessor::default();
|
||||
let mut pipeline_context = PipelineContext {
|
||||
processor: Box::new(processor),
|
||||
};
|
||||
@ -155,8 +144,8 @@ fn run_headless() {
|
||||
{
|
||||
map.map_schedule_mut()
|
||||
.map_context
|
||||
.tile_cache
|
||||
.put_tessellated_layer_(v.coords, v.buffer, v.feature_indices, v.layer_data);
|
||||
.tile_repository
|
||||
.put_tessellated_layer(v);
|
||||
}
|
||||
|
||||
match map.map_schedule_mut().update_and_redraw() {
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
use crate::coords::{Zoom, TILE_SIZE};
|
||||
use crate::io::tile_cache::TileCache;
|
||||
use crate::io::tile_repository::TileRepository;
|
||||
use crate::render::camera::{Camera, Perspective, ViewProjection};
|
||||
use crate::util::ChangeObserver;
|
||||
use crate::{Renderer, ScheduleMethod, Style, WindowSize};
|
||||
@ -59,6 +59,6 @@ pub struct MapContext {
|
||||
pub view_state: ViewState,
|
||||
pub style: Style,
|
||||
|
||||
pub tile_cache: TileCache,
|
||||
pub tile_repository: TileRepository,
|
||||
pub renderer: Renderer,
|
||||
}
|
||||
|
||||
@ -16,7 +16,7 @@ pub mod static_tile_fetcher;
|
||||
pub mod geometry_index;
|
||||
pub mod pipeline;
|
||||
pub mod pipeline_steps;
|
||||
pub mod tile_cache;
|
||||
pub mod tile_repository;
|
||||
pub mod tile_request_state;
|
||||
|
||||
/// A request for a tile at the given coordinates and in the given layers.
|
||||
|
||||
@ -2,18 +2,48 @@
|
||||
|
||||
use crate::coords::{Quadkey, WorldTileCoords};
|
||||
use crate::render::ShaderVertex;
|
||||
use crate::stages::LayerTessellateMessage;
|
||||
use crate::tessellation::{IndexDataType, OverAlignedVertexBuffer};
|
||||
use geozero::mvt::tile;
|
||||
use std::collections::{btree_map, BTreeMap, HashSet};
|
||||
|
||||
/// Stores the multiple [crate::io::LayerTessellateMessage] of a cached tile.
|
||||
pub struct CachedTile {
|
||||
layers: Vec<LayerTessellateMessage>, // TODO: Changen type here, its no message
|
||||
/// A layer which is stored for future use.
|
||||
pub enum StoredLayer {
|
||||
UnavailableLayer {
|
||||
coords: WorldTileCoords,
|
||||
layer_name: String,
|
||||
},
|
||||
TessellatedLayer {
|
||||
coords: WorldTileCoords,
|
||||
buffer: OverAlignedVertexBuffer<ShaderVertex, IndexDataType>,
|
||||
/// Holds for each feature the count of indices.
|
||||
feature_indices: Vec<u32>,
|
||||
layer_data: tile::Layer,
|
||||
},
|
||||
}
|
||||
|
||||
impl CachedTile {
|
||||
pub fn new(first_layer: LayerTessellateMessage) -> Self {
|
||||
impl StoredLayer {
|
||||
pub fn get_coords(&self) -> WorldTileCoords {
|
||||
match self {
|
||||
StoredLayer::UnavailableLayer { coords, .. } => *coords,
|
||||
StoredLayer::TessellatedLayer { coords, .. } => *coords,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn layer_name(&self) -> &str {
|
||||
match self {
|
||||
StoredLayer::UnavailableLayer { layer_name, .. } => layer_name.as_str(),
|
||||
StoredLayer::TessellatedLayer { layer_data, .. } => &layer_data.name,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Stores multiple [StoredLayers](StoredLayer).
|
||||
pub struct StoredTile {
|
||||
layers: Vec<StoredLayer>,
|
||||
}
|
||||
|
||||
impl StoredTile {
|
||||
pub fn new(first_layer: StoredLayer) -> Self {
|
||||
Self {
|
||||
layers: vec![first_layer],
|
||||
}
|
||||
@ -22,50 +52,34 @@ impl CachedTile {
|
||||
|
||||
/// Stores and provides access to a quad tree of cached tiles with world tile coords.
|
||||
#[derive(Default)]
|
||||
pub struct TileCache {
|
||||
// TODO: Change name to TileStore
|
||||
cache: BTreeMap<Quadkey, CachedTile>,
|
||||
pub struct TileRepository {
|
||||
tree: BTreeMap<Quadkey, StoredTile>,
|
||||
}
|
||||
|
||||
impl TileCache {
|
||||
impl TileRepository {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
cache: BTreeMap::new(),
|
||||
tree: BTreeMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn put_tessellated_layer_(
|
||||
&mut self,
|
||||
coords: WorldTileCoords,
|
||||
buffer: OverAlignedVertexBuffer<ShaderVertex, IndexDataType>,
|
||||
feature_indices: Vec<u32>,
|
||||
layer_data: tile::Layer,
|
||||
) {
|
||||
self.put_tessellated_layer(LayerTessellateMessage::TessellatedLayer {
|
||||
coords,
|
||||
buffer,
|
||||
feature_indices,
|
||||
layer_data,
|
||||
})
|
||||
}
|
||||
|
||||
/// Inserts a tessellated layer into the quad tree at its world tile coords.
|
||||
/// If the space is vacant, the tessellated layer is inserted into a new
|
||||
/// [crate::io::tile_cache::CachedTile].
|
||||
/// [crate::io::tile_repository::CachedTile].
|
||||
/// If the space is occupied, the tessellated layer is added to the current
|
||||
/// [crate::io::tile_cache::CachedTile].
|
||||
pub fn put_tessellated_layer(&mut self, message: LayerTessellateMessage) {
|
||||
if let Some(entry) = message
|
||||
/// [crate::io::tile_repository::CachedTile].
|
||||
pub fn put_tessellated_layer(&mut self, layer: StoredLayer) {
|
||||
if let Some(entry) = layer
|
||||
.get_coords()
|
||||
.build_quad_key()
|
||||
.map(|key| self.cache.entry(key))
|
||||
.map(|key| self.tree.entry(key))
|
||||
{
|
||||
match entry {
|
||||
btree_map::Entry::Vacant(entry) => {
|
||||
entry.insert(CachedTile::new(message));
|
||||
entry.insert(StoredTile::new(layer));
|
||||
}
|
||||
btree_map::Entry::Occupied(mut entry) => {
|
||||
entry.get_mut().layers.push(message);
|
||||
entry.get_mut().layers.push(layer);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -76,10 +90,10 @@ impl TileCache {
|
||||
pub fn iter_tessellated_layers_at(
|
||||
&self,
|
||||
coords: &WorldTileCoords,
|
||||
) -> Option<impl Iterator<Item = &LayerTessellateMessage> + '_> {
|
||||
) -> Option<impl Iterator<Item = &StoredLayer> + '_> {
|
||||
coords
|
||||
.build_quad_key()
|
||||
.and_then(|key| self.cache.get(&key))
|
||||
.and_then(|key| self.tree.get(&key))
|
||||
.map(|results| results.layers.iter())
|
||||
}
|
||||
|
||||
@ -90,7 +104,7 @@ impl TileCache {
|
||||
coords: &WorldTileCoords,
|
||||
layers: &mut HashSet<String>,
|
||||
) {
|
||||
if let Some(cached_tile) = coords.build_quad_key().and_then(|key| self.cache.get(&key)) {
|
||||
if let Some(cached_tile) = coords.build_quad_key().and_then(|key| self.tree.get(&key)) {
|
||||
let tessellated_set: HashSet<String> = cached_tile
|
||||
.layers
|
||||
.iter()
|
||||
@ -103,7 +117,7 @@ impl TileCache {
|
||||
|
||||
/// Checks if a layer is missing from the given layers set at the given coords.
|
||||
pub fn is_layers_missing(&self, coords: &WorldTileCoords, layers: &HashSet<String>) -> bool {
|
||||
if let Some(cached_tile) = coords.build_quad_key().and_then(|key| self.cache.get(&key)) {
|
||||
if let Some(cached_tile) = coords.build_quad_key().and_then(|key| self.tree.get(&key)) {
|
||||
let tessellated_set: HashSet<&str> = cached_tile
|
||||
.layers
|
||||
.iter()
|
||||
@ -1,11 +1,11 @@
|
||||
//! Stores the state of the map such as `[crate::coords::Zoom]`, `[crate::camera::Camera]`, `[crate::style::Style]`, `[crate::io::tile_cache::TileCache]` and more.
|
||||
//! Stores the state of the map such as `[crate::coords::Zoom]`, `[crate::camera::Camera]`, `[crate::style::Style]`, `[crate::io::tile_repository::TileCache]` and more.
|
||||
|
||||
use crate::context::{MapContext, ViewState};
|
||||
use crate::error::Error;
|
||||
use crate::io::geometry_index::GeometryIndex;
|
||||
use crate::io::scheduler::Scheduler;
|
||||
use crate::io::source_client::{HttpClient, HttpSourceClient, SourceClient};
|
||||
use crate::io::tile_cache::TileCache;
|
||||
use crate::io::tile_repository::TileRepository;
|
||||
use crate::io::tile_request_state::TileRequestState;
|
||||
use crate::render::register_render_stages;
|
||||
use crate::schedule::{Schedule, Stage};
|
||||
@ -23,7 +23,7 @@ pub struct PrematureMapContext {
|
||||
view_state: ViewState,
|
||||
style: Style,
|
||||
|
||||
tile_cache: TileCache,
|
||||
tile_repository: TileRepository,
|
||||
|
||||
wgpu_settings: WgpuSettings,
|
||||
renderer_settings: RendererSettings,
|
||||
@ -44,7 +44,7 @@ impl EventuallyMapContext {
|
||||
EventuallyMapContext::Premature(PrematureMapContext {
|
||||
view_state,
|
||||
style,
|
||||
tile_cache,
|
||||
tile_repository,
|
||||
..
|
||||
}) => {
|
||||
mem::replace(
|
||||
@ -52,7 +52,7 @@ impl EventuallyMapContext {
|
||||
EventuallyMapContext::Full(MapContext {
|
||||
view_state,
|
||||
style,
|
||||
tile_cache,
|
||||
tile_repository,
|
||||
renderer,
|
||||
}),
|
||||
);
|
||||
@ -98,7 +98,7 @@ where
|
||||
renderer_settings: RendererSettings,
|
||||
) -> Self {
|
||||
let view_state = ViewState::new(&window_size);
|
||||
let tile_cache = TileCache::new();
|
||||
let tile_repository = TileRepository::new();
|
||||
let mut schedule = Schedule::default();
|
||||
|
||||
let http_source_client: HttpSourceClient<HC> = HttpSourceClient::new(http_client);
|
||||
@ -112,14 +112,14 @@ where
|
||||
None => EventuallyMapContext::Premature(PrematureMapContext {
|
||||
view_state,
|
||||
style,
|
||||
tile_cache,
|
||||
tile_repository,
|
||||
wgpu_settings,
|
||||
renderer_settings,
|
||||
}),
|
||||
Some(renderer) => EventuallyMapContext::Full(MapContext {
|
||||
view_state,
|
||||
style,
|
||||
tile_cache,
|
||||
tile_repository,
|
||||
renderer,
|
||||
}),
|
||||
},
|
||||
@ -244,7 +244,7 @@ where
|
||||
style: Style,
|
||||
) -> Self {
|
||||
let view_state = ViewState::new(&window_size);
|
||||
let tile_cache = TileCache::new();
|
||||
let tile_repository = TileRepository::new();
|
||||
let mut schedule = Schedule::default();
|
||||
|
||||
register_render_stages(&mut schedule);
|
||||
@ -254,7 +254,7 @@ where
|
||||
map_context: MapContext {
|
||||
view_state,
|
||||
style,
|
||||
tile_cache,
|
||||
tile_repository,
|
||||
renderer,
|
||||
},
|
||||
schedule,
|
||||
|
||||
@ -2,7 +2,7 @@
|
||||
|
||||
use crate::context::MapContext;
|
||||
use crate::coords::{ViewRegion, Zoom};
|
||||
use crate::io::tile_cache::TileCache;
|
||||
use crate::io::tile_repository::TileRepository;
|
||||
use crate::render::camera::ViewProjection;
|
||||
use crate::render::render_phase::RenderPhase;
|
||||
use crate::render::resource::IndexEntry;
|
||||
|
||||
@ -2,7 +2,7 @@
|
||||
|
||||
use crate::context::MapContext;
|
||||
use crate::coords::{ViewRegion, Zoom};
|
||||
use crate::io::tile_cache::TileCache;
|
||||
use crate::io::tile_repository::TileRepository;
|
||||
use crate::render::camera::ViewProjection;
|
||||
use crate::render::render_phase::RenderPhase;
|
||||
use crate::render::resource::IndexEntry;
|
||||
|
||||
@ -2,7 +2,7 @@
|
||||
|
||||
use crate::context::MapContext;
|
||||
use crate::coords::{ViewRegion, Zoom};
|
||||
use crate::io::tile_cache::TileCache;
|
||||
use crate::io::tile_repository::TileRepository;
|
||||
use crate::render::camera::ViewProjection;
|
||||
use crate::render::resource::IndexEntry;
|
||||
use crate::render::shaders::{
|
||||
|
||||
@ -2,7 +2,7 @@
|
||||
|
||||
use crate::context::MapContext;
|
||||
use crate::coords::{ViewRegion, Zoom};
|
||||
use crate::io::tile_cache::TileCache;
|
||||
use crate::io::tile_repository::{StoredLayer, TileRepository};
|
||||
use crate::render::camera::ViewProjection;
|
||||
use crate::render::resource::IndexEntry;
|
||||
use crate::render::shaders::{
|
||||
@ -13,7 +13,6 @@ use crate::render::util::Eventually::Initialized;
|
||||
use crate::schedule::Stage;
|
||||
use crate::{RenderState, Renderer, Style};
|
||||
|
||||
use crate::stages::LayerTessellateMessage;
|
||||
use std::iter;
|
||||
|
||||
#[derive(Default)]
|
||||
@ -26,7 +25,7 @@ impl Stage for UploadStage {
|
||||
MapContext {
|
||||
view_state,
|
||||
style,
|
||||
tile_cache,
|
||||
tile_repository,
|
||||
renderer: Renderer { queue, state, .. },
|
||||
..
|
||||
}: &mut MapContext,
|
||||
@ -59,7 +58,7 @@ impl Stage for UploadStage {
|
||||
.map(|bounding_box| ViewRegion::new(bounding_box, 0, *view_state.zoom, visible_level));
|
||||
|
||||
if let Some(view_region) = &view_region {
|
||||
self.upload_tile_geometry(state, queue, tile_cache, style, view_region);
|
||||
self.upload_tile_geometry(state, queue, tile_repository, style, view_region);
|
||||
self.upload_tile_view_pattern(state, queue, &view_proj);
|
||||
self.update_metadata();
|
||||
}
|
||||
@ -89,7 +88,7 @@ impl UploadStage {
|
||||
/*let source_layer = entry.style_layer.source_layer.as_ref().unwrap();
|
||||
|
||||
if let Some(result) = scheduler
|
||||
.get_tile_cache()
|
||||
.get_tile_repository()
|
||||
.iter_tessellated_layers_at(&world_coords)
|
||||
.unwrap()
|
||||
.find(|layer| source_layer.as_str() == layer.layer_name())
|
||||
@ -155,7 +154,7 @@ impl UploadStage {
|
||||
&self,
|
||||
RenderState { buffer_pool, .. }: &mut RenderState,
|
||||
queue: &wgpu::Queue,
|
||||
tile_cache: &TileCache,
|
||||
tile_repository: &TileRepository,
|
||||
style: &Style,
|
||||
view_region: &ViewRegion,
|
||||
) {
|
||||
@ -165,7 +164,7 @@ impl UploadStage {
|
||||
let loaded_layers = buffer_pool
|
||||
.get_loaded_layers_at(&world_coords)
|
||||
.unwrap_or_default();
|
||||
if let Some(available_layers) = tile_cache
|
||||
if let Some(available_layers) = tile_repository
|
||||
.iter_tessellated_layers_at(&world_coords)
|
||||
.map(|layers| {
|
||||
layers
|
||||
@ -187,10 +186,10 @@ impl UploadStage {
|
||||
.map(|color| color.into());
|
||||
|
||||
match message {
|
||||
LayerTessellateMessage::UnavailableLayer { coords: _, .. } => {
|
||||
StoredLayer::UnavailableLayer { coords: _, .. } => {
|
||||
/*self.buffer_pool.mark_layer_unavailable(*coords);*/
|
||||
}
|
||||
LayerTessellateMessage::TessellatedLayer {
|
||||
StoredLayer::TessellatedLayer {
|
||||
coords,
|
||||
feature_indices,
|
||||
layer_data,
|
||||
|
||||
@ -2,7 +2,7 @@
|
||||
|
||||
use crate::context::MapContext;
|
||||
use crate::coords::{ViewRegion, Zoom};
|
||||
use crate::io::tile_cache::TileCache;
|
||||
use crate::io::tile_repository::TileRepository;
|
||||
use crate::render::camera::ViewProjection;
|
||||
use crate::render::render_phase::RenderPhase;
|
||||
use crate::render::resource::{BufferDimensions, BufferedTextureHead, Head, IndexEntry};
|
||||
|
||||
155
maplibre/src/stages/message.rs
Normal file
155
maplibre/src/stages/message.rs
Normal file
@ -0,0 +1,155 @@
|
||||
use crate::coords::{WorldCoords, WorldTileCoords, Zoom};
|
||||
use crate::error::Error;
|
||||
use crate::io::geometry_index::{GeometryIndex, IndexedGeometry};
|
||||
use crate::io::pipeline::PipelineContext;
|
||||
use crate::io::pipeline::Processable;
|
||||
use crate::io::pipeline_steps::build_vector_tile_pipeline;
|
||||
use crate::io::tile_repository::StoredLayer;
|
||||
use crate::io::tile_request_state::TileRequestState;
|
||||
use crate::io::{TileRequest, TileRequestID};
|
||||
use crate::render::ShaderVertex;
|
||||
use crate::stages::HeadedPipelineProcessor;
|
||||
use crate::tessellation::{IndexDataType, OverAlignedVertexBuffer};
|
||||
use geozero::mvt::tile;
|
||||
use std::fmt;
|
||||
use std::sync::{mpsc, Arc, Mutex};
|
||||
|
||||
pub type MessageSender = mpsc::Sender<TessellateMessage>;
|
||||
pub type MessageReceiver = mpsc::Receiver<TessellateMessage>;
|
||||
|
||||
/// [crate::io::TileTessellateMessage] or [crate::io::LayerTessellateMessage] tessellation message.
|
||||
pub enum TessellateMessage {
|
||||
Tile(TileTessellateMessage),
|
||||
Layer(LayerTessellateMessage),
|
||||
}
|
||||
|
||||
/// The result of the tessellation of a tile.
|
||||
pub struct TileTessellateMessage {
|
||||
pub request_id: TileRequestID,
|
||||
pub coords: WorldTileCoords,
|
||||
}
|
||||
|
||||
/// `TessellatedLayer` contains the result of the tessellation for a specific layer, otherwise
|
||||
/// `UnavailableLayer` if the layer doesn't exist.
|
||||
pub enum LayerTessellateMessage {
|
||||
UnavailableLayer {
|
||||
coords: WorldTileCoords,
|
||||
layer_name: String,
|
||||
},
|
||||
TessellatedLayer {
|
||||
coords: WorldTileCoords,
|
||||
buffer: OverAlignedVertexBuffer<ShaderVertex, IndexDataType>,
|
||||
/// Holds for each feature the count of indices.
|
||||
feature_indices: Vec<u32>,
|
||||
layer_data: tile::Layer,
|
||||
},
|
||||
}
|
||||
|
||||
impl Into<StoredLayer> for LayerTessellateMessage {
|
||||
fn into(self) -> StoredLayer {
|
||||
match self {
|
||||
LayerTessellateMessage::UnavailableLayer { coords, layer_name } => {
|
||||
StoredLayer::UnavailableLayer { coords, layer_name }
|
||||
}
|
||||
LayerTessellateMessage::TessellatedLayer {
|
||||
coords,
|
||||
buffer,
|
||||
feature_indices,
|
||||
layer_data,
|
||||
} => StoredLayer::TessellatedLayer {
|
||||
coords,
|
||||
buffer,
|
||||
feature_indices,
|
||||
layer_data,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for LayerTessellateMessage {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"LayerTessellateMessage{}",
|
||||
match self {
|
||||
LayerTessellateMessage::UnavailableLayer { coords, .. } => coords,
|
||||
LayerTessellateMessage::TessellatedLayer { coords, .. } => coords,
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// Stores and provides access to the thread safe data shared between the schedulers.
|
||||
#[derive(Clone)]
|
||||
pub struct SharedThreadState {
|
||||
pub tile_request_state: Arc<Mutex<TileRequestState>>,
|
||||
pub message_sender: mpsc::Sender<TessellateMessage>,
|
||||
pub geometry_index: Arc<Mutex<GeometryIndex>>,
|
||||
}
|
||||
|
||||
impl SharedThreadState {
|
||||
fn get_tile_request(&self, request_id: TileRequestID) -> Option<TileRequest> {
|
||||
self.tile_request_state
|
||||
.lock()
|
||||
.ok()
|
||||
.and_then(|tile_request_state| tile_request_state.get_tile_request(request_id).cloned())
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub fn process_tile(&self, request_id: TileRequestID, data: Box<[u8]>) -> Result<(), Error> {
|
||||
if let Some(tile_request) = self.get_tile_request(request_id) {
|
||||
let mut processor = HeadedPipelineProcessor {
|
||||
state: self.clone(),
|
||||
};
|
||||
let mut pipeline_context = PipelineContext {
|
||||
processor: Box::new(processor),
|
||||
};
|
||||
let pipeline = build_vector_tile_pipeline();
|
||||
pipeline.process((tile_request, request_id, data), &mut pipeline_context);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn tile_unavailable(
|
||||
&self,
|
||||
coords: &WorldTileCoords,
|
||||
request_id: TileRequestID,
|
||||
) -> Result<(), Error> {
|
||||
if let Some(tile_request) = self.get_tile_request(request_id) {
|
||||
for to_load in &tile_request.layers {
|
||||
tracing::warn!("layer {} at {} unavailable", to_load, coords);
|
||||
self.message_sender.send(TessellateMessage::Layer(
|
||||
LayerTessellateMessage::UnavailableLayer {
|
||||
coords: tile_request.coords,
|
||||
layer_name: to_load.to_string(),
|
||||
},
|
||||
))?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub fn query_point(
|
||||
&self,
|
||||
world_coords: &WorldCoords,
|
||||
z: u8,
|
||||
zoom: Zoom,
|
||||
) -> Option<Vec<IndexedGeometry<f64>>> {
|
||||
if let Ok(geometry_index) = self.geometry_index.lock() {
|
||||
geometry_index
|
||||
.query_point(world_coords, z, zoom)
|
||||
.map(|geometries| {
|
||||
geometries
|
||||
.iter()
|
||||
.cloned()
|
||||
.cloned()
|
||||
.collect::<Vec<IndexedGeometry<f64>>>()
|
||||
})
|
||||
} else {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -24,7 +24,13 @@ use std::fmt;
|
||||
use std::sync::{mpsc, Arc, Mutex};
|
||||
|
||||
use crate::io::pipeline::Processable;
|
||||
use crate::io::tile_repository::StoredLayer;
|
||||
use crate::stages::message::{
|
||||
LayerTessellateMessage, MessageReceiver, MessageSender, SharedThreadState, TessellateMessage,
|
||||
TileTessellateMessage,
|
||||
};
|
||||
|
||||
mod message;
|
||||
mod populate_tile_store_stage;
|
||||
mod request_stage;
|
||||
|
||||
@ -52,59 +58,6 @@ pub fn register_stages<HC: HttpClient, SM: ScheduleMethod>(
|
||||
);
|
||||
}
|
||||
|
||||
type MessageSender = mpsc::Sender<TessellateMessage>;
|
||||
type MessageReceiver = mpsc::Receiver<TessellateMessage>;
|
||||
|
||||
/// [crate::io::TileTessellateMessage] or [crate::io::LayerTessellateMessage] tessellation message.
|
||||
enum TessellateMessage {
|
||||
Tile(TileTessellateMessage),
|
||||
Layer(LayerTessellateMessage),
|
||||
}
|
||||
|
||||
/// The result of the tessellation of a tile.
|
||||
struct TileTessellateMessage {
|
||||
pub request_id: TileRequestID,
|
||||
pub coords: WorldTileCoords,
|
||||
}
|
||||
|
||||
/// `TessellatedLayer` contains the result of the tessellation for a specific layer, otherwise
|
||||
/// `UnavailableLayer` if the layer doesn't exist.
|
||||
enum LayerTessellateMessage {
|
||||
UnavailableLayer {
|
||||
coords: WorldTileCoords,
|
||||
layer_name: String,
|
||||
},
|
||||
TessellatedLayer {
|
||||
coords: WorldTileCoords,
|
||||
buffer: OverAlignedVertexBuffer<ShaderVertex, IndexDataType>,
|
||||
/// Holds for each feature the count of indices.
|
||||
feature_indices: Vec<u32>,
|
||||
layer_data: tile::Layer,
|
||||
},
|
||||
}
|
||||
|
||||
impl fmt::Debug for LayerTessellateMessage {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "LayerTessellateMessage{}", self.get_coords())
|
||||
}
|
||||
}
|
||||
|
||||
impl LayerTessellateMessage {
|
||||
pub fn get_coords(&self) -> WorldTileCoords {
|
||||
match self {
|
||||
LayerTessellateMessage::UnavailableLayer { coords, .. } => *coords,
|
||||
LayerTessellateMessage::TessellatedLayer { coords, .. } => *coords,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn layer_name(&self) -> &str {
|
||||
match self {
|
||||
LayerTessellateMessage::UnavailableLayer { layer_name, .. } => layer_name.as_str(),
|
||||
LayerTessellateMessage::TessellatedLayer { layer_data, .. } => &layer_data.name,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct HeadedPipelineProcessor {
|
||||
state: SharedThreadState,
|
||||
}
|
||||
@ -159,78 +112,3 @@ impl PipelineProcessor for HeadedPipelineProcessor {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Stores and provides access to the thread safe data shared between the schedulers.
|
||||
#[derive(Clone)]
|
||||
pub struct SharedThreadState {
|
||||
pub tile_request_state: Arc<Mutex<TileRequestState>>,
|
||||
pub message_sender: mpsc::Sender<TessellateMessage>,
|
||||
pub geometry_index: Arc<Mutex<GeometryIndex>>,
|
||||
}
|
||||
|
||||
impl SharedThreadState {
|
||||
fn get_tile_request(&self, request_id: TileRequestID) -> Option<TileRequest> {
|
||||
self.tile_request_state
|
||||
.lock()
|
||||
.ok()
|
||||
.and_then(|tile_request_state| tile_request_state.get_tile_request(request_id).cloned())
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub fn process_tile(&self, request_id: TileRequestID, data: Box<[u8]>) -> Result<(), Error> {
|
||||
if let Some(tile_request) = self.get_tile_request(request_id) {
|
||||
let mut processor = HeadedPipelineProcessor {
|
||||
state: self.clone(),
|
||||
};
|
||||
let mut pipeline_context = PipelineContext {
|
||||
processor: Box::new(processor),
|
||||
};
|
||||
let pipeline = build_vector_tile_pipeline();
|
||||
pipeline.process((tile_request, request_id, data), &mut pipeline_context);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn tile_unavailable(
|
||||
&self,
|
||||
coords: &WorldTileCoords,
|
||||
request_id: TileRequestID,
|
||||
) -> Result<(), Error> {
|
||||
if let Some(tile_request) = self.get_tile_request(request_id) {
|
||||
for to_load in &tile_request.layers {
|
||||
tracing::warn!("layer {} at {} unavailable", to_load, coords);
|
||||
self.message_sender.send(TessellateMessage::Layer(
|
||||
LayerTessellateMessage::UnavailableLayer {
|
||||
coords: tile_request.coords,
|
||||
layer_name: to_load.to_string(),
|
||||
},
|
||||
))?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub fn query_point(
|
||||
&self,
|
||||
world_coords: &WorldCoords,
|
||||
z: u8,
|
||||
zoom: Zoom,
|
||||
) -> Option<Vec<IndexedGeometry<f64>>> {
|
||||
if let Ok(geometry_index) = self.geometry_index.lock() {
|
||||
geometry_index
|
||||
.query_point(world_coords, z, zoom)
|
||||
.map(|geometries| {
|
||||
geometries
|
||||
.iter()
|
||||
.cloned()
|
||||
.cloned()
|
||||
.collect::<Vec<IndexedGeometry<f64>>>()
|
||||
})
|
||||
} else {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,8 +1,9 @@
|
||||
//! Receives data from async threads and populates the [`crate::io::tile_cache::TileCache`].
|
||||
//! Receives data from async threads and populates the [`crate::io::tile_repository::TileCache`].
|
||||
|
||||
use super::{MessageReceiver, SharedThreadState, TessellateMessage, TileTessellateMessage};
|
||||
use crate::context::MapContext;
|
||||
use crate::io::tile_repository::StoredLayer;
|
||||
use crate::schedule::Stage;
|
||||
use crate::stages::{MessageReceiver, SharedThreadState, TessellateMessage, TileTessellateMessage};
|
||||
use std::sync::mpsc;
|
||||
|
||||
pub struct PopulateTileStore {
|
||||
@ -20,16 +21,22 @@ impl PopulateTileStore {
|
||||
}
|
||||
|
||||
impl Stage for PopulateTileStore {
|
||||
fn run(&mut self, MapContext { tile_cache, .. }: &mut MapContext) {
|
||||
fn run(
|
||||
&mut self,
|
||||
MapContext {
|
||||
tile_repository, ..
|
||||
}: &mut MapContext,
|
||||
) {
|
||||
if let Ok(result) = self.message_receiver.try_recv() {
|
||||
match result {
|
||||
TessellateMessage::Layer(layer_result) => {
|
||||
let layer: StoredLayer = layer_result.into();
|
||||
tracing::trace!(
|
||||
"Layer {} at {} reached main thread",
|
||||
layer_result.layer_name(),
|
||||
layer_result.get_coords()
|
||||
layer.layer_name(),
|
||||
layer.get_coords()
|
||||
);
|
||||
tile_cache.put_tessellated_layer(layer_result);
|
||||
tile_repository.put_tessellated_layer(layer);
|
||||
}
|
||||
TessellateMessage::Tile(TileTessellateMessage { request_id, coords }) => loop {
|
||||
if let Ok(mut tile_request_state) =
|
||||
|
||||
@ -4,7 +4,7 @@ use crate::context::MapContext;
|
||||
use crate::coords::{ViewRegion, WorldTileCoords};
|
||||
use crate::error::Error;
|
||||
use crate::io::source_client::{HttpSourceClient, SourceClient};
|
||||
use crate::io::tile_cache::TileCache;
|
||||
use crate::io::tile_repository::TileRepository;
|
||||
use crate::io::TileRequest;
|
||||
use crate::schedule::Stage;
|
||||
use crate::stages::SharedThreadState;
|
||||
@ -48,7 +48,7 @@ where
|
||||
MapContext {
|
||||
view_state,
|
||||
style,
|
||||
tile_cache,
|
||||
tile_repository,
|
||||
..
|
||||
}: &mut MapContext,
|
||||
) {
|
||||
@ -65,7 +65,7 @@ where
|
||||
{
|
||||
if let Some(view_region) = &view_region {
|
||||
// FIXME: We also need to request tiles from layers above if we are over the maximum zoom level
|
||||
self.try_failed = self.request_tiles_in_view(tile_cache, style, view_region);
|
||||
self.try_failed = self.request_tiles_in_view(tile_repository, style, view_region);
|
||||
}
|
||||
}
|
||||
|
||||
@ -82,7 +82,7 @@ where
|
||||
#[tracing::instrument(skip_all)]
|
||||
fn request_tiles_in_view(
|
||||
&self,
|
||||
tile_cache: &TileCache,
|
||||
tile_repository: &TileRepository,
|
||||
style: &Style,
|
||||
view_region: &ViewRegion,
|
||||
) -> bool {
|
||||
@ -97,7 +97,7 @@ where
|
||||
if coords.build_quad_key().is_some() {
|
||||
// TODO: Make tesselation depend on style?
|
||||
try_failed = self
|
||||
.try_request_tile(tile_cache, &coords, &source_layers)
|
||||
.try_request_tile(tile_repository, &coords, &source_layers)
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
@ -106,11 +106,11 @@ where
|
||||
|
||||
fn try_request_tile(
|
||||
&self,
|
||||
tile_cache: &TileCache,
|
||||
tile_repository: &TileRepository,
|
||||
coords: &WorldTileCoords,
|
||||
layers: &HashSet<String>,
|
||||
) -> Result<bool, Error> {
|
||||
if !tile_cache.is_layers_missing(coords, layers) {
|
||||
if !tile_repository.is_layers_missing(coords, layers) {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user