Enable the possibility to use two different schedulers for web

This commit is contained in:
Maximilian Ammann 2022-03-23 16:45:57 +01:00
parent a208e299bc
commit 9f0447d98b
5 changed files with 117 additions and 82 deletions

View File

@ -47,8 +47,29 @@ extern "C" {
#[wasm_bindgen]
pub fn create_scheduler() -> *mut IOScheduler {
let scheduler = Box::new(IOScheduler::new(ScheduleMethod::WebWorker(
WebWorkerScheduleMethod::new(),
)));
let scheduler_ptr = Box::into_raw(scheduler);
return scheduler_ptr;
}
use web_sys::Worker;
#[wasm_bindgen]
pub fn create_pool_scheduler(new_worker: js_sys::Function) -> *mut IOScheduler {
let f = move || {
new_worker
.call0(&JsValue::null())
.unwrap()
.dyn_into::<Worker>()
.unwrap()
};
f();
let scheduler = Box::new(IOScheduler::new(ScheduleMethod::WebWorkerPool(
WebWorkerPoolScheduleMethod::new(),
WebWorkerPoolScheduleMethod::new(Box::new(f)),
)));
let scheduler_ptr = Box::into_raw(scheduler);
return scheduler_ptr;
@ -119,11 +140,18 @@ pub mod scheduler {
use super::pool::WorkerPool;
use super::schedule_tile_request;
use crate::coords::{TileCoords, WorldTileCoords};
use crate::error::Error;
use crate::io::scheduler::{IOScheduler, ScheduleMethod, ThreadLocalTessellatorState};
use crate::io::tile_cache::TileCache;
use crate::io::TileRequestID;
use js_sys::{ArrayBuffer, Error as JSError, Uint8Array};
use log::warn;
use std::thread::Thread;
use wasm_bindgen::prelude::*;
use wasm_bindgen::JsCast;
use wasm_bindgen_futures::JsFuture;
use web_sys::Worker;
use web_sys::{Request, RequestInit, RequestMode, Response, WorkerGlobalScope};
pub struct WebWorkerScheduleMethod;
@ -151,12 +179,6 @@ pub mod scheduler {
}
}
use crate::error::Error;
use js_sys::{ArrayBuffer, Error as JSError, Uint8Array};
use wasm_bindgen::prelude::*;
use wasm_bindgen::JsCast;
use wasm_bindgen_futures::JsFuture;
use web_sys::{Request, RequestInit, RequestMode, Response, WorkerGlobalScope};
impl From<JsValue> for Error {
fn from(maybe_error: JsValue) -> Self {
assert!(maybe_error.is_instance_of::<JSError>());
@ -170,9 +192,9 @@ pub mod scheduler {
}
impl WebWorkerPoolScheduleMethod {
pub fn new() -> Self {
pub fn new(f: Box<dyn Fn() -> Worker>) -> Self {
Self {
pool: WorkerPool::new(4).unwrap(),
pool: WorkerPool::new(4, f).unwrap(),
}
}
@ -236,50 +258,3 @@ pub mod scheduler {
}
}
}
/*use crate::error::Error;
use js_sys::{ArrayBuffer, Error as JSError, Uint8Array};
use wasm_bindgen::prelude::*;
use wasm_bindgen::JsCast;
use wasm_bindgen_futures::JsFuture;
use web_sys::{Request, RequestInit, RequestMode, Response, WorkerGlobalScope};
impl From<JsValue> for Error {
fn from(maybe_error: JsValue) -> Self {
assert!(maybe_error.is_instance_of::<JSError>());
let error: JSError = maybe_error.dyn_into().unwrap();
Error::Network(error.message().as_string().unwrap())
}
}
async fn fetch(&self, url: &str) -> Result<Vec<u8>, Error> {
let mut opts = RequestInit::new();
opts.method("GET");
let request = Request::new_with_str_and_init(&url, &opts)?;
// Get the global scope
let global = js_sys::global();
assert!(global.is_instance_of::<WorkerGlobalScope>());
let scope = global.dyn_into::<WorkerGlobalScope>().unwrap();
// Call fetch on global scope
let maybe_response = JsFuture::from(scope.fetch_with_request(&request)).await?;
assert!(maybe_response.is_instance_of::<Response>());
let response: Response = maybe_response.dyn_into().unwrap();
// Get ArrayBuffer
let maybe_array_buffer = JsFuture::from(response.array_buffer()?).await?;
assert!(maybe_array_buffer.is_instance_of::<ArrayBuffer>());
let array_buffer: ArrayBuffer = maybe_array_buffer.dyn_into().unwrap();
// Copy data to Vec<u8>
let buffer: Uint8Array = Uint8Array::new(&array_buffer);
let mut output: Vec<u8> = vec![0; array_buffer.byte_length() as usize];
buffer.copy_to(output.as_mut_slice());
Ok(output)
}*/

View File

@ -19,6 +19,7 @@ extern "C" {
}
pub struct WorkerPool {
new_worker: Box<dyn Fn() -> Worker>,
state: Rc<PoolState>,
}
@ -42,8 +43,9 @@ impl WorkerPool {
///
/// Returns any error that may happen while a JS web worker is created and a
/// message is sent to it.
pub fn new(initial: usize) -> Result<WorkerPool, JsValue> {
pub fn new(initial: usize, new_worker: Box<dyn Fn() -> Worker>) -> Result<WorkerPool, JsValue> {
let pool = WorkerPool {
new_worker,
state: Rc::new(PoolState {
workers: RefCell::new(Vec::with_capacity(initial)),
callback: Closure::wrap(Box::new(|event: Event| {
@ -77,9 +79,10 @@ impl WorkerPool {
// * How do we not fetch a script N times? It internally then
// causes another script to get fetched N times...
warn!("New worker spawned");
let maybe_worker = new_worker();
/*let maybe_worker = new_worker();
assert!(maybe_worker.is_instance_of::<Worker>());
let worker: Worker = maybe_worker.dyn_into().unwrap();
let worker: Worker = maybe_worker.dyn_into().unwrap();*/
let worker = (self.new_worker)();
// With a worker spun up send it the module/memory so it can start
// instantiating the wasm module. Later it might receive further

View File

@ -1,4 +1,4 @@
import init, {create_scheduler, new_tessellator_state, run} from "./dist/libs/mapr"
import init, {create_pool_scheduler, create_scheduler, new_tessellator_state, run} from "./dist/libs/mapr"
import {Spector} from "spectorjs"
import {WebWorkerMessageType} from "./types"
@ -84,16 +84,43 @@ const start = async () => {
let MEMORY_PAGES = 16 * 1024
window.newWorker = () => {
return new Worker(new URL('./worker.ts', import.meta.url), {
type: 'module'
});
}
const memory = new WebAssembly.Memory({initial: 1024, maximum: MEMORY_PAGES, shared: true})
await init(undefined, memory)
/*const schedulerPtr = create_pool_scheduler(() => {
console.log("spawni")
return new Worker(new URL('./pool_worker.ts', import.meta.url), {
type: 'module'
});
})*/
const schedulerPtr = create_scheduler()
let WORKER_COUNT = 4
const createWorker = (id: number) => {
const worker = new Worker(new URL('./worker.ts', import.meta.url), {
type: "module",
name: `worker_${id}`
})
worker.postMessage({type: "init", memory} as WebWorkerMessageType)
return worker
}
let workers: [number, Worker][] = Array.from(
new Array(WORKER_COUNT).keys(),
(id) => [new_tessellator_state(schedulerPtr), createWorker(id)]
)
window.schedule_tile_request = (url: string, request_id: number) => {
const [tessellatorState, worker] = workers[Math.floor(Math.random() * workers.length)]
worker.postMessage({
type: "fetch_tile",
tessellatorState: tessellatorState,
url,
request_id
} as WebWorkerMessageType)
}
await run(schedulerPtr)
}

20
web/pool_worker.ts Normal file
View File

@ -0,0 +1,20 @@
import init, {child_entry_point} from "./dist/libs/mapr"
onmessage = async message => {
const initialised = init(undefined, message.data[1]).catch(err => {
// Propagate to main `onerror`:
setTimeout(() => {
throw err;
});
// Rethrow to keep promise rejected and prevent execution of further commands:
throw err;
});
self.onmessage = async message => {
console.warn(message.data)
// This will queue further commands up until the module is fully initialised:
await initialised;
child_entry_point(message.data);
};
}

View File

@ -1,22 +1,32 @@
import init, {child_entry_point} from "./dist/libs/mapr"
import init, {InitOutput, tessellate_layers} from "./dist/libs/mapr"
import {WebWorkerMessageType} from "./types"
let module: Promise<InitOutput> = null
onmessage = async message => {
console.warn(message.data)
let messageData: WebWorkerMessageType = message.data
console.dir(messageData)
const initialised = init(undefined, message.data[1]).catch(err => {
// Propagate to main `onerror`:
setTimeout(() => {
throw err;
});
// Rethrow to keep promise rejected and prevent execution of further commands:
throw err;
});
switch (messageData.type) {
case "init":
if (module != null) {
return
}
module = init(undefined, messageData.memory)
break
case "fetch_tile":
let {tessellatorState, url, request_id} = messageData
await module
self.onmessage = async message => {
console.warn(message.data)
console.log("Fetching from " + self.name)
// This will queue further commands up until the module is fully initialised:
await initialised;
child_entry_point(message.data);
};
let result = await fetch(url)
let buffer = await result.arrayBuffer()
tessellate_layers(tessellatorState, request_id, new Uint8Array(buffer))
break
default:
console.warn("WebWorker received unknown message!")
break
}
}