Clear pending requests from main thread

This commit is contained in:
Maximilian Ammann 2022-03-15 11:27:57 +01:00
parent 4821282589
commit 08495f2784
6 changed files with 139 additions and 115 deletions

View File

@ -14,7 +14,7 @@ pub mod static_tile_fetcher;
pub mod tile_cache;
#[derive(Clone)]
pub enum TileResult {
pub enum TileFetchResult {
Unavailable {
coords: WorldTileCoords,
},
@ -24,6 +24,12 @@ pub enum TileResult {
},
}
#[derive(Clone)]
pub enum TileTessellateResult {
Tile { request_id: TileRequestID },
Layer(LayerResult),
}
#[derive(Clone)]
pub enum LayerResult {
UnavailableLayer {

View File

@ -1,18 +1,16 @@
use std::collections::{HashMap, HashSet};
use log::{info, warn};
use std::sync::mpsc::{channel, Receiver, SendError, Sender};
use std::sync::{Arc, Mutex};
//use crossbeam_channel::{unbounded as channel, Receiver, RecvError, SendError, Sender};
use log::{info, warn};
use style_spec::source::TileAdressingScheme;
use vector_tile::parse_tile_bytes;
/// Describes through which channels work-requests travel. It describes the flow of work.
use crate::coords::{TileCoords, WorldTileCoords};
use crate::io::tile_cache::TileCache;
use crate::io::{LayerResult, TileRequest, TileRequestID, TileResult};
use crate::io::{LayerResult, TileFetchResult, TileRequest, TileRequestID, TileTessellateResult};
use crate::tessellation::Tessellated;
@ -23,6 +21,19 @@ pub enum ScheduleMethod {
WebWorker(crate::platform::scheduler::WebWorkerScheduleMethod),
}
impl Default for ScheduleMethod {
fn default() -> Self {
#[cfg(not(target_arch = "wasm32"))]
{
ScheduleMethod::Tokio(crate::platform::scheduler::TokioScheduleMethod::new(None))
}
#[cfg(target_arch = "wasm32")]
{
ScheduleMethod::Tokio(crate::platform::scheduler::WebWorkerScheduleMethod::new())
}
}
}
impl ScheduleMethod {
pub fn schedule_tile_request(
&self,
@ -45,7 +56,17 @@ impl ScheduleMethod {
pub struct ThreadLocalTessellatorState {
tile_request_state: Arc<Mutex<TileRequestState>>,
layer_result_sender: Sender<LayerResult>,
layer_result_sender: Sender<TileTessellateResult>,
}
#[cfg(target_arch = "wasm32")]
impl Drop for ThreadLocalTessellatorState {
fn drop(&mut self) {
warn!(
"ThreadLocalTessellatorState dropped. \
On web this should only happen when the application is stopped!"
);
}
}
impl ThreadLocalTessellatorState {
@ -53,15 +74,16 @@ impl ThreadLocalTessellatorState {
&self,
request_id: TileRequestID,
data: Box<[u8]>,
) -> Result<(), SendError<LayerResult>> {
if let Ok(tile_request_state) = self.tile_request_state.lock() {
if let Some(tile_request) = tile_request_state.finish_tile_request(request_id) {
) -> Result<(), SendError<TileTessellateResult>> {
if let Ok(mut tile_request_state) = self.tile_request_state.lock() {
if let Some(tile_request) = tile_request_state.get_tile_request(request_id) {
self.tessellate_layers_with_request(
TileResult::Tile {
TileFetchResult::Tile {
coords: tile_request.coords,
data,
},
tile_request,
&tile_request,
request_id,
)
} else {
Ok(())
@ -73,10 +95,11 @@ impl ThreadLocalTessellatorState {
fn tessellate_layers_with_request(
&self,
tile_result: TileResult,
tile_result: TileFetchResult,
tile_request: &TileRequest,
) -> Result<(), SendError<LayerResult>> {
if let TileResult::Tile { data, coords } = tile_result {
request_id: TileRequestID,
) -> Result<(), SendError<TileTessellateResult>> {
if let TileFetchResult::Tile { data, coords } = tile_result {
info!("parsing tile {} with {}bytes", &coords, data.len());
let tile = parse_tile_bytes(&data).expect("failed to load tile");
@ -87,26 +110,37 @@ impl ThreadLocalTessellatorState {
.find(|layer| to_load.as_str() == layer.name())
{
if let Some((buffer, feature_indices)) = layer.tessellate() {
self.layer_result_sender
.send(LayerResult::TessellatedLayer {
self.layer_result_sender.send(TileTessellateResult::Layer(
LayerResult::TessellatedLayer {
coords,
buffer: buffer.into(),
feature_indices,
layer_data: layer.clone(),
})?;
},
))?;
} else {
self.layer_result_sender.send(TileTessellateResult::Layer(
LayerResult::UnavailableLayer {
coords,
layer_name: to_load.to_string(),
},
))?;
}
info!("layer {} ready: {}", to_load, &coords);
} else {
self.layer_result_sender
.send(LayerResult::UnavailableLayer {
self.layer_result_sender.send(TileTessellateResult::Layer(
LayerResult::UnavailableLayer {
coords,
layer_name: to_load.to_string(),
})?;
},
))?;
info!("layer {} not found: {}", to_load, &coords);
}
}
self.layer_result_sender
.send(TileTessellateResult::Tile { request_id })?;
}
Ok(())
@ -114,8 +148,8 @@ impl ThreadLocalTessellatorState {
}
pub struct IOScheduler {
layer_result_sender: Sender<LayerResult>,
layer_result_receiver: Receiver<LayerResult>,
result_sender: Sender<TileTessellateResult>,
result_receiver: Receiver<TileTessellateResult>,
tile_request_state: Arc<Mutex<TileRequestState>>,
tile_cache: TileCache,
schedule_method: ScheduleMethod,
@ -129,69 +163,68 @@ const _: () = {
}
};
impl Drop for IOScheduler {
fn drop(&mut self) {
warn!("WorkerLoop dropped. This should only happen when the application is stopped!");
}
}
impl IOScheduler {
pub fn new(schedule_method: ScheduleMethod) -> Self {
let (layer_result_sender, layer_result_receiver) = channel();
let (result_sender, result_receiver) = channel();
Self {
layer_result_sender,
layer_result_receiver,
result_sender,
result_receiver,
tile_request_state: Arc::new(Mutex::new(TileRequestState::new())),
tile_cache: TileCache::new(),
schedule_method,
}
}
pub fn populate_cache(&self) {
while let Ok(result) = self.layer_result_receiver.try_recv() {
self.tile_cache.push(result);
pub fn try_populate_cache(&mut self) {
if let Ok(result) = self.result_receiver.try_recv() {
match result {
TileTessellateResult::Tile { request_id } => loop {
if let Ok(mut tile_request_state) = self.tile_request_state.try_lock() {
tile_request_state.finish_tile_request(request_id);
break;
}
},
TileTessellateResult::Layer(layer_result) => {
self.tile_cache.push(layer_result);
}
}
}
}
pub fn new_tessellator_state(&self) -> ThreadLocalTessellatorState {
ThreadLocalTessellatorState {
tile_request_state: self.tile_request_state.clone(),
layer_result_sender: self.layer_result_sender.clone(),
layer_result_sender: self.result_sender.clone(),
}
}
pub fn request_tile(
pub fn try_request_tile(
&mut self,
tile_request: TileRequest,
) -> Result<(), SendError<TileRequest>> {
let TileRequest { coords, layers } = &tile_request;
if let Some(missing_layers) = self
.tile_cache
.get_missing_tessellated_layer_names_at(coords, layers.clone())
{
if missing_layers.is_empty() {
return Ok(());
}
let mut missing_layers = layers.clone();
self.tile_cache
.retain_missing_layer_names(coords, &mut missing_layers);
loop {
if let Ok(mut tile_request_state) = self.tile_request_state.try_lock() {
if let Some(id) = tile_request_state.start_tile_request(tile_request.clone()) {
info!("new tile request: {}", &coords);
let tile_coords = coords.into_tile(TileAdressingScheme::TMS);
self.schedule_method
.schedule_tile_request(self, id, tile_coords)
}
break;
}
}
Ok(())
} else {
Ok(())
if missing_layers.is_empty() {
return Ok(());
}
if let Ok(mut tile_request_state) = self.tile_request_state.try_lock() {
let tile_coords = *coords;
if let Some(id) = tile_request_state.start_tile_request(tile_request) {
info!("new tile request: {}", &tile_coords);
let tile_coords = tile_coords.into_tile(TileAdressingScheme::TMS);
self.schedule_method
.schedule_tile_request(self, id, tile_coords)
}
}
Ok(())
}
pub fn get_tessellated_layers_at(
@ -235,14 +268,14 @@ impl TileRequestState {
Some(id)
}
/*pub fn finish_tile_request(&mut self, id: TileRequestID) -> Option<TileRequest> {
pub fn finish_tile_request(&mut self, id: TileRequestID) -> Option<TileRequest> {
self.pending_tile_requests.remove(&id).map(|request| {
self.pending_coords.remove(&request.coords);
request
})
}*/
}
pub fn finish_tile_request(&self, id: TileRequestID) -> Option<&TileRequest> {
self.pending_tile_requests.get(&id)
pub fn get_tile_request(&self, id: TileRequestID) -> Option<TileRequest> {
self.pending_tile_requests.get(&id).cloned()
}
}

View File

@ -3,31 +3,25 @@ use crate::io::LayerResult;
use std::collections::{btree_map, BTreeMap, HashSet};
use std::sync::{Arc, Mutex};
#[derive(Clone)]
pub struct TileCache {
store: Arc<Mutex<BTreeMap<WorldTileCoords, Vec<LayerResult>>>>,
index: BTreeMap<WorldTileCoords, Vec<LayerResult>>,
}
impl TileCache {
pub fn new() -> Self {
Self {
store: Arc::new(Mutex::new(BTreeMap::new())),
index: BTreeMap::new(),
}
}
pub fn push(&self, result: LayerResult) -> bool {
if let Ok(mut map) = self.store.lock() {
match map.entry(result.get_coords()) {
btree_map::Entry::Vacant(entry) => {
entry.insert(vec![result]);
}
btree_map::Entry::Occupied(mut entry) => {
entry.get_mut().push(result);
}
pub fn push(&mut self, result: LayerResult) {
match self.index.entry(result.get_coords()) {
btree_map::Entry::Vacant(entry) => {
entry.insert(vec![result]);
}
btree_map::Entry::Occupied(mut entry) => {
entry.get_mut().push(result);
}
true
} else {
false
}
}
@ -37,12 +31,11 @@ impl TileCache {
skip_layers: &HashSet<String>,
) -> Vec<LayerResult> {
let mut ret = Vec::new();
if let Ok(map) = self.store.try_lock() {
if let Some(results) = map.get(coords) {
for result in results {
if !skip_layers.contains(&result.layer_name().to_string()) {
ret.push(result.clone());
}
if let Some(results) = self.index.get(coords) {
for result in results {
if !skip_layers.contains(&result.layer_name().to_string()) {
ret.push(result.clone());
}
}
}
@ -50,26 +43,18 @@ impl TileCache {
ret
}
pub fn get_missing_tessellated_layer_names_at(
pub fn retain_missing_layer_names(
&self,
coords: &WorldTileCoords,
mut layers: HashSet<String>,
) -> Option<HashSet<String>> {
if let Ok(loaded) = self.store.try_lock() {
if let Some(tessellated_layers) = loaded.get(coords) {
let tessellated_set: HashSet<String> = tessellated_layers
.iter()
.map(|tessellated_layer| tessellated_layer.layer_name().to_string())
.collect();
layers: &mut HashSet<String>,
) {
if let Some(results) = self.index.get(coords) {
let tessellated_set: HashSet<String> = results
.iter()
.map(|tessellated_layer| tessellated_layer.layer_name().to_string())
.collect();
layers.retain(|layer| !tessellated_set.contains(layer));
Some(layers)
} else {
Some(layers)
}
} else {
None
layers.retain(|layer| !tessellated_set.contains(layer));
}
}
}

View File

@ -17,7 +17,7 @@ use crate::render::render_state::RenderState;
pub async fn run(
window: winit::window::Window,
event_loop: EventLoop<()>,
mut workflow: Box<IOScheduler>,
mut scheduler: Box<IOScheduler>,
style: Box<Style>,
) {
let mut input = InputController::new(0.2, 100.0, 0.1);
@ -87,10 +87,10 @@ pub async fn run(
let dt = now - last_render_time;
last_render_time = now;
workflow.populate_cache();
scheduler.try_populate_cache();
input.update_state(state, dt);
state.upload_tile_geometry(&mut workflow);
state.upload_tile_geometry(&mut scheduler);
match state.render() {
Ok(_) => {}
Err(wgpu::SurfaceError::Lost) => {

View File

@ -52,12 +52,12 @@ pub fn create_scheduler() -> *mut IOScheduler {
}
#[wasm_bindgen]
pub fn new_tessellator_state(workflow_ptr: *mut IOScheduler) -> *mut ThreadLocalTessellatorState {
let workflow: Box<IOScheduler> = unsafe { Box::from_raw(workflow_ptr) };
let tessellator_state = Box::new(workflow.new_tessellator_state());
pub fn new_tessellator_state(scheduler_ptr: *mut IOScheduler) -> *mut ThreadLocalTessellatorState {
let scheduler: Box<IOScheduler> = unsafe { Box::from_raw(scheduler_ptr) };
let tessellator_state = Box::new(scheduler.new_tessellator_state());
let tessellator_state_ptr = Box::into_raw(tessellator_state);
// Call forget such that workflow does not get deallocated
std::mem::forget(workflow);
// Call forget such that scheduler does not get deallocated
std::mem::forget(scheduler);
return tessellator_state_ptr;
}
@ -74,7 +74,7 @@ pub fn tessellate_layers(
.tessellate_layers(request_id, data)
.unwrap();
// Call forget such that workflow does not get deallocated
// Call forget such that scheduler does not get deallocated
std::mem::forget(tessellator_state);
}
@ -99,8 +99,8 @@ pub fn get_canvas(element_id: &'static str) -> web_sys::HtmlCanvasElement {
}
#[wasm_bindgen]
pub async fn run(workflow_ptr: *mut IOScheduler) {
let scheduler: Box<IOScheduler> = unsafe { Box::from_raw(workflow_ptr) };
pub async fn run(scheduler_ptr: *mut IOScheduler) {
let scheduler: Box<IOScheduler> = unsafe { Box::from_raw(scheduler_ptr) };
// Either call forget or the main loop to keep worker loop alive
MapBuilder::from_canvas("mapr")
@ -109,7 +109,7 @@ pub async fn run(workflow_ptr: *mut IOScheduler) {
.run_async()
.await;
// std::mem::forget(workflow);
// std::mem::forget(scheduler);
}
pub mod scheduler {

View File

@ -374,7 +374,7 @@ impl RenderState {
coords,
layers: source_layers.clone(),
};
scheduler.request_tile(tile_request).unwrap();
scheduler.try_request_tile(tile_request).unwrap();
}
}