Fix multithreading in web

This commit is contained in:
Maximilian Ammann 2022-09-15 18:21:01 +02:00
parent 851a02cd69
commit c7f7408a10
12 changed files with 149 additions and 208 deletions

View File

@ -1,6 +1,6 @@
use maplibre::environment::DefaultTransferables;
use maplibre::platform::apc::TokioAsyncProcedureCall;
use maplibre::platform::apc::SchedulerAsyncProcedureCall;
use maplibre::{
platform::{http_client::ReqwestHttpClient, scheduler::TokioScheduler},
MapBuilder,
@ -9,10 +9,13 @@ use maplibre_winit::winit::{WinitEnvironment, WinitMapWindowConfig};
pub async fn run_headed() {
let client = ReqwestHttpClient::new(None);
MapBuilder::<WinitEnvironment<_, _, _, TokioAsyncProcedureCall<_>>>::new()
MapBuilder::<WinitEnvironment<_, _, _, SchedulerAsyncProcedureCall<_, _>>>::new()
.with_map_window_config(WinitMapWindowConfig::new("maplibre".to_string()))
.with_http_client(client.clone())
.with_apc(TokioAsyncProcedureCall::new(client))
.with_apc(SchedulerAsyncProcedureCall::new(
client,
TokioScheduler::new(),
))
.with_scheduler(TokioScheduler::new())
.build()
.initialize()

View File

@ -1,5 +1,5 @@
use maplibre::headless::HeadlessEnvironment;
use maplibre::platform::apc::TokioAsyncProcedureCall;
use maplibre::platform::apc::SchedulerAsyncProcedureCall;
use maplibre::{
coords::{LatLon, WorldTileCoords},
error::Error,
@ -15,20 +15,24 @@ use tile_grid::{extent_wgs84_to_merc, Extent, GridIterator};
pub async fn run_headless(tile_size: u32, min: LatLon, max: LatLon) {
let client = ReqwestHttpClient::new(None);
let mut map = MapBuilder::<HeadlessEnvironment<_, _, _, TokioAsyncProcedureCall<_>>>::new()
.with_map_window_config(HeadlessMapWindowConfig {
size: WindowSize::new(tile_size, tile_size).unwrap(),
})
.with_http_client(client.clone())
.with_apc(TokioAsyncProcedureCall::new(client)) // TODO: avoid passing client here
.with_scheduler(TokioScheduler::new())
.with_renderer_settings(RendererSettings {
texture_format: TextureFormat::Rgba8UnormSrgb,
..RendererSettings::default()
})
.build()
.initialize_headless()
.await;
let mut map =
MapBuilder::<HeadlessEnvironment<_, _, _, SchedulerAsyncProcedureCall<_, _>>>::new()
.with_map_window_config(HeadlessMapWindowConfig {
size: WindowSize::new(tile_size, tile_size).unwrap(),
})
.with_http_client(client.clone())
.with_apc(SchedulerAsyncProcedureCall::new(
client,
TokioScheduler::new(),
)) // TODO: avoid passing client and scheduler here
.with_scheduler(TokioScheduler::new())
.with_renderer_settings(RendererSettings {
texture_format: TextureFormat::Rgba8UnormSrgb,
..RendererSettings::default()
})
.build()
.initialize_headless()
.await;
let tile_limits = google_mercator().tile_limits(
extent_wgs84_to_merc(&Extent {

View File

@ -15,7 +15,7 @@ use wgpu::{BufferAsyncError, BufferSlice};
use crate::environment::DefaultTransferables;
use crate::io::apc::AsyncProcedureCall;
use crate::io::transferables::Transferables;
use crate::platform::apc::TokioAsyncProcedureCall;
use crate::platform::apc::SchedulerAsyncProcedureCall;
use crate::{
context::{MapContext, ViewState},
coords::{LatLon, ViewRegion, WorldCoords, WorldTileCoords, Zoom, TILE_SIZE},

View File

@ -1,12 +1,16 @@
use crate::coords::WorldTileCoords;
use crate::environment::DefaultTransferables;
use crate::io::source_client::{HttpSourceClient, SourceClient};
use crate::io::transferables::Transferables;
use crate::io::TileRequest;
use crate::Scheduler;
use crate::{Environment, HttpClient};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::mpsc;
use std::sync::mpsc::{Receiver, Sender};
/// The result of the tessellation of a tile.
/// `TessellatedLayer` contains the result of the tessellation for a specific layer, otherwise
@ -38,3 +42,68 @@ pub trait AsyncProcedureCall<T: Transferables, HC: HttpClient>: 'static {
fn schedule(&self, input: Input, procedure: AsyncProcedure<Self::Context>);
}
// FIXME: Make this generic using the Schedule
#[derive(Clone)]
pub struct TokioContext<T: Transferables, HC: HttpClient> {
sender: Sender<Message<T>>,
source_client: SourceClient<HC>,
}
impl<T: Transferables, HC: HttpClient> Context<T, HC> for TokioContext<T, HC> {
fn send(&self, data: Message<T>) {
self.sender.send(data).unwrap();
}
fn source_client(&self) -> &SourceClient<HC> {
&self.source_client
}
}
pub struct SchedulerAsyncProcedureCall<HC: HttpClient, S: Scheduler> {
channel: (
Sender<Message<DefaultTransferables>>,
Receiver<Message<DefaultTransferables>>,
),
http_client: HC,
scheduler: S,
}
impl<HC: HttpClient, S: Scheduler> SchedulerAsyncProcedureCall<HC, S> {
pub fn new(http_client: HC, scheduler: S) -> Self {
Self {
channel: mpsc::channel(),
http_client,
scheduler,
}
}
}
impl<HC: HttpClient, S: Scheduler> AsyncProcedureCall<DefaultTransferables, HC>
for SchedulerAsyncProcedureCall<HC, S>
{
type Context = TokioContext<DefaultTransferables, HC>;
fn receive(&mut self) -> Option<Message<DefaultTransferables>> {
let transferred = self.channel.1.try_recv().ok()?;
Some(transferred)
}
fn schedule(&self, input: Input, procedure: AsyncProcedure<Self::Context>) {
let sender = self.channel.0.clone();
let client = self.http_client.clone(); // FIXME: do not clone each time
self.scheduler
.schedule(move || async move {
(procedure)(
input,
TokioContext {
sender,
source_client: SourceClient::Http(HttpSourceClient::new(client)),
},
)
.await;
})
.unwrap();
}
}

View File

@ -1,74 +0,0 @@
use crate::environment::DefaultTransferables;
use crate::io::apc::{AsyncProcedure, AsyncProcedureCall, Context, Input, Message};
use crate::io::source_client::{HttpSourceClient, SourceClient};
use crate::io::transferables::Transferables;
use crate::{Environment, HttpClient};
use serde::Serialize;
use std::collections::HashMap;
use std::sync::mpsc;
use std::sync::mpsc::{Receiver, Sender};
use tokio_util::task::LocalPoolHandle;
// FIXME: Make this generic using the Schedule
#[derive(Clone)]
pub struct TokioContext<T: Transferables, HC: HttpClient> {
sender: Sender<Message<T>>,
source_client: SourceClient<HC>,
}
impl<T: Transferables, HC: HttpClient> Context<T, HC> for TokioContext<T, HC>
where
T: Clone,
{
fn send(&self, data: Message<T>) {
self.sender.send(data).unwrap();
}
fn source_client(&self) -> &SourceClient<HC> {
&self.source_client
}
}
pub struct TokioAsyncProcedureCall<HC: HttpClient> {
channel: (
Sender<Message<DefaultTransferables>>,
Receiver<Message<DefaultTransferables>>,
),
pool: LocalPoolHandle,
http_client: HC,
}
impl<HC: HttpClient> TokioAsyncProcedureCall<HC> {
pub fn new(http_client: HC) -> Self {
Self {
channel: mpsc::channel(),
pool: LocalPoolHandle::new(4),
http_client,
}
}
}
impl<HC: HttpClient> AsyncProcedureCall<DefaultTransferables, HC> for TokioAsyncProcedureCall<HC> {
type Context = TokioContext<DefaultTransferables, HC>;
fn receive(&mut self) -> Option<Message<DefaultTransferables>> {
let transferred = self.channel.1.try_recv().ok()?;
Some(transferred)
}
fn schedule(&self, input: Input, procedure: AsyncProcedure<Self::Context>) {
let sender = self.channel.0.clone();
let client = self.http_client.clone(); // FIXME: do not clone each time
self.pool.spawn_pinned(move || async move {
(procedure)(
input,
TokioContext {
sender,
source_client: SourceClient::Http(HttpSourceClient::new(client)),
},
)
.await;
});
}
}

View File

@ -2,7 +2,6 @@
use std::future::Future;
pub mod apc;
pub mod http_client;
pub mod scheduler;

View File

@ -1,7 +1,6 @@
import {run} from "../wasm/maplibre"
import init, {create_map, run} from "../wasm/maplibre"
import {Spector} from "spectorjs"
import {checkRequirements, checkWasmFeatures} from "../browser";
import init from "../wasm/maplibre";
import {preventDefaultTouchActions} from "../canvas";
// @ts-ignore esbuild plugin is handling this
import PoolWorker from './multithreaded-pool.worker.js';
@ -33,9 +32,11 @@ export const startMapLibre = async (wasmPath: string | undefined, workerPath: st
await initializeSharedModule(wasmPath);
await run(() => {
let map = await create_map(() => {
return workerPath ? new Worker(workerPath, {
type: 'module'
}) : PoolWorker();
})
await run(map)
}

View File

@ -12,9 +12,6 @@ use maplibre_winit::winit::{WinitEnvironment, WinitMapWindowConfig};
use wasm_bindgen::prelude::*;
use crate::platform::http_client::WHATWGFetchHttpClient;
use crate::platform::unsync::apc::PassingAsyncProcedureCall;
use crate::platform::unsync::transferables::LinearTransferables;
use crate::platform::AsyncProcedureCall;
mod error;
mod platform;
@ -46,34 +43,58 @@ pub fn wasm_bindgen_start() {
enable_tracing();
}
#[cfg(not(target_feature = "atomics"))]
pub type MapType = Map<
WinitEnvironment<
NopScheduler,
WHATWGFetchHttpClient,
LinearTransferables,
PassingAsyncProcedureCall,
platform::unsync::transferables::LinearTransferables,
platform::unsync::apc::PassingAsyncProcedureCall,
>,
>;
#[cfg(target_feature = "atomics")]
pub type MapType = Map<
WinitEnvironment<
platform::sync::pool_scheduler::WebWorkerPoolScheduler,
WHATWGFetchHttpClient,
maplibre::environment::DefaultTransferables,
maplibre::io::apc::SchedulerAsyncProcedureCall<
WHATWGFetchHttpClient,
platform::sync::pool_scheduler::WebWorkerPoolScheduler,
>,
>,
>;
#[wasm_bindgen]
pub async fn create_map(new_worker: js_sys::Function) -> u32 {
// Either call forget or the main loop to keep worker loop alive
let mut builder = MapBuilder::new()
.with_map_window_config(WinitMapWindowConfig::new("maplibre".to_string()))
.with_http_client(WHATWGFetchHttpClient::new());
#[cfg(target_feature = "atomics")]
let scheduler = platform::Scheduler::new(new_worker.clone());
#[cfg(target_feature = "atomics")]
let apc = AsyncProcedureCall::new(scheduler);
{
builder = builder
.with_apc(maplibre::io::apc::SchedulerAsyncProcedureCall::new(
WHATWGFetchHttpClient::new(),
platform::sync::pool_scheduler::WebWorkerPoolScheduler::new(new_worker.clone()),
))
.with_scheduler(platform::sync::pool_scheduler::WebWorkerPoolScheduler::new(
new_worker,
));
}
#[cfg(not(target_feature = "atomics"))]
let apc = AsyncProcedureCall::new(new_worker);
{
builder = builder
.with_apc(platform::unsync::apc::PassingAsyncProcedureCall::new(
new_worker, 4,
))
.with_scheduler(NopScheduler);
}
// Either call forget or the main loop to keep worker loop alive
let map: MapType = MapBuilder::new()
.with_map_window_config(WinitMapWindowConfig::new("maplibre".to_string()))
.with_http_client(WHATWGFetchHttpClient::new())
.with_scheduler(NopScheduler)
.with_apc(apc)
.build()
.initialize()
.await;
let map: MapType = builder.build().initialize().await;
Rc::into_raw(Rc::new(RefCell::new(map))) as u32
}

View File

@ -4,12 +4,6 @@ pub mod http_client;
#[cfg(target_feature = "atomics")]
pub mod sync;
#[cfg(target_feature = "atomics")]
type Scheduler = sync::pool_scheduler::WebWorkerPoolScheduler;
#[cfg(target_feature = "atomics")]
pub type AsyncProcedureCall = sync::apc::AtomicAsyncProcedureCall;
#[cfg(not(target_feature = "atomics"))]
pub mod unsync;
#[cfg(not(target_feature = "atomics"))]
pub type AsyncProcedureCall = unsync::apc::PassingAsyncProcedureCall;

View File

@ -1,78 +0,0 @@
use crate::platform::sync::pool_scheduler::WebWorkerPoolScheduler;
use maplibre::environment::DefaultTransferables;
use maplibre::environment::Environment;
use maplibre::io::apc::{AsyncProcedure, AsyncProcedureCall, Context, Message};
use maplibre::io::scheduler::Scheduler;
use maplibre::io::source_client::{HttpClient, HttpSourceClient, SourceClient};
use maplibre::io::transferables::Transferables;
use serde::Serialize;
use std::collections::HashMap;
use std::sync::mpsc;
use std::sync::mpsc::{Receiver, Sender};
#[derive(Clone)]
pub struct AtomicContext<T: Transferables, HC: HttpClient> {
sender: Sender<Message<T>>,
source_client: SourceClient<HC>,
}
impl<T: Transferables, HC: HttpClient> Context<T, HC> for AtomicContext<T, HC>
where
T: Clone,
{
fn send(&self, data: Message<T>) {
self.sender.send(data).unwrap();
}
fn source_client(&self) -> &SourceClient<HC> {
&self.source_client
}
}
pub struct AtomicAsyncProcedureCall {
channel: (
Sender<Message<DefaultTransferables>>,
Receiver<Message<DefaultTransferables>>,
),
scheduler: WebWorkerPoolScheduler,
}
impl AtomicAsyncProcedureCall {
pub fn new(scheduler: WebWorkerPoolScheduler) -> Self {
Self {
channel: mpsc::channel(),
scheduler,
}
}
}
impl<HC: HttpClient> AsyncProcedureCall<DefaultTransferables, HC> for AtomicAsyncProcedureCall {
type Context = AtomicContext<DefaultTransferables, HC>;
fn receive(&self) -> Option<Message<DefaultTransferables>> {
let transferred = self.channel.1.try_recv().ok()?;
Some(transferred)
}
fn schedule<I: Serialize + Send + 'static>(
&self,
input: I,
procedure: AsyncProcedure<I, AtomicContext<DefaultTransferables, HC>>,
http_client: HttpSourceClient<HC>,
) {
let sender = self.channel.0.clone();
self.scheduler
.schedule(move || async move {
(procedure)(
input,
AtomicContext {
sender,
source_client: SourceClient::Http(http_client),
},
)
.await;
})
.unwrap();
}
}

View File

@ -1,3 +1,2 @@
pub mod apc;
pub mod pool;
pub mod pool_scheduler;

View File

@ -36,8 +36,6 @@ pub struct PassingContext {
impl Context<UsedTransferables, UsedHttpClient> for PassingContext {
fn send(&self, data: Message<UsedTransferables>) {
// TODO: send back to main thread via postMessage
let (tag, serialized): (u32, &[u8]) = match &data {
Message::TileTessellated(data) => (1, bytemuck::bytes_of(data)),
Message::UnavailableLayer(data) => (2, bytemuck::bytes_of(data)),
@ -70,7 +68,7 @@ pub struct PassingAsyncProcedureCall {
}
impl PassingAsyncProcedureCall {
pub fn new(new_worker: js_sys::Function) -> Self {
pub fn new(new_worker: js_sys::Function, initial_workers: u8) -> Self {
let create_new_worker = Box::new(move || {
new_worker
.call0(&JsValue::undefined())
@ -79,15 +77,20 @@ impl PassingAsyncProcedureCall {
.unwrap()
});
let worker = create_new_worker();
let workers = (0..initial_workers)
.map(|_| {
let worker: Worker = create_new_worker();
let array = js_sys::Array::new();
array.push(&wasm_bindgen::module());
worker.post_message(&array).unwrap();
let array = js_sys::Array::new();
array.push(&wasm_bindgen::module());
worker.post_message(&array).unwrap();
worker
})
.collect::<Vec<_>>();
Self {
new_worker: create_new_worker,
workers: vec![worker],
workers,
received: vec![],
}
}