diff --git a/maplibre/src/lib.rs b/maplibre/src/lib.rs index 79f7e76a..6790e5e1 100644 --- a/maplibre/src/lib.rs +++ b/maplibre/src/lib.rs @@ -37,12 +37,13 @@ pub mod style; pub mod window; // Exposed because of doc-strings pub mod schedule; +// Exposed because of SharedThreadState +pub mod stages; // Used for benchmarking pub mod benchmarking; // Internal modules -pub(crate) mod stages; pub(crate) mod tessellation; pub mod util; diff --git a/maplibre/src/stages/message.rs b/maplibre/src/stages/message.rs index 7c441db7..133ef399 100644 --- a/maplibre/src/stages/message.rs +++ b/maplibre/src/stages/message.rs @@ -78,75 +78,3 @@ impl fmt::Debug for LayerTessellateMessage { ) } } - -/// Stores and provides access to the thread safe data shared between the schedulers. -#[derive(Clone)] -pub struct SharedThreadState { - pub tile_request_state: Arc>, - pub message_sender: mpsc::Sender, - pub geometry_index: Arc>, -} - -impl SharedThreadState { - fn get_tile_request(&self, request_id: TileRequestID) -> Option { - self.tile_request_state - .lock() - .ok() - .and_then(|tile_request_state| tile_request_state.get_tile_request(request_id).cloned()) - } - - #[tracing::instrument(skip_all)] - pub fn process_tile(&self, request_id: TileRequestID, data: Box<[u8]>) -> Result<(), Error> { - if let Some(tile_request) = self.get_tile_request(request_id) { - let mut pipeline_context = PipelineContext::new(HeadedPipelineProcessor { - state: self.clone(), - }); - let pipeline = build_vector_tile_pipeline(); - pipeline.process((tile_request, request_id, data), &mut pipeline_context); - } - - Ok(()) - } - - pub fn tile_unavailable( - &self, - coords: &WorldTileCoords, - request_id: TileRequestID, - ) -> Result<(), Error> { - if let Some(tile_request) = self.get_tile_request(request_id) { - for to_load in &tile_request.layers { - tracing::warn!("layer {} at {} unavailable", to_load, coords); - self.message_sender.send(TessellateMessage::Layer( - LayerTessellateMessage::UnavailableLayer { - coords: tile_request.coords, - layer_name: to_load.to_string(), - }, - ))?; - } - } - - Ok(()) - } - - #[tracing::instrument(skip_all)] - pub fn query_point( - &self, - world_coords: &WorldCoords, - z: u8, - zoom: Zoom, - ) -> Option>> { - if let Ok(geometry_index) = self.geometry_index.lock() { - geometry_index - .query_point(world_coords, z, zoom) - .map(|geometries| { - geometries - .iter() - .cloned() - .cloned() - .collect::>>() - }) - } else { - unimplemented!() - } - } -} diff --git a/maplibre/src/stages/mod.rs b/maplibre/src/stages/mod.rs index 686e7efa..396a9a14 100644 --- a/maplibre/src/stages/mod.rs +++ b/maplibre/src/stages/mod.rs @@ -26,7 +26,7 @@ use std::sync::{mpsc, Arc, Mutex}; use crate::io::pipeline::Processable; use crate::io::tile_repository::StoredLayer; use crate::stages::message::{ - LayerTessellateMessage, MessageReceiver, MessageSender, SharedThreadState, TessellateMessage, + LayerTessellateMessage, MessageReceiver, MessageSender, TessellateMessage, TileTessellateMessage, }; @@ -113,3 +113,75 @@ impl PipelineProcessor for HeadedPipelineProcessor { } } } + +/// Stores and provides access to the thread safe data shared between the schedulers. +#[derive(Clone)] +pub struct SharedThreadState { + pub tile_request_state: Arc>, + pub message_sender: mpsc::Sender, + pub geometry_index: Arc>, +} + +impl SharedThreadState { + fn get_tile_request(&self, request_id: TileRequestID) -> Option { + self.tile_request_state + .lock() + .ok() + .and_then(|tile_request_state| tile_request_state.get_tile_request(request_id).cloned()) + } + + #[tracing::instrument(skip_all)] + pub fn process_tile(&self, request_id: TileRequestID, data: Box<[u8]>) -> Result<(), Error> { + if let Some(tile_request) = self.get_tile_request(request_id) { + let mut pipeline_context = PipelineContext::new(HeadedPipelineProcessor { + state: self.clone(), + }); + let pipeline = build_vector_tile_pipeline(); + pipeline.process((tile_request, request_id, data), &mut pipeline_context); + } + + Ok(()) + } + + pub fn tile_unavailable( + &self, + coords: &WorldTileCoords, + request_id: TileRequestID, + ) -> Result<(), Error> { + if let Some(tile_request) = self.get_tile_request(request_id) { + for to_load in &tile_request.layers { + tracing::warn!("layer {} at {} unavailable", to_load, coords); + self.message_sender.send(TessellateMessage::Layer( + LayerTessellateMessage::UnavailableLayer { + coords: tile_request.coords, + layer_name: to_load.to_string(), + }, + ))?; + } + } + + Ok(()) + } + + #[tracing::instrument(skip_all)] + pub fn query_point( + &self, + world_coords: &WorldCoords, + z: u8, + zoom: Zoom, + ) -> Option>> { + if let Ok(geometry_index) = self.geometry_index.lock() { + geometry_index + .query_point(world_coords, z, zoom) + .map(|geometries| { + geometries + .iter() + .cloned() + .cloned() + .collect::>>() + }) + } else { + unimplemented!() + } + } +} diff --git a/web/lib/src/index.ts b/web/lib/src/index.ts index 56c00921..ffc0e661 100644 --- a/web/lib/src/index.ts +++ b/web/lib/src/index.ts @@ -1,4 +1,4 @@ -import init, {create_pool_scheduler, run} from "./wasm-pack" +import init, {create_pool_scheduler, new_thread_local_state, run} from "./wasm-pack" import {Spector} from "spectorjs" import {WebWorkerMessageType} from "./types" import { diff --git a/web/lib/src/legacy.worker.ts b/web/lib/src/legacy.worker.ts index c9a6010a..0f7b1c25 100644 --- a/web/lib/src/legacy.worker.ts +++ b/web/lib/src/legacy.worker.ts @@ -1,4 +1,3 @@ -/* import init, {InitOutput, tessellate_layers} from "./wasm-pack" import {WebWorkerMessageType} from "./types" @@ -31,4 +30,3 @@ onmessage = async message => { break } } -*/ diff --git a/web/src/platform/legacy_webworker_fetcher.rs b/web/src/platform/legacy_webworker_fetcher.rs index 4129fb88..975a78d3 100644 --- a/web/src/platform/legacy_webworker_fetcher.rs +++ b/web/src/platform/legacy_webworker_fetcher.rs @@ -7,6 +7,7 @@ use maplibre::coords::TileCoords; use maplibre::io::scheduler::Scheduler; use maplibre::io::TileRequestID; +use maplibre::stages::SharedThreadState; #[wasm_bindgen] extern "C" { diff --git a/web/src/platform/mod.rs b/web/src/platform/mod.rs index 8522af98..4b07af78 100644 --- a/web/src/platform/mod.rs +++ b/web/src/platform/mod.rs @@ -1,4 +1,4 @@ pub mod http_client; -//pub mod legacy_webworker_fetcher; +pub mod legacy_webworker_fetcher; pub mod pool; pub mod schedule_method; diff --git a/web/src/platform/schedule_method.rs b/web/src/platform/schedule_method.rs index 8949ccca..caed574c 100644 --- a/web/src/platform/schedule_method.rs +++ b/web/src/platform/schedule_method.rs @@ -34,12 +34,13 @@ impl WebWorkerPoolScheduleMethod { } impl ScheduleMethod for WebWorkerPoolScheduleMethod { - fn schedule( + fn schedule( &self, - future_factory: Box< - (dyn (FnOnce() -> Pin + 'static>>) + Send + 'static), - >, - ) -> Result<(), Error> { + future_factory: impl (FnOnce() -> T) + Send + 'static, + ) -> Result<(), Error> + where + T: Future + 'static, + { self.pool .run(move || { wasm_bindgen_futures::future_to_promise(async move {