Revive old legacy webworker example

This commit is contained in:
Maximilian Ammann 2022-05-31 21:55:20 +02:00
parent 0e2f8ef2c9
commit 005dca064e
8 changed files with 84 additions and 83 deletions

View File

@ -37,12 +37,13 @@ pub mod style;
pub mod window;
// Exposed because of doc-strings
pub mod schedule;
// Exposed because of SharedThreadState
pub mod stages;
// Used for benchmarking
pub mod benchmarking;
// Internal modules
pub(crate) mod stages;
pub(crate) mod tessellation;
pub mod util;

View File

@ -78,75 +78,3 @@ impl fmt::Debug for LayerTessellateMessage {
)
}
}
/// Stores and provides access to the thread safe data shared between the schedulers.
#[derive(Clone)]
pub struct SharedThreadState {
pub tile_request_state: Arc<Mutex<TileRequestState>>,
pub message_sender: mpsc::Sender<TessellateMessage>,
pub geometry_index: Arc<Mutex<GeometryIndex>>,
}
impl SharedThreadState {
fn get_tile_request(&self, request_id: TileRequestID) -> Option<TileRequest> {
self.tile_request_state
.lock()
.ok()
.and_then(|tile_request_state| tile_request_state.get_tile_request(request_id).cloned())
}
#[tracing::instrument(skip_all)]
pub fn process_tile(&self, request_id: TileRequestID, data: Box<[u8]>) -> Result<(), Error> {
if let Some(tile_request) = self.get_tile_request(request_id) {
let mut pipeline_context = PipelineContext::new(HeadedPipelineProcessor {
state: self.clone(),
});
let pipeline = build_vector_tile_pipeline();
pipeline.process((tile_request, request_id, data), &mut pipeline_context);
}
Ok(())
}
pub fn tile_unavailable(
&self,
coords: &WorldTileCoords,
request_id: TileRequestID,
) -> Result<(), Error> {
if let Some(tile_request) = self.get_tile_request(request_id) {
for to_load in &tile_request.layers {
tracing::warn!("layer {} at {} unavailable", to_load, coords);
self.message_sender.send(TessellateMessage::Layer(
LayerTessellateMessage::UnavailableLayer {
coords: tile_request.coords,
layer_name: to_load.to_string(),
},
))?;
}
}
Ok(())
}
#[tracing::instrument(skip_all)]
pub fn query_point(
&self,
world_coords: &WorldCoords,
z: u8,
zoom: Zoom,
) -> Option<Vec<IndexedGeometry<f64>>> {
if let Ok(geometry_index) = self.geometry_index.lock() {
geometry_index
.query_point(world_coords, z, zoom)
.map(|geometries| {
geometries
.iter()
.cloned()
.cloned()
.collect::<Vec<IndexedGeometry<f64>>>()
})
} else {
unimplemented!()
}
}
}

View File

@ -26,7 +26,7 @@ use std::sync::{mpsc, Arc, Mutex};
use crate::io::pipeline::Processable;
use crate::io::tile_repository::StoredLayer;
use crate::stages::message::{
LayerTessellateMessage, MessageReceiver, MessageSender, SharedThreadState, TessellateMessage,
LayerTessellateMessage, MessageReceiver, MessageSender, TessellateMessage,
TileTessellateMessage,
};
@ -113,3 +113,75 @@ impl PipelineProcessor for HeadedPipelineProcessor {
}
}
}
/// Stores and provides access to the thread safe data shared between the schedulers.
#[derive(Clone)]
pub struct SharedThreadState {
pub tile_request_state: Arc<Mutex<TileRequestState>>,
pub message_sender: mpsc::Sender<TessellateMessage>,
pub geometry_index: Arc<Mutex<GeometryIndex>>,
}
impl SharedThreadState {
fn get_tile_request(&self, request_id: TileRequestID) -> Option<TileRequest> {
self.tile_request_state
.lock()
.ok()
.and_then(|tile_request_state| tile_request_state.get_tile_request(request_id).cloned())
}
#[tracing::instrument(skip_all)]
pub fn process_tile(&self, request_id: TileRequestID, data: Box<[u8]>) -> Result<(), Error> {
if let Some(tile_request) = self.get_tile_request(request_id) {
let mut pipeline_context = PipelineContext::new(HeadedPipelineProcessor {
state: self.clone(),
});
let pipeline = build_vector_tile_pipeline();
pipeline.process((tile_request, request_id, data), &mut pipeline_context);
}
Ok(())
}
pub fn tile_unavailable(
&self,
coords: &WorldTileCoords,
request_id: TileRequestID,
) -> Result<(), Error> {
if let Some(tile_request) = self.get_tile_request(request_id) {
for to_load in &tile_request.layers {
tracing::warn!("layer {} at {} unavailable", to_load, coords);
self.message_sender.send(TessellateMessage::Layer(
LayerTessellateMessage::UnavailableLayer {
coords: tile_request.coords,
layer_name: to_load.to_string(),
},
))?;
}
}
Ok(())
}
#[tracing::instrument(skip_all)]
pub fn query_point(
&self,
world_coords: &WorldCoords,
z: u8,
zoom: Zoom,
) -> Option<Vec<IndexedGeometry<f64>>> {
if let Ok(geometry_index) = self.geometry_index.lock() {
geometry_index
.query_point(world_coords, z, zoom)
.map(|geometries| {
geometries
.iter()
.cloned()
.cloned()
.collect::<Vec<IndexedGeometry<f64>>>()
})
} else {
unimplemented!()
}
}
}

View File

@ -1,4 +1,4 @@
import init, {create_pool_scheduler, run} from "./wasm-pack"
import init, {create_pool_scheduler, new_thread_local_state, run} from "./wasm-pack"
import {Spector} from "spectorjs"
import {WebWorkerMessageType} from "./types"
import {

View File

@ -1,4 +1,3 @@
/*
import init, {InitOutput, tessellate_layers} from "./wasm-pack"
import {WebWorkerMessageType} from "./types"
@ -31,4 +30,3 @@ onmessage = async message => {
break
}
}
*/

View File

@ -7,6 +7,7 @@ use maplibre::coords::TileCoords;
use maplibre::io::scheduler::Scheduler;
use maplibre::io::TileRequestID;
use maplibre::stages::SharedThreadState;
#[wasm_bindgen]
extern "C" {

View File

@ -1,4 +1,4 @@
pub mod http_client;
//pub mod legacy_webworker_fetcher;
pub mod legacy_webworker_fetcher;
pub mod pool;
pub mod schedule_method;

View File

@ -34,12 +34,13 @@ impl WebWorkerPoolScheduleMethod {
}
impl ScheduleMethod for WebWorkerPoolScheduleMethod {
fn schedule(
fn schedule<T>(
&self,
future_factory: Box<
(dyn (FnOnce() -> Pin<Box<dyn Future<Output = ()> + 'static>>) + Send + 'static),
>,
) -> Result<(), Error> {
future_factory: impl (FnOnce() -> T) + Send + 'static,
) -> Result<(), Error>
where
T: Future<Output = ()> + 'static,
{
self.pool
.run(move || {
wasm_bindgen_futures::future_to_promise(async move {