Add experiment for unsync scheduling

This commit is contained in:
Maximilian Ammann 2022-09-10 11:12:42 +02:00
parent be34569bde
commit dc7a6ff7ef
9 changed files with 158 additions and 26 deletions

View File

@ -65,4 +65,4 @@ module.exports = (env) => ({
title: 'maplibre demo',
}),
]
});
});

View File

@ -1,4 +1,4 @@
import init, {create_pool_scheduler, run} from "../wasm/maplibre"
import init, {create_scheduler, run} from "../wasm/maplibre"
import {Spector} from "spectorjs"
import {checkRequirements, checkWasmFeatures} from "../browser";
import {preventDefaultTouchActions} from "../canvas";
@ -25,10 +25,27 @@ export const startMapLibre = async (wasmPath: string | undefined, workerPath: st
await init(wasmPath);
const schedulerPtr = create_pool_scheduler(() => {
return workerPath ? new Worker(workerPath, {
const schedulerPtr = create_scheduler(() => {
let worker = workerPath ? new Worker(workerPath, {
type: 'module'
}) : PoolWorker();
let memories = []
worker.onmessage = (message) => {
console.warn("new message");
//let uint8Array = new Uint8Array(message.data[0], message.data[1]);
memories.push(message.data[0])
console.warn(memories.map(v => new Uint8Array(v, message.data[1])[0]));
console.warn(memories[0] == memories[1]);
worker.postMessage("test")
}
return worker;
})
await run(schedulerPtr)

View File

@ -1,3 +1,22 @@
onmessage = async message => {
import init, {worker_entry} from "../wasm/maplibre"
const initializeExisting = async (module: string) => {
await init(module)
}
onmessage = async message => {
const initialised = initializeExisting(message.data[0]).catch(err => {
// Propagate to main `onerror`:
setTimeout(() => {
throw err;
});
// Rethrow to keep promise rejected and prevent execution of further commands:
throw err;
});
self.onmessage = async message => {
// This will queue further commands up until the module is fully initialised:
await initialised;
await worker_entry();
};
}

View File

@ -1,10 +1,11 @@
use std::panic;
use maplibre::{io::scheduler::Scheduler, MapBuilder};
use maplibre_winit::winit::WinitMapWindowConfig;
use maplibre_winit::winit::{WinitEnvironment, WinitMapWindowConfig};
use wasm_bindgen::prelude::*;
use crate::platform::{http_client::WHATWGFetchHttpClient, NopScheduleMethod};
use crate::platform::unsync::UnsyncScheduler;
use crate::platform::{http_client::WHATWGFetchHttpClient, NopScheduler};
mod error;
mod platform;
@ -12,6 +13,8 @@ mod platform;
#[cfg(not(target_arch = "wasm32"))]
compile_error!("web works only on wasm32.");
type CurrentScheduler = UnsyncScheduler;
#[cfg(feature = "trace")]
fn enable_tracing() {
use tracing_subscriber::{layer::SubscriberExt, Registry};
@ -37,20 +40,20 @@ pub fn wasm_bindgen_start() {
}
#[wasm_bindgen]
pub fn create_pool_scheduler(new_worker: js_sys::Function) -> *mut Scheduler<NopScheduleMethod> {
let scheduler = Scheduler::new(NopScheduleMethod);
pub fn create_scheduler(new_worker: js_sys::Function) -> *mut CurrentScheduler {
let scheduler = UnsyncScheduler::new(new_worker);
Box::into_raw(Box::new(scheduler))
}
#[wasm_bindgen]
pub async fn run(scheduler_ptr: *mut Scheduler<NopScheduleMethod>) {
pub async fn run(scheduler_ptr: *mut CurrentScheduler) {
let scheduler = unsafe { Box::from_raw(scheduler_ptr) };
// Either call forget or the main loop to keep worker loop alive
MapBuilder::new()
MapBuilder::<WinitEnvironment<_, _>>::new()
.with_map_window_config(WinitMapWindowConfig::new("maplibre".to_string()))
.with_http_client(WHATWGFetchHttpClient::new())
.with_existing_scheduler(*scheduler)
.with_scheduler(*scheduler)
.build()
.initialize()
.await

View File

@ -1,17 +1,17 @@
use maplibre::error::Error;
use maplibre::io::scheduler::ScheduleMethod;
use maplibre::io::scheduler::Scheduler;
use std::future::Future;
pub mod http_client;
#[cfg(target_feature = "atomics")]
pub mod pool;
#[cfg(target_feature = "atomics")]
pub mod pool_schedule_method;
pub mod sync;
pub struct NopScheduleMethod;
pub mod unsync;
impl ScheduleMethod for NopScheduleMethod {
pub struct NopScheduler;
impl Scheduler for NopScheduler {
fn schedule<T>(&self, future_factory: impl FnOnce() -> T + Send + 'static) -> Result<(), Error>
where
T: Future<Output = ()> + 'static,

View File

@ -0,0 +1,2 @@
pub mod pool;
pub mod pool_scheduler;

View File

@ -25,7 +25,6 @@ pub struct WorkerPool {
struct PoolState {
workers: RefCell<Vec<Worker>>,
callback: Closure<dyn FnMut(Event)>,
}
struct Work {
@ -48,9 +47,6 @@ impl WorkerPool {
new_worker,
state: Rc::new(PoolState {
workers: RefCell::new(Vec::with_capacity(initial)),
callback: Closure::wrap(Box::new(|event: Event| {
log::error!("unhandled event: {}", event.type_());
}) as Box<dyn FnMut(Event)>),
}),
};
for _ in 0..initial {

View File

@ -2,15 +2,15 @@ use log::warn;
use std::future::Future;
use super::pool::WorkerPool;
use maplibre::{error::Error, io::scheduler::ScheduleMethod};
use maplibre::{error::Error, io::scheduler::Scheduler};
use wasm_bindgen::{prelude::*, JsCast};
use web_sys::Worker;
pub struct WebWorkerPoolScheduleMethod {
pub struct WebWorkerPoolScheduler {
pool: WorkerPool,
}
impl WebWorkerPoolScheduleMethod {
impl WebWorkerPoolScheduler {
pub fn new(new_worker: js_sys::Function) -> Self {
Self {
pool: WorkerPool::new(
@ -28,7 +28,7 @@ impl WebWorkerPoolScheduleMethod {
}
}
impl ScheduleMethod for WebWorkerPoolScheduleMethod {
impl Scheduler for WebWorkerPoolScheduler {
fn schedule<T>(
&self,
future_factory: impl (FnOnce() -> T) + Send + 'static,

View File

@ -0,0 +1,95 @@
use js_sys::{ArrayBuffer, Uint8Array};
use maplibre::error::Error;
use maplibre::io::scheduler::Scheduler;
use std::borrow::{Borrow, BorrowMut};
use std::cell::RefCell;
use std::future::Future;
use wasm_bindgen::prelude::*;
use wasm_bindgen::JsCast;
use wasm_bindgen::JsValue;
use web_sys::{DedicatedWorkerGlobalScope, Worker};
pub struct UnsyncScheduler {
new_worker: Box<dyn Fn() -> Worker>,
workers: RefCell<Vec<Worker>>,
}
impl UnsyncScheduler {
pub fn new(new_worker: js_sys::Function) -> Self {
let create_new_worker = Box::new(move || {
new_worker
.call0(&JsValue::undefined())
.unwrap()
.dyn_into::<Worker>()
.unwrap()
});
let worker = create_new_worker();
let array = js_sys::Array::new();
array.push(&wasm_bindgen::module());
worker.post_message(&array).unwrap();
log::info!("new unsync");
Self {
new_worker: create_new_worker,
workers: RefCell::new(vec![worker]),
}
}
}
impl Scheduler for UnsyncScheduler {
fn schedule<T>(&self, future_factory: impl FnOnce() -> T + Send + 'static) -> Result<(), Error>
where
T: Future<Output = ()> + 'static,
{
self.workers.borrow()[0]
.post_message(&JsValue::undefined())
.unwrap();
Ok(())
}
}
thread_local! {
static VEC: RefCell<Vec<u8>> = RefCell::new(vec![165u8, 162, 145, 224, 111]);
}
/// Entry point invoked by the worker.
#[wasm_bindgen]
pub async fn worker_entry() -> Result<(), JsValue> {
log::info!("worker_entry unsync");
//let vec = vec![165u8, 162, 145, 224, 111];
VEC.with(|d| {
let mut ref_mut = d.borrow_mut();
ref_mut[0] += 1;
unsafe {
let uint8 = Uint8Array::view(&ref_mut);
let array_buffer = uint8.buffer();
log::info!(
"{}",
array_buffer
== wasm_bindgen::memory()
.dyn_into::<js_sys::WebAssembly::Memory>()
.unwrap()
.buffer()
.dyn_into::<ArrayBuffer>()
.unwrap()
);
let global = js_sys::global().unchecked_into::<DedicatedWorkerGlobalScope>();
let array = js_sys::Array::new();
array.push(&array_buffer);
array.push(&JsValue::from(uint8.byte_offset()));
global.post_message(&array).unwrap();
};
});
Ok(())
}