Vendoring gloo-workers into yew-agent, support module type web worker (#3859)

This commit is contained in:
Luca Cappelletti 2025-05-20 05:45:46 +02:00 committed by GitHub
parent 433a0f2eca
commit 17cbf2f026
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
35 changed files with 2030 additions and 88 deletions

7
Cargo.lock generated
View File

@ -4115,10 +4115,15 @@ dependencies = [
name = "yew-agent"
version = "0.3.0"
dependencies = [
"bincode",
"futures 0.3.31",
"gloo-worker",
"js-sys",
"pinned",
"serde",
"thiserror 1.0.69",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
"yew",
"yew-agent-macro",
]

View File

@ -13,11 +13,28 @@ rust-version = "1.76.0"
[dependencies]
yew = { version = "0.21.0", path = "../yew" }
gloo-worker = { version = "0.5", features = ["futures"] }
wasm-bindgen = "0.2"
js-sys = "0.3"
pinned = "0.1.0"
thiserror = "1.0.37"
bincode = { version = "1.3.3" }
wasm-bindgen-futures = "0.4"
serde = { version = "1", features = ["derive"] }
futures = "0.3"
yew-agent-macro = { version = "0.2", path = "../yew-agent-macro" }
[dependencies.web-sys]
version = "0.3"
features = [
"Blob",
"BlobPropertyBag",
"DedicatedWorkerGlobalScope",
"MessageEvent",
"Url",
"Worker",
"WorkerOptions",
"WorkerType"
]
[dev-dependencies]
serde = "1.0.218"

View File

@ -0,0 +1,66 @@
# Yew Agent
This module contains Yew's web worker implementation.
## Types
There're a couple kinds of agents:
### Oneshot
A kind of agent that for each input, a single output is returned.
#### Reactor
A kind of agent that can send many inputs and receive many outputs over a single bridge.
#### Worker
The low-level implementation of agents that provides an actor model and communicates with
multiple bridges.
## Reachability
When an agent is spawned, each agent is associated with a reachability.
### Private
Each time a bridge is created, a new instance
of agent is spawned. This allows parallel computing between agents.
#### Public
Public agents are shared among all children of a provider.
Only 1 instance will be spawned for each public agents provider.
### Provider
Each Agent requires a provider to provide communications and maintain bridges.
All hooks must be called within a provider.
## Communications with Agents
Hooks provides means to communicate with agent instances.
### Bridge
See: [`use_worker_bridge`](worker::use_worker_bridge),
[`use_reactor_bridge`](reactor::use_reactor_bridge)
A bridge takes a callback to receive outputs from agents
and provides a handle to send inputs to agents.
#### Subscription
See: [`use_worker_subscription`](worker::use_worker_subscription),
[`use_reactor_subscription`](reactor::use_reactor_subscription)
Similar to bridges, a subscription produces a handle to send inputs to agents. However, instead
of notifying the receiver with a callback, it collect all outputs into a slice.
#### Runner
See: [`use_oneshot_runner`](oneshot::use_oneshot_runner)
Unlike other agents, oneshot bridges provide a `use_oneshot_runner` hook to execute oneshot
agents on demand.

View File

@ -0,0 +1,40 @@
//! Submodule providing the `Codec` trait and its default implementation using `bincode`.
use js_sys::Uint8Array;
use serde::{Deserialize, Serialize};
use wasm_bindgen::JsValue;
/// Message Encoding and Decoding Format
pub trait Codec {
/// Encode an input to JsValue
fn encode<I>(input: I) -> JsValue
where
I: Serialize;
/// Decode a message to a type
fn decode<O>(input: JsValue) -> O
where
O: for<'de> Deserialize<'de>;
}
/// Default message encoding with [bincode].
#[derive(Debug)]
pub struct Bincode;
impl Codec for Bincode {
fn encode<I>(input: I) -> JsValue
where
I: Serialize,
{
let buf = bincode::serialize(&input).expect("can't serialize an worker message");
Uint8Array::from(buf.as_slice()).into()
}
fn decode<O>(input: JsValue) -> O
where
O: for<'de> Deserialize<'de>,
{
let data = Uint8Array::from(input).to_vec();
bincode::deserialize(&data).expect("can't deserialize an worker message")
}
}

View File

@ -1,68 +1,4 @@
//! This module contains Yew's web worker implementation.
//!
//! ## Types
//!
//! There're a couple kinds of agents:
//!
//! #### Oneshot
//!
//! A kind of agent that for each input, a single output is returned.
//!
//! #### Reactor
//!
//! A kind of agent that can send many inputs and receive many outputs over a single bridge.
//!
//! #### Worker
//!
//! The low-level implementation of agents that provides an actor model and communicates with
//! multiple bridges.
//!
//! ## Reachability
//!
//! When an agent is spawned, each agent is associated with a reachability.
//!
//! #### Private
//!
//! Each time a bridge is created, a new instance
//! of agent is spawned. This allows parallel computing between agents.
//!
//! #### Public
//!
//! Public agents are shared among all children of a provider.
//! Only 1 instance will be spawned for each public agents provider.
//!
//! ### Provider
//!
//! Each Agent requires a provider to provide communications and maintain bridges.
//! All hooks must be called within a provider.
//!
//! ## Communications with Agents
//!
//! Hooks provides means to communicate with agent instances.
//!
//! #### Bridge
//!
//! See: [`use_worker_bridge`](worker::use_worker_bridge),
//! [`use_reactor_bridge`](reactor::use_reactor_bridge)
//!
//! A bridge takes a callback to receive outputs from agents
//! and provides a handle to send inputs to agents.
//!
//! #### Subscription
//!
//! See: [`use_worker_subscription`](worker::use_worker_subscription),
//! [`use_reactor_subscription`](reactor::use_reactor_subscription)
//!
//! Similar to bridges, a subscription produces a handle to send inputs to agents. However, instead
//! of notifying the receiver with a callback, it collect all outputs into a slice.
//!
//! #### Runner
//!
//! See: [`use_oneshot_runner`](oneshot::use_oneshot_runner)
//!
//! Unlike other agents, oneshot bridges provide a `use_oneshot_runner` hook to execute oneshot
//! agents on demand.
#![doc = include_str!("../README.md")]
#![deny(
clippy::all,
missing_docs,
@ -74,12 +10,13 @@
extern crate self as yew_agent;
pub mod codec;
pub mod oneshot;
pub mod reactor;
pub mod worker;
#[doc(inline)]
pub use gloo_worker::{Bincode, Codec, Registrable, Spawnable};
pub use codec::{Bincode, Codec};
pub mod traits;
pub use traits::{Registrable, Spawnable};
mod reach;
pub mod scope_ext;

View File

@ -0,0 +1,73 @@
use futures::stream::StreamExt;
use pinned::mpsc;
use pinned::mpsc::UnboundedReceiver;
use super::traits::Oneshot;
use super::worker::OneshotWorker;
use crate::codec::Codec;
use crate::worker::{WorkerBridge, WorkerSpawner};
/// A connection manager for components interaction with oneshot workers.
#[derive(Debug)]
pub struct OneshotBridge<N>
where
N: Oneshot + 'static,
{
inner: WorkerBridge<OneshotWorker<N>>,
rx: UnboundedReceiver<N::Output>,
}
impl<N> OneshotBridge<N>
where
N: Oneshot + 'static,
{
#[inline(always)]
pub(crate) fn new(
inner: WorkerBridge<OneshotWorker<N>>,
rx: UnboundedReceiver<N::Output>,
) -> Self {
Self { inner, rx }
}
#[inline(always)]
pub(crate) fn register_callback<CODEC>(
spawner: &mut WorkerSpawner<OneshotWorker<N>, CODEC>,
) -> UnboundedReceiver<N::Output>
where
CODEC: Codec,
{
let (tx, rx) = mpsc::unbounded();
spawner.callback(move |output| {
let _ = tx.send_now(output);
});
rx
}
/// Forks the bridge.
///
/// This method creates a new bridge that can be used to execute tasks on the same worker
/// instance.
pub fn fork(&self) -> Self {
let (tx, rx) = mpsc::unbounded();
let inner = self.inner.fork(Some(move |output| {
let _ = tx.send_now(output);
}));
Self { inner, rx }
}
/// Run the the current oneshot worker once in the current worker instance.
pub async fn run(&mut self, input: N::Input) -> N::Output {
// &mut self guarantees that the bridge will be
// exclusively borrowed during the time the oneshot worker is running.
self.inner.send(input);
// For each bridge, there can only be 1 active task running on the worker instance.
// The next output will be the output for the input that we just sent.
self.rx
.next()
.await
.expect("failed to receive result from worker")
}
}

View File

@ -1,12 +1,19 @@
//! This module provides task agent implementation.
mod bridge;
mod hooks;
mod provider;
mod registrar;
mod spawner;
mod traits;
mod worker;
#[doc(inline)]
pub use gloo_worker::oneshot::{Oneshot, OneshotBridge, OneshotRegistrar, OneshotSpawner};
pub use bridge::OneshotBridge;
pub use hooks::{use_oneshot_runner, UseOneshotRunnerHandle};
pub use provider::OneshotProvider;
pub(crate) use provider::OneshotProviderState;
pub use registrar::OneshotRegistrar;
pub use spawner::OneshotSpawner;
pub use traits::Oneshot;
/// A procedural macro to create oneshot agents.
pub use yew_agent_macro::oneshot;

View File

@ -97,13 +97,19 @@ where
children,
path,
lazy,
module,
reach,
} = props.clone();
// Creates a spawning function so Codec is can be erased from contexts.
let spawn_bridge_fn: Rc<dyn Fn() -> OneshotBridge<T>> = {
let path = path.clone();
Rc::new(move || OneshotSpawner::<T>::new().encoding::<C>().spawn(&path))
Rc::new(move || {
OneshotSpawner::<T>::new()
.as_module(module)
.encoding::<C>()
.spawn(&path)
})
};
let state = {

View File

@ -0,0 +1,71 @@
use std::fmt;
use serde::de::Deserialize;
use serde::ser::Serialize;
use super::traits::Oneshot;
use super::worker::OneshotWorker;
use crate::codec::{Bincode, Codec};
use crate::traits::Registrable;
use crate::worker::WorkerRegistrar;
/// A registrar for oneshot workers.
pub struct OneshotRegistrar<T, CODEC = Bincode>
where
T: Oneshot + 'static,
CODEC: Codec + 'static,
{
inner: WorkerRegistrar<OneshotWorker<T>, CODEC>,
}
impl<T, CODEC> Default for OneshotRegistrar<T, CODEC>
where
T: Oneshot + 'static,
CODEC: Codec + 'static,
{
fn default() -> Self {
Self::new()
}
}
impl<N, CODEC> OneshotRegistrar<N, CODEC>
where
N: Oneshot + 'static,
CODEC: Codec + 'static,
{
/// Creates a new Oneshot Registrar.
pub fn new() -> Self {
Self {
inner: OneshotWorker::<N>::registrar().encoding::<CODEC>(),
}
}
/// Sets the encoding.
pub fn encoding<C>(&self) -> OneshotRegistrar<N, C>
where
C: Codec + 'static,
{
OneshotRegistrar {
inner: self.inner.encoding::<C>(),
}
}
/// Registers the worker.
pub fn register(&self)
where
N::Input: Serialize + for<'de> Deserialize<'de>,
N::Output: Serialize + for<'de> Deserialize<'de>,
{
self.inner.register()
}
}
impl<T, CODEC> fmt::Debug for OneshotRegistrar<T, CODEC>
where
T: Oneshot + 'static,
CODEC: Codec + 'static,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("OneshotRegistrar<_>").finish()
}
}

View File

@ -0,0 +1,77 @@
use serde::de::Deserialize;
use serde::ser::Serialize;
use super::bridge::OneshotBridge;
use super::traits::Oneshot;
use super::worker::OneshotWorker;
use crate::codec::{Bincode, Codec};
use crate::worker::WorkerSpawner;
/// A spawner to create oneshot workers.
#[derive(Debug, Default)]
pub struct OneshotSpawner<N, CODEC = Bincode>
where
N: Oneshot + 'static,
CODEC: Codec,
{
inner: WorkerSpawner<OneshotWorker<N>, CODEC>,
}
impl<N, CODEC> OneshotSpawner<N, CODEC>
where
N: Oneshot + 'static,
CODEC: Codec,
{
/// Creates a [OneshotSpawner].
pub const fn new() -> Self {
Self {
inner: WorkerSpawner::<OneshotWorker<N>, CODEC>::new(),
}
}
/// Sets a new message encoding.
pub const fn encoding<C>(&self) -> OneshotSpawner<N, C>
where
C: Codec,
{
OneshotSpawner {
inner: WorkerSpawner::<OneshotWorker<N>, C>::new(),
}
}
/// Indicates that [`spawn`](WorkerSpawner#method.spawn) should expect a
/// `path` to a loader shim script (e.g. when using Trunk, created by using
/// the [`data-loader-shim`](https://trunkrs.dev/assets/#link-asset-types)
/// asset type) and one does not need to be generated. `false` by default.
pub fn with_loader(mut self, with_loader: bool) -> Self {
self.inner.with_loader(with_loader);
self
}
/// Determines whether the worker will be spawned with
/// [`options.type`](https://developer.mozilla.org/en-US/docs/Web/API/Worker/Worker#type)
/// set to `module`. `true` by default.
///
/// This option should be un-set if the worker was created with the
/// `--target no-modules` flag of `wasm-bindgen`. If using Trunk, see the
/// [`data-bindgen-target`](https://trunkrs.dev/assets/#link-asset-types)
/// asset type.
pub fn as_module(mut self, as_module: bool) -> Self {
self.inner.as_module(as_module);
self
}
/// Spawns a Oneshot Worker.
pub fn spawn(mut self, path: &str) -> OneshotBridge<N>
where
N::Input: Serialize + for<'de> Deserialize<'de>,
N::Output: Serialize + for<'de> Deserialize<'de>,
{
let rx = OneshotBridge::register_callback(&mut self.inner);
let inner = self.inner.spawn(path);
OneshotBridge::new(inner, rx)
}
}

View File

@ -0,0 +1,10 @@
use std::future::Future;
/// A future-based worker that for each input, one output is produced.
pub trait Oneshot: Future {
/// Incoming message type.
type Input;
/// Creates an oneshot worker.
fn create(input: Self::Input) -> Self;
}

View File

@ -0,0 +1,64 @@
use super::traits::Oneshot;
use crate::worker::{HandlerId, Worker, WorkerDestroyHandle, WorkerScope};
pub(crate) enum Message<T>
where
T: Oneshot,
{
Finished {
handler_id: HandlerId,
output: T::Output,
},
}
pub(crate) struct OneshotWorker<T>
where
T: 'static + Oneshot,
{
running_tasks: usize,
destruct_handle: Option<WorkerDestroyHandle<Self>>,
}
impl<T> Worker for OneshotWorker<T>
where
T: 'static + Oneshot,
{
type Input = T::Input;
type Message = Message<T>;
type Output = T::Output;
fn create(_scope: &WorkerScope<Self>) -> Self {
Self {
running_tasks: 0,
destruct_handle: None,
}
}
fn update(&mut self, scope: &WorkerScope<Self>, msg: Self::Message) {
let Message::Finished { handler_id, output } = msg;
self.running_tasks -= 1;
scope.respond(handler_id, output);
if self.running_tasks == 0 {
self.destruct_handle = None;
}
}
fn received(&mut self, scope: &WorkerScope<Self>, input: Self::Input, handler_id: HandlerId) {
self.running_tasks += 1;
scope.send_future(async move {
let output = T::create(input).await;
Message::Finished { handler_id, output }
});
}
fn destroy(&mut self, _scope: &WorkerScope<Self>, destruct: WorkerDestroyHandle<Self>) {
if self.running_tasks > 0 {
self.destruct_handle = Some(destruct);
}
}
}

View File

@ -0,0 +1,153 @@
use std::fmt;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::sink::Sink;
use futures::stream::{FusedStream, Stream};
use pinned::mpsc;
use pinned::mpsc::{UnboundedReceiver, UnboundedSender};
use thiserror::Error;
use super::messages::{ReactorInput, ReactorOutput};
use super::scope::ReactorScoped;
use super::traits::Reactor;
use super::worker::ReactorWorker;
use crate::worker::{WorkerBridge, WorkerSpawner};
use crate::Codec;
/// A connection manager for components interaction with oneshot workers.
///
/// As this type implements [Stream] + [Sink], it can be splitted with [`StreamExt::split`].
pub struct ReactorBridge<R>
where
R: Reactor + 'static,
{
inner: WorkerBridge<ReactorWorker<R>>,
rx: UnboundedReceiver<<R::Scope as ReactorScoped>::Output>,
}
impl<R> fmt::Debug for ReactorBridge<R>
where
R: Reactor,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("ReactorBridge<_>")
}
}
impl<R> ReactorBridge<R>
where
R: Reactor + 'static,
{
#[inline(always)]
pub(crate) fn new(
inner: WorkerBridge<ReactorWorker<R>>,
rx: UnboundedReceiver<<R::Scope as ReactorScoped>::Output>,
) -> Self {
Self { inner, rx }
}
pub(crate) fn output_callback(
tx: &UnboundedSender<<R::Scope as ReactorScoped>::Output>,
output: ReactorOutput<<R::Scope as ReactorScoped>::Output>,
) {
match output {
ReactorOutput::Output(m) => {
let _ = tx.send_now(m);
}
ReactorOutput::Finish => {
tx.close_now();
}
}
}
#[inline(always)]
pub(crate) fn register_callback<CODEC>(
spawner: &mut WorkerSpawner<ReactorWorker<R>, CODEC>,
) -> UnboundedReceiver<<R::Scope as ReactorScoped>::Output>
where
CODEC: Codec,
{
let (tx, rx) = mpsc::unbounded();
spawner.callback(move |output| Self::output_callback(&tx, output));
rx
}
/// Forks the bridge.
///
/// This method creates a new bridge connected to a new reactor on the same worker instance.
pub fn fork(&self) -> Self {
let (tx, rx) = mpsc::unbounded();
let inner = self
.inner
.fork(Some(move |output| Self::output_callback(&tx, output)));
Self { inner, rx }
}
/// Sends an input to the current reactor.
pub fn send_input(&self, msg: <R::Scope as ReactorScoped>::Input) {
self.inner.send(ReactorInput::Input(msg));
}
}
impl<R> Stream for ReactorBridge<R>
where
R: Reactor + 'static,
{
type Item = <R::Scope as ReactorScoped>::Output;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.rx).poll_next(cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.rx.size_hint()
}
}
impl<R> FusedStream for ReactorBridge<R>
where
R: Reactor + 'static,
{
fn is_terminated(&self) -> bool {
self.rx.is_terminated()
}
}
/// An error type for bridge sink.
#[derive(Error, Clone, PartialEq, Eq, Debug)]
pub enum ReactorBridgeSinkError {
/// A bridge is an RAII Guard, it can only be closed by dropping the value.
#[error("attempting to close the bridge via the sink")]
AttemptClosure,
}
impl<R> Sink<<R::Scope as ReactorScoped>::Input> for ReactorBridge<R>
where
R: Reactor + 'static,
{
type Error = ReactorBridgeSinkError;
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Err(ReactorBridgeSinkError::AttemptClosure))
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn start_send(
self: Pin<&mut Self>,
item: <R::Scope as ReactorScoped>::Input,
) -> Result<(), Self::Error> {
self.send_input(item);
Ok(())
}
}

View File

@ -0,0 +1,17 @@
use serde::{Deserialize, Serialize};
/// The Bridge Input.
#[derive(Debug, Serialize, Deserialize)]
pub(crate) enum ReactorInput<I> {
/// An input message.
Input(I),
}
/// The Bridge Output.
#[derive(Debug, Serialize, Deserialize)]
pub enum ReactorOutput<O> {
/// An output message has been received.
Output(O),
/// Reactor for current bridge has exited.
Finish,
}

View File

@ -36,18 +36,26 @@
//! }
//! ```
mod bridge;
mod hooks;
mod messages;
mod provider;
mod registrar;
mod scope;
mod spawner;
mod traits;
mod worker;
#[doc(inline)]
pub use gloo_worker::reactor::{
Reactor, ReactorBridge, ReactorRegistrar, ReactorScope, ReactorScoped, ReactorSpawner,
};
pub use bridge::{ReactorBridge, ReactorBridgeSinkError};
pub use hooks::{
use_reactor_bridge, use_reactor_subscription, ReactorEvent, UseReactorBridgeHandle,
UseReactorSubscriptionHandle,
};
pub use provider::ReactorProvider;
pub(crate) use provider::ReactorProviderState;
pub use registrar::ReactorRegistrar;
pub use scope::{ReactorScope, ReactorScoped};
pub use spawner::ReactorSpawner;
pub use traits::Reactor;
/// A procedural macro to create reactor agents.
pub use yew_agent_macro::reactor;

View File

@ -3,11 +3,10 @@ use std::cell::RefCell;
use std::fmt;
use std::rc::Rc;
use gloo_worker::reactor::ReactorScoped;
use serde::{Deserialize, Serialize};
use yew::prelude::*;
use super::{Reactor, ReactorBridge, ReactorSpawner};
use super::{Reactor, ReactorBridge, ReactorScoped, ReactorSpawner};
use crate::utils::get_next_id;
use crate::worker::WorkerProviderProps;
use crate::{Bincode, Codec, Reach};
@ -100,13 +99,19 @@ where
children,
path,
lazy,
module,
reach,
} = props.clone();
// Creates a spawning function so Codec is can be erased from contexts.
let spawn_bridge_fn: Rc<dyn Fn() -> ReactorBridge<R>> = {
let path = path.clone();
Rc::new(move || ReactorSpawner::<R>::new().encoding::<C>().spawn(&path))
Rc::new(move || {
ReactorSpawner::<R>::new()
.as_module(module)
.encoding::<C>()
.spawn(&path)
})
};
let state = {

View File

@ -0,0 +1,72 @@
use std::fmt;
use serde::de::Deserialize;
use serde::ser::Serialize;
use super::scope::ReactorScoped;
use super::traits::Reactor;
use super::worker::ReactorWorker;
use crate::codec::{Bincode, Codec};
use crate::traits::Registrable;
use crate::worker::WorkerRegistrar;
/// A registrar for reactor workers.
pub struct ReactorRegistrar<R, CODEC = Bincode>
where
R: Reactor + 'static,
CODEC: Codec + 'static,
{
inner: WorkerRegistrar<ReactorWorker<R>, CODEC>,
}
impl<R, CODEC> Default for ReactorRegistrar<R, CODEC>
where
R: Reactor + 'static,
CODEC: Codec + 'static,
{
fn default() -> Self {
Self::new()
}
}
impl<R, CODEC> ReactorRegistrar<R, CODEC>
where
R: Reactor + 'static,
CODEC: Codec + 'static,
{
/// Creates a new reactor registrar.
pub fn new() -> Self {
Self {
inner: ReactorWorker::<R>::registrar().encoding::<CODEC>(),
}
}
/// Sets the encoding.
pub fn encoding<C>(&self) -> ReactorRegistrar<R, C>
where
C: Codec + 'static,
{
ReactorRegistrar {
inner: self.inner.encoding::<C>(),
}
}
/// Registers the worker.
pub fn register(&self)
where
<R::Scope as ReactorScoped>::Input: Serialize + for<'de> Deserialize<'de>,
<R::Scope as ReactorScoped>::Output: Serialize + for<'de> Deserialize<'de>,
{
self.inner.register()
}
}
impl<R, CODEC> fmt::Debug for ReactorRegistrar<R, CODEC>
where
R: Reactor + 'static,
CODEC: Codec + 'static,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ReactorRegistrar<_>").finish()
}
}

View File

@ -0,0 +1,91 @@
use std::convert::Infallible;
use std::fmt;
use std::pin::Pin;
use futures::stream::{FusedStream, Stream};
use futures::task::{Context, Poll};
use futures::Sink;
/// A handle to communicate with bridges.
pub struct ReactorScope<I, O> {
input_stream: Pin<Box<dyn FusedStream<Item = I>>>,
output_sink: Pin<Box<dyn Sink<O, Error = Infallible>>>,
}
impl<I, O> fmt::Debug for ReactorScope<I, O> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ReactorScope<_>").finish()
}
}
impl<I, O> Stream for ReactorScope<I, O> {
type Item = I;
#[inline(always)]
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.input_stream).poll_next(cx)
}
#[inline(always)]
fn size_hint(&self) -> (usize, Option<usize>) {
self.input_stream.size_hint()
}
}
impl<I, O> FusedStream for ReactorScope<I, O> {
#[inline(always)]
fn is_terminated(&self) -> bool {
self.input_stream.is_terminated()
}
}
/// A helper trait to extract the input and output type from a [ReactorStream].
pub trait ReactorScoped: Stream + FusedStream {
/// The Input Message.
type Input;
/// The Output Message.
type Output;
/// Creates a ReactorReceiver.
fn new<IS, OS>(input_stream: IS, output_sink: OS) -> Self
where
IS: Stream<Item = Self::Input> + FusedStream + 'static,
OS: Sink<Self::Output, Error = Infallible> + 'static;
}
impl<I, O> ReactorScoped for ReactorScope<I, O> {
type Input = I;
type Output = O;
#[inline]
fn new<IS, OS>(input_stream: IS, output_sink: OS) -> Self
where
IS: Stream<Item = Self::Input> + FusedStream + 'static,
OS: Sink<Self::Output, Error = Infallible> + 'static,
{
Self {
input_stream: Box::pin(input_stream),
output_sink: Box::pin(output_sink),
}
}
}
impl<I, O> Sink<O> for ReactorScope<I, O> {
type Error = Infallible;
fn start_send(mut self: Pin<&mut Self>, item: O) -> Result<(), Self::Error> {
Pin::new(&mut self.output_sink).start_send(item)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.output_sink).poll_close(cx)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.output_sink).poll_flush(cx)
}
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.output_sink).poll_flush(cx)
}
}

View File

@ -0,0 +1,78 @@
use serde::de::Deserialize;
use serde::ser::Serialize;
use super::bridge::ReactorBridge;
use super::scope::ReactorScoped;
use super::traits::Reactor;
use super::worker::ReactorWorker;
use crate::codec::{Bincode, Codec};
use crate::worker::WorkerSpawner;
/// A spawner to create oneshot workers.
#[derive(Debug, Default)]
pub struct ReactorSpawner<R, CODEC = Bincode>
where
R: Reactor + 'static,
CODEC: Codec,
{
inner: WorkerSpawner<ReactorWorker<R>, CODEC>,
}
impl<R, CODEC> ReactorSpawner<R, CODEC>
where
R: Reactor + 'static,
CODEC: Codec,
{
/// Creates a ReactorSpawner.
pub const fn new() -> Self {
Self {
inner: WorkerSpawner::<ReactorWorker<R>, CODEC>::new(),
}
}
/// Sets a new message encoding.
pub const fn encoding<C>(&self) -> ReactorSpawner<R, C>
where
C: Codec,
{
ReactorSpawner {
inner: WorkerSpawner::<ReactorWorker<R>, C>::new(),
}
}
/// Indicates that [`spawn`](WorkerSpawner#method.spawn) should expect a
/// `path` to a loader shim script (e.g. when using Trunk, created by using
/// the [`data-loader-shim`](https://trunkrs.dev/assets/#link-asset-types)
/// asset type) and one does not need to be generated. `false` by default.
pub fn with_loader(mut self, with_loader: bool) -> Self {
self.inner.with_loader(with_loader);
self
}
/// Determines whether the worker will be spawned with
/// [`options.type`](https://developer.mozilla.org/en-US/docs/Web/API/Worker/Worker#type)
/// set to `module`. `true` by default.
///
/// This option should be un-set if the worker was created with the
/// `--target no-modules` flag of `wasm-bindgen`. If using Trunk, see the
/// [`data-bindgen-target`](https://trunkrs.dev/assets/#link-asset-types)
/// asset type.
pub fn as_module(mut self, as_module: bool) -> Self {
self.inner.as_module(as_module);
self
}
/// Spawns a reactor worker.
pub fn spawn(mut self, path: &str) -> ReactorBridge<R>
where
<R::Scope as ReactorScoped>::Input: Serialize + for<'de> Deserialize<'de>,
<R::Scope as ReactorScoped>::Output: Serialize + for<'de> Deserialize<'de>,
{
let rx = ReactorBridge::register_callback(&mut self.inner);
let inner = self.inner.spawn(path);
ReactorBridge::new(inner, rx)
}
}

View File

@ -0,0 +1,12 @@
use std::future::Future;
use super::scope::ReactorScoped;
/// A reactor worker.
pub trait Reactor: Future<Output = ()> {
/// The Reactor Scope
type Scope: ReactorScoped;
/// Creates a reactor worker.
fn create(scope: Self::Scope) -> Self;
}

View File

@ -0,0 +1,117 @@
use std::collections::HashMap;
use std::convert::Infallible;
use futures::sink;
use futures::stream::StreamExt;
use pinned::mpsc;
use pinned::mpsc::UnboundedSender;
use wasm_bindgen_futures::spawn_local;
use super::messages::{ReactorInput, ReactorOutput};
use super::scope::ReactorScoped;
use super::traits::Reactor;
use crate::worker::{HandlerId, Worker, WorkerDestroyHandle, WorkerScope};
pub(crate) enum Message {
ReactorExited(HandlerId),
}
pub(crate) struct ReactorWorker<R>
where
R: 'static + Reactor,
{
senders: HashMap<HandlerId, UnboundedSender<<R::Scope as ReactorScoped>::Input>>,
destruct_handle: Option<WorkerDestroyHandle<Self>>,
}
impl<R> Worker for ReactorWorker<R>
where
R: 'static + Reactor,
{
type Input = ReactorInput<<R::Scope as ReactorScoped>::Input>;
type Message = Message;
type Output = ReactorOutput<<R::Scope as ReactorScoped>::Output>;
fn create(_scope: &WorkerScope<Self>) -> Self {
Self {
senders: HashMap::new(),
destruct_handle: None,
}
}
fn update(&mut self, scope: &WorkerScope<Self>, msg: Self::Message) {
match msg {
Self::Message::ReactorExited(id) => {
scope.respond(id, ReactorOutput::Finish);
self.senders.remove(&id);
}
}
// All reactors have closed themselves, the worker can now close.
if self.destruct_handle.is_some() && self.senders.is_empty() {
self.destruct_handle = None;
}
}
fn connected(&mut self, scope: &WorkerScope<Self>, id: HandlerId) {
let from_bridge = {
let (tx, rx) = mpsc::unbounded();
self.senders.insert(id, tx);
rx
};
let to_bridge = {
let scope_ = scope.clone();
let (tx, mut rx) = mpsc::unbounded();
spawn_local(async move {
while let Some(m) = rx.next().await {
scope_.respond(id, ReactorOutput::Output(m));
}
});
sink::unfold((), move |_, item: <R::Scope as ReactorScoped>::Output| {
let tx = tx.clone();
async move {
let _ = tx.send_now(item);
Ok::<(), Infallible>(())
}
})
};
let reactor_scope = ReactorScoped::new(from_bridge, to_bridge);
let reactor = R::create(reactor_scope);
scope.send_future(async move {
reactor.await;
Message::ReactorExited(id)
});
}
fn received(&mut self, _scope: &WorkerScope<Self>, input: Self::Input, id: HandlerId) {
match input {
Self::Input::Input(input) => {
if let Some(m) = self.senders.get_mut(&id) {
let _result = m.send_now(input);
}
}
}
}
fn disconnected(&mut self, _scope: &WorkerScope<Self>, id: HandlerId) {
// We close this channel, but drop it when the reactor has exited itself.
if let Some(m) = self.senders.get_mut(&id) {
m.close_now();
}
}
fn destroy(&mut self, _scope: &WorkerScope<Self>, destruct: WorkerDestroyHandle<Self>) {
if !self.senders.is_empty() {
self.destruct_handle = Some(destruct);
}
}
}

View File

@ -0,0 +1,19 @@
//! Submodule providing the `Spawnable` and `Registrable` traits.
/// A Worker that can be spawned by a spawner.
pub trait Spawnable {
/// Spawner Type.
type Spawner;
/// Creates a spawner.
fn spawner() -> Self::Spawner;
}
/// A trait to enable public workers being registered in a web worker.
pub trait Registrable {
/// Registrar Type.
type Registrar;
/// Creates a registrar for the current worker.
fn registrar() -> Self::Registrar;
}

View File

@ -1,8 +1,14 @@
use std::rc::Rc;
use std::sync::atomic::{AtomicUsize, Ordering};
use wasm_bindgen::UnwrapThrowExt;
use yew::Reducible;
/// Convenience function to avoid repeating expect logic.
pub fn window() -> web_sys::Window {
web_sys::window().expect_throw("Can't find the global Window")
}
/// Gets a unique worker id
pub(crate) fn get_next_id() -> usize {
static CTR: AtomicUsize = AtomicUsize::new(0);

View File

@ -0,0 +1,176 @@
use std::cell::RefCell;
use std::collections::HashMap;
use std::fmt;
use std::marker::PhantomData;
use std::rc::{Rc, Weak};
use serde::{Deserialize, Serialize};
use super::handler_id::HandlerId;
use super::messages::ToWorker;
use super::native_worker::NativeWorkerExt;
use super::traits::Worker;
use super::{Callback, Shared};
use crate::codec::Codec;
pub(crate) type ToWorkerQueue<W> = Vec<ToWorker<W>>;
pub(crate) type CallbackMap<W> = HashMap<HandlerId, Weak<dyn Fn(<W as Worker>::Output)>>;
struct WorkerBridgeInner<W>
where
W: Worker,
{
// When worker is loaded, queue becomes None.
pending_queue: Shared<Option<ToWorkerQueue<W>>>,
callbacks: Shared<CallbackMap<W>>,
post_msg: Rc<dyn Fn(ToWorker<W>)>,
}
impl<W> fmt::Debug for WorkerBridgeInner<W>
where
W: Worker,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("WorkerBridgeInner<_>")
}
}
impl<W> WorkerBridgeInner<W>
where
W: Worker,
{
/// Send a message to the worker, queuing the message if necessary
fn send_message(&self, msg: ToWorker<W>) {
let mut pending_queue = self.pending_queue.borrow_mut();
match pending_queue.as_mut() {
Some(m) => {
m.push(msg);
}
None => {
(self.post_msg)(msg);
}
}
}
}
impl<W> Drop for WorkerBridgeInner<W>
where
W: Worker,
{
fn drop(&mut self) {
let destroy = ToWorker::Destroy;
self.send_message(destroy);
}
}
/// A connection manager for components interaction with workers.
pub struct WorkerBridge<W>
where
W: Worker,
{
inner: Rc<WorkerBridgeInner<W>>,
id: HandlerId,
_worker: PhantomData<W>,
_cb: Option<Rc<dyn Fn(W::Output)>>,
}
impl<W> WorkerBridge<W>
where
W: Worker,
{
fn init(&self) {
self.inner.send_message(ToWorker::Connected(self.id));
}
pub(crate) fn new<CODEC>(
id: HandlerId,
native_worker: web_sys::Worker,
pending_queue: Rc<RefCell<Option<ToWorkerQueue<W>>>>,
callbacks: Rc<RefCell<CallbackMap<W>>>,
callback: Option<Callback<W::Output>>,
) -> Self
where
CODEC: Codec,
W::Input: Serialize + for<'de> Deserialize<'de>,
{
let post_msg = move |msg: ToWorker<W>| native_worker.post_packed_message::<_, CODEC>(msg);
let self_ = Self {
inner: WorkerBridgeInner {
pending_queue,
callbacks,
post_msg: Rc::new(post_msg),
}
.into(),
id,
_worker: PhantomData,
_cb: callback,
};
self_.init();
self_
}
/// Send a message to the current worker.
pub fn send(&self, msg: W::Input) {
let msg = ToWorker::ProcessInput(self.id, msg);
self.inner.send_message(msg);
}
/// Forks the bridge with a different callback.
///
/// This creates a new [HandlerID] that helps the worker to differentiate bridges.
pub fn fork<F>(&self, cb: Option<F>) -> Self
where
F: 'static + Fn(W::Output),
{
let cb = cb.map(|m| Rc::new(m) as Rc<dyn Fn(W::Output)>);
let handler_id = HandlerId::new();
if let Some(cb_weak) = cb.as_ref().map(Rc::downgrade) {
self.inner
.callbacks
.borrow_mut()
.insert(handler_id, cb_weak);
}
let self_ = Self {
inner: self.inner.clone(),
id: handler_id,
_worker: PhantomData,
_cb: cb,
};
self_.init();
self_
}
}
impl<W> Drop for WorkerBridge<W>
where
W: Worker,
{
fn drop(&mut self) {
let disconnected = ToWorker::Disconnected(self.id);
self.inner.send_message(disconnected);
}
}
impl<W> fmt::Debug for WorkerBridge<W>
where
W: Worker,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("WorkerBridge<_>")
}
}
impl<W> PartialEq for WorkerBridge<W>
where
W: Worker,
{
fn eq(&self, rhs: &Self) -> bool {
self.id == rhs.id
}
}

View File

@ -0,0 +1,17 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use serde::{Deserialize, Serialize};
/// Identifier to send output to bridges.
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Hash, Clone, Copy)]
pub struct HandlerId(usize);
impl HandlerId {
pub(crate) fn new() -> Self {
static CTR: AtomicUsize = AtomicUsize::new(0);
let id = CTR.fetch_add(1, Ordering::SeqCst);
HandlerId(id)
}
}

View File

@ -0,0 +1,132 @@
use wasm_bindgen::prelude::*;
use super::messages::ToWorker;
use super::native_worker::{DedicatedWorker, WorkerSelf};
use super::scope::{WorkerDestroyHandle, WorkerScope};
use super::traits::Worker;
use super::Shared;
pub(crate) struct WorkerState<W>
where
W: Worker,
{
worker: Option<(W, WorkerScope<W>)>,
to_destroy: bool,
}
impl<W> WorkerState<W>
where
W: Worker,
{
pub fn new() -> Self {
WorkerState {
worker: None,
to_destroy: false,
}
}
}
/// Internal Worker lifecycle events
pub(crate) enum WorkerLifecycleEvent<W: Worker> {
/// Request to create the scope
Create(WorkerScope<W>),
/// Internal Worker message
Message(W::Message),
/// External Messages from bridges
Remote(ToWorker<W>),
/// Destroy the Worker
Destroy,
}
pub(crate) struct WorkerRunnable<W: Worker> {
pub state: Shared<WorkerState<W>>,
pub event: WorkerLifecycleEvent<W>,
}
impl<W> WorkerRunnable<W>
where
W: Worker + 'static,
{
pub fn run(self) {
let mut state = self.state.borrow_mut();
// We should block all event other than message after a worker is destroyed.
match self.event {
WorkerLifecycleEvent::Create(scope) => {
if state.to_destroy {
return;
}
state.worker = Some((W::create(&scope), scope));
}
WorkerLifecycleEvent::Message(msg) => {
if let Some((worker, scope)) = state.worker.as_mut() {
worker.update(scope, msg);
}
}
WorkerLifecycleEvent::Remote(ToWorker::Connected(id)) => {
if state.to_destroy {
return;
}
let (worker, scope) = state
.worker
.as_mut()
.expect_throw("worker was not created to process connected messages");
worker.connected(scope, id);
}
WorkerLifecycleEvent::Remote(ToWorker::ProcessInput(id, inp)) => {
if state.to_destroy {
return;
}
let (worker, scope) = state
.worker
.as_mut()
.expect_throw("worker was not created to process inputs");
worker.received(scope, inp, id);
}
WorkerLifecycleEvent::Remote(ToWorker::Disconnected(id)) => {
if state.to_destroy {
return;
}
let (worker, scope) = state
.worker
.as_mut()
.expect_throw("worker was not created to process disconnected messages");
worker.disconnected(scope, id);
}
WorkerLifecycleEvent::Remote(ToWorker::Destroy) => {
if state.to_destroy {
return;
}
state.to_destroy = true;
let (worker, scope) = state
.worker
.as_mut()
.expect_throw("trying to destroy not existent worker");
let destruct = WorkerDestroyHandle::new(scope.clone());
worker.destroy(scope, destruct);
}
WorkerLifecycleEvent::Destroy => {
state
.worker
.take()
.expect_throw("worker is not initialised or already destroyed");
DedicatedWorker::worker_self().close();
}
}
}
}

View File

@ -0,0 +1,32 @@
use serde::{Deserialize, Serialize};
use super::handler_id::HandlerId;
use super::traits::Worker;
/// Serializable messages to worker
#[derive(Serialize, Deserialize, Debug)]
pub(crate) enum ToWorker<W>
where
W: Worker,
{
/// Client is connected
Connected(HandlerId),
/// Incoming message to Worker
ProcessInput(HandlerId, W::Input),
/// Client is disconnected
Disconnected(HandlerId),
/// Worker should be terminated
Destroy,
}
/// Serializable messages sent by worker to consumer
#[derive(Serialize, Deserialize, Debug)]
pub(crate) enum FromWorker<W>
where
W: Worker,
{
/// Worker sends this message when `wasm` bundle has loaded.
WorkerLoaded,
/// Outgoing message to consumer
ProcessOutput(HandlerId, W::Output),
}

View File

@ -63,15 +63,35 @@
//! # }
//! ```
mod bridge;
mod handler_id;
mod hooks;
mod lifecycle;
mod messages;
mod native_worker;
mod provider;
mod registrar;
mod scope;
mod spawner;
mod traits;
#[doc(inline)]
pub use gloo_worker::{
HandlerId, Worker, WorkerBridge, WorkerDestroyHandle, WorkerRegistrar, WorkerScope,
};
use std::cell::RefCell;
use std::rc::Rc;
pub use bridge::WorkerBridge;
pub use handler_id::HandlerId;
pub use hooks::{
use_worker_bridge, use_worker_subscription, UseWorkerBridgeHandle, UseWorkerSubscriptionHandle,
};
pub(crate) use provider::WorkerProviderState;
pub use provider::{WorkerProvider, WorkerProviderProps};
pub use registrar::WorkerRegistrar;
pub use scope::{WorkerDestroyHandle, WorkerScope};
pub use spawner::WorkerSpawner;
pub use traits::Worker;
/// Alias for `Rc<RefCell<T>>`
type Shared<T> = Rc<RefCell<T>>;
/// Alias for `Rc<dyn Fn(IN)>`
type Callback<IN> = Rc<dyn Fn(IN)>;

View File

@ -0,0 +1,68 @@
use serde::{Deserialize, Serialize};
use wasm_bindgen::closure::Closure;
use wasm_bindgen::prelude::*;
use wasm_bindgen::{JsCast, JsValue};
pub(crate) use web_sys::Worker as DedicatedWorker;
use web_sys::{DedicatedWorkerGlobalScope, MessageEvent};
use crate::codec::Codec;
pub(crate) trait WorkerSelf {
type GlobalScope;
fn worker_self() -> Self::GlobalScope;
}
impl WorkerSelf for DedicatedWorker {
type GlobalScope = DedicatedWorkerGlobalScope;
fn worker_self() -> Self::GlobalScope {
JsValue::from(js_sys::global()).into()
}
}
pub(crate) trait NativeWorkerExt {
fn set_on_packed_message<T, CODEC, F>(&self, handler: F)
where
T: Serialize + for<'de> Deserialize<'de>,
CODEC: Codec,
F: 'static + Fn(T);
fn post_packed_message<T, CODEC>(&self, data: T)
where
T: Serialize + for<'de> Deserialize<'de>,
CODEC: Codec;
}
macro_rules! worker_ext_impl {
($($type:path),+) => {$(
impl NativeWorkerExt for $type {
fn set_on_packed_message<T, CODEC, F>(&self, handler: F)
where
T: Serialize + for<'de> Deserialize<'de>,
CODEC: Codec,
F: 'static + Fn(T)
{
let handler = move |message: MessageEvent| {
let msg = CODEC::decode(message.data());
handler(msg);
};
let closure = Closure::wrap(Box::new(handler) as Box<dyn Fn(MessageEvent)>).into_js_value();
self.set_onmessage(Some(closure.as_ref().unchecked_ref()));
}
fn post_packed_message<T, CODEC>(&self, data: T)
where
T: Serialize + for<'de> Deserialize<'de>,
CODEC: Codec
{
self.post_message(&CODEC::encode(data))
.expect_throw("failed to post message");
}
}
)+};
}
worker_ext_impl! {
DedicatedWorker, DedicatedWorkerGlobalScope
}

View File

@ -3,14 +3,13 @@ use std::cell::RefCell;
use std::fmt;
use std::rc::Rc;
use gloo_worker::Spawnable;
use serde::{Deserialize, Serialize};
use yew::prelude::*;
use super::{Worker, WorkerBridge};
use crate::reach::Reach;
use crate::utils::get_next_id;
use crate::{Bincode, Codec};
use crate::{Bincode, Codec, Spawnable};
/// Properties for [WorkerProvider].
#[derive(Debug, Properties, PartialEq, Clone)]
@ -24,6 +23,11 @@ pub struct WorkerProviderProps {
#[prop_or(Reach::Public)]
pub reach: Reach,
/// Whether the agent should be created
/// with type `Module`.
#[prop_or(false)]
pub module: bool,
/// Lazily spawn the agent.
///
/// The agent will be spawned when the first time a hook requests a bridge.
@ -112,13 +116,14 @@ where
children,
path,
lazy,
module,
reach,
} = props.clone();
// Creates a spawning function so Codec is can be erased from contexts.
let spawn_bridge_fn: Rc<dyn Fn() -> WorkerBridge<W>> = {
let path = path.clone();
Rc::new(move || W::spawner().encoding::<C>().spawn(&path))
Rc::new(move || W::spawner().as_module(module).encoding::<C>().spawn(&path))
};
let state = {

View File

@ -0,0 +1,67 @@
use std::fmt;
use std::marker::PhantomData;
use serde::de::Deserialize;
use serde::ser::Serialize;
use super::lifecycle::WorkerLifecycleEvent;
use super::messages::{FromWorker, ToWorker};
use super::native_worker::{DedicatedWorker, NativeWorkerExt, WorkerSelf};
use super::scope::WorkerScope;
use super::traits::Worker;
use crate::codec::{Bincode, Codec};
/// A Worker Registrar.
pub struct WorkerRegistrar<W, CODEC = Bincode>
where
W: Worker,
CODEC: Codec,
{
_marker: PhantomData<(W, CODEC)>,
}
impl<W: Worker> fmt::Debug for WorkerRegistrar<W> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("WorkerRegistrar<_>")
}
}
impl<W, CODEC> WorkerRegistrar<W, CODEC>
where
W: Worker + 'static,
CODEC: Codec,
{
pub(crate) fn new() -> Self {
Self {
_marker: PhantomData,
}
}
/// Sets a new message encoding.
pub fn encoding<C>(&self) -> WorkerRegistrar<W, C>
where
C: Codec,
{
WorkerRegistrar::new()
}
/// Executes an worker in the current environment.
pub fn register(&self)
where
CODEC: Codec,
W::Input: Serialize + for<'de> Deserialize<'de>,
W::Output: Serialize + for<'de> Deserialize<'de>,
{
let scope = WorkerScope::<W>::new::<CODEC>();
let upd = WorkerLifecycleEvent::Create(scope.clone());
scope.send(upd);
let handler = move |msg: ToWorker<W>| {
let upd = WorkerLifecycleEvent::Remote(msg);
scope.send(upd);
};
let loaded: FromWorker<W> = FromWorker::WorkerLoaded;
let worker = DedicatedWorker::worker_self();
worker.set_on_packed_message::<_, CODEC, _>(handler);
worker.post_packed_message::<_, CODEC>(loaded);
}
}

View File

@ -0,0 +1,170 @@
use std::cell::RefCell;
use std::fmt;
use std::future::Future;
use std::rc::Rc;
use serde::de::Deserialize;
use serde::ser::Serialize;
use wasm_bindgen_futures::spawn_local;
use super::handler_id::HandlerId;
use super::lifecycle::{WorkerLifecycleEvent, WorkerRunnable, WorkerState};
use super::messages::FromWorker;
use super::native_worker::{DedicatedWorker, NativeWorkerExt, WorkerSelf};
use super::traits::Worker;
use super::Shared;
use crate::codec::Codec;
/// A handle that closes the worker when it is dropped.
pub struct WorkerDestroyHandle<W>
where
W: Worker + 'static,
{
scope: WorkerScope<W>,
}
impl<W: Worker> fmt::Debug for WorkerDestroyHandle<W> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("WorkerDestroyHandle<_>")
}
}
impl<W> WorkerDestroyHandle<W>
where
W: Worker,
{
pub(crate) fn new(scope: WorkerScope<W>) -> Self {
Self { scope }
}
}
impl<W> Drop for WorkerDestroyHandle<W>
where
W: Worker,
{
fn drop(&mut self) {
self.scope.send(WorkerLifecycleEvent::Destroy);
}
}
/// This struct holds a reference to a component and to a global scheduler.
pub struct WorkerScope<W: Worker> {
state: Shared<WorkerState<W>>,
post_msg: Rc<dyn Fn(FromWorker<W>)>,
}
impl<W: Worker> fmt::Debug for WorkerScope<W> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("WorkerScope<_>")
}
}
impl<W: Worker> Clone for WorkerScope<W> {
fn clone(&self) -> Self {
WorkerScope {
state: self.state.clone(),
post_msg: self.post_msg.clone(),
}
}
}
impl<W> WorkerScope<W>
where
W: Worker + 'static,
{
/// Create worker scope
pub(crate) fn new<CODEC>() -> Self
where
CODEC: Codec,
W::Output: Serialize + for<'de> Deserialize<'de>,
{
let post_msg = move |msg: FromWorker<W>| {
DedicatedWorker::worker_self().post_packed_message::<_, CODEC>(msg)
};
let state = Rc::new(RefCell::new(WorkerState::new()));
WorkerScope {
post_msg: Rc::new(post_msg),
state,
}
}
/// Schedule message for sending to worker
pub(crate) fn send(&self, event: WorkerLifecycleEvent<W>) {
let state = self.state.clone();
// We can implement a custom scheduler,
// but it's easier to borrow the one from wasm-bindgen-futures.
spawn_local(async move {
WorkerRunnable { state, event }.run();
});
}
/// Send response to a worker bridge.
pub fn respond(&self, id: HandlerId, output: W::Output) {
let msg = FromWorker::<W>::ProcessOutput(id, output);
(self.post_msg)(msg);
}
/// Send a message to the worker
pub fn send_message<T>(&self, msg: T)
where
T: Into<W::Message>,
{
self.send(WorkerLifecycleEvent::Message(msg.into()));
}
/// Create a callback which will send a message to the worker when invoked.
pub fn callback<F, IN, M>(&self, function: F) -> Rc<dyn Fn(IN)>
where
M: Into<W::Message>,
F: Fn(IN) -> M + 'static,
{
let scope = self.clone();
let closure = move |input| {
let output = function(input).into();
scope.send(WorkerLifecycleEvent::Message(output));
};
Rc::new(closure)
}
/// This method creates a callback which returns a Future which
/// returns a message to be sent back to the worker
///
/// # Panics
/// If the future panics, then the promise will not resolve, and
/// will leak.
pub fn callback_future<FN, FU, IN, M>(&self, function: FN) -> Rc<dyn Fn(IN)>
where
M: Into<W::Message>,
FU: Future<Output = M> + 'static,
FN: Fn(IN) -> FU + 'static,
{
let scope = self.clone();
let closure = move |input: IN| {
let future: FU = function(input);
scope.send_future(future);
};
Rc::new(closure)
}
/// This method processes a Future that returns a message and sends it back to the worker.
///
/// # Panics
/// If the future panics, then the promise will not resolve, and will leak.
pub fn send_future<F, M>(&self, future: F)
where
M: Into<W::Message>,
F: Future<Output = M> + 'static,
{
let scope = self.clone();
let js_future = async move {
let message: W::Message = future.await.into();
let cb = scope.callback(|m: W::Message| m);
(*cb)(message);
};
wasm_bindgen_futures::spawn_local(js_future);
}
}

View File

@ -0,0 +1,217 @@
use std::cell::RefCell;
use std::collections::HashMap;
use std::fmt;
use std::marker::PhantomData;
use std::rc::{Rc, Weak};
use js_sys::Array;
use serde::de::Deserialize;
use serde::ser::Serialize;
use web_sys::{Blob, BlobPropertyBag, Url, WorkerOptions, WorkerType};
use super::bridge::{CallbackMap, WorkerBridge};
use super::handler_id::HandlerId;
use super::messages::FromWorker;
use super::native_worker::{DedicatedWorker, NativeWorkerExt};
use super::traits::Worker;
use super::{Callback, Shared};
use crate::codec::{Bincode, Codec};
use crate::utils::window;
/// A spawner to create workers.
#[derive(Clone)]
pub struct WorkerSpawner<W, CODEC = Bincode>
where
W: Worker,
CODEC: Codec,
{
_marker: PhantomData<(W, CODEC)>,
callback: Option<Callback<W::Output>>,
with_loader: bool,
as_module: bool,
}
impl<W, CODEC> fmt::Debug for WorkerSpawner<W, CODEC>
where
W: Worker,
CODEC: Codec,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("WorkerScope<_>")
}
}
impl<W, CODEC> Default for WorkerSpawner<W, CODEC>
where
W: Worker + 'static,
CODEC: Codec,
{
fn default() -> Self {
Self::new()
}
}
impl<W, CODEC> WorkerSpawner<W, CODEC>
where
W: Worker + 'static,
CODEC: Codec,
{
/// Creates a [WorkerSpawner].
pub const fn new() -> Self {
Self {
_marker: PhantomData,
callback: None,
with_loader: false,
as_module: false,
}
}
/// Sets a new message encoding.
pub fn encoding<C>(&mut self) -> WorkerSpawner<W, C>
where
C: Codec,
{
WorkerSpawner {
_marker: PhantomData,
callback: self.callback.clone(),
with_loader: self.with_loader,
as_module: self.as_module,
}
}
/// Sets a callback.
pub fn callback<F>(&mut self, cb: F) -> &mut Self
where
F: 'static + Fn(W::Output),
{
self.callback = Some(Rc::new(cb));
self
}
/// Indicates that [`spawn`](WorkerSpawner#method.spawn) should expect a
/// `path` to a loader shim script (e.g. when using Trunk, created by using
/// the [`data-loader-shim`](https://trunkrs.dev/assets/#link-asset-types)
/// asset type) and one does not need to be generated. `false` by default.
pub fn with_loader(&mut self, with_loader: bool) -> &mut Self {
self.with_loader = with_loader;
self
}
/// Determines whether the worker will be spawned with
/// [`options.type`](https://developer.mozilla.org/en-US/docs/Web/API/Worker/Worker#type)
/// set to `module`. `true` by default.
///
/// This option should be un-set if the worker was created with the
/// `--target no-modules` flag of `wasm-bindgen`. If using Trunk, see the
/// [`data-bindgen-target`](https://trunkrs.dev/assets/#link-asset-types)
/// asset type.
pub fn as_module(&mut self, as_module: bool) -> &mut Self {
self.as_module = as_module;
self
}
/// Spawns a Worker.
pub fn spawn(&self, path: &str) -> WorkerBridge<W>
where
W::Input: Serialize + for<'de> Deserialize<'de>,
W::Output: Serialize + for<'de> Deserialize<'de>,
{
let worker = self.create_worker(path).expect("failed to spawn worker");
self.spawn_inner(worker)
}
fn create_worker(&self, path: &str) -> Option<DedicatedWorker> {
let path = if self.with_loader {
std::borrow::Cow::Borrowed(path)
} else {
let js_shim_url = Url::new_with_base(
path,
&window().location().href().expect("failed to read href."),
)
.expect("failed to create url for javascript entrypoint")
.to_string();
let wasm_url = js_shim_url.replace(".js", "_bg.wasm");
let array = Array::new();
let shim = if self.as_module {
format!(r#"import init from '{js_shim_url}';await init();"#)
} else {
format!(r#"importScripts("{js_shim_url}");wasm_bindgen("{wasm_url}");"#)
};
array.push(&shim.into());
let blob_property = BlobPropertyBag::new();
blob_property.set_type("application/javascript");
let blob = Blob::new_with_str_sequence_and_options(&array, &blob_property).unwrap();
let url = Url::create_object_url_with_blob(&blob).unwrap();
std::borrow::Cow::Owned(url)
};
let path = path.as_ref();
if self.as_module {
let options = WorkerOptions::new();
options.set_type(WorkerType::Module);
DedicatedWorker::new_with_options(path, &options).ok()
} else {
DedicatedWorker::new(path).ok()
}
}
fn spawn_inner(&self, worker: DedicatedWorker) -> WorkerBridge<W>
where
W::Input: Serialize + for<'de> Deserialize<'de>,
W::Output: Serialize + for<'de> Deserialize<'de>,
{
let pending_queue = Rc::new(RefCell::new(Some(Vec::new())));
let handler_id = HandlerId::new();
let mut callbacks = HashMap::new();
if let Some(m) = self.callback.as_ref().map(Rc::downgrade) {
callbacks.insert(handler_id, m);
}
let callbacks: Shared<CallbackMap<W>> = Rc::new(RefCell::new(callbacks));
let handler = {
let pending_queue = pending_queue.clone();
let callbacks = callbacks.clone();
let worker = worker.clone();
move |msg: FromWorker<W>| match msg {
FromWorker::WorkerLoaded => {
if let Some(pending_queue) = pending_queue.borrow_mut().take() {
for to_worker in pending_queue.into_iter() {
worker.post_packed_message::<_, CODEC>(to_worker);
}
}
}
FromWorker::ProcessOutput(id, output) => {
let mut callbacks = callbacks.borrow_mut();
if let Some(m) = callbacks.get(&id) {
if let Some(m) = Weak::upgrade(m) {
m(output);
} else {
callbacks.remove(&id);
}
}
}
}
};
worker.set_on_packed_message::<_, CODEC, _>(handler);
WorkerBridge::<W>::new::<CODEC>(
handler_id,
worker,
pending_queue,
callbacks,
self.callback.clone(),
)
}
}

View File

@ -0,0 +1,87 @@
use super::handler_id::HandlerId;
use super::registrar::WorkerRegistrar;
use super::scope::{WorkerDestroyHandle, WorkerScope};
use super::spawner::WorkerSpawner;
use crate::traits::{Registrable, Spawnable};
/// Declares the behaviour of a worker.
pub trait Worker: Sized {
/// Update message type.
type Message;
/// Incoming message type.
type Input;
/// Outgoing message type.
type Output;
/// Creates an instance of a worker.
fn create(scope: &WorkerScope<Self>) -> Self;
/// Receives an update.
///
/// This method is called when the worker send messages to itself via
/// [`WorkerScope::send_message`].
fn update(&mut self, scope: &WorkerScope<Self>, msg: Self::Message);
/// New bridge created.
///
/// When a new bridge is created by [`WorkerSpawner::spawn`](crate::WorkerSpawner)
/// or [`WorkerBridge::fork`](crate::WorkerBridge::fork),
/// the worker will be notified the [`HandlerId`] of the created bridge via this method.
fn connected(&mut self, scope: &WorkerScope<Self>, id: HandlerId) {
let _scope = scope;
let _id = id;
}
/// Receives an input from a connected bridge.
///
/// When a bridge sends an input via [`WorkerBridge::send`](crate::WorkerBridge::send), the
/// worker will receive the input via this method.
fn received(&mut self, scope: &WorkerScope<Self>, msg: Self::Input, id: HandlerId);
/// Existing bridge destroyed.
///
/// When a bridge is dropped, the worker will be notified with this method.
fn disconnected(&mut self, scope: &WorkerScope<Self>, id: HandlerId) {
let _scope = scope;
let _id = id;
}
/// Destroys the current worker.
///
/// When all bridges are dropped, the method will be invoked.
///
/// This method is provided a destroy handle where when it is dropped, the worker is closed.
/// If the worker is closed immediately, then it can ignore the destroy handle.
/// Otherwise hold the destroy handle until the clean up task is finished.
///
/// # Note
///
/// This method will only be called after all bridges are disconnected.
/// Attempting to send messages after this method is called will have no effect.
fn destroy(&mut self, scope: &WorkerScope<Self>, destruct: WorkerDestroyHandle<Self>) {
let _scope = scope;
let _destruct = destruct;
}
}
impl<W> Spawnable for W
where
W: Worker + 'static,
{
type Spawner = WorkerSpawner<Self>;
fn spawner() -> WorkerSpawner<Self> {
WorkerSpawner::new()
}
}
impl<W> Registrable for W
where
W: Worker + 'static,
{
type Registrar = WorkerRegistrar<Self>;
fn registrar() -> WorkerRegistrar<Self> {
WorkerRegistrar::new()
}
}

View File

@ -92,7 +92,7 @@ pub struct NodeRef(Rc<RefCell<NodeRefInner>>);
impl PartialEq for NodeRef {
fn eq(&self, other: &Self) -> bool {
self.0.as_ptr() == other.0.as_ptr()
std::ptr::eq(self.0.as_ptr(), other.0.as_ptr())
}
}