mirror of
https://github.com/maplibre/maplibre-rs.git
synced 2025-12-08 19:05:57 +00:00
Schedule on random WebWorkers instead of spawning new ones as needed (#80)
This commit is contained in:
parent
1934555013
commit
7783493f09
@ -21,7 +21,9 @@ crate-type = ["cdylib", "rlib"]
|
||||
async-trait = "0.1.56"
|
||||
maplibre = { path = "../maplibre", features = ["no-thread-safe-futures"] }
|
||||
maplibre-winit = { path = "../maplibre-winit", version = "0.0.1" }
|
||||
|
||||
log = "0.4.17"
|
||||
rand = { version = "0.7", features = ["wasm-bindgen"] }
|
||||
|
||||
console_error_panic_hook = "0.1.7"
|
||||
# Exact version requirement can be removed as soon as https://github.com/gfx-rs/wgpu/pull/2954 is merged
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
import init, {create_pool_scheduler, new_thread_local_state, run} from "./wasm-pack"
|
||||
import init, {create_pool_scheduler, run} from "./wasm-pack"
|
||||
import {Spector} from "spectorjs"
|
||||
import {WebWorkerMessageType} from "./types"
|
||||
import {
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
import init, {InitOutput, tessellate_layers} from "./wasm-pack"
|
||||
/*import init, {InitOutput, tessellate_layers} from "./wasm-pack"
|
||||
import {WebWorkerMessageType} from "./types"
|
||||
|
||||
let module: Promise<InitOutput> = null
|
||||
@ -29,4 +29,4 @@ onmessage = async message => {
|
||||
console.warn("WebWorker received unknown message!")
|
||||
break
|
||||
}
|
||||
}
|
||||
}*/
|
||||
|
||||
@ -1,4 +1,3 @@
|
||||
pub mod http_client;
|
||||
pub mod legacy_webworker_fetcher;
|
||||
pub mod pool;
|
||||
pub mod schedule_method;
|
||||
|
||||
@ -5,6 +5,7 @@
|
||||
use std::{cell::RefCell, rc::Rc};
|
||||
|
||||
use js_sys::Promise;
|
||||
use rand::prelude::*;
|
||||
use wasm_bindgen::{prelude::*, JsCast};
|
||||
use wasm_bindgen_futures::JsFuture;
|
||||
use web_sys::{DedicatedWorkerGlobalScope, ErrorEvent, Event, MessageEvent, Worker};
|
||||
@ -51,8 +52,7 @@ impl WorkerPool {
|
||||
}),
|
||||
};
|
||||
for _ in 0..initial {
|
||||
let worker = pool.spawn()?;
|
||||
pool.state.push(worker);
|
||||
pool.spawn()?;
|
||||
}
|
||||
|
||||
Ok(pool)
|
||||
@ -67,15 +67,8 @@ impl WorkerPool {
|
||||
///
|
||||
/// Returns any error that may happen while a JS web worker is created and a
|
||||
/// message is sent to it.
|
||||
fn spawn(&self) -> Result<Worker, JsValue> {
|
||||
fn spawn(&self) -> Result<(), JsValue> {
|
||||
log::info!("spawning new worker");
|
||||
// TODO: what do do about `./worker.js`:
|
||||
//
|
||||
// * the path is only known by the bundler. How can we, as a
|
||||
// library, know what's going on?
|
||||
// * How do we not fetch a script N times? It internally then
|
||||
// causes another script to get fetched N times...
|
||||
|
||||
let worker = (self.new_worker)();
|
||||
|
||||
// With a worker spun up send it the module/memory so it can start
|
||||
@ -86,7 +79,8 @@ impl WorkerPool {
|
||||
array.push(&wasm_bindgen::memory());
|
||||
worker.post_message(&array)?;
|
||||
|
||||
Ok(worker)
|
||||
self.state.push(worker);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Fetches a worker from this pool, spawning one if necessary.
|
||||
@ -100,9 +94,19 @@ impl WorkerPool {
|
||||
/// Returns any error that may happen while a JS web worker is created and a
|
||||
/// message is sent to it.
|
||||
fn worker(&self) -> Result<Worker, JsValue> {
|
||||
match self.state.workers.borrow_mut().pop() {
|
||||
Some(worker) => Ok(worker),
|
||||
None => self.spawn(),
|
||||
let workers = self.state.workers.borrow();
|
||||
let result = match workers.choose(&mut rand::thread_rng()) {
|
||||
Some(worker) => Some(worker),
|
||||
None => None,
|
||||
};
|
||||
|
||||
if result.is_none() {
|
||||
self.spawn();
|
||||
}
|
||||
|
||||
match result {
|
||||
Some(worker) => Ok(worker.clone()),
|
||||
None => self.worker(),
|
||||
}
|
||||
}
|
||||
|
||||
@ -118,15 +122,12 @@ impl WorkerPool {
|
||||
///
|
||||
/// Returns any error that may happen while a JS web worker is created and a
|
||||
/// message is sent to it.
|
||||
pub fn execute(
|
||||
&self,
|
||||
f: impl (FnOnce() -> Promise) + Send + 'static,
|
||||
) -> Result<Worker, JsValue> {
|
||||
pub fn execute(&self, f: impl (FnOnce() -> Promise) + Send + 'static) -> Result<(), JsValue> {
|
||||
let worker = self.worker()?;
|
||||
let work = Box::new(Work { func: Box::new(f) });
|
||||
let ptr = Box::into_raw(work);
|
||||
match worker.post_message(&JsValue::from(ptr as u32)) {
|
||||
Ok(()) => Ok(worker),
|
||||
Ok(()) => Ok(()),
|
||||
Err(e) => {
|
||||
unsafe {
|
||||
drop(Box::from_raw(ptr));
|
||||
@ -135,76 +136,15 @@ impl WorkerPool {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Configures an `onmessage` callback for the `worker` specified for the
|
||||
/// web worker to be reclaimed and re-inserted into this pool when a message
|
||||
/// is received.
|
||||
///
|
||||
/// Currently this `WorkerPool` abstraction is intended to execute one-off
|
||||
/// style work where the work itself doesn't send any notifications and
|
||||
/// whatn it's done the worker is ready to execute more work. This method is
|
||||
/// used for all spawned workers to ensure that when the work is finished
|
||||
/// the worker is reclaimed back into this pool.
|
||||
fn reclaim_on_message(&self, worker: Worker) {
|
||||
let state = Rc::downgrade(&self.state);
|
||||
let worker2 = worker.clone();
|
||||
let reclaim_slot = Rc::new(RefCell::new(None));
|
||||
let slot2 = reclaim_slot.clone();
|
||||
let reclaim = Closure::wrap(Box::new(move |event: Event| {
|
||||
if let Some(error) = event.dyn_ref::<ErrorEvent>() {
|
||||
log::error!("error in worker: {}", error.message());
|
||||
// TODO: this probably leaks memory somehow? It's sort of
|
||||
// unclear what to do about errors in workers right now.
|
||||
return;
|
||||
}
|
||||
|
||||
// If this is a completion event then can deallocate our own
|
||||
// callback by clearing out `slot2` which contains our own closure.
|
||||
if let Some(_msg) = event.dyn_ref::<MessageEvent>() {
|
||||
if let Some(state) = state.upgrade() {
|
||||
state.push(worker2.clone());
|
||||
}
|
||||
*slot2.borrow_mut() = None;
|
||||
return;
|
||||
}
|
||||
|
||||
log::error!("unhandled event: {}", event.type_());
|
||||
// TODO: like above, maybe a memory leak here?
|
||||
}) as Box<dyn FnMut(Event)>);
|
||||
worker.set_onmessage(Some(reclaim.as_ref().unchecked_ref()));
|
||||
*reclaim_slot.borrow_mut() = Some(reclaim);
|
||||
}
|
||||
|
||||
/// Executes `f` in a web worker.
|
||||
///
|
||||
/// This pool manages a set of web workers to draw from, and `f` will be
|
||||
/// spawned quickly into one if the worker is idle. If no idle workers are
|
||||
/// available then a new web worker will be spawned.
|
||||
///
|
||||
/// Once `f` returns the worker assigned to `f` is automatically reclaimed
|
||||
/// by this `WorkerPool`. This method provides no method of learning when
|
||||
/// `f` completes, and for that you'll need to use `run_notify`.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// If an error happens while spawning a web worker or sending a message to
|
||||
/// a web worker, that error is returned.
|
||||
pub fn run(&self, f: impl (FnOnce() -> Promise) + Send + 'static) -> Result<(), JsValue> {
|
||||
let worker = self.execute(f)?;
|
||||
self.reclaim_on_message(worker);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl PoolState {
|
||||
fn push(&self, worker: Worker) {
|
||||
worker.set_onmessage(Some(self.callback.as_ref().unchecked_ref()));
|
||||
worker.set_onerror(Some(self.callback.as_ref().unchecked_ref()));
|
||||
//worker.set_onmessage(Some(self.callback.as_ref().unchecked_ref()));
|
||||
//worker.set_onerror(Some(self.callback.as_ref().unchecked_ref()));
|
||||
let mut workers = self.workers.borrow_mut();
|
||||
for prev in workers.iter() {
|
||||
let prev: &JsValue = prev;
|
||||
let worker: &JsValue = &worker;
|
||||
assert!(prev != worker);
|
||||
for existing_worker in workers.iter() {
|
||||
assert!(existing_worker as &JsValue != &worker as &JsValue);
|
||||
}
|
||||
workers.push(worker);
|
||||
}
|
||||
@ -215,8 +155,8 @@ impl PoolState {
|
||||
#[wasm_bindgen]
|
||||
pub async fn child_entry_point(ptr: u32) -> Result<(), JsValue> {
|
||||
let ptr = unsafe { Box::from_raw(ptr as *mut Work) };
|
||||
let global = js_sys::global().unchecked_into::<DedicatedWorkerGlobalScope>();
|
||||
//let global = js_sys::global().unchecked_into::<DedicatedWorkerGlobalScope>();
|
||||
JsFuture::from((ptr.func)()).await?;
|
||||
global.post_message(&JsValue::undefined())?;
|
||||
//global.post_message(&JsValue::undefined())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -37,7 +37,7 @@ impl ScheduleMethod for WebWorkerPoolScheduleMethod {
|
||||
T: Future<Output = ()> + 'static,
|
||||
{
|
||||
self.pool
|
||||
.run(move || {
|
||||
.execute(move || {
|
||||
wasm_bindgen_futures::future_to_promise(async move {
|
||||
future_factory().await;
|
||||
Ok(JsValue::undefined())
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user