Add a more general threadpool implementation for wasm

This commit is contained in:
Maximilian Ammann 2022-03-23 16:22:14 +01:00
parent 5ce9f07d73
commit a208e299bc
8 changed files with 355 additions and 59 deletions

View File

@ -18,7 +18,7 @@ build = "build.rs"
license = "MIT OR Apache-2.0"
[package.metadata.wasm-pack.profile.release]
wasm-opt = true
wasm-opt = false
[profile.release]
lto = true
@ -38,6 +38,7 @@ web-sys = { version = "0.3", features = [
"Window",
"Headers",
"WorkerGlobalScope", "Request", "RequestInit", "RequestMode", "Response",
"ErrorEvent", "DedicatedWorkerGlobalScope"
] }
js-sys = "0.3"
wasm-bindgen = "0.2"

View File

@ -19,6 +19,8 @@ pub enum ScheduleMethod {
Tokio(crate::platform::scheduler::TokioScheduleMethod),
#[cfg(target_arch = "wasm32")]
WebWorker(crate::platform::scheduler::WebWorkerScheduleMethod),
#[cfg(target_arch = "wasm32")]
WebWorkerPool(crate::platform::scheduler::WebWorkerPoolScheduleMethod),
}
impl Default for ScheduleMethod {
@ -50,6 +52,10 @@ impl ScheduleMethod {
ScheduleMethod::WebWorker(method) => {
method.schedule_tile_request(scheduler, request_id, coords)
}
#[cfg(target_arch = "wasm32")]
ScheduleMethod::WebWorkerPool(method) => {
method.schedule_tile_request(scheduler, request_id, coords)
}
}
}
}

View File

@ -1,3 +1,5 @@
mod pool;
use std::panic;
use log::error;
@ -14,6 +16,7 @@ use crate::io::scheduler::ThreadLocalTessellatorState;
use crate::MapBuilder;
use console_error_panic_hook;
pub use instant::Instant;
use scheduler::WebWorkerPoolScheduleMethod;
use scheduler::WebWorkerScheduleMethod;
use style_spec::source::TileAdressingScheme;
use wasm_bindgen::prelude::*;
@ -44,8 +47,8 @@ extern "C" {
#[wasm_bindgen]
pub fn create_scheduler() -> *mut IOScheduler {
let scheduler = Box::new(IOScheduler::new(ScheduleMethod::WebWorker(
WebWorkerScheduleMethod::new(),
let scheduler = Box::new(IOScheduler::new(ScheduleMethod::WebWorkerPool(
WebWorkerPoolScheduleMethod::new(),
)));
let scheduler_ptr = Box::into_raw(scheduler);
return scheduler_ptr;
@ -113,11 +116,14 @@ pub async fn run(scheduler_ptr: *mut IOScheduler) {
}
pub mod scheduler {
use super::pool::WorkerPool;
use super::schedule_tile_request;
use crate::coords::{TileCoords, WorldTileCoords};
use crate::io::scheduler::{IOScheduler, ScheduleMethod, ThreadLocalTessellatorState};
use crate::io::tile_cache::TileCache;
use crate::io::TileRequestID;
use log::warn;
use std::thread::Thread;
pub struct WebWorkerScheduleMethod;
@ -144,6 +150,91 @@ pub mod scheduler {
)
}
}
use crate::error::Error;
use js_sys::{ArrayBuffer, Error as JSError, Uint8Array};
use wasm_bindgen::prelude::*;
use wasm_bindgen::JsCast;
use wasm_bindgen_futures::JsFuture;
use web_sys::{Request, RequestInit, RequestMode, Response, WorkerGlobalScope};
impl From<JsValue> for Error {
fn from(maybe_error: JsValue) -> Self {
assert!(maybe_error.is_instance_of::<JSError>());
let error: JSError = maybe_error.dyn_into().unwrap();
Error::Network(error.message().as_string().unwrap())
}
}
pub struct WebWorkerPoolScheduleMethod {
pool: WorkerPool,
}
impl WebWorkerPoolScheduleMethod {
pub fn new() -> Self {
Self {
pool: WorkerPool::new(4).unwrap(),
}
}
async fn fetch(
state: ThreadLocalTessellatorState,
request_id: TileRequestID,
url: &str,
) -> Result<JsValue, JsValue> {
let mut opts = RequestInit::new();
opts.method("GET");
let request = Request::new_with_str_and_init(&url, &opts)?;
// Get the global scope
let global = js_sys::global();
assert!(global.is_instance_of::<WorkerGlobalScope>());
let scope = global.dyn_into::<WorkerGlobalScope>().unwrap();
// Call fetch on global scope
let maybe_response = JsFuture::from(scope.fetch_with_request(&request)).await?;
assert!(maybe_response.is_instance_of::<Response>());
let response: Response = maybe_response.dyn_into().unwrap();
// Get ArrayBuffer
let maybe_array_buffer = JsFuture::from(response.array_buffer()?).await?;
assert!(maybe_array_buffer.is_instance_of::<ArrayBuffer>());
let array_buffer: ArrayBuffer = maybe_array_buffer.dyn_into().unwrap();
// Copy data to Vec<u8>
let buffer: Uint8Array = Uint8Array::new(&array_buffer);
let mut output: Vec<u8> = vec![0; array_buffer.byte_length() as usize];
buffer.copy_to(output.as_mut_slice());
state
.tessellate_layers(request_id, output.into_boxed_slice())
.unwrap();
Ok(JsValue::undefined())
}
pub fn schedule_tile_request(
&self,
scheduler: &IOScheduler,
request_id: TileRequestID,
coords: TileCoords,
) {
let state = scheduler.new_tessellator_state();
self.pool
.run(move || {
wasm_bindgen_futures::future_to_promise(async move {
let string = format!(
"https://maps.tuerantuer.org/europe_germany/{z}/{x}/{y}.pbf",
x = coords.x,
y = coords.y,
z = coords.z,
);
Self::fetch(state, request_id, string.as_str()).await
})
})
.unwrap();
}
}
}
/*use crate::error::Error;

225
src/platform/web/pool.rs Normal file
View File

@ -0,0 +1,225 @@
//! A small module that's intended to provide an example of creating a pool of
//! web workers which can be used to execute `rayon`-style work.
use js_sys::Promise;
use log::{info, warn};
use std::cell::RefCell;
use std::future::Future;
use std::rc::Rc;
use wasm_bindgen::prelude::*;
use wasm_bindgen::JsCast;
use wasm_bindgen_futures::JsFuture;
use web_sys::{DedicatedWorkerGlobalScope, MessageEvent};
use web_sys::{ErrorEvent, Event, Worker};
#[wasm_bindgen()]
extern "C" {
#[wasm_bindgen(js_name = newWorker)]
fn new_worker() -> JsValue;
}
pub struct WorkerPool {
state: Rc<PoolState>,
}
struct PoolState {
workers: RefCell<Vec<Worker>>,
callback: Closure<dyn FnMut(Event)>,
}
struct Work {
func: Box<dyn (FnOnce() -> Promise) + Send>,
}
impl WorkerPool {
/// Creates a new `WorkerPool` which immediately creates `initial` workers.
///
/// The pool created here can be used over a long period of time, and it
/// will be initially primed with `initial` workers. Currently workers are
/// never released or gc'd until the whole pool is destroyed.
///
/// # Errors
///
/// Returns any error that may happen while a JS web worker is created and a
/// message is sent to it.
pub fn new(initial: usize) -> Result<WorkerPool, JsValue> {
let pool = WorkerPool {
state: Rc::new(PoolState {
workers: RefCell::new(Vec::with_capacity(initial)),
callback: Closure::wrap(Box::new(|event: Event| {
warn!("unhandled event: {}", event.type_());
}) as Box<dyn FnMut(Event)>),
}),
};
for _ in 0..initial {
let worker = pool.spawn()?;
pool.state.push(worker);
}
Ok(pool)
}
/// Unconditionally spawns a new worker
///
/// The worker isn't registered with this `WorkerPool` but is capable of
/// executing work for this wasm module.
///
/// # Errors
///
/// Returns any error that may happen while a JS web worker is created and a
/// message is sent to it.
fn spawn(&self) -> Result<Worker, JsValue> {
info!("spawning new worker");
// TODO: what do do about `./worker.js`:
//
// * the path is only known by the bundler. How can we, as a
// library, know what's going on?
// * How do we not fetch a script N times? It internally then
// causes another script to get fetched N times...
warn!("New worker spawned");
let maybe_worker = new_worker();
assert!(maybe_worker.is_instance_of::<Worker>());
let worker: Worker = maybe_worker.dyn_into().unwrap();
// With a worker spun up send it the module/memory so it can start
// instantiating the wasm module. Later it might receive further
// messages about code to run on the wasm module.
let array = js_sys::Array::new();
array.push(&wasm_bindgen::module());
array.push(&wasm_bindgen::memory());
worker.post_message(&array)?;
Ok(worker)
}
/// Fetches a worker from this pool, spawning one if necessary.
///
/// This will attempt to pull an already-spawned web worker from our cache
/// if one is available, otherwise it will spawn a new worker and return the
/// newly spawned worker.
///
/// # Errors
///
/// Returns any error that may happen while a JS web worker is created and a
/// message is sent to it.
fn worker(&self) -> Result<Worker, JsValue> {
match self.state.workers.borrow_mut().pop() {
Some(worker) => Ok(worker),
None => self.spawn(),
}
}
/// Executes the work `f` in a web worker, spawning a web worker if
/// necessary.
///
/// This will acquire a web worker and then send the closure `f` to the
/// worker to execute. The worker won't be usable for anything else while
/// `f` is executing, and no callbacks are registered for when the worker
/// finishes.
///
/// # Errors
///
/// Returns any error that may happen while a JS web worker is created and a
/// message is sent to it.
pub fn execute(
&self,
f: impl (FnOnce() -> Promise) + Send + 'static,
) -> Result<Worker, JsValue> {
let worker = self.worker()?;
let work = Box::new(Work { func: Box::new(f) });
let ptr = Box::into_raw(work);
match worker.post_message(&JsValue::from(ptr as u32)) {
Ok(()) => Ok(worker),
Err(e) => {
unsafe {
drop(Box::from_raw(ptr));
}
Err(e)
}
}
}
/// Configures an `onmessage` callback for the `worker` specified for the
/// web worker to be reclaimed and re-inserted into this pool when a message
/// is received.
///
/// Currently this `WorkerPool` abstraction is intended to execute one-off
/// style work where the work itself doesn't send any notifications and
/// whatn it's done the worker is ready to execute more work. This method is
/// used for all spawned workers to ensure that when the work is finished
/// the worker is reclaimed back into this pool.
fn reclaim_on_message(&self, worker: Worker) {
let state = Rc::downgrade(&self.state);
let worker2 = worker.clone();
let reclaim_slot = Rc::new(RefCell::new(None));
let slot2 = reclaim_slot.clone();
let reclaim = Closure::wrap(Box::new(move |event: Event| {
if let Some(error) = event.dyn_ref::<ErrorEvent>() {
warn!("error in worker: {}", error.message());
// TODO: this probably leaks memory somehow? It's sort of
// unclear what to do about errors in workers right now.
return;
}
// If this is a completion event then can deallocate our own
// callback by clearing out `slot2` which contains our own closure.
if let Some(_msg) = event.dyn_ref::<MessageEvent>() {
if let Some(state) = state.upgrade() {
state.push(worker2.clone());
}
*slot2.borrow_mut() = None;
return;
}
warn!("unhandled event: {}", event.type_());
// TODO: like above, maybe a memory leak here?
}) as Box<dyn FnMut(Event)>);
worker.set_onmessage(Some(reclaim.as_ref().unchecked_ref()));
*reclaim_slot.borrow_mut() = Some(reclaim);
}
/// Executes `f` in a web worker.
///
/// This pool manages a set of web workers to draw from, and `f` will be
/// spawned quickly into one if the worker is idle. If no idle workers are
/// available then a new web worker will be spawned.
///
/// Once `f` returns the worker assigned to `f` is automatically reclaimed
/// by this `WorkerPool`. This method provides no method of learning when
/// `f` completes, and for that you'll need to use `run_notify`.
///
/// # Errors
///
/// If an error happens while spawning a web worker or sending a message to
/// a web worker, that error is returned.
pub fn run(&self, f: impl (FnOnce() -> Promise) + Send + 'static) -> Result<(), JsValue> {
let worker = self.execute(f)?;
self.reclaim_on_message(worker);
Ok(())
}
}
impl PoolState {
fn push(&self, worker: Worker) {
worker.set_onmessage(Some(self.callback.as_ref().unchecked_ref()));
worker.set_onerror(Some(self.callback.as_ref().unchecked_ref()));
let mut workers = self.workers.borrow_mut();
for prev in workers.iter() {
let prev: &JsValue = prev;
let worker: &JsValue = &worker;
assert!(prev != worker);
}
workers.push(worker);
}
}
/// Entry point invoked by `worker.js`, a bit of a hack but see the "TODO" above
/// about `worker.js` in general.
#[wasm_bindgen]
pub async fn child_entry_point(ptr: u32) -> Result<(), JsValue> {
let ptr = unsafe { Box::from_raw(ptr as *mut Work) };
let global = js_sys::global().unchecked_into::<DedicatedWorkerGlobalScope>();
JsFuture::from((ptr.func)()).await?;
global.post_message(&JsValue::undefined())?;
Ok(())
}

View File

@ -2,10 +2,10 @@ import init, {create_scheduler, new_tessellator_state, run} from "./dist/libs/ma
import {Spector} from "spectorjs"
import {WebWorkerMessageType} from "./types"
declare global {
interface Window {
schedule_tile_request: (url: string, request_id: number) => void;
newWorker: () => void;
}
}
@ -83,37 +83,20 @@ const start = async () => {
preventDefaultTouchActions();
let MEMORY_PAGES = 16 * 1024
let WORKER_COUNT = 2
window.newWorker = () => {
return new Worker(new URL('./worker.ts', import.meta.url), {
type: 'module'
});
}
const memory = new WebAssembly.Memory({initial: 1024, maximum: MEMORY_PAGES, shared: true})
await init(undefined, memory)
const schedulerPtr = create_scheduler()
const createWorker = (id: number) => {
const worker = new Worker(new URL('./worker.ts', import.meta.url), {
type: "module",
name: `worker_${id}`
})
worker.postMessage({type: "init", memory} as WebWorkerMessageType)
return worker
}
let workers: [number, Worker][] = Array.from(
new Array(WORKER_COUNT).keys(),
(id) => [new_tessellator_state(schedulerPtr), createWorker(id)]
)
window.schedule_tile_request = (url: string, request_id: number) => {
const [tessellatorState, worker] = workers[Math.floor(Math.random() * workers.length)]
worker.postMessage({
type: "fetch_tile",
tessellatorState: tessellatorState,
url,
request_id
} as WebWorkerMessageType)
}
await run(schedulerPtr)
}
start().then(() => console.log("started via wasm"))

View File

@ -4,7 +4,7 @@
<title>mapr Demo</title>
</head>
<body style="margin: 0; padding: 0;">
<script src="index.js"></script>
<script src="main.js"></script>
<canvas id="mapr" style="border: 2px solid black"></canvas>
</body>
</html>

View File

@ -7,7 +7,7 @@ let dist = path.join(__dirname, 'dist/demo');
module.exports = (env) => ({
mode: "development",
entry: {
index: "./index.ts",
main: "./index.ts",
},
experiments: {
syncWebAssembly: true

View File

@ -1,32 +1,22 @@
import init, {InitOutput, tessellate_layers} from "./dist/libs/mapr"
import {WebWorkerMessageType} from "./types"
let module: Promise<InitOutput> = null
import init, {child_entry_point} from "./dist/libs/mapr"
onmessage = async message => {
let messageData: WebWorkerMessageType = message.data
console.dir(messageData)
console.warn(message.data)
switch (messageData.type) {
case "init":
if (module != null) {
return
}
module = init(undefined, messageData.memory)
break
case "fetch_tile":
let {tessellatorState, url, request_id} = messageData
await module
const initialised = init(undefined, message.data[1]).catch(err => {
// Propagate to main `onerror`:
setTimeout(() => {
throw err;
});
// Rethrow to keep promise rejected and prevent execution of further commands:
throw err;
});
console.log("Fetching from " + self.name)
self.onmessage = async message => {
console.warn(message.data)
let result = await fetch(url)
let buffer = await result.arrayBuffer()
tessellate_layers(tessellatorState, request_id, new Uint8Array(buffer))
break
default:
console.warn("WebWorker received unknown message!")
break
}
}
// This will queue further commands up until the module is fully initialised:
await initialised;
child_entry_point(message.data);
};
}