mirror of
https://github.com/maplibre/maplibre-rs.git
synced 2025-12-08 19:05:57 +00:00
Move function from scheduler to map state
This commit is contained in:
parent
3fb76fa33e
commit
4c520ae317
@ -71,28 +71,28 @@ impl UpdateState for QueryHandler {
|
||||
.camera()
|
||||
.window_to_world_at_ground(&window_position, &inverted_view_proj)
|
||||
{
|
||||
state
|
||||
.scheduler()
|
||||
.get_method()
|
||||
.schedule(state.scheduler(), move |thread_local| async move {
|
||||
if let Some(geometries) = thread_local.query_point(
|
||||
&WorldCoords {
|
||||
x: coordinates.x,
|
||||
y: coordinates.y,
|
||||
},
|
||||
z,
|
||||
zoom,
|
||||
) {
|
||||
log::info!(
|
||||
"{:?}",
|
||||
geometries
|
||||
.iter()
|
||||
.map(|geometry| &geometry.properties)
|
||||
.collect::<Vec<_>>()
|
||||
);
|
||||
}
|
||||
})
|
||||
.unwrap();
|
||||
/*state
|
||||
.scheduler()
|
||||
.schedule_method()
|
||||
.schedule(state.scheduler(), move |thread_local| async move {
|
||||
if let Some(geometries) = thread_local.query_point(
|
||||
&WorldCoords {
|
||||
x: coordinates.x,
|
||||
y: coordinates.y,
|
||||
},
|
||||
z,
|
||||
zoom,
|
||||
) {
|
||||
log::info!(
|
||||
"{:?}",
|
||||
geometries
|
||||
.iter()
|
||||
.map(|geometry| &geometry.properties)
|
||||
.collect::<Vec<_>>()
|
||||
);
|
||||
}
|
||||
})
|
||||
.unwrap();*/
|
||||
}
|
||||
}
|
||||
self.clicking = false;
|
||||
|
||||
@ -10,12 +10,14 @@ use std::fmt;
|
||||
|
||||
use vector_tile::tile::Layer;
|
||||
|
||||
mod geometry_index;
|
||||
pub mod scheduler;
|
||||
mod source_client;
|
||||
pub mod source_client;
|
||||
pub mod static_tile_fetcher;
|
||||
|
||||
pub mod geometry_index;
|
||||
pub mod shared_thread_state;
|
||||
pub mod tile_cache;
|
||||
mod tile_request_state;
|
||||
pub mod tile_request_state;
|
||||
|
||||
pub enum TileFetchResult {
|
||||
Unavailable {
|
||||
@ -46,8 +48,8 @@ pub enum TessellateMessage {
|
||||
}
|
||||
|
||||
pub struct TileTessellateMessage {
|
||||
request_id: TileRequestID,
|
||||
coords: WorldTileCoords,
|
||||
pub request_id: TileRequestID,
|
||||
pub coords: WorldTileCoords,
|
||||
}
|
||||
|
||||
pub enum LayerTessellateMessage {
|
||||
|
||||
@ -8,7 +8,6 @@ use std::sync::{Arc, Mutex};
|
||||
|
||||
use vector_tile::parse_tile_bytes;
|
||||
|
||||
/// Describes through which channels work-requests travel. It describes the flow of work.
|
||||
use crate::coords::{WorldCoords, WorldTileCoords, Zoom};
|
||||
use crate::io::tile_cache::TileCache;
|
||||
use crate::io::{
|
||||
@ -18,11 +17,26 @@ use crate::io::{
|
||||
|
||||
use crate::error::Error;
|
||||
use crate::io::geometry_index::{GeometryIndex, IndexProcessor, IndexedGeometry, TileIndex};
|
||||
use crate::io::shared_thread_state::SharedThreadState;
|
||||
use crate::io::source_client::{HttpSourceClient, SourceClient};
|
||||
use crate::io::tile_request_state::TileRequestState;
|
||||
use crate::tessellation::Tessellated;
|
||||
use prost::Message;
|
||||
|
||||
pub struct Scheduler {
|
||||
schedule_method: ScheduleMethod,
|
||||
}
|
||||
|
||||
impl Scheduler {
|
||||
pub fn new(schedule_method: ScheduleMethod) -> Self {
|
||||
Self { schedule_method }
|
||||
}
|
||||
|
||||
pub fn schedule_method(&self) -> &ScheduleMethod {
|
||||
&self.schedule_method
|
||||
}
|
||||
}
|
||||
|
||||
pub enum ScheduleMethod {
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
Tokio(crate::platform::schedule_method::TokioScheduleMethod),
|
||||
@ -47,14 +61,16 @@ impl ScheduleMethod {
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
pub fn schedule<T>(
|
||||
&self,
|
||||
scheduler: &Scheduler,
|
||||
future_factory: impl (FnOnce(ThreadLocalState) -> T) + Send + 'static,
|
||||
shared_thread_state: SharedThreadState,
|
||||
future_factory: impl (FnOnce(SharedThreadState) -> T) + Send + 'static,
|
||||
) -> Result<(), Error>
|
||||
where
|
||||
T: Future<Output = ()> + 'static,
|
||||
{
|
||||
match self {
|
||||
ScheduleMethod::WebWorkerPool(method) => Ok(method.schedule(scheduler, future_factory)),
|
||||
ScheduleMethod::WebWorkerPool(method) => {
|
||||
Ok(method.schedule(shared_thread_state, future_factory))
|
||||
}
|
||||
_ => Err(Error::Schedule),
|
||||
}
|
||||
}
|
||||
@ -62,8 +78,8 @@ impl ScheduleMethod {
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
pub fn schedule<T>(
|
||||
&self,
|
||||
scheduler: &Scheduler,
|
||||
future_factory: impl (FnOnce(ThreadLocalState) -> T) + Send + 'static,
|
||||
shared_thread_state: SharedThreadState,
|
||||
future_factory: impl (FnOnce(SharedThreadState) -> T) + Send + 'static,
|
||||
) -> Result<(), Error>
|
||||
where
|
||||
T: Future + Send + 'static,
|
||||
@ -71,319 +87,10 @@ impl ScheduleMethod {
|
||||
{
|
||||
match self {
|
||||
ScheduleMethod::Tokio(method) => {
|
||||
method.schedule(scheduler, future_factory);
|
||||
method.schedule(shared_thread_state, future_factory);
|
||||
Ok(())
|
||||
}
|
||||
_ => Err(Error::Schedule),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ThreadLocalState {
|
||||
tile_request_state: Arc<Mutex<TileRequestState>>,
|
||||
tessellate_result_sender: Sender<TessellateMessage>,
|
||||
geometry_index: Arc<Mutex<GeometryIndex>>,
|
||||
}
|
||||
|
||||
impl ThreadLocalState {
|
||||
fn get_tile_request(&self, request_id: TileRequestID) -> Option<TileRequest> {
|
||||
self.tile_request_state
|
||||
.lock()
|
||||
.ok()
|
||||
.and_then(|tile_request_state| tile_request_state.get_tile_request(request_id).cloned())
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub fn process_tile(
|
||||
&self,
|
||||
request_id: TileRequestID,
|
||||
data: Box<[u8]>,
|
||||
) -> Result<(), SendError<TessellateMessage>> {
|
||||
if let Some(tile_request) = self.get_tile_request(request_id) {
|
||||
let tile_result = TileFetchResult::Tile {
|
||||
coords: tile_request.coords,
|
||||
data,
|
||||
};
|
||||
|
||||
self.tessellate_layers_with_request(&tile_result, &tile_request, request_id)?;
|
||||
self.index_geometry(&tile_result);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn tile_unavailable(
|
||||
&self,
|
||||
request_id: TileRequestID,
|
||||
) -> Result<(), SendError<TessellateMessage>> {
|
||||
if let Some(tile_request) = self.get_tile_request(request_id) {
|
||||
let tile_result = TileFetchResult::Unavailable {
|
||||
coords: tile_request.coords,
|
||||
};
|
||||
self.tessellate_layers_with_request(&tile_result, &tile_request, request_id)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
fn index_geometry(&self, tile_result: &TileFetchResult) {
|
||||
match tile_result {
|
||||
TileFetchResult::Tile { data, coords } => {
|
||||
let tile: Tile = Tile::decode(data.as_ref()).unwrap();
|
||||
|
||||
let mut processor = IndexProcessor::new();
|
||||
for mut layer in tile.layers {
|
||||
layer.process(&mut processor).unwrap();
|
||||
}
|
||||
|
||||
if let Ok(mut geometry_index) = self.geometry_index.lock() {
|
||||
geometry_index.index_tile(
|
||||
&coords,
|
||||
TileIndex::Linear {
|
||||
list: processor.get_geometries(),
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub fn query_point(
|
||||
&self,
|
||||
world_coords: &WorldCoords,
|
||||
z: u8,
|
||||
zoom: Zoom,
|
||||
) -> Option<Vec<IndexedGeometry<f64>>> {
|
||||
if let Ok(geometry_index) = self.geometry_index.lock() {
|
||||
geometry_index
|
||||
.query_point(world_coords, z, zoom)
|
||||
.map(|geometries| {
|
||||
geometries
|
||||
.iter()
|
||||
.cloned()
|
||||
.cloned()
|
||||
.collect::<Vec<IndexedGeometry<f64>>>()
|
||||
})
|
||||
} else {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
fn tessellate_layers_with_request(
|
||||
&self,
|
||||
tile_result: &TileFetchResult,
|
||||
tile_request: &TileRequest,
|
||||
request_id: TileRequestID,
|
||||
) -> Result<(), SendError<TessellateMessage>> {
|
||||
match tile_result {
|
||||
TileFetchResult::Unavailable { coords } => {
|
||||
for to_load in &tile_request.layers {
|
||||
tracing::warn!("layer {} at {} unavailable", to_load, &coords);
|
||||
self.tessellate_result_sender
|
||||
.send(TessellateMessage::Layer(
|
||||
LayerTessellateMessage::UnavailableLayer {
|
||||
coords: *coords,
|
||||
layer_name: to_load.to_string(),
|
||||
},
|
||||
))?;
|
||||
}
|
||||
}
|
||||
TileFetchResult::Tile { data, coords } => {
|
||||
tracing::info!("parsing tile {} with {}bytes", &coords, data.len());
|
||||
|
||||
let tile = {
|
||||
let _span_ =
|
||||
tracing::span!(tracing::Level::TRACE, "parse_tile_bytes").entered();
|
||||
parse_tile_bytes(data).expect("failed to load tile")
|
||||
};
|
||||
|
||||
for to_load in &tile_request.layers {
|
||||
if let Some(layer) = tile
|
||||
.layers()
|
||||
.iter()
|
||||
.find(|layer| to_load.as_str() == layer.name())
|
||||
{
|
||||
match layer.tessellate() {
|
||||
Ok((buffer, feature_indices)) => {
|
||||
tracing::info!("layer {} at {} ready", to_load, &coords);
|
||||
self.tessellate_result_sender
|
||||
.send(TessellateMessage::Layer(
|
||||
LayerTessellateMessage::TessellatedLayer {
|
||||
coords: *coords,
|
||||
buffer: buffer.into(),
|
||||
feature_indices,
|
||||
layer_data: layer.clone(),
|
||||
},
|
||||
))?;
|
||||
}
|
||||
Err(e) => {
|
||||
self.tessellate_result_sender
|
||||
.send(TessellateMessage::Layer(
|
||||
LayerTessellateMessage::UnavailableLayer {
|
||||
coords: *coords,
|
||||
layer_name: to_load.to_string(),
|
||||
},
|
||||
))?;
|
||||
|
||||
tracing::error!(
|
||||
"layer {} at {} tesselation failed {:?}",
|
||||
to_load,
|
||||
&coords,
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
self.tessellate_result_sender
|
||||
.send(TessellateMessage::Layer(
|
||||
LayerTessellateMessage::UnavailableLayer {
|
||||
coords: *coords,
|
||||
layer_name: to_load.to_string(),
|
||||
},
|
||||
))?;
|
||||
|
||||
tracing::info!(
|
||||
"requested layer {} at {} not found in tile",
|
||||
to_load,
|
||||
&coords
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!("tile at {} finished", &tile_request.coords);
|
||||
|
||||
self.tessellate_result_sender
|
||||
.send(TessellateMessage::Tile(TileTessellateMessage {
|
||||
request_id,
|
||||
coords: tile_request.coords,
|
||||
}))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Scheduler {
|
||||
tessellate_channel: (Sender<TessellateMessage>, Receiver<TessellateMessage>),
|
||||
tile_request_state: Arc<Mutex<TileRequestState>>,
|
||||
geometry_index: Arc<Mutex<GeometryIndex>>,
|
||||
tile_cache: TileCache,
|
||||
schedule_method: ScheduleMethod,
|
||||
}
|
||||
|
||||
const _: () = {
|
||||
fn assert_send<T: Send>() {}
|
||||
|
||||
fn assert_all() {
|
||||
assert_send::<ThreadLocalState>();
|
||||
}
|
||||
};
|
||||
|
||||
impl Scheduler {
|
||||
pub fn new(schedule_method: ScheduleMethod) -> Self {
|
||||
Self {
|
||||
tessellate_channel: channel(),
|
||||
tile_request_state: Arc::new(Mutex::new(TileRequestState::new())),
|
||||
geometry_index: Arc::new(Mutex::new(GeometryIndex::new())),
|
||||
tile_cache: TileCache::new(),
|
||||
schedule_method,
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub fn try_populate_cache(&mut self) {
|
||||
if let Ok(result) = self.tessellate_channel.1.try_recv() {
|
||||
match result {
|
||||
TessellateMessage::Layer(layer_result) => {
|
||||
tracing::trace!(
|
||||
"Layer {} at {} reached main thread",
|
||||
layer_result.layer_name(),
|
||||
layer_result.get_coords()
|
||||
);
|
||||
self.tile_cache.put_tessellated_layer(layer_result);
|
||||
}
|
||||
TessellateMessage::Tile(TileTessellateMessage { request_id, coords }) => loop {
|
||||
if let Ok(mut tile_request_state) = self.tile_request_state.try_lock() {
|
||||
tile_request_state.finish_tile_request(request_id);
|
||||
tracing::trace!("Tile at {} finished loading", coords);
|
||||
break;
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_thread_local_state(&self) -> ThreadLocalState {
|
||||
ThreadLocalState {
|
||||
tile_request_state: self.tile_request_state.clone(),
|
||||
tessellate_result_sender: self.tessellate_channel.0.clone(),
|
||||
geometry_index: self.geometry_index.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn try_request_tile(
|
||||
&mut self,
|
||||
coords: &WorldTileCoords,
|
||||
layers: &HashSet<String>,
|
||||
) -> Result<bool, SendError<TileRequest>> {
|
||||
if !self.tile_cache.is_layers_missing(coords, layers) {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
if let Ok(mut tile_request_state) = self.tile_request_state.try_lock() {
|
||||
if let Some(request_id) = tile_request_state.start_tile_request(TileRequest {
|
||||
coords: *coords,
|
||||
layers: layers.clone(),
|
||||
}) {
|
||||
tracing::info!("new tile request: {}", &coords);
|
||||
|
||||
// The following snippet can be added instead of the next code block to demonstrate
|
||||
// an understanable approach of fetching
|
||||
/*#[cfg(target_arch = "wasm32")]
|
||||
if let Some(tile_coords) = coords.into_tile(TileAddressingScheme::TMS) {
|
||||
crate::platform::legacy_webworker_fetcher::request_tile(
|
||||
request_id,
|
||||
tile_coords,
|
||||
);
|
||||
}*/
|
||||
|
||||
{
|
||||
let client = SourceClient::Http(HttpSourceClient::new());
|
||||
let copied_coords = *coords;
|
||||
|
||||
let future_fn = move |thread_local_state: ThreadLocalState| async move {
|
||||
if let Ok(data) = client.fetch(&copied_coords).await {
|
||||
thread_local_state
|
||||
.process_tile(request_id, data.into_boxed_slice())
|
||||
.unwrap();
|
||||
} else {
|
||||
thread_local_state.tile_unavailable(request_id).unwrap();
|
||||
}
|
||||
};
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
self.schedule_method.schedule(self, future_fn).unwrap();
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
self.schedule_method.schedule(self, future_fn).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(false)
|
||||
} else {
|
||||
Ok(true)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_tile_cache(&self) -> &TileCache {
|
||||
&self.tile_cache
|
||||
}
|
||||
|
||||
pub fn get_method(&self) -> &ScheduleMethod {
|
||||
&self.schedule_method
|
||||
}
|
||||
}
|
||||
|
||||
199
src/io/shared_thread_state.rs
Normal file
199
src/io/shared_thread_state.rs
Normal file
@ -0,0 +1,199 @@
|
||||
use crate::coords::{WorldCoords, Zoom};
|
||||
use crate::io::geometry_index::{GeometryIndex, IndexProcessor, IndexedGeometry, TileIndex};
|
||||
use crate::io::tile_request_state::TileRequestState;
|
||||
use crate::io::{
|
||||
LayerTessellateMessage, TessellateMessage, TileFetchResult, TileRequest, TileRequestID,
|
||||
TileTessellateMessage,
|
||||
};
|
||||
use crate::tessellation::Tessellated;
|
||||
use std::sync::mpsc::{SendError, Sender};
|
||||
use std::sync::{mpsc, Arc, Mutex};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct SharedThreadState {
|
||||
pub tile_request_state: Arc<Mutex<TileRequestState>>,
|
||||
pub message_sender: mpsc::Sender<TessellateMessage>,
|
||||
pub geometry_index: Arc<Mutex<GeometryIndex>>,
|
||||
}
|
||||
|
||||
impl SharedThreadState {
|
||||
fn get_tile_request(&self, request_id: TileRequestID) -> Option<TileRequest> {
|
||||
self.tile_request_state
|
||||
.lock()
|
||||
.ok()
|
||||
.and_then(|tile_request_state| tile_request_state.get_tile_request(request_id).cloned())
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub fn process_tile(
|
||||
&self,
|
||||
request_id: TileRequestID,
|
||||
data: Box<[u8]>,
|
||||
) -> Result<(), SendError<TessellateMessage>> {
|
||||
if let Some(tile_request) = self.get_tile_request(request_id) {
|
||||
let tile_result = TileFetchResult::Tile {
|
||||
coords: tile_request.coords,
|
||||
data,
|
||||
};
|
||||
|
||||
self.tessellate_layers_with_request(&tile_result, &tile_request, request_id)?;
|
||||
self.index_geometry(&tile_result);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn tile_unavailable(
|
||||
&self,
|
||||
request_id: TileRequestID,
|
||||
) -> Result<(), SendError<TessellateMessage>> {
|
||||
if let Some(tile_request) = self.get_tile_request(request_id) {
|
||||
let tile_result = TileFetchResult::Unavailable {
|
||||
coords: tile_request.coords,
|
||||
};
|
||||
self.tessellate_layers_with_request(&tile_result, &tile_request, request_id)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
fn index_geometry(&self, tile_result: &TileFetchResult) {
|
||||
match tile_result {
|
||||
TileFetchResult::Tile { data, coords } => {
|
||||
use geozero::GeozeroDatasource;
|
||||
use prost::Message;
|
||||
|
||||
let tile = geozero::mvt::Tile::decode(data.as_ref()).unwrap();
|
||||
|
||||
let mut processor = IndexProcessor::new();
|
||||
for mut layer in tile.layers {
|
||||
layer.process(&mut processor).unwrap();
|
||||
}
|
||||
|
||||
if let Ok(mut geometry_index) = self.geometry_index.lock() {
|
||||
geometry_index.index_tile(
|
||||
&coords,
|
||||
TileIndex::Linear {
|
||||
list: processor.get_geometries(),
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub fn query_point(
|
||||
&self,
|
||||
world_coords: &WorldCoords,
|
||||
z: u8,
|
||||
zoom: Zoom,
|
||||
) -> Option<Vec<IndexedGeometry<f64>>> {
|
||||
if let Ok(geometry_index) = self.geometry_index.lock() {
|
||||
geometry_index
|
||||
.query_point(world_coords, z, zoom)
|
||||
.map(|geometries| {
|
||||
geometries
|
||||
.iter()
|
||||
.cloned()
|
||||
.cloned()
|
||||
.collect::<Vec<IndexedGeometry<f64>>>()
|
||||
})
|
||||
} else {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
fn tessellate_layers_with_request(
|
||||
&self,
|
||||
tile_result: &TileFetchResult,
|
||||
tile_request: &TileRequest,
|
||||
request_id: TileRequestID,
|
||||
) -> Result<(), SendError<TessellateMessage>> {
|
||||
match tile_result {
|
||||
TileFetchResult::Unavailable { coords } => {
|
||||
for to_load in &tile_request.layers {
|
||||
tracing::warn!("layer {} at {} unavailable", to_load, &coords);
|
||||
self.message_sender.send(TessellateMessage::Layer(
|
||||
LayerTessellateMessage::UnavailableLayer {
|
||||
coords: *coords,
|
||||
layer_name: to_load.to_string(),
|
||||
},
|
||||
))?;
|
||||
}
|
||||
}
|
||||
TileFetchResult::Tile { data, coords } => {
|
||||
tracing::info!("parsing tile {} with {}bytes", &coords, data.len());
|
||||
|
||||
let tile = {
|
||||
let _span_ =
|
||||
tracing::span!(tracing::Level::TRACE, "parse_tile_bytes").entered();
|
||||
vector_tile::parse_tile_bytes(data).expect("failed to load tile")
|
||||
};
|
||||
|
||||
for to_load in &tile_request.layers {
|
||||
if let Some(layer) = tile
|
||||
.layers()
|
||||
.iter()
|
||||
.find(|layer| to_load.as_str() == layer.name())
|
||||
{
|
||||
match layer.tessellate() {
|
||||
Ok((buffer, feature_indices)) => {
|
||||
tracing::info!("layer {} at {} ready", to_load, &coords);
|
||||
self.message_sender.send(TessellateMessage::Layer(
|
||||
LayerTessellateMessage::TessellatedLayer {
|
||||
coords: *coords,
|
||||
buffer: buffer.into(),
|
||||
feature_indices,
|
||||
layer_data: layer.clone(),
|
||||
},
|
||||
))?;
|
||||
}
|
||||
Err(e) => {
|
||||
self.message_sender.send(TessellateMessage::Layer(
|
||||
LayerTessellateMessage::UnavailableLayer {
|
||||
coords: *coords,
|
||||
layer_name: to_load.to_string(),
|
||||
},
|
||||
))?;
|
||||
|
||||
tracing::error!(
|
||||
"layer {} at {} tesselation failed {:?}",
|
||||
to_load,
|
||||
&coords,
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
self.message_sender.send(TessellateMessage::Layer(
|
||||
LayerTessellateMessage::UnavailableLayer {
|
||||
coords: *coords,
|
||||
layer_name: to_load.to_string(),
|
||||
},
|
||||
))?;
|
||||
|
||||
tracing::info!(
|
||||
"requested layer {} at {} not found in tile",
|
||||
to_load,
|
||||
&coords
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!("tile at {} finished", &tile_request.coords);
|
||||
|
||||
self.message_sender
|
||||
.send(TessellateMessage::Tile(TileTessellateMessage {
|
||||
request_id,
|
||||
coords: tile_request.coords,
|
||||
}))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
153
src/map_state.rs
153
src/map_state.rs
@ -1,4 +1,4 @@
|
||||
use crate::coords::{ViewRegion, Zoom, TILE_SIZE};
|
||||
use crate::coords::{ViewRegion, WorldTileCoords, Zoom, TILE_SIZE};
|
||||
|
||||
use crate::io::scheduler::Scheduler;
|
||||
|
||||
@ -8,7 +8,14 @@ use crate::render::render_state::RenderState;
|
||||
use crate::util::ChangeObserver;
|
||||
use crate::WindowSize;
|
||||
use std::collections::HashSet;
|
||||
use std::sync::{mpsc, Arc, Mutex};
|
||||
|
||||
use crate::io::geometry_index::GeometryIndex;
|
||||
use crate::io::shared_thread_state::SharedThreadState;
|
||||
use crate::io::source_client::{HttpSourceClient, SourceClient};
|
||||
use crate::io::tile_cache::TileCache;
|
||||
use crate::io::tile_request_state::TileRequestState;
|
||||
use crate::io::{TessellateMessage, TileRequest, TileTessellateMessage};
|
||||
use style_spec::Style;
|
||||
use wgpu::SurfaceError;
|
||||
|
||||
@ -16,21 +23,24 @@ pub trait Runnable<E> {
|
||||
fn run(self, event_loop: E, max_frames: Option<u64>);
|
||||
}
|
||||
|
||||
pub struct MapState<W> {
|
||||
render_state: RenderState,
|
||||
pub type Channel<T> = (mpsc::Sender<T>, mpsc::Receiver<T>);
|
||||
|
||||
pub struct MapState<W> {
|
||||
window: W,
|
||||
|
||||
zoom: ChangeObserver<Zoom>,
|
||||
camera: ChangeObserver<camera::Camera>,
|
||||
perspective: camera::Perspective,
|
||||
|
||||
render_state: RenderState,
|
||||
scheduler: Scheduler,
|
||||
|
||||
try_failed: bool,
|
||||
message_receiver: mpsc::Receiver<TessellateMessage>,
|
||||
shared_thread_state: SharedThreadState,
|
||||
tile_cache: TileCache,
|
||||
|
||||
style: Style,
|
||||
|
||||
camera: ChangeObserver<camera::Camera>,
|
||||
perspective: camera::Perspective,
|
||||
try_failed: bool,
|
||||
}
|
||||
|
||||
impl<W> MapState<W> {
|
||||
@ -57,6 +67,8 @@ impl<W> MapState<W> {
|
||||
2000.0,
|
||||
);
|
||||
|
||||
let (message_sender, message_receiver) = mpsc::channel();
|
||||
|
||||
Self {
|
||||
render_state,
|
||||
window,
|
||||
@ -66,16 +78,48 @@ impl<W> MapState<W> {
|
||||
scheduler,
|
||||
camera: ChangeObserver::new(camera),
|
||||
perspective,
|
||||
message_receiver,
|
||||
tile_cache: TileCache::new(),
|
||||
shared_thread_state: SharedThreadState {
|
||||
tile_request_state: Arc::new(Mutex::new(TileRequestState::new())),
|
||||
message_sender,
|
||||
geometry_index: Arc::new(Mutex::new(GeometryIndex::new())),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update_and_redraw(&mut self) -> Result<(), SurfaceError> {
|
||||
self.scheduler.try_populate_cache();
|
||||
self.try_populate_cache();
|
||||
|
||||
self.prepare_render();
|
||||
self.render_state.render()
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub fn try_populate_cache(&mut self) {
|
||||
if let Ok(result) = self.message_receiver.try_recv() {
|
||||
match result {
|
||||
TessellateMessage::Layer(layer_result) => {
|
||||
tracing::trace!(
|
||||
"Layer {} at {} reached main thread",
|
||||
layer_result.layer_name(),
|
||||
layer_result.get_coords()
|
||||
);
|
||||
self.tile_cache.put_tessellated_layer(layer_result);
|
||||
}
|
||||
TessellateMessage::Tile(TileTessellateMessage { request_id, coords }) => loop {
|
||||
if let Ok(mut tile_request_state) =
|
||||
self.shared_thread_state.tile_request_state.try_lock()
|
||||
{
|
||||
tile_request_state.finish_tile_request(request_id);
|
||||
tracing::trace!("Tile at {} finished loading", coords);
|
||||
break;
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Request tiles which are currently in view
|
||||
#[tracing::instrument(skip_all)]
|
||||
fn request_tiles_in_view(&mut self, view_region: &ViewRegion) -> bool {
|
||||
@ -90,10 +134,7 @@ impl<W> MapState<W> {
|
||||
for coords in view_region.iter() {
|
||||
if coords.build_quad_key().is_some() {
|
||||
// TODO: Make tesselation depend on style?
|
||||
try_failed = self
|
||||
.scheduler
|
||||
.try_request_tile(&coords, &source_layers)
|
||||
.unwrap();
|
||||
try_failed = self.try_request_tile(&coords, &source_layers).unwrap();
|
||||
}
|
||||
}
|
||||
try_failed
|
||||
@ -116,11 +157,8 @@ impl<W> MapState<W> {
|
||||
drop(_guard);
|
||||
|
||||
if let Some(view_region) = &view_region {
|
||||
self.render_state.upload_tile_geometry(
|
||||
view_region,
|
||||
&self.style,
|
||||
self.scheduler.get_tile_cache(),
|
||||
);
|
||||
self.render_state
|
||||
.upload_tile_geometry(view_region, &self.style, &self.tile_cache);
|
||||
|
||||
self.render_state
|
||||
.update_tile_view_pattern(view_region, &view_proj, self.zoom());
|
||||
@ -144,6 +182,76 @@ impl<W> MapState<W> {
|
||||
self.zoom.update_reference();
|
||||
}
|
||||
|
||||
pub fn try_request_tile(
|
||||
&mut self,
|
||||
coords: &WorldTileCoords,
|
||||
layers: &HashSet<String>,
|
||||
) -> Result<bool, mpsc::SendError<TileRequest>> {
|
||||
if !self.tile_cache.is_layers_missing(coords, layers) {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
if let Ok(mut tile_request_state) = self.shared_thread_state.tile_request_state.try_lock() {
|
||||
if let Some(request_id) = tile_request_state.start_tile_request(TileRequest {
|
||||
coords: *coords,
|
||||
layers: layers.clone(),
|
||||
}) {
|
||||
tracing::info!("new tile request: {}", &coords);
|
||||
|
||||
// The following snippet can be added instead of the next code block to demonstrate
|
||||
// an understanable approach of fetching
|
||||
/*#[cfg(target_arch = "wasm32")]
|
||||
if let Some(tile_coords) = coords.into_tile(TileAddressingScheme::TMS) {
|
||||
crate::platform::legacy_webworker_fetcher::request_tile(
|
||||
request_id,
|
||||
tile_coords,
|
||||
);
|
||||
}*/
|
||||
|
||||
{
|
||||
let client = SourceClient::Http(HttpSourceClient::new());
|
||||
let copied_coords = *coords;
|
||||
|
||||
let future_fn = move |state: SharedThreadState| async move {
|
||||
if let Ok(data) = client.fetch(&copied_coords).await {
|
||||
state
|
||||
.process_tile(request_id, data.into_boxed_slice())
|
||||
.unwrap();
|
||||
} else {
|
||||
state.tile_unavailable(request_id).unwrap();
|
||||
}
|
||||
};
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
self.scheduler
|
||||
.schedule_method()
|
||||
.schedule(self.shared_thread_state.clone(), future_fn)
|
||||
.unwrap();
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
self.scheduler
|
||||
.schedule_method()
|
||||
.schedule(self.shared_thread_state.clone(), future_fn)
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(false)
|
||||
} else {
|
||||
Ok(true)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn resize(&mut self, width: u32, height: u32) {
|
||||
if width <= 0 || height <= 0 {
|
||||
return;
|
||||
}
|
||||
|
||||
self.perspective.resize(width, height);
|
||||
self.camera.resize(width, height);
|
||||
|
||||
self.render_state.resize(width, height)
|
||||
}
|
||||
|
||||
pub fn scheduler(&self) -> &Scheduler {
|
||||
&self.scheduler
|
||||
}
|
||||
@ -164,17 +272,6 @@ impl<W> MapState<W> {
|
||||
&self.perspective
|
||||
}
|
||||
|
||||
pub fn resize(&mut self, width: u32, height: u32) {
|
||||
if width <= 0 || height <= 0 {
|
||||
return;
|
||||
}
|
||||
|
||||
self.perspective.resize(width, height);
|
||||
self.camera.resize(width, height);
|
||||
|
||||
self.render_state.resize(width, height)
|
||||
}
|
||||
|
||||
pub fn zoom(&self) -> Zoom {
|
||||
*self.zoom
|
||||
}
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
use crate::io::scheduler::{Scheduler, ThreadLocalState};
|
||||
use crate::io::scheduler::Scheduler;
|
||||
use crate::io::shared_thread_state::SharedThreadState;
|
||||
|
||||
pub struct TokioScheduleMethod;
|
||||
|
||||
@ -9,12 +10,12 @@ impl TokioScheduleMethod {
|
||||
|
||||
pub fn schedule<T>(
|
||||
&self,
|
||||
scheduler: &Scheduler,
|
||||
future_factory: impl (FnOnce(ThreadLocalState) -> T) + Send + 'static,
|
||||
shared_thread_state: SharedThreadState,
|
||||
future_factory: impl (FnOnce(SharedThreadState) -> T) + Send + 'static,
|
||||
) where
|
||||
T: std::future::Future + Send + 'static,
|
||||
T::Output: Send + 'static,
|
||||
{
|
||||
tokio::task::spawn(future_factory(scheduler.new_thread_local_state()));
|
||||
tokio::task::spawn(future_factory(shared_thread_state));
|
||||
}
|
||||
}
|
||||
|
||||
@ -38,17 +38,16 @@ impl WebWorkerPoolScheduleMethod {
|
||||
|
||||
pub fn schedule<T>(
|
||||
&self,
|
||||
scheduler: &Scheduler,
|
||||
shared_thread_state: SharedThreadState,
|
||||
future_factory: impl (FnOnce(ThreadLocalState) -> T) + Send + 'static,
|
||||
) where
|
||||
T: std::future::Future + 'static,
|
||||
T::Output: Send + 'static,
|
||||
{
|
||||
let state = scheduler.new_thread_local_state();
|
||||
self.pool
|
||||
.run(move || {
|
||||
wasm_bindgen_futures::future_to_promise(async move {
|
||||
future_factory(state).await;
|
||||
future_factory(shared_thread_state).await;
|
||||
Ok(JsValue::undefined())
|
||||
})
|
||||
})
|
||||
|
||||
@ -64,8 +64,6 @@ pub struct RenderState {
|
||||
tile_view_pattern: TileViewPattern<Queue, Buffer>,
|
||||
}
|
||||
|
||||
pub type SurfaceFactory = dyn FnOnce(&wgpu::Instance) -> (Surface, SurfaceConfiguration);
|
||||
|
||||
impl RenderState {
|
||||
pub async fn initialize<W: raw_window_handle::HasRawWindowHandle>(
|
||||
window: &W,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user