Improve and fix http cache & add offscreen kernel config (#307)

* Switch HTTP cache crate

* Set compiler on linux

* Start work on offscreen kernel sharing

* Finish basic config sharing mechanism on web and native; not really used on web so far though
This commit is contained in:
Max Ammann 2024-07-23 16:08:57 +02:00 committed by GitHub
parent 972fd528c9
commit 151ee9fb77
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 288 additions and 141 deletions

View File

@ -32,7 +32,7 @@ authors = ["Maximilian Ammann <max@maxammann.org>"]
rand = { version = "0.7.3", features = ["wasm-bindgen"] }
# FIXME: Untrusted dependency, 0.2.x doesn't compile with cache middleware
reqwest-middleware = "0.1.6"
reqwest-middleware = "0.3.2"
winit = { version = "0.30", default-features = false, features = ["rwh_06"] }
@ -67,8 +67,7 @@ lyon = { version = "1.0.1", features = [] }
naga = { version = "0.13.0", features = ["wgsl-in"] }
android_logger = "0.13.3"
png = { version = "0.17.10" }
reqwest = { version = "0.11.20", default-features = false, features = ["rustls-tls", "gzip"] } # Use rusttls on android because cross compiling is difficult
reqwest-middleware-cache = "0.1.1" # FIXME: Untrusted dependency
reqwest = { version = "0.12.5", default-features = false, features = ["rustls-tls", "gzip"] } # Use rusttls on android because cross compiling is difficult
rstar = "0.11.0"
rusqlite = { version = "0.29.0" }
serde = { version = "1.0.188", features = ["derive"] }
@ -88,6 +87,7 @@ wasm-bindgen-futures = "0.4"
wasm-bindgen-test = "0.3"
web-sys = "0.3" # Individual features are customized in each crate
wgpu = "0.19.4"
http-cache-reqwest = "0.14.0"
[profile.release]
lto = true

View File

@ -14,7 +14,7 @@ pub fn android_main(app: android_activity::AndroidApp) {
android_logger::Config::default().with_max_level(log::LevelFilter::Info),
);
log::log!(Level::Info, "maplibre starting");
run_headed_map(
run_headed_map::<String>(
None,
WinitMapWindowConfig::new("maplibre".to_string(), app),
WgpuSettings {

View File

@ -10,7 +10,7 @@ compile_error!("apple works only on macOS and iOS.");
pub fn maplibre_apple_main() {
env_logger::init_from_env(env_logger::Env::default().default_filter_or("info"));
run_headed_map(
run_headed_map::<String>(
None,
WinitMapWindowConfig::new("maplibre".to_string()),
WgpuSettings {

View File

@ -1,6 +1,6 @@
#![deny(unused_imports)]
use std::io::ErrorKind;
use std::{io::ErrorKind, path::PathBuf};
use clap::{Parser, Subcommand};
use maplibre::{coords::LatLon, render::settings::WgpuSettings};
@ -64,7 +64,7 @@ fn main() {
// matches just as you would the top level cmd
match &cli.command {
Commands::Headed {} => run_headed_map(
None,
Some(PathBuf::from("./maplibre-cache".to_string())),
WinitMapWindowConfig::new("maplibre".to_string()),
WgpuSettings {
backends: Some(maplibre::render::settings::Backends::all()),

View File

@ -4,7 +4,7 @@ use std::{fmt::Debug, marker::PhantomData};
use instant::Instant;
use maplibre::{
environment::{Environment, OffscreenKernelEnvironment},
environment::{Environment, OffscreenKernel},
event_loop::{EventLoop, EventLoopProxy, SendEventError},
io::{apc::AsyncProcedureCall, scheduler::Scheduler, source_client::HttpClient},
map::Map,
@ -207,7 +207,7 @@ impl<ET: 'static> EventLoopProxy<ET> for WinitEventLoopProxy<ET> {
pub struct WinitEnvironment<
S: Scheduler,
HC: HttpClient,
K: OffscreenKernelEnvironment,
K: OffscreenKernel,
APC: AsyncProcedureCall<K>,
ET,
> {
@ -221,7 +221,7 @@ pub struct WinitEnvironment<
impl<
S: Scheduler,
HC: HttpClient,
K: OffscreenKernelEnvironment,
K: OffscreenKernel,
APC: AsyncProcedureCall<K>,
ET: 'static + Clone,
> Environment for WinitEnvironment<S, HC, K, APC, ET>

View File

@ -3,9 +3,10 @@
//! * Platform Events like suspend/resume
//! * Render a new frame
use std::marker::PhantomData;
use std::{marker::PhantomData, path::PathBuf};
use maplibre::{
environment::OffscreenKernelConfig,
event_loop::EventLoop,
io::apc::SchedulerAsyncProcedureCall,
kernel::{Kernel, KernelBuilder},
@ -98,21 +99,29 @@ impl<ET: 'static + Clone> MapWindowConfig for WinitMapWindowConfig<ET> {
}
}
pub fn run_headed_map(
cache_path: Option<String>,
pub fn run_headed_map<P>(
cache_path: Option<P>,
window_config: WinitMapWindowConfig<()>,
wgpu_settings: WgpuSettings,
) {
) where
P: Into<PathBuf>,
{
run_multithreaded(async {
type Environment<S, HC, APC> =
WinitEnvironment<S, HC, ReqwestOffscreenKernelEnvironment, APC, ()>;
let client = ReqwestHttpClient::new(cache_path);
let cache_path = cache_path.map(|path| path.into());
let client = ReqwestHttpClient::new(cache_path.clone());
let kernel: Kernel<Environment<_, _, _>> = KernelBuilder::new()
.with_map_window_config(window_config)
.with_http_client(client.clone())
.with_apc(SchedulerAsyncProcedureCall::new(TokioScheduler::new()))
.with_apc(SchedulerAsyncProcedureCall::new(
TokioScheduler::new(),
OffscreenKernelConfig {
cache_directory: cache_path.map(|path| path.to_str().unwrap().to_string()),
},
))
.with_scheduler(TokioScheduler::new())
.build();

View File

@ -29,7 +29,7 @@ tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread", "sync
tokio-util.workspace = true
env_logger.workspace = true
reqwest.workspace = true
reqwest-middleware-cache.workspace = true
http-cache-reqwest.workspace = true
reqwest-middleware.workspace = true
tracing-tracy = { workspace = true, optional = true }

View File

@ -1,3 +1,5 @@
use serde::{Deserialize, Serialize};
use crate::{
io::{
apc::AsyncProcedureCall,
@ -24,12 +26,17 @@ pub trait Environment: 'static {
type HttpClient: HttpClient;
type OffscreenKernelEnvironment: OffscreenKernelEnvironment;
type OffscreenKernelEnvironment: OffscreenKernel;
}
pub trait OffscreenKernelEnvironment: Send + Sync + 'static {
#[derive(Serialize, Deserialize, Clone)]
pub struct OffscreenKernelConfig {
pub cache_directory: Option<String>,
}
pub trait OffscreenKernel: Send + Sync + 'static {
type HttpClient: HttpClient;
fn create() -> Self;
fn create(config: OffscreenKernelConfig) -> Self;
fn source_client(&self) -> SourceClient<Self::HttpClient>;
}

View File

@ -1,6 +1,7 @@
use std::rc::Rc;
use crate::{
environment::OffscreenKernelConfig,
headless::{
environment::HeadlessEnvironment,
graph_node::CopySurfaceBufferNode,
@ -37,7 +38,12 @@ pub async fn create_headless_renderer(
PhysicalSize::new(tile_size, tile_size).unwrap(),
))
.with_http_client(client.clone())
.with_apc(SchedulerAsyncProcedureCall::new(TokioScheduler::new()))
.with_apc(SchedulerAsyncProcedureCall::new(
TokioScheduler::new(),
OffscreenKernelConfig {
cache_directory: None,
},
))
.with_scheduler(TokioScheduler::new())
.build();

View File

@ -16,8 +16,11 @@ use serde::{Deserialize, Serialize};
use thiserror::Error;
use crate::{
coords::WorldTileCoords, define_label, environment::OffscreenKernelEnvironment,
io::scheduler::Scheduler, style::Style,
coords::WorldTileCoords,
define_label,
environment::{OffscreenKernel, OffscreenKernelConfig},
io::scheduler::Scheduler,
style::Style,
};
define_label!(MessageTag);
@ -164,7 +167,7 @@ pub type AsyncProcedure<K, C> = fn(input: Input, context: C, kernel: K) -> Async
///
///
// TODO: Rename to AsyncProcedureCaller?
pub trait AsyncProcedureCall<K: OffscreenKernelEnvironment>: 'static {
pub trait AsyncProcedureCall<K: OffscreenKernel>: 'static {
type Context: Context + Send + Clone;
type ReceiveIterator<F: FnMut(&Message) -> bool>: Iterator<Item = Message>;
@ -194,27 +197,29 @@ impl Context for SchedulerContext {
}
}
pub struct SchedulerAsyncProcedureCall<K: OffscreenKernelEnvironment, S: Scheduler> {
// An APC that uses a scheduler to execute work asynchronously.
// An async sender and receiver to exchange return values of calls.
pub struct SchedulerAsyncProcedureCall<K: OffscreenKernel, S: Scheduler> {
channel: (Sender<Message>, Receiver<Message>),
buffer: RefCell<Vec<Message>>,
scheduler: S,
phantom_k: PhantomData<K>,
offscreen_kernel_config: OffscreenKernelConfig,
}
impl<K: OffscreenKernelEnvironment, S: Scheduler> SchedulerAsyncProcedureCall<K, S> {
pub fn new(scheduler: S) -> Self {
impl<K: OffscreenKernel, S: Scheduler> SchedulerAsyncProcedureCall<K, S> {
pub fn new(scheduler: S, offscreen_kernel_config: OffscreenKernelConfig) -> Self {
Self {
channel: mpsc::channel(),
buffer: RefCell::new(Vec::new()),
phantom_k: PhantomData::default(),
scheduler,
offscreen_kernel_config,
}
}
}
impl<K: OffscreenKernelEnvironment, S: Scheduler> AsyncProcedureCall<K>
for SchedulerAsyncProcedureCall<K, S>
{
impl<K: OffscreenKernel, S: Scheduler> AsyncProcedureCall<K> for SchedulerAsyncProcedureCall<K, S> {
type Context = SchedulerContext;
type ReceiveIterator<F: FnMut(&Message) -> bool> = IntoIter<Message>;
@ -255,12 +260,14 @@ impl<K: OffscreenKernelEnvironment, S: Scheduler> AsyncProcedureCall<K>
procedure: AsyncProcedure<K, Self::Context>,
) -> Result<(), CallError> {
let sender = self.channel.0.clone();
let offscreen_kernel_config = self.offscreen_kernel_config.clone();
self.scheduler
.schedule(move || async move {
log::info!("Processing on thread: {:?}", std::thread::current().name());
procedure(input, SchedulerContext { sender }, K::create())
let kernel = K::create(offscreen_kernel_config);
procedure(input, SchedulerContext { sender }, kernel)
.await
.unwrap();
})

View File

@ -1,7 +1,9 @@
use std::path::PathBuf;
use async_trait::async_trait;
use http_cache_reqwest::{CACacheManager, Cache, CacheMode, HttpCache, HttpCacheOptions};
use reqwest::{Client, StatusCode};
use reqwest_middleware::ClientWithMiddleware;
use reqwest_middleware_cache::{managers::CACacheManager, Cache, CacheMode};
use crate::io::source_client::{HttpClient, SourceFetchError};
@ -24,20 +26,24 @@ impl From<reqwest_middleware::Error> for SourceFetchError {
impl ReqwestHttpClient {
/// cache_path: Under which path should we cache requests.
// TODO: Use Into<Path> instead of String
pub fn new(cache_path: Option<String>) -> Self {
pub fn new<P>(cache_path: Option<P>) -> Self
where
P: Into<PathBuf>,
{
let mut builder = reqwest_middleware::ClientBuilder::new(Client::new());
if let Some(cache_path) = cache_path {
builder = builder.with(Cache {
builder = builder.with(Cache(HttpCache {
mode: CacheMode::Default,
cache_manager: CACacheManager { path: cache_path },
});
manager: CACacheManager {
path: cache_path.into(),
},
options: HttpCacheOptions::default(),
}))
}
let client = builder.build();
Self {
client: builder.build(),
}
Self { client }
}
}

View File

@ -6,7 +6,7 @@ use std::{
};
use crate::{
environment::OffscreenKernelEnvironment,
environment::{OffscreenKernel, OffscreenKernelConfig},
io::source_client::{HttpSourceClient, SourceClient},
platform::http_client::ReqwestHttpClient,
};
@ -35,16 +35,18 @@ pub fn run_multithreaded<F: Future>(future: F) -> F::Output {
.block_on(future)
}
pub struct ReqwestOffscreenKernelEnvironment;
pub struct ReqwestOffscreenKernelEnvironment(OffscreenKernelConfig);
impl OffscreenKernelEnvironment for ReqwestOffscreenKernelEnvironment {
impl OffscreenKernel for ReqwestOffscreenKernelEnvironment {
type HttpClient = ReqwestHttpClient;
fn create() -> Self {
ReqwestOffscreenKernelEnvironment
fn create(config: OffscreenKernelConfig) -> Self {
ReqwestOffscreenKernelEnvironment(config)
}
fn source_client(&self) -> SourceClient<Self::HttpClient> {
SourceClient::new(HttpSourceClient::new(ReqwestHttpClient::new(None)))
SourceClient::new(HttpSourceClient::new(ReqwestHttpClient::new::<String>(
self.0.cache_directory.clone(),
)))
}
}

View File

@ -4,7 +4,7 @@ use std::{borrow::Cow, collections::HashSet, marker::PhantomData, rc::Rc};
use crate::{
context::MapContext,
environment::{Environment, OffscreenKernelEnvironment},
environment::{Environment, OffscreenKernel},
io::{
apc::{AsyncProcedureCall, AsyncProcedureFuture, Context, Input, ProcedureError},
source_type::{RasterSource, SourceType},
@ -101,11 +101,7 @@ impl<E: Environment, T: RasterTransferables> System for RequestSystem<E, T> {
view_state.update_references();
}
}
pub fn fetch_raster_apc<
K: OffscreenKernelEnvironment,
T: RasterTransferables,
C: Context + Clone + Send,
>(
pub fn fetch_raster_apc<K: OffscreenKernel, T: RasterTransferables, C: Context + Clone + Send>(
input: Input,
context: C,
kernel: K,

View File

@ -4,7 +4,7 @@ use std::{borrow::Cow, collections::HashSet, marker::PhantomData, rc::Rc};
use crate::{
context::MapContext,
environment::{Environment, OffscreenKernelEnvironment},
environment::{Environment, OffscreenKernel},
io::{
apc::{AsyncProcedureCall, AsyncProcedureFuture, Context, Input, ProcedureError},
source_type::{SourceType, TessellateSource},
@ -103,11 +103,7 @@ impl<E: Environment, T: VectorTransferables> System for RequestSystem<E, T> {
}
}
pub fn fetch_vector_apc<
K: OffscreenKernelEnvironment,
T: VectorTransferables,
C: Context + Clone + Send,
>(
pub fn fetch_vector_apc<K: OffscreenKernel, T: VectorTransferables, C: Context + Clone + Send>(
input: Input,
context: C,
kernel: K,

View File

@ -2,7 +2,10 @@
# The repository supports direnv (https://direnv.net/). If your IDE supports direnv,
# then you do not need to care about dependencies.
{ pkgs ? import <nixpkgs> { } }:
{ pkgs ? import <nixpkgs> {
overlays = [];
}
}:
with pkgs;
let
unstable = import
@ -15,15 +18,19 @@ in
# Wew are using the host clang on macOS; the Nix clang adds a clag that breaks cross compilation here:
# https://github.com/NixOS/nixpkgs/blob/362cb82b75394680990cbe89f40fe65d35f66617/pkgs/build-support/cc-wrapper/default.nix#L490
# It caused this error during the compilation of ring: clang-15: error: invalid argument '-mmacos-version-min=11.0' not allowed with '-miphoneos-version-min=7.0'
stdenv = stdenvNoCC;
stdenv = if stdenv.isDarwin then stdenvNoCC else llvmPackages_16.stdenv;
}) {
nativeBuildInputs = [
# Tools
unstable.rustup
rustup
unstable.just
unstable.nodejs
unstable.mdbook
# unstable.wasm-bindgen-cli # we need wasm-bindgen-cli@0.2.92, so pull it from cargo instead
(pkgs.wasm-bindgen-cli.override {
version = "0.2.92"; # This needs to match the wasm-bindgen version of the web module
hash = "sha256-1VwY8vQy7soKEgbki4LD+v259751kKxSxmo/gqE6yV0=";
cargoHash = "sha256-aACJ+lYNEU8FFBs158G1/JG8sc6Rq080PeKCMnwdpH0=";
})
unstable.tracy
unstable.nixpkgs-fmt # To format this file: nixpkgs-fmt *.nix
# System dependencies
@ -32,14 +39,15 @@ in
pkgs.jdk17
unstable.sqlite
unstable.pkg-config
] ++ lib.optionals stdenv.isLinux [
unstable.xorg.libXrandr
unstable.xorg.libXi
unstable.xorg.libXcursor
unstable.xorg.libX11
unstable.libxkbcommon
unstable.sqlite
unstable.wayland
unstable.pkg-config
];
shellHook = ''
export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:${ pkgs.lib.makeLibraryPath [ unstable.libxkbcommon ] }";

View File

@ -8,6 +8,7 @@ import {dirname} from "path";
import {fileURLToPath} from "url";
let argv = yargs(process.argv.slice(2))
.strict(true)
.option('release', {
type: 'boolean',
description: 'Release mode'
@ -140,6 +141,8 @@ const wasmPack = () => {
"-C", "link-args=--shared-memory --import-memory --max-memory=209715200"
]`
spawnTool('cargo', ["--version"])
let cargo = spawnTool('cargo', [
...(multithreaded ? ["--config", multithreaded_config] : []),
"build",

View File

@ -40,22 +40,23 @@ export const startMapLibre = async (wasmPath: string | undefined, workerPath: st
const memory = new WebAssembly.Memory({initial: 1024, shared: false})
await maplibre.default(wasmPath, memory);
await maplibre.run_maplibre((ptr) => {
await maplibre.run_maplibre((received_ptr: number) => {
let worker: Worker = workerPath ? new Worker(workerPath, {
type: 'module',
}) : PoolWorker(); // Setting a "name" for this webworker is not yet supported, because it needs support from esbuild-plugin-inline-worker
worker.onmessage = (message: MessageEvent) => {
// Handle messages coming back from the Worker
worker.onmessage = (message: MessageEvent<[tag: number, buffer: ArrayBuffer]>) => {
// WARNING: Do not modify data passed from Rust!
let in_transfer = message.data;
let data = message.data;
const main_entry = maplibre["singlethreaded_main_entry"];
const receive_data: (received_ptr: number, tag: number, buffer: ArrayBuffer) => void = maplibre["singlethreaded_receive_data"];
if (!main_entry) {
if (!receive_data) {
throw Error("singlethreaded_main_entry is not defined. Maybe the Rust build used the wrong build configuration.")
}
main_entry(ptr, in_transfer)
receive_data(received_ptr, data[0], data[1])
}
return worker;

View File

@ -1,25 +1,41 @@
import * as maplibre from "../wasm/maplibre"
onmessage = async message => {
const initialised = maplibre.default(message.data[0], message.data[1]).catch(err => {
// Propagate to main `onerror`:
setTimeout(() => {
type MessageData = {type: 'wasm_init', module: WebAssembly.Module, memory: WebAssembly.Memory}
| {type: 'call', work_ptr: number}
let initialised: Promise<maplibre.InitOutput> = null
onmessage = async (message: MessageEvent<MessageData>) => {
if (initialised) {
// This will queue further commands up until the module is fully initialised:
await initialised;
}
const type = message.data.type;
if (type === 'wasm_init') {
const data = message.data;
const module = data.module;
const memory = data.memory;
const initialised = maplibre.default(module, memory).catch(err => {
// Propagate to main `onerror`:
setTimeout(() => {
throw err;
});
// Rethrow to keep promise rejected and prevent execution of further commands:
throw err;
});
// Rethrow to keep promise rejected and prevent execution of further commands:
throw err;
});
self.onmessage = async message => {
} else if (type === 'call') {
const work_ptr = message.data.work_ptr; // because memory is shared, this pointer is valid in the memory of the main thread and this worker thread
// This will queue further commands up until the module is fully initialised:
await initialised;
const worker_entry = maplibre["multithreaded_worker_entry"]
const process_data: (msg: any) => Promise<void> = maplibre["multithreaded_process_data"]
if (!worker_entry) {
if (!process_data) {
throw Error("multithreaded_worker_entry is not defined. Maybe the Rust build used the wrong build configuration.")
}
await worker_entry(message.data);
};
}
await process_data(work_ptr);
}
}

View File

@ -1,31 +1,54 @@
import * as maplibre from "../wasm/maplibre"
onmessage = async message => {
const memory = new WebAssembly.Memory({initial: 1024, shared: false})
let module = message.data[0];
const initialised = maplibre.default(module, memory).catch(err => {
// Propagate to main `onerror`:
setTimeout(() => {
throw err;
});
// Rethrow to keep promise rejected and prevent execution of further commands:
throw err;
});
type MessageData = { type: 'wasm_init', module: WebAssembly.Module }
| { type: 'kernel_config', config: string }
| { type: 'call', procedure_ptr: number, input: string }
self.onmessage = async message => {
let initialised: Promise<maplibre.InitOutput> = null
onmessage = async (message: MessageEvent<MessageData>) => {
if (initialised) {
// This will queue further commands up until the module is fully initialised:
await initialised;
}
const type = message.data.type;
if (type === 'wasm_init') {
const data = message.data;
const memory = new WebAssembly.Memory({initial: 1024, shared: false})
let module = data.module;
initialised = maplibre.default(module, memory).catch(err => {
// Propagate to main `onerror`:
setTimeout(() => {
throw err;
});
// Rethrow to keep promise rejected and prevent execution of further commands:
throw err;
});
} else if (type === 'call') {
const data = message.data;
// WARNING: Do not modify data passed from Rust!
const procedure_ptr = message.data[0];
const input = message.data[1];
const procedure_ptr = data.procedure_ptr;
const input = data.input;
const worker_entry = maplibre["singlethreaded_worker_entry"];
const process_data: (procedure_ptr: number, input: string) => Promise<void> = maplibre["singlethreaded_process_data"];
if (!worker_entry) {
if (!process_data) {
throw Error("singlethreaded_worker_entry is not defined. Maybe the Rust build used the wrong build configuration.")
}
await worker_entry(procedure_ptr, input);
};
await process_data(procedure_ptr, input);
} else if (type === 'kernel_config') {
const data = message.data;
const set_kernel_config: (config: string) => void = maplibre["set_kernel_config"];
if (!set_kernel_config) {
throw Error("set_kernel_config is not defined. Maybe the Rust build used the wrong build configuration.")
}
set_kernel_config(data.config)
}
}

View File

@ -1,7 +1,7 @@
#![deny(unused_imports)]
use maplibre::{
environment::OffscreenKernelEnvironment,
environment::{OffscreenKernel, OffscreenKernelConfig},
event_loop::EventLoop,
io::source_client::{HttpSourceClient, SourceClient},
kernel::{Kernel, KernelBuilder},
@ -49,10 +49,10 @@ pub fn wasm_bindgen_start() {
pub struct WHATWGOffscreenKernelEnvironment;
impl OffscreenKernelEnvironment for WHATWGOffscreenKernelEnvironment {
impl OffscreenKernel for WHATWGOffscreenKernelEnvironment {
type HttpClient = WHATWGFetchHttpClient;
fn create() -> Self {
fn create(config: OffscreenKernelConfig) -> Self {
WHATWGOffscreenKernelEnvironment
}
@ -90,6 +90,10 @@ pub async fn run_maplibre(new_worker: js_sys::Function) -> Result<(), JSError> {
.with_map_window_config(WinitMapWindowConfig::new("maplibre".to_string()))
.with_http_client(WHATWGFetchHttpClient::default());
let offscreen_kernel_config = OffscreenKernelConfig {
cache_directory: None,
};
#[cfg(target_feature = "atomics")]
{
kernel_builder = kernel_builder
@ -97,6 +101,7 @@ pub async fn run_maplibre(new_worker: js_sys::Function) -> Result<(), JSError> {
platform::multithreaded::pool_scheduler::WebWorkerPoolScheduler::new(
new_worker.clone(),
)?,
offscreen_kernel_config,
))
.with_scheduler(
platform::multithreaded::pool_scheduler::WebWorkerPoolScheduler::new(new_worker)?,
@ -106,7 +111,13 @@ pub async fn run_maplibre(new_worker: js_sys::Function) -> Result<(), JSError> {
#[cfg(not(target_feature = "atomics"))]
{
kernel_builder = kernel_builder
.with_apc(platform::singlethreaded::apc::PassingAsyncProcedureCall::new(new_worker, 4)?)
.with_apc(
platform::singlethreaded::apc::PassingAsyncProcedureCall::new(
new_worker,
4,
offscreen_kernel_config,
)?,
)
.with_scheduler(maplibre::io::scheduler::NopScheduler);
}

View File

@ -90,10 +90,14 @@ impl WorkerPool {
// With a worker spun up send it the module/memory so it can start
// instantiating the wasm module. Later it might receive further
// messages about code to run on the wasm module.
let array = js_sys::Array::new();
array.push(&wasm_bindgen::module());
array.push(&wasm_bindgen::memory());
worker.post_message(&array)?;
worker.post_message(
&js_sys::Object::from_entries(&js_sys::Array::of3(
&js_sys::Array::of2(&JsValue::from("type"), &js_sys::JsString::from("wasm_init")),
&js_sys::Array::of2(&JsValue::from("module"), &wasm_bindgen::module()),
&js_sys::Array::of2(&JsValue::from("memory"), &wasm_bindgen::memory()),
))
.expect("can not fail"),
)?;
self.state.push(worker);
Ok(())
@ -139,7 +143,13 @@ impl WorkerPool {
let worker = self.worker()?;
let work = Work { func: Box::new(f) };
let work_ptr = Box::into_raw(Box::new(work));
match worker.post_message(&JsValue::from(work_ptr as u32)) {
match worker.post_message(
&js_sys::Object::from_entries(&js_sys::Array::of2(
&js_sys::Array::of2(&JsValue::from("type"), &js_sys::JsString::from("call")),
&js_sys::Array::of2(&JsValue::from("work_ptr"), &JsValue::from(work_ptr as u32)),
))
.expect("can not fail"),
) {
Ok(()) => Ok(()),
Err(e) => {
unsafe {

View File

@ -6,8 +6,8 @@ use crate::{platform::multithreaded::pool::Work, JSError};
/// Entry point invoked by the worker.
#[wasm_bindgen]
pub async fn multithreaded_worker_entry(ptr: u32) -> Result<(), JSError> {
let work = unsafe { Box::from_raw(ptr as *mut Work) };
pub async fn multithreaded_process_data(work_ptr: *mut Work) -> Result<(), JSError> {
let work = unsafe { Box::from_raw(work_ptr) };
JsFuture::from(work.execute())
.await
.map_err(|_e| CallError::Schedule)?;

View File

@ -3,7 +3,7 @@ use std::{cell::RefCell, rc::Rc, vec::IntoIter};
use js_sys::{ArrayBuffer, Uint8Array};
use log::error;
use maplibre::{
environment::OffscreenKernelEnvironment,
environment::{OffscreenKernel, OffscreenKernelConfig},
io::{
apc::{
AsyncProcedure, AsyncProcedureCall, CallError, Context, Input, IntoMessage, Message,
@ -136,15 +136,18 @@ pub struct PassingAsyncProcedureCall {
}
impl PassingAsyncProcedureCall {
pub fn new(new_worker: js_sys::Function, initial_workers: usize) -> Result<Self, WebError> {
pub fn new(
new_worker: js_sys::Function,
initial_workers: usize,
config: OffscreenKernelConfig,
) -> Result<Self, WebError> {
let received = Rc::new(RefCell::new(vec![]));
let received_ref = received.clone();
let create_new_worker = || {
new_worker
.call1(
&JsValue::undefined(),
&JsValue::from(Rc::into_raw(received_ref.clone()) as u32),
&JsValue::from(Rc::into_raw(received.clone())),
)
.map_err(WebError::from)?
.dyn_into::<Worker>()
@ -156,8 +159,35 @@ impl PassingAsyncProcedureCall {
for _ in 0..initial_workers {
let worker: Worker = create_new_worker()?;
let array = js_sys::Array::of1(&wasm_bindgen::module());
worker.post_message(&array).map_err(WebError::from)?;
worker
.post_message(
&js_sys::Object::from_entries(&js_sys::Array::of2(
&js_sys::Array::of2(
&JsValue::from("type"),
&js_sys::JsString::from("wasm_init"),
),
&js_sys::Array::of2(&JsValue::from("module"), &wasm_bindgen::module()),
))
.expect("can not fail"),
)
.map_err(WebError::from)?;
worker
.post_message(
&js_sys::Object::from_entries(&js_sys::Array::of2(
&js_sys::Array::of2(
&JsValue::from("type"),
&js_sys::JsString::from("kernel_config"),
),
&js_sys::Array::of2(
&JsValue::from("config"),
&js_sys::JsString::from(serde_json::to_string(&config).expect("TODO")), // FIXME: Handle this error correctly
),
))
.expect("can not fail"),
)
.map_err(WebError::from)?;
workers.push(worker);
}
@ -169,7 +199,7 @@ impl PassingAsyncProcedureCall {
}
}
impl<K: OffscreenKernelEnvironment> AsyncProcedureCall<K> for PassingAsyncProcedureCall {
impl<K: OffscreenKernel> AsyncProcedureCall<K> for PassingAsyncProcedureCall {
type Context = UsedContext;
type ReceiveIterator<F: FnMut(&Message) -> bool> = IntoIter<Message>;
@ -213,10 +243,18 @@ impl<K: OffscreenKernelEnvironment> AsyncProcedureCall<K> for PassingAsyncProced
input: Input,
procedure: AsyncProcedure<K, UsedContext>,
) -> Result<(), CallError> {
let procedure_ptr = procedure as *mut AsyncProcedure<K, UsedContext> as u32; // FIXME: is u32 fine, define an overflow safe function?
let procedure_ptr = procedure as *mut AsyncProcedure<K, UsedContext>; // TODO: Verify how these poitner are converted to pointers
let input = serde_json::to_string(&input).map_err(|e| CallError::Serialize(Box::new(e)))?;
let message = js_sys::Array::of2(&JsValue::from(procedure_ptr), &JsValue::from(input));
let message = js_sys::Object::from_entries(&js_sys::Array::of3(
&js_sys::Array::of2(&JsValue::from("type"), &JsValue::from("call")),
&js_sys::Array::of2(
&JsValue::from("procedure_ptr"),
&JsValue::from(procedure_ptr),
),
&js_sys::Array::of2(&JsValue::from("input"), &JsValue::from(input)),
))
.expect("can not fail");
let worker = self
.workers

View File

@ -1,13 +1,12 @@
use std::{mem, rc::Rc};
use std::{mem, rc::Rc, sync::OnceLock};
use js_sys::ArrayBuffer;
use log::error;
use maplibre::{
benchmarking::io::{
apc::{AsyncProcedure, Input, Message},
source_client::{HttpSourceClient, SourceClient},
},
environment::OffscreenKernelEnvironment,
environment::OffscreenKernel,
io::apc::CallError,
};
use thiserror::Error;
@ -27,9 +26,20 @@ use crate::{
WHATWGFetchHttpClient,
};
/// Entry point invoked by the worker.
static CONFIG: OnceLock<String> = OnceLock::new();
fn kernel_config() -> &'static str {
CONFIG.get().map(move |t| t.as_str()).unwrap_or("{}")
}
#[wasm_bindgen]
pub async fn singlethreaded_worker_entry(procedure_ptr: u32, input: String) -> Result<(), JSError> {
pub fn set_kernel_config(config: String) {
CONFIG.set(config).expect("failed to set kernel config")
}
/// Entry point invoked by the worker. Processes data and sends the result back to the main thread.
#[wasm_bindgen]
pub async fn singlethreaded_process_data(procedure_ptr: u32, input: String) -> Result<(), JSError> {
let procedure: AsyncProcedure<UsedOffscreenKernelEnvironment, UsedContext> =
unsafe { mem::transmute(procedure_ptr) };
@ -53,7 +63,12 @@ pub async fn singlethreaded_worker_entry(procedure_ptr: u32, input: String) -> R
);
}
procedure(input, context, UsedOffscreenKernelEnvironment::create()).await?;
procedure(
input,
context,
UsedOffscreenKernelEnvironment::create(serde_json::from_str(&kernel_config()).unwrap()),
)
.await?;
Ok(())
}
@ -62,21 +77,14 @@ pub async fn singlethreaded_worker_entry(procedure_ptr: u32, input: String) -> R
#[error("unable to deserialize message sent by postMessage()")]
pub struct DeserializeMessage;
/// Entry point invoked by the main thread.
/// Entry point invoked by the main thread. Receives data on the main thread and makes it available
/// to the renderer.
#[wasm_bindgen]
pub unsafe fn singlethreaded_main_entry(
pub fn singlethreaded_receive_data(
received_ptr: *const ReceivedType,
in_transfer: js_sys::Array,
tag: u32,
buffer: js_sys::ArrayBuffer,
) -> Result<(), JSError> {
let tag = in_transfer
.get(0)
.as_f64()
.ok_or_else(|| CallError::Deserialize(Box::new(DeserializeMessage)))? as u32; // TODO: Is this cast fine?
let buffer: ArrayBuffer = in_transfer
.get(1)
.dyn_into()
.map_err(|_e| CallError::Deserialize(Box::new(DeserializeMessage)))?;
let tag = WebMessageTag::from_u32(tag).map_err(|e| CallError::Deserialize(Box::new(e)))?;
log::debug!(
@ -90,7 +98,7 @@ pub unsafe fn singlethreaded_main_entry(
);
// FIXME: Can we make this call safe? check if it was cloned before?
let received: Rc<ReceivedType> = Rc::from_raw(received_ptr);
let received: Rc<ReceivedType> = unsafe { Rc::from_raw(received_ptr) };
// MAJOR FIXME: Fix mutability
received