* Make private bridges.

* Add worker agent.

* Add task type.

* Memorised Task.

* Add station.

* Add Station Agent.

* Subscription.

* Station hooks.

* Reactor Agents.

* Add more description.

* Restart station when closed.

* Remove Station restart.

* Send finish message to bridges.

* Adds a method to read whether a bridge has received finish message for a subscription.

* Update Reactor Agent.

* Decouple macros from it.

* Reactor Macro.

* Reactivate Task.

* Slightly adjust API.

* Add documentation for reactor agents.

* Remove Station.

* Create Task macro.

* Migrate Example.

* Simplify Task Agent Design.

* Implement Sink.

* Agent -> Task in Example.

* Switch to Registrable.

* AgentScopeExt.

* Finish AgentScopeExt.

* Prelude.

* Adjust prelude.

* Fix imports.

* Yew Agent.

* Switch to upstreamed version of gloo.

* Add stub reset.

* Remove example.

* Add Reset for Worker agents.

* merge fix-ci into "agent-v2"

* Switch to released version of gloo.

* Adds Runtime.

* A LocalRuntime.

* Add note.

* Add SSR benchmark.

* Only create default runtime if no custom runtime is set.

* Use jemalloc for benchmarking.

* Remove once_cell for web assembly.

* Add time.

* Fix wasm_bindgen.

* Adjust inlining.

* merge local-runtime into "agent-v2"

* Add reset.

* Simplify task agent.

* SSR for tasks.

* Optimise benchmark output.

* Optimise BufWriter.

* Add json output.

* Add Benchmark Workflow.

* merge local-runtime into "agent-v2"

* Makes Prepared States to be Rc'ed.

* Move example.

* Update example.

* Implement prepared state for memorised tasks.

* Make prepared states work on none runtime as well.

* Finished prepared output.

* Remove local set from tests.

* Fix Workflow syntax.

* Exclude benchmark from doc tests.

* Tidy up the code.

* Remove HashSet.

* Fix rustfmt.

* Some optimisation.

* Use postcard.

* Remove allocations.

* Weak Ref.

* Adjust feature flags.

* Adds a pinned channel implementation.

* Make Send bound explicit.

* Migrate to pinned channel.

* Implement on immutable reference.

* Rename agent channel method.

* Fix Sink close.

* Fix closing.

* Remove old platform.

* Migrate to new macro.

* Port Oneshot Agent.

* Migrate reactor to gloo-worker.

* Implement ScopeExt for Reactor.

* Remove unneeded checks.

* Update example note.

* Fix doc tests.

* Add an example for reactor agent.

* Rename Prime to PrimeReactor.

* Update Crate Information.

* Remove unused dependencies.

* Remove unused dependencies.

* Update documentation.

* Rename Bridge to Runner.

* Update documentation.

* Update documentation.

* Update State name.

* Merge outputs state for subscriptions.

* Update documentation.

* Fix doc link.

* Make code link code link.

* Make CODEC -> C.

* Update Debug Implementation to type_name.

* Fix readme.
This commit is contained in:
Kaede Hoshikawa 2023-09-18 17:30:02 +09:00 committed by GitHub
parent 73f4bb91cf
commit eec07583f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 2450 additions and 277 deletions

183
Cargo.lock generated
View File

@ -123,6 +123,15 @@ dependencies = [
"yew", "yew",
] ]
[[package]]
name = "atomic-polyfill"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3ff7eb3f316534d83a8a2c3d1674ace8a5a71198eba31e2e2b597833f699b28"
dependencies = [
"critical-section",
]
[[package]] [[package]]
name = "autocfg" name = "autocfg"
version = "1.1.0" version = "1.1.0"
@ -417,6 +426,12 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2da6da31387c7e4ef160ffab6d5e7f00c42626fe39aea70a7b0f1773f7dd6c1b" checksum = "2da6da31387c7e4ef160ffab6d5e7f00c42626fe39aea70a7b0f1773f7dd6c1b"
[[package]]
name = "cobs"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67ba02a97a2bd10f4b59b25c7973101c79642302776489e030cd13cdab09ed15"
[[package]] [[package]]
name = "colorchoice" name = "colorchoice"
version = "1.0.0" version = "1.0.0"
@ -531,6 +546,12 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "critical-section"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6548a0ad5d2549e111e1f6a11a6c2e2d00ce6a3dafe22948d67c2b443f775e52"
[[package]] [[package]]
name = "crypto-common" name = "crypto-common"
version = "0.1.6" version = "0.1.6"
@ -1147,23 +1168,6 @@ dependencies = [
"web-sys", "web-sys",
] ]
[[package]]
name = "gloo-worker"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09110b5555bcafe508cee0fb94308af9aac7a85f980d3c88b270d117c6c6911d"
dependencies = [
"anymap2",
"bincode",
"gloo-console",
"gloo-utils",
"js-sys",
"serde",
"slab",
"wasm-bindgen",
"web-sys",
]
[[package]] [[package]]
name = "gloo-worker" name = "gloo-worker"
version = "0.2.1" version = "0.2.1"
@ -1181,6 +1185,37 @@ dependencies = [
"web-sys", "web-sys",
] ]
[[package]]
name = "gloo-worker"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdec38f5350e6f71425895382d3f0e5e45ad78b69c9905f097a171b80c73112c"
dependencies = [
"bincode",
"futures 0.3.28",
"gloo-utils",
"gloo-worker-macros",
"js-sys",
"pinned",
"serde",
"thiserror",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
]
[[package]]
name = "gloo-worker-macros"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "956caa58d4857bc9941749d55e4bd3000032d8212762586fa5705632967140e7"
dependencies = [
"proc-macro-crate",
"proc-macro2",
"quote",
"syn 2.0.27",
]
[[package]] [[package]]
name = "h2" name = "h2"
version = "0.3.20" version = "0.3.20"
@ -1200,6 +1235,15 @@ dependencies = [
"tracing", "tracing",
] ]
[[package]]
name = "hash32"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b0c35f58762feb77d74ebe43bdbc3210f09be9fe6742234d573bacc26ed92b67"
dependencies = [
"byteorder",
]
[[package]] [[package]]
name = "hashbrown" name = "hashbrown"
version = "0.12.3" version = "0.12.3"
@ -1237,6 +1281,20 @@ dependencies = [
"http", "http",
] ]
[[package]]
name = "heapless"
version = "0.7.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db04bc24a18b9ea980628ecf00e6c0264f3c1426dac36c00cb49b6fbad8b0743"
dependencies = [
"atomic-polyfill",
"hash32",
"rustc_version",
"serde",
"spin",
"stable_deref_trait",
]
[[package]] [[package]]
name = "heck" name = "heck"
version = "0.4.1" version = "0.4.1"
@ -2004,6 +2062,17 @@ dependencies = [
"yew", "yew",
] ]
[[package]]
name = "postcard"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c9ee729232311d3cd113749948b689627618133b1c5012b77342c1950b25eaeb"
dependencies = [
"cobs",
"heapless",
"serde",
]
[[package]] [[package]]
name = "ppv-lite86" name = "ppv-lite86"
version = "0.2.17" version = "0.2.17"
@ -2020,6 +2089,22 @@ dependencies = [
"syn 2.0.27", "syn 2.0.27",
] ]
[[package]]
name = "primes"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68a61082d8bceecd71a3870e9162002bb75f7ba9c7aa8b76227e887782fef9c8"
[[package]]
name = "proc-macro-crate"
version = "1.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f4c021e1093a56626774e81216a4ce732a735e5bad4868a03f3ed65ca0c3919"
dependencies = [
"once_cell",
"toml_edit",
]
[[package]] [[package]]
name = "proc-macro-error" name = "proc-macro-error"
version = "1.0.4" version = "1.0.4"
@ -2452,6 +2537,9 @@ name = "spin"
version = "0.9.8" version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
dependencies = [
"lock_api",
]
[[package]] [[package]]
name = "ssr_router" name = "ssr_router"
@ -2473,6 +2561,12 @@ dependencies = [
"yew", "yew",
] ]
[[package]]
name = "stable_deref_trait"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
[[package]] [[package]]
name = "strsim" name = "strsim"
version = "0.10.0" version = "0.10.0"
@ -2759,6 +2853,23 @@ dependencies = [
"tracing", "tracing",
] ]
[[package]]
name = "toml_datetime"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7cda73e2f1397b1262d6dfdcef8aafae14d1de7748d66822d3bfeeb6d03e5e4b"
[[package]]
name = "toml_edit"
version = "0.19.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8123f27e969974a3dfba720fdb560be359f57b44302d280ba72e76a74480e8a"
dependencies = [
"indexmap 2.0.0",
"toml_datetime",
"winnow",
]
[[package]] [[package]]
name = "tower" name = "tower"
version = "0.4.13" version = "0.4.13"
@ -3397,6 +3508,15 @@ version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a"
[[package]]
name = "winnow"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acaaa1190073b2b101e15083c38ee8ec891b5e05cbee516521e94ec008f61e64"
dependencies = [
"memchr",
]
[[package]] [[package]]
name = "winreg" name = "winreg"
version = "0.10.1" version = "0.10.1"
@ -3438,9 +3558,24 @@ dependencies = [
name = "yew-agent" name = "yew-agent"
version = "0.2.0" version = "0.2.0"
dependencies = [ dependencies = [
"gloo-worker 0.1.2", "futures 0.3.28",
"gloo-worker 0.3.0",
"serde", "serde",
"wasm-bindgen",
"yew", "yew",
"yew-agent-macro",
]
[[package]]
name = "yew-agent-macro"
version = "0.1.0"
dependencies = [
"proc-macro2",
"quote",
"rustversion",
"syn 2.0.27",
"trybuild",
"yew-agent",
] ]
[[package]] [[package]]
@ -3494,6 +3629,7 @@ name = "yew-worker-fib"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"js-sys", "js-sys",
"postcard",
"serde", "serde",
"wasm-bindgen", "wasm-bindgen",
"web-sys", "web-sys",
@ -3501,6 +3637,17 @@ dependencies = [
"yew-agent", "yew-agent",
] ]
[[package]]
name = "yew-worker-prime"
version = "0.1.0"
dependencies = [
"futures 0.3.28",
"primes",
"serde",
"yew",
"yew-agent",
]
[[package]] [[package]]
name = "zxcvbn" name = "zxcvbn"
version = "2.2.2" version = "2.2.2"

View File

@ -57,7 +57,8 @@ As an example, check out the TodoMVC example here: <https://examples.yew.rs/todo
| [timer_functional](timer_functional) | [F] | Demonstrates the use of the interval and timeout services using function components | | [timer_functional](timer_functional) | [F] | Demonstrates the use of the interval and timeout services using function components |
| [todomvc](todomvc) | [S] | Implementation of [TodoMVC](http://todomvc.com/). | | [todomvc](todomvc) | [S] | Implementation of [TodoMVC](http://todomvc.com/). |
| [two_apps](two_apps) | [S] | Runs two separate Yew apps which can communicate with each other. | | [two_apps](two_apps) | [S] | Runs two separate Yew apps which can communicate with each other. |
| [web_worker_fib](web_worker_fib) | [S] | Calculate Fibonacci numbers in a web worker thread using [`gloo-worker`](https://docs.rs/gloo-worker/latest/gloo_worker/). | | [web_worker_fib](web_worker_fib) | [F] | Calculate Fibonacci numbers in a web worker thread using [`yew-agent`](https://docs.rs/yew-agent/latest/yew_agent/). |
| [web_worker_prime](web_worker_prime) | [F] | Calculate Prime numbers in a web worker thread using [`yew-agent`](https://docs.rs/yew-agent/latest/yew_agent/). |
| [webgl](webgl) | [S] | Controls a [WebGL canvas](https://developer.mozilla.org/en-US/docs/Web/API/WebGL_API/Tutorial/Getting_started_with_WebGL) from Yew. | | [webgl](webgl) | [S] | Controls a [WebGL canvas](https://developer.mozilla.org/en-US/docs/Web/API/WebGL_API/Tutorial/Getting_started_with_WebGL) from Yew. |
[CT]: ## "Component Type" [CT]: ## "Component Type"

View File

@ -10,4 +10,5 @@ yew-agent = { path = "../../packages/yew-agent" }
wasm-bindgen = "0.2" wasm-bindgen = "0.2"
js-sys = "0.3" js-sys = "0.3"
web-sys = { version = "0.3", features = [ "HtmlInputElement" ] } web-sys = { version = "0.3", features = [ "HtmlInputElement" ] }
serde = "1" serde = { version = "1", features = ["derive"] }
postcard = "1.0.0"

View File

@ -6,7 +6,7 @@ Calculate fibrillation value of a number in the worker thread, without blocking
## Concepts ## Concepts
The example illustrates how to use `gloo-worker` to send tasks to a worker thread in a Yew application. The example illustrates how to use `yew-agent` to send tasks to a worker thread in a Yew application.
## Thanks to ## Thanks to

View File

@ -1,13 +1,15 @@
<!doctype html> <!doctype html>
<html lang="en"> <html lang="en">
<head> <head>
<meta charset="utf-8"> <meta charset="utf-8">
<title>Yew • Web Worker Fibonacci</title> <title>Yew • Web Worker Fibonacci</title>
<link data-trunk rel="rust" href="Cargo.toml" data-bin="app" data-type="main" /> <link data-trunk rel="rust" href="Cargo.toml" data-bin="app" data-type="main" data-weak-refs />
<link data-trunk rel="rust" href="Cargo.toml" data-bin="worker" data-type="worker" /> <link data-trunk rel="rust" href="Cargo.toml" data-bin="worker" data-type="worker" data-weak-refs />
</head> </head>
<body>
</body>
<body>
</body>
</html> </html>

View File

@ -1,59 +1,39 @@
use js_sys::Uint8Array;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use yew_agent::{HandlerId, Public, WorkerLink}; use wasm_bindgen::JsValue;
use yew_agent::prelude::*;
use yew_agent::Codec;
pub struct Worker { /// Example to use a custom codec.
link: WorkerLink<Self>, pub struct Postcard;
}
#[derive(Serialize, Deserialize)] impl Codec for Postcard {
pub struct WorkerInput { fn encode<I>(input: I) -> JsValue
pub n: u32, where
} I: Serialize,
{
#[derive(Serialize, Deserialize)] let buf = postcard::to_vec::<_, 32>(&input).expect("can't serialize a worker message");
pub struct WorkerOutput { Uint8Array::from(buf.as_slice()).into()
pub value: u32,
}
impl yew_agent::Worker for Worker {
type Input = WorkerInput;
type Message = ();
type Output = WorkerOutput;
type Reach = Public<Self>;
fn create(link: WorkerLink<Self>) -> Self {
Self { link }
} }
fn update(&mut self, _msg: Self::Message) { fn decode<O>(input: JsValue) -> O
// no messaging where
O: for<'de> Deserialize<'de>,
{
let data = Uint8Array::from(input).to_vec();
postcard::from_bytes(&data).expect("can't deserialize a worker message")
} }
}
fn handle_input(&mut self, msg: Self::Input, id: HandlerId) { #[oneshot]
// this runs in a web worker pub async fn FibonacciTask(n: u32) -> u32 {
// and does not block the main fn fib(n: u32) -> u32 {
// browser thread! if n <= 1 {
1
let n = msg.n; } else {
fib(n - 1) + fib(n - 2)
fn fib(n: u32) -> u32 {
if n <= 1 {
1
} else {
fib(n - 1) + fib(n - 2)
}
} }
let output = Self::Output { value: fib(n) };
self.link.respond(id, output);
} }
fn name_of_resource() -> &'static str { fib(n)
"worker.js"
}
fn resource_path_is_relative() -> bool {
true
}
} }

View File

@ -1,6 +1,6 @@
use yew_agent::PublicWorker; use yew_agent::Registrable;
use yew_worker_fib::agent::Worker; use yew_worker_fib::agent::{FibonacciTask, Postcard};
fn main() { fn main() {
Worker::register(); FibonacciTask::registrar().encoding::<Postcard>().register();
} }

View File

@ -3,82 +3,78 @@
pub mod agent; pub mod agent;
use std::rc::Rc;
use web_sys::HtmlInputElement; use web_sys::HtmlInputElement;
use yew::platform::spawn_local;
use yew::prelude::*; use yew::prelude::*;
use yew_agent::{Bridge, Bridged}; use yew_agent::oneshot::{use_oneshot_runner, OneshotProvider};
use crate::agent::{Worker, WorkerInput, WorkerOutput}; use crate::agent::{FibonacciTask, Postcard};
pub struct App { #[function_component]
clicker_value: u32, fn Main() -> Html {
input_ref: NodeRef, let input_value = use_state_eq(|| 44);
worker: Box<dyn Bridge<Worker>>, let output = use_state(|| "Try out some fibonacci calculations!".to_string());
fibonacci_output: String, let fib_task = use_oneshot_runner::<FibonacciTask>();
}
pub enum Message { let clicker_value = use_state_eq(|| 0);
Click,
RunWorker,
WorkerMsg(WorkerOutput),
}
impl Component for App { let calculate = {
type Message = Message; let input_value = *input_value;
type Properties = (); let output = output.clone();
move |_e: MouseEvent| {
let fib_agent = fib_task.clone();
let output = output.clone();
fn create(ctx: &Context<Self>) -> Self { spawn_local(async move {
let cb = { // start the worker
let link = ctx.link().clone(); let output_value = fib_agent.run(input_value).await;
move |e| link.send_message(Self::Message::WorkerMsg(e))
};
let worker = Worker::bridge(Rc::new(cb));
Self { output.set(format!("Fibonacci value: {}", output_value));
clicker_value: 0, });
input_ref: NodeRef::default(),
worker,
fibonacci_output: String::from("Try out some fibonacci calculations!"),
} }
} };
fn update(&mut self, _ctx: &Context<Self>, msg: Self::Message) -> bool { let on_input_change = {
match msg { let input_value = input_value.clone();
Self::Message::Click => { move |e: InputEvent| {
self.clicker_value += 1; input_value.set(
} e.target_unchecked_into::<HtmlInputElement>()
Self::Message::RunWorker => { .value()
if let Some(input) = self.input_ref.cast::<HtmlInputElement>() { .parse()
// start the worker off! .expect("failed to parse"),
self.worker.send(WorkerInput { );
n: input.value_as_number() as u32,
});
}
}
Self::Message::WorkerMsg(output) => {
// the worker is done!
self.fibonacci_output = format!("Fibonacci value: {}", output.value);
}
} }
};
true let inc_clicker = {
} let clicker_value = clicker_value.clone();
fn view(&self, ctx: &Context<Self>) -> Html { move |_e: MouseEvent| {
html! { clicker_value.set(*clicker_value + 1);
<>
<h1>{ "Web worker demo" }</h1>
<p>{ "Submit a value to calculate, then increase the counter on the main thread!"} </p>
<p>{ "Large numbers will take some time!" }</p>
<h3>{ "Output: " } { &self.fibonacci_output }</h3>
<br />
<input ref={self.input_ref.clone()} type="number" value="44" max="50"/>
<button onclick={ctx.link().callback(|_| Message::RunWorker)}>{ "submit" }</button>
<br /> <br />
<h3>{ "Main thread value: " } { self.clicker_value }</h3>
<button onclick={ctx.link().callback(|_| Message::Click)}>{ "click!" }</button>
</>
} }
};
html! {
<>
<h1>{ "Web worker demo" }</h1>
<p>{ "Submit a value to calculate, then increase the counter on the main thread!"} </p>
<p>{ "Large numbers will take some time!" }</p>
<h3>{ "Output: " } { &*output }</h3>
<br />
<input type="number" value={input_value.to_string()} max="50" oninput={on_input_change} />
<button onclick={calculate}>{ "submit" }</button>
<br /> <br />
<h3>{ "Main thread value: " } { *clicker_value }</h3>
<button onclick={inc_clicker}>{ "click!" }</button>
</>
}
}
#[function_component]
pub fn App() -> Html {
html! {
<OneshotProvider<FibonacciTask, Postcard> path="/worker.js">
<Main />
</OneshotProvider<FibonacciTask, Postcard>>
} }
} }

View File

@ -0,0 +1,11 @@
[package]
name = "yew-worker-prime"
version = "0.1.0"
edition = "2021"
[dependencies]
yew-agent = { path = "../../packages/yew-agent" }
yew = { path = "../../packages/yew", features = ["csr"] }
futures = "0.3.25"
primes = "0.3.0"
serde = { version = "1.0.147", features = ["derive"] }

View File

@ -0,0 +1,17 @@
# Web Worker Prime
[![Demo](https://img.shields.io/website?label=demo&url=https%3A%2F%2Fexamples.yew.rs%2Fweb_worker_prime)](https://examples.yew.rs/web_worker_prime)
Calculate primes until stop button is pressed, without blocking the main thread.
## Concepts
The example illustrates how to use reactor agents to offload CPU bound tasks to a worker thread in a Yew application.
## Running
Run this application with the trunk development server:
```bash
trunk serve --open
```

View File

@ -0,0 +1,15 @@
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Yew • Web Worker Prime</title>
<link data-trunk rel="rust" href="Cargo.toml" data-bin="app" data-type="main" data-weak-refs />
<link data-trunk rel="rust" href="Cargo.toml" data-bin="worker" data-type="worker" data-weak-refs />
</head>
<body>
</body>
</html>

View File

@ -0,0 +1,38 @@
use std::time::Duration;
use futures::sink::SinkExt;
use futures::{FutureExt, StreamExt};
use serde::{Deserialize, Serialize};
use yew::platform::time::sleep;
use yew_agent::prelude::*;
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum ControlSignal {
Start,
Stop,
}
#[reactor]
pub async fn PrimeReactor(mut scope: ReactorScope<ControlSignal, u64>) {
while let Some(m) = scope.next().await {
if m == ControlSignal::Start {
'inner: for i in 1.. {
// This is not the most efficient way to calculate prime,
// but this example is here to demonstrate how primes can be
// sent to the application in an ascending order.
if primes::is_prime(i) {
scope.send(i).await.unwrap();
}
futures::select! {
m = scope.next() => {
if m == Some(ControlSignal::Stop) {
break 'inner;
}
},
_ = sleep(Duration::from_millis(100)).fuse() => {},
}
}
}
}
}

View File

@ -0,0 +1,3 @@
fn main() {
yew::Renderer::<yew_worker_prime::App>::new().render();
}

View File

@ -0,0 +1,6 @@
use yew_agent::Registrable;
use yew_worker_prime::agent::PrimeReactor;
fn main() {
PrimeReactor::registrar().register();
}

View File

@ -0,0 +1,64 @@
pub mod agent;
use agent::{ControlSignal, PrimeReactor};
use yew::prelude::*;
use yew_agent::reactor::{use_reactor_subscription, ReactorProvider};
#[function_component]
fn Main() -> Html {
let prime_sub = use_reactor_subscription::<PrimeReactor>();
let started = use_state_eq(|| false);
let skip_len = use_state_eq(|| 0);
let result_s = prime_sub
.iter()
// Skip results in previous runs.
.skip(*skip_len)
.fold("".to_string(), |mut output, item| {
if !output.is_empty() {
output.push_str(", ");
}
output.push_str(&item.to_string());
output
});
let start_prime_calc = use_callback(
(prime_sub.clone(), started.setter(), skip_len.setter()),
|_input, (prime_sub, started_setter, skip_len)| {
skip_len.set(prime_sub.len());
prime_sub.send(ControlSignal::Start);
started_setter.set(true);
},
);
let stop_prime_calc = use_callback(
(prime_sub, started.setter()),
|_input, (prime_sub, started_setter)| {
prime_sub.send(ControlSignal::Stop);
started_setter.set(false);
},
);
html! {
<>
<h1>{"Find Prime"}</h1>
<p>{"This page demonstrates how to calculate prime in a web worker."}</p>
if *started {
<button onclick={stop_prime_calc}>{"Stop"}</button>
} else {
<button onclick={start_prime_calc}>{"Start"}</button>
}
<div id="result">{result_s}</div>
</>
}
}
#[function_component]
pub fn App() -> Html {
html! {
<ReactorProvider<PrimeReactor> path="/worker.js">
<Main />
</ReactorProvider<PrimeReactor>>
}
}

View File

@ -0,0 +1,25 @@
[package]
name = "yew-agent-macro"
version = "0.1.0"
edition = "2021"
rust-version = "1.64.0"
authors = ["Kaede Hoshikawa <futursolo@icloud.com>"]
repository = "https://github.com/yewstack/yew"
homepage = "https://yew.rs"
documentation = "https://docs.rs/yew/"
readme = "../../README.md"
description = "Macro Support for Yew Agents"
license = "MIT OR Apache-2.0"
[lib]
proc-macro = true
[dependencies]
proc-macro2 = "1"
quote = "1"
syn = { version = "2", features = ["full", "extra-traits"] }
[dev-dependencies]
rustversion = "1"
trybuild = "1"
yew-agent = { path = "../yew-agent" }

View File

@ -0,0 +1,241 @@
use proc_macro2::{Span, TokenStream};
use quote::ToTokens;
use syn::parse::{Parse, ParseStream};
use syn::punctuated::Punctuated;
use syn::token::Comma;
use syn::{Attribute, FnArg, Generics, Ident, Item, ItemFn, Signature, Type, Visibility};
pub trait AgentFnType {
type RecvType;
type OutputType;
fn attr_name() -> &'static str;
fn agent_type_name() -> &'static str;
fn parse_recv_type(sig: &Signature) -> syn::Result<Self::RecvType>;
fn parse_output_type(sig: &Signature) -> syn::Result<Self::OutputType>;
fn extract_fn_arg_type(arg: &FnArg) -> syn::Result<Type> {
let ty = match arg {
FnArg::Typed(arg) => arg.ty.clone(),
FnArg::Receiver(_) => {
return Err(syn::Error::new_spanned(
arg,
format!("{} agents can't accept a receiver", Self::agent_type_name()),
));
}
};
Ok(*ty)
}
fn assert_no_left_argument<I, T>(rest_inputs: I, expected_len: usize) -> syn::Result<()>
where
I: ExactSizeIterator + IntoIterator<Item = T>,
T: ToTokens,
{
// Checking after param parsing may make it a little inefficient
// but that's a requirement for better error messages in case of receivers
// `>0` because first one is already consumed.
if rest_inputs.len() > 0 {
let params: TokenStream = rest_inputs
.into_iter()
.map(|it| it.to_token_stream())
.collect();
return Err(syn::Error::new_spanned(
params,
format!(
"{} agent can accept at most {} argument{}",
Self::agent_type_name(),
expected_len,
if expected_len > 1 { "s" } else { "" }
),
));
}
Ok(())
}
}
#[derive(Clone)]
pub struct AgentFn<F>
where
F: AgentFnType + 'static,
{
pub recv_type: F::RecvType,
pub output_type: F::OutputType,
pub generics: Generics,
pub vis: Visibility,
pub attrs: Vec<Attribute>,
pub name: Ident,
pub agent_name: Option<Ident>,
pub is_async: bool,
pub func: ItemFn,
}
impl<F> Parse for AgentFn<F>
where
F: AgentFnType + 'static,
{
fn parse(input: ParseStream) -> syn::Result<Self> {
let parsed: Item = input.parse()?;
let func = match parsed {
Item::Fn(m) => m,
item => {
return Err(syn::Error::new_spanned(
item,
format!(
"`{}` attribute can only be applied to functions",
F::attr_name()
),
))
}
};
let ItemFn {
attrs, vis, sig, ..
} = func.clone();
if sig.generics.lifetimes().next().is_some() {
return Err(syn::Error::new_spanned(
sig.generics,
format!(
"{} agents can't have generic lifetime parameters",
F::agent_type_name()
),
));
}
if sig.constness.is_some() {
return Err(syn::Error::new_spanned(
sig.constness,
format!("const functions can't be {} agents", F::agent_type_name()),
));
}
if sig.abi.is_some() {
return Err(syn::Error::new_spanned(
sig.abi,
format!("extern functions can't be {} agents", F::agent_type_name()),
));
}
let recv_type = F::parse_recv_type(&sig)?;
let output_type = F::parse_output_type(&sig)?;
let is_async = sig.asyncness.is_some();
Ok(Self {
recv_type,
output_type,
generics: sig.generics,
is_async,
vis,
attrs,
name: sig.ident,
agent_name: None,
func,
})
}
}
impl<F> AgentFn<F>
where
F: AgentFnType + 'static,
{
/// Filters attributes that should be copied to agent definition.
pub fn filter_attrs_for_agent_struct(&self) -> Vec<Attribute> {
self.attrs
.iter()
.filter_map(|m| {
m.path()
.get_ident()
.and_then(|ident| match ident.to_string().as_str() {
"doc" | "allow" => Some(m.clone()),
_ => None,
})
})
.collect()
}
/// Filters attributes that should be copied to the agent impl block.
pub fn filter_attrs_for_agent_impl(&self) -> Vec<Attribute> {
self.attrs
.iter()
.filter_map(|m| {
m.path()
.get_ident()
.and_then(|ident| match ident.to_string().as_str() {
"allow" => Some(m.clone()),
_ => None,
})
})
.collect()
}
pub fn phantom_generics(&self) -> Punctuated<Ident, Comma> {
self.generics
.type_params()
.map(|ty_param| ty_param.ident.clone()) // create a new Punctuated sequence without any type bounds
.collect::<Punctuated<_, Comma>>()
}
pub fn merge_agent_name(&mut self, name: AgentName) -> syn::Result<()> {
if let Some(ref m) = name.agent_name {
if m == &self.name {
return Err(syn::Error::new_spanned(
m,
format!(
"the {} must not have the same name as the function",
F::agent_type_name()
),
));
}
}
self.agent_name = name.agent_name;
Ok(())
}
pub fn inner_fn_ident(&self) -> Ident {
if self.agent_name.is_some() {
self.name.clone()
} else {
Ident::new("inner", Span::mixed_site())
}
}
pub fn agent_name(&self) -> Ident {
self.agent_name.clone().unwrap_or_else(|| self.name.clone())
}
pub fn print_inner_fn(&self) -> ItemFn {
let mut func = self.func.clone();
func.sig.ident = self.inner_fn_ident();
func.vis = Visibility::Inherited;
func
}
}
pub struct AgentName {
agent_name: Option<Ident>,
}
impl Parse for AgentName {
fn parse(input: ParseStream) -> syn::Result<Self> {
if input.is_empty() {
return Ok(Self { agent_name: None });
}
let agent_name = input.parse()?;
Ok(Self {
agent_name: Some(agent_name),
})
}
}

View File

@ -0,0 +1,30 @@
use proc_macro::TokenStream;
use syn::parse_macro_input;
mod agent_fn;
mod oneshot;
mod reactor;
use agent_fn::{AgentFn, AgentName};
use oneshot::{oneshot_impl, OneshotFn};
use reactor::{reactor_impl, ReactorFn};
#[proc_macro_attribute]
pub fn reactor(attr: TokenStream, item: TokenStream) -> TokenStream {
let item = parse_macro_input!(item as AgentFn<ReactorFn>);
let attr = parse_macro_input!(attr as AgentName);
reactor_impl(attr, item)
.unwrap_or_else(|err| err.to_compile_error())
.into()
}
#[proc_macro_attribute]
pub fn oneshot(attr: TokenStream, item: TokenStream) -> TokenStream {
let item = parse_macro_input!(item as AgentFn<OneshotFn>);
let attr = parse_macro_input!(attr as AgentName);
oneshot_impl(attr, item)
.unwrap_or_else(|err| err.to_compile_error())
.into()
}

View File

@ -0,0 +1,130 @@
use proc_macro2::{Span, TokenStream};
use quote::quote;
use syn::{parse_quote, Ident, ReturnType, Signature, Type};
use crate::agent_fn::{AgentFn, AgentFnType, AgentName};
pub struct OneshotFn {}
impl AgentFnType for OneshotFn {
type OutputType = Type;
type RecvType = Type;
fn attr_name() -> &'static str {
"oneshot"
}
fn agent_type_name() -> &'static str {
"oneshot"
}
fn parse_recv_type(sig: &Signature) -> syn::Result<Self::RecvType> {
let mut inputs = sig.inputs.iter();
let arg = inputs
.next()
.ok_or_else(|| syn::Error::new_spanned(&sig.ident, "expected 1 argument"))?;
let ty = Self::extract_fn_arg_type(arg)?;
Self::assert_no_left_argument(inputs, 1)?;
Ok(ty)
}
fn parse_output_type(sig: &Signature) -> syn::Result<Self::OutputType> {
let ty = match &sig.output {
ReturnType::Default => {
parse_quote! { () }
}
ReturnType::Type(_, ty) => *ty.clone(),
};
Ok(ty)
}
}
pub fn oneshot_impl(name: AgentName, mut agent_fn: AgentFn<OneshotFn>) -> syn::Result<TokenStream> {
agent_fn.merge_agent_name(name)?;
let struct_attrs = agent_fn.filter_attrs_for_agent_struct();
let oneshot_impl_attrs = agent_fn.filter_attrs_for_agent_impl();
let phantom_generics = agent_fn.phantom_generics();
let oneshot_name = agent_fn.agent_name();
let fn_name = agent_fn.inner_fn_ident();
let inner_fn = agent_fn.print_inner_fn();
let AgentFn {
recv_type: input_type,
generics,
output_type,
vis,
is_async,
..
} = agent_fn;
let (impl_generics, ty_generics, where_clause) = generics.split_for_impl();
let fn_generics = ty_generics.as_turbofish();
let in_ident = Ident::new("_input", Span::mixed_site());
let fn_call = if is_async {
quote! { #fn_name #fn_generics (#in_ident).await }
} else {
quote! { #fn_name #fn_generics (#in_ident) }
};
let crate_name = quote! { yew_agent };
let quoted = quote! {
#(#struct_attrs)*
#[allow(unused_parens)]
#vis struct #oneshot_name #generics #where_clause {
inner: ::std::pin::Pin<::std::boxed::Box<dyn ::std::future::Future<Output = #output_type>>>,
_marker: ::std::marker::PhantomData<(#phantom_generics)>,
}
// we cannot disable any lints here because it will be applied to the function body
// as well.
#(#oneshot_impl_attrs)*
impl #impl_generics ::#crate_name::oneshot::Oneshot for #oneshot_name #ty_generics #where_clause {
type Input = #input_type;
fn create(#in_ident: Self::Input) -> Self {
#inner_fn
Self {
inner: ::std::boxed::Box::pin(
async move {
#fn_call
}
),
_marker: ::std::marker::PhantomData,
}
}
}
impl #impl_generics ::std::future::Future for #oneshot_name #ty_generics #where_clause {
type Output = #output_type;
fn poll(mut self: ::std::pin::Pin<&mut Self>, cx: &mut ::std::task::Context<'_>) -> ::std::task::Poll<Self::Output> {
::std::future::Future::poll(::std::pin::Pin::new(&mut self.inner), cx)
}
}
impl #impl_generics ::#crate_name::Registrable for #oneshot_name #ty_generics #where_clause {
type Registrar = ::#crate_name::oneshot::OneshotRegistrar<Self>;
fn registrar() -> Self::Registrar {
::#crate_name::oneshot::OneshotRegistrar::<Self>::new()
}
}
impl #impl_generics ::#crate_name::Spawnable for #oneshot_name #ty_generics #where_clause {
type Spawner = ::#crate_name::oneshot::OneshotSpawner<Self>;
fn spawner() -> Self::Spawner {
::#crate_name::oneshot::OneshotSpawner::<Self>::new()
}
}
};
Ok(quoted)
}

View File

@ -0,0 +1,135 @@
use proc_macro2::{Span, TokenStream};
use quote::quote;
use syn::{Ident, ReturnType, Signature, Type};
use crate::agent_fn::{AgentFn, AgentFnType, AgentName};
pub struct ReactorFn {}
impl AgentFnType for ReactorFn {
type OutputType = ();
type RecvType = Type;
fn attr_name() -> &'static str {
"reactor"
}
fn agent_type_name() -> &'static str {
"reactor"
}
fn parse_recv_type(sig: &Signature) -> syn::Result<Self::RecvType> {
let mut inputs = sig.inputs.iter();
let arg = inputs
.next()
.ok_or_else(|| syn::Error::new_spanned(&sig.ident, "expected 1 argument"))?;
let ty = Self::extract_fn_arg_type(arg)?;
Self::assert_no_left_argument(inputs, 1)?;
Ok(ty)
}
fn parse_output_type(sig: &Signature) -> syn::Result<Self::OutputType> {
match &sig.output {
ReturnType::Default => {}
ReturnType::Type(_, ty) => {
return Err(syn::Error::new_spanned(
ty,
"reactor agents cannot return any value",
))
}
}
Ok(())
}
}
pub fn reactor_impl(name: AgentName, mut agent_fn: AgentFn<ReactorFn>) -> syn::Result<TokenStream> {
agent_fn.merge_agent_name(name)?;
if !agent_fn.is_async {
return Err(syn::Error::new_spanned(
&agent_fn.name,
"reactor agents must be asynchronous",
));
}
let struct_attrs = agent_fn.filter_attrs_for_agent_struct();
let reactor_impl_attrs = agent_fn.filter_attrs_for_agent_impl();
let phantom_generics = agent_fn.phantom_generics();
let reactor_name = agent_fn.agent_name();
let fn_name = agent_fn.inner_fn_ident();
let inner_fn = agent_fn.print_inner_fn();
let AgentFn {
recv_type,
generics,
vis,
..
} = agent_fn;
let (impl_generics, ty_generics, where_clause) = generics.split_for_impl();
let fn_generics = ty_generics.as_turbofish();
let scope_ident = Ident::new("_scope", Span::mixed_site());
let fn_call = quote! { #fn_name #fn_generics (#scope_ident).await };
let crate_name = quote! { yew_agent };
let quoted = quote! {
#(#struct_attrs)*
#[allow(unused_parens)]
#vis struct #reactor_name #generics #where_clause {
inner: ::std::pin::Pin<::std::boxed::Box<dyn ::std::future::Future<Output = ()>>>,
_marker: ::std::marker::PhantomData<(#phantom_generics)>,
}
// we cannot disable any lints here because it will be applied to the function body
// as well.
#(#reactor_impl_attrs)*
impl #impl_generics ::#crate_name::reactor::Reactor for #reactor_name #ty_generics #where_clause {
type Scope = #recv_type;
fn create(#scope_ident: Self::Scope) -> Self {
#inner_fn
Self {
inner: ::std::boxed::Box::pin(
async move {
#fn_call
}
),
_marker: ::std::marker::PhantomData,
}
}
}
impl #impl_generics ::std::future::Future for #reactor_name #ty_generics #where_clause {
type Output = ();
fn poll(mut self: ::std::pin::Pin<&mut Self>, cx: &mut ::std::task::Context<'_>) -> ::std::task::Poll<Self::Output> {
::std::future::Future::poll(::std::pin::Pin::new(&mut self.inner), cx)
}
}
impl #impl_generics ::#crate_name::Registrable for #reactor_name #ty_generics #where_clause {
type Registrar = ::#crate_name::reactor::ReactorRegistrar<Self>;
fn registrar() -> Self::Registrar {
::#crate_name::reactor::ReactorRegistrar::<Self>::new()
}
}
impl #impl_generics ::#crate_name::Spawnable for #reactor_name #ty_generics #where_clause {
type Spawner = ::#crate_name::reactor::ReactorSpawner<Self>;
fn spawner() -> Self::Spawner {
::#crate_name::reactor::ReactorSpawner::<Self>::new()
}
}
};
Ok(quoted)
}

View File

@ -9,10 +9,15 @@ edition = "2021"
readme = "../../README.md" readme = "../../README.md"
description = "Agents for Yew" description = "Agents for Yew"
license = "MIT OR Apache-2.0" license = "MIT OR Apache-2.0"
rust-version = "1.64.0"
[dependencies] [dependencies]
yew = { version = "0.20.0", path = "../yew" } yew = { version = "0.20.0", path = "../yew" }
gloo-worker = "0.1" gloo-worker = { version = "0.3", features = ["futures"] }
wasm-bindgen = "0.2"
serde = { version = "1", features = ["derive"] }
futures = "0.3"
yew-agent-macro = { version = "0.1", path = "../yew-agent-macro" }
[dev-dependencies] [dev-dependencies]
serde = "1.0.164" serde = "1.0.164"

View File

@ -1,132 +0,0 @@
use std::cell::RefCell;
use std::rc::Rc;
use yew::prelude::*;
use crate::*;
/// State handle for [`use_bridge`] hook
pub struct UseBridgeHandle<T>
where
T: Bridged,
{
inner: Rc<RefCell<Box<dyn Bridge<T>>>>,
}
impl<T> UseBridgeHandle<T>
where
T: Bridged,
{
/// Send a message to an worker.
pub fn send(&self, msg: T::Input) {
let mut bridge = self.inner.borrow_mut();
bridge.send(msg);
}
}
/// A hook to bridge to an [`Worker`].
///
/// This hooks will only bridge the worker once over the entire component lifecycle.
///
/// Takes a callback as the only argument. The callback will be updated on every render to make
/// sure captured values (if any) are up to date.
///
/// # Examples
///
/// ```
/// # mod example {
/// use serde::{Deserialize, Serialize};
/// use yew::prelude::*;
/// use yew_agent::{use_bridge, UseBridgeHandle};
///
/// // This would usually live in the same file as your worker
/// #[derive(Serialize, Deserialize)]
/// pub enum WorkerResponseType {
/// IncrementCounter,
/// }
/// # mod my_worker_mod {
/// # use yew_agent::{HandlerId, Public, WorkerLink};
/// # use super::WorkerResponseType;
/// # pub struct MyWorker {
/// # pub link: WorkerLink<Self>,
/// # }
///
/// # impl yew_agent::Worker for MyWorker {
/// # type Input = ();
/// # type Output = WorkerResponseType;
/// # type Reach = Public<Self>;
/// # type Message = ();
/// #
/// # fn create(link: WorkerLink<Self>) -> Self {
/// # MyWorker { link }
/// # }
/// #
/// # fn update(&mut self, _msg: Self::Message) {
/// # // do nothing
/// # }
/// #
/// # fn handle_input(&mut self, _msg: Self::Input, id: HandlerId) {
/// # self.link.respond(id, WorkerResponseType::IncrementCounter);
/// # }
/// # }
/// # }
/// use my_worker_mod::MyWorker; // note that <MyWorker as yew_agent::Worker>::Output == WorkerResponseType
/// #[function_component(UseBridge)]
/// fn bridge() -> Html {
/// let counter = use_state(|| 0);
///
/// // a scoped block to clone the state in
/// {
/// let counter = counter.clone();
/// // response will be of type MyWorker::Output, i.e. WorkerResponseType
/// let bridge: UseBridgeHandle<MyWorker> = use_bridge(move |response| match response {
/// WorkerResponseType::IncrementCounter => {
/// counter.set(*counter + 1);
/// }
/// });
/// }
///
/// html! {
/// <div>
/// {*counter}
/// </div>
/// }
/// }
/// # }
/// ```
#[hook]
pub fn use_bridge<T, F>(on_output: F) -> UseBridgeHandle<T>
where
T: Bridged,
F: Fn(T::Output) + 'static,
{
let on_output = Rc::new(on_output);
let on_output_clone = on_output.clone();
let on_output_ref = use_mut_ref(move || on_output_clone);
// Refresh the callback on every render.
{
let mut on_output_ref = on_output_ref.borrow_mut();
*on_output_ref = on_output;
}
let bridge = use_mut_ref(move || {
T::bridge({
Rc::new(move |output| {
let on_output = on_output_ref.borrow().clone();
on_output(output);
})
})
});
UseBridgeHandle { inner: bridge }
}
impl<T: Worker> Clone for UseBridgeHandle<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}

View File

@ -1,7 +1,112 @@
//! This module contains Yew's web worker implementation. //! This module contains Yew's web worker implementation.
//!
//! ## Types
//!
//! There're a couple kinds of agents:
//!
//! #### Oneshot
//!
//! A kind of agent that for each input, a single output is returned.
//!
//! #### Reactor
//!
//! A kind of agent that can send many inputs and receive many outputs over a single bridge.
//!
//! #### Worker
//!
//! The low-level implementation of agents that provides an actor model and communicates with
//! multiple bridges.
//!
//! ## Reachability
//!
//! When an agent is spawned, each agent is associated with a reachability.
//!
//! #### Private
//!
//! Each time a bridge is created, a new instance
//! of agent is spawned. This allows parallel computing between agents.
//!
//! #### Public
//!
//! Public agents are shared among all children of a provider.
//! Only 1 instance will be spawned for each public agents provider.
//!
//! ### Provider
//!
//! Each Agent requires a provider to provide communications and maintain bridges.
//! All hooks must be called within a provider.
//!
//! ## Communications with Agents
//!
//! Hooks provides means to communicate with agent instances.
//!
//! #### Bridge
//!
//! See: [`use_worker_bridge`](worker::use_worker_bridge),
//! [`use_reactor_bridge`](reactor::use_reactor_bridge)
//!
//! A bridge takes a callback to receive outputs from agents
//! and provides a handle to send inputs to agents.
//!
//! #### Subscription
//!
//! See: [`use_worker_subscription`](worker::use_worker_subscription),
//! [`use_reactor_subscription`](reactor::use_reactor_subscription)
//!
//! Similar to bridges, a subscription produces a handle to send inputs to agents. However, instead
//! of notifying the receiver with a callback, it collect all outputs into a slice.
//!
//! #### Runner
//!
//! See: [`use_oneshot_runner`](oneshot::use_oneshot_runner)
//!
//! Unlike other agents, oneshot bridges provide a `use_oneshot_runner` hook to execute oneshot
//! agents on demand.
mod hooks; #![deny(
clippy::all,
missing_docs,
missing_debug_implementations,
bare_trait_objects,
anonymous_parameters,
elided_lifetimes_in_paths
)]
extern crate self as yew_agent;
pub mod oneshot;
pub mod reactor;
pub mod worker;
#[doc(inline)] #[doc(inline)]
pub use gloo_worker::*; pub use gloo_worker::{Bincode, Codec, Registrable, Spawnable};
pub use hooks::{use_bridge, UseBridgeHandle};
mod reach;
pub mod scope_ext;
pub use reach::Reach;
mod utils;
#[doc(hidden)]
pub mod __vendored {
pub use futures;
}
pub mod prelude {
//! Prelude module to be imported when working with `yew-agent`.
//!
//! This module re-exports the frequently used types from the crate.
pub use crate::oneshot::{oneshot, use_oneshot_runner, UseOneshotRunnerHandle};
pub use crate::reach::Reach;
pub use crate::reactor::{
reactor, use_reactor_bridge, use_reactor_subscription, ReactorEvent, ReactorScope,
UseReactorBridgeHandle, UseReactorSubscriptionHandle,
};
pub use crate::scope_ext::{AgentScopeExt, ReactorBridgeHandle, WorkerBridgeHandle};
pub use crate::worker::{
use_worker_bridge, use_worker_subscription, UseWorkerBridgeHandle,
UseWorkerSubscriptionHandle, WorkerScope,
};
pub use crate::{Registrable, Spawnable};
}

View File

@ -0,0 +1,54 @@
use yew::prelude::*;
use super::provider::OneshotProviderState;
use super::Oneshot;
/// Hook handle for [`use_oneshot_runner`]
#[derive(Debug)]
pub struct UseOneshotRunnerHandle<T>
where
T: Oneshot + 'static,
{
state: OneshotProviderState<T>,
}
impl<T> UseOneshotRunnerHandle<T>
where
T: Oneshot + 'static,
{
/// Runs an oneshot agent.
pub async fn run(&self, input: T::Input) -> T::Output {
self.state.create_bridge().run(input).await
}
}
impl<T> Clone for UseOneshotRunnerHandle<T>
where
T: Oneshot + 'static,
{
fn clone(&self) -> Self {
Self {
state: self.state.clone(),
}
}
}
impl<T> PartialEq for UseOneshotRunnerHandle<T>
where
T: Oneshot,
{
fn eq(&self, rhs: &Self) -> bool {
self.state == rhs.state
}
}
/// A hook to create a runner to an oneshot agent.
#[hook]
pub fn use_oneshot_runner<T>() -> UseOneshotRunnerHandle<T>
where
T: Oneshot + 'static,
{
let state = use_context::<OneshotProviderState<T>>().expect("failed to find worker context");
UseOneshotRunnerHandle { state }
}

View File

@ -0,0 +1,12 @@
//! This module provides task agent implementation.
mod hooks;
mod provider;
#[doc(inline)]
pub use gloo_worker::oneshot::{Oneshot, OneshotBridge, OneshotRegistrar, OneshotSpawner};
pub use hooks::{use_oneshot_runner, UseOneshotRunnerHandle};
pub use provider::OneshotProvider;
pub(crate) use provider::OneshotProviderState;
/// A procedural macro to create oneshot agents.
pub use yew_agent_macro::oneshot;

View File

@ -0,0 +1,130 @@
use core::fmt;
use std::any::type_name;
use std::cell::RefCell;
use std::rc::Rc;
use serde::{Deserialize, Serialize};
use yew::prelude::*;
use super::{Oneshot, OneshotBridge, OneshotSpawner};
use crate::utils::get_next_id;
use crate::worker::WorkerProviderProps;
use crate::{Bincode, Codec, Reach};
pub(crate) struct OneshotProviderState<T>
where
T: Oneshot + 'static,
{
id: usize,
spawn_bridge_fn: Rc<dyn Fn() -> OneshotBridge<T>>,
reach: Reach,
held_bridge: Rc<RefCell<Option<OneshotBridge<T>>>>,
}
impl<T> fmt::Debug for OneshotProviderState<T>
where
T: Oneshot,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(type_name::<Self>())
}
}
impl<T> OneshotProviderState<T>
where
T: Oneshot,
{
fn get_held_bridge(&self) -> OneshotBridge<T> {
let mut held_bridge = self.held_bridge.borrow_mut();
match held_bridge.as_mut() {
Some(m) => m.fork(),
None => {
let bridge = (self.spawn_bridge_fn)();
*held_bridge = Some(bridge.fork());
bridge
}
}
}
/// Creates a bridge, uses "fork" for public agents.
pub fn create_bridge(&self) -> OneshotBridge<T> {
match self.reach {
Reach::Public => {
let held_bridge = self.get_held_bridge();
held_bridge.fork()
}
Reach::Private => (self.spawn_bridge_fn)(),
}
}
}
impl<T> Clone for OneshotProviderState<T>
where
T: Oneshot,
{
fn clone(&self) -> Self {
Self {
id: self.id,
spawn_bridge_fn: self.spawn_bridge_fn.clone(),
reach: self.reach,
held_bridge: self.held_bridge.clone(),
}
}
}
impl<T> PartialEq for OneshotProviderState<T>
where
T: Oneshot,
{
fn eq(&self, rhs: &Self) -> bool {
self.id == rhs.id
}
}
/// The Oneshot Agent Provider.
///
/// This component provides its children access to an oneshot agent.
#[function_component]
pub fn OneshotProvider<T, C = Bincode>(props: &WorkerProviderProps) -> Html
where
T: Oneshot + 'static,
T::Input: Serialize + for<'de> Deserialize<'de> + 'static,
T::Output: Serialize + for<'de> Deserialize<'de> + 'static,
C: Codec + 'static,
{
let WorkerProviderProps {
children,
path,
lazy,
reach,
} = props.clone();
// Creates a spawning function so Codec is can be erased from contexts.
let spawn_bridge_fn: Rc<dyn Fn() -> OneshotBridge<T>> = {
let path = path.clone();
Rc::new(move || OneshotSpawner::<T>::new().encoding::<C>().spawn(&path))
};
let state = {
use_memo((path, lazy, reach), move |(_path, lazy, reach)| {
let state = OneshotProviderState::<T> {
id: get_next_id(),
spawn_bridge_fn,
reach: *reach,
held_bridge: Rc::default(),
};
if *reach == Reach::Public && !*lazy {
state.get_held_bridge();
}
state
})
};
html! {
<ContextProvider<OneshotProviderState<T>> context={(*state).clone()}>
{children}
</ContextProvider<OneshotProviderState<T>>>
}
}

View File

@ -0,0 +1,8 @@
/// The reachability of an agent.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Copy)]
pub enum Reach {
/// Public Reachability.
Public,
/// Private Reachability.
Private,
}

View File

@ -0,0 +1,280 @@
use std::any::type_name;
use std::fmt;
use std::ops::Deref;
use std::rc::Rc;
use futures::sink::SinkExt;
use futures::stream::{SplitSink, StreamExt};
use wasm_bindgen::UnwrapThrowExt;
use yew::platform::pinned::RwLock;
use yew::platform::spawn_local;
use yew::prelude::*;
use super::provider::ReactorProviderState;
use super::{Reactor, ReactorBridge, ReactorScoped};
use crate::utils::{BridgeIdState, OutputsAction, OutputsState};
type ReactorTx<R> =
Rc<RwLock<SplitSink<ReactorBridge<R>, <<R as Reactor>::Scope as ReactorScoped>::Input>>>;
/// A type that represents events from a reactor.
pub enum ReactorEvent<R>
where
R: Reactor,
{
/// The reactor agent has sent an output.
Output(<R::Scope as ReactorScoped>::Output),
/// The reactor agent has exited.
Finished,
}
impl<R> fmt::Debug for ReactorEvent<R>
where
R: Reactor,
<R::Scope as ReactorScoped>::Output: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Output(m) => f.debug_tuple("ReactorEvent::Output").field(&m).finish(),
Self::Finished => f.debug_tuple("ReactorEvent::Finished").finish(),
}
}
}
/// Hook handle for the [`use_reactor_bridge`] hook.
pub struct UseReactorBridgeHandle<R>
where
R: 'static + Reactor,
{
tx: ReactorTx<R>,
ctr: UseReducerDispatcher<BridgeIdState>,
}
impl<R> fmt::Debug for UseReactorBridgeHandle<R>
where
R: 'static + Reactor,
<R::Scope as ReactorScoped>::Input: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct(type_name::<Self>())
.field("inner", &self.tx)
.finish()
}
}
impl<R> Clone for UseReactorBridgeHandle<R>
where
R: 'static + Reactor,
{
fn clone(&self) -> Self {
Self {
tx: self.tx.clone(),
ctr: self.ctr.clone(),
}
}
}
impl<R> UseReactorBridgeHandle<R>
where
R: 'static + Reactor,
{
/// Send an input to a reactor agent.
pub fn send(&self, msg: <R::Scope as ReactorScoped>::Input) {
let tx = self.tx.clone();
spawn_local(async move {
let mut tx = tx.write().await;
let _ = tx.send(msg).await;
});
}
/// Reset the bridge.
///
/// Disconnect the old bridge and re-connects the agent with a new bridge.
pub fn reset(&self) {
self.ctr.dispatch(());
}
}
impl<R> PartialEq for UseReactorBridgeHandle<R>
where
R: 'static + Reactor,
{
fn eq(&self, rhs: &Self) -> bool {
self.ctr == rhs.ctr
}
}
/// A hook to bridge to a [`Reactor`].
///
/// This hooks will only bridge the reactor once over the entire component lifecycle.
///
/// Takes a callback as the argument.
///
/// The callback will be updated on every render to make sure captured values (if any) are up to
/// date.
#[hook]
pub fn use_reactor_bridge<R, F>(on_output: F) -> UseReactorBridgeHandle<R>
where
R: 'static + Reactor,
F: Fn(ReactorEvent<R>) + 'static,
{
let ctr = use_reducer(BridgeIdState::default);
let worker_state = use_context::<ReactorProviderState<R>>()
.expect_throw("cannot find a provider for current agent.");
let on_output = Rc::new(on_output);
let on_output_ref = {
let on_output = on_output.clone();
use_mut_ref(move || on_output)
};
// Refresh the callback on every render.
{
let mut on_output_ref = on_output_ref.borrow_mut();
*on_output_ref = on_output;
}
let tx = use_memo((worker_state, ctr.inner), |(state, _ctr)| {
let bridge = state.create_bridge();
let (tx, mut rx) = bridge.split();
spawn_local(async move {
while let Some(m) = rx.next().await {
let on_output = on_output_ref.borrow().clone();
on_output(ReactorEvent::<R>::Output(m));
}
let on_output = on_output_ref.borrow().clone();
on_output(ReactorEvent::<R>::Finished);
});
RwLock::new(tx)
});
UseReactorBridgeHandle {
tx: tx.clone(),
ctr: ctr.dispatcher(),
}
}
/// Hook handle for the [`use_reactor_subscription`] hook.
pub struct UseReactorSubscriptionHandle<R>
where
R: 'static + Reactor,
{
bridge: UseReactorBridgeHandle<R>,
outputs: Vec<Rc<<R::Scope as ReactorScoped>::Output>>,
finished: bool,
ctr: usize,
}
impl<R> UseReactorSubscriptionHandle<R>
where
R: 'static + Reactor,
{
/// Send an input to a reactor agent.
pub fn send(&self, msg: <R::Scope as ReactorScoped>::Input) {
self.bridge.send(msg);
}
/// Returns whether the current bridge has received a finish message.
pub fn finished(&self) -> bool {
self.finished
}
/// Reset the subscription.
///
/// This disconnects the old bridge and re-connects the agent with a new bridge.
/// Existing outputs stored in the subscription will also be cleared.
pub fn reset(&self) {
self.bridge.reset();
}
}
impl<R> Clone for UseReactorSubscriptionHandle<R>
where
R: 'static + Reactor,
{
fn clone(&self) -> Self {
Self {
bridge: self.bridge.clone(),
outputs: self.outputs.clone(),
ctr: self.ctr,
finished: self.finished,
}
}
}
impl<R> fmt::Debug for UseReactorSubscriptionHandle<R>
where
R: 'static + Reactor,
<R::Scope as ReactorScoped>::Input: fmt::Debug,
<R::Scope as ReactorScoped>::Output: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct(type_name::<Self>())
.field("bridge", &self.bridge)
.field("outputs", &self.outputs)
.finish()
}
}
impl<R> Deref for UseReactorSubscriptionHandle<R>
where
R: 'static + Reactor,
{
type Target = [Rc<<R::Scope as ReactorScoped>::Output>];
fn deref(&self) -> &Self::Target {
&self.outputs
}
}
impl<R> PartialEq for UseReactorSubscriptionHandle<R>
where
R: 'static + Reactor,
{
fn eq(&self, rhs: &Self) -> bool {
self.bridge == rhs.bridge && self.ctr == rhs.ctr
}
}
/// A hook to subscribe to the outputs of a [Reactor] agent.
///
/// All outputs sent to current bridge will be collected into a slice.
#[hook]
pub fn use_reactor_subscription<R>() -> UseReactorSubscriptionHandle<R>
where
R: 'static + Reactor,
{
let outputs = use_reducer(OutputsState::<<R::Scope as ReactorScoped>::Output>::default);
let bridge = {
let outputs = outputs.clone();
use_reactor_bridge::<R, _>(move |output| {
outputs.dispatch(match output {
ReactorEvent::Output(m) => OutputsAction::Push(m.into()),
ReactorEvent::Finished => OutputsAction::Close,
})
})
};
{
let outputs = outputs.clone();
use_effect_with(bridge.clone(), move |_| {
outputs.dispatch(OutputsAction::Reset);
|| {}
});
}
UseReactorSubscriptionHandle {
bridge,
outputs: outputs.inner.clone(),
ctr: outputs.ctr,
finished: outputs.closed,
}
}

View File

@ -0,0 +1,53 @@
//! This module contains the reactor agent implementation.
//!
//! Reactor agents are agents that receive multiple inputs and send multiple outputs over a single
//! bridge. A reactor is defined as an async function that takes a [ReactorScope]
//! as the argument.
//!
//! The reactor scope is a stream that produces inputs from the bridge and a
//! sink that implements an additional send method to send outputs to the connected bridge.
//! When the bridge disconnects, the output stream and input sink will be closed.
//!
//! # Example
//!
//! ```
//! # use serde::{Serialize, Deserialize};
//! # #[derive(Serialize, Deserialize)]
//! # pub struct ReactorInput {}
//! # #[derive(Serialize, Deserialize)]
//! # pub struct ReactorOutput {}
//! #
//! use futures::sink::SinkExt;
//! use futures::stream::StreamExt;
//! use yew_agent::reactor::{reactor, ReactorScope};
//! #[reactor(MyReactor)]
//! pub async fn my_reactor(mut scope: ReactorScope<ReactorInput, ReactorOutput>) {
//! while let Some(input) = scope.next().await {
//! // handles each input.
//! // ...
//! # let output = ReactorOutput { /* ... */ };
//!
//! // sends output
//! if scope.send(output).await.is_err() {
//! // sender closed, the bridge is disconnected
//! break;
//! }
//! }
//! }
//! ```
mod hooks;
mod provider;
#[doc(inline)]
pub use gloo_worker::reactor::{
Reactor, ReactorBridge, ReactorRegistrar, ReactorScope, ReactorScoped, ReactorSpawner,
};
pub use hooks::{
use_reactor_bridge, use_reactor_subscription, ReactorEvent, UseReactorBridgeHandle,
UseReactorSubscriptionHandle,
};
pub use provider::ReactorProvider;
pub(crate) use provider::ReactorProviderState;
/// A procedural macro to create reactor agents.
pub use yew_agent_macro::reactor;

View File

@ -0,0 +1,133 @@
use std::any::type_name;
use std::cell::RefCell;
use std::fmt;
use std::rc::Rc;
use gloo_worker::reactor::ReactorScoped;
use serde::{Deserialize, Serialize};
use yew::prelude::*;
use super::{Reactor, ReactorBridge, ReactorSpawner};
use crate::utils::get_next_id;
use crate::worker::WorkerProviderProps;
use crate::{Bincode, Codec, Reach};
pub(crate) struct ReactorProviderState<T>
where
T: Reactor + 'static,
{
id: usize,
spawn_bridge_fn: Rc<dyn Fn() -> ReactorBridge<T>>,
reach: Reach,
held_bridge: Rc<RefCell<Option<ReactorBridge<T>>>>,
}
impl<T> fmt::Debug for ReactorProviderState<T>
where
T: Reactor,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(type_name::<Self>())
}
}
impl<T> ReactorProviderState<T>
where
T: Reactor,
{
fn get_held_bridge(&self) -> ReactorBridge<T> {
let mut held_bridge = self.held_bridge.borrow_mut();
match held_bridge.as_mut() {
Some(m) => m.fork(),
None => {
let bridge = (self.spawn_bridge_fn)();
*held_bridge = Some(bridge.fork());
bridge
}
}
}
/// Creates a bridge, uses "fork" for public agents.
pub fn create_bridge(&self) -> ReactorBridge<T> {
match self.reach {
Reach::Public => {
let held_bridge = self.get_held_bridge();
held_bridge.fork()
}
Reach::Private => (self.spawn_bridge_fn)(),
}
}
}
impl<T> Clone for ReactorProviderState<T>
where
T: Reactor,
{
fn clone(&self) -> Self {
Self {
id: self.id,
spawn_bridge_fn: self.spawn_bridge_fn.clone(),
reach: self.reach,
held_bridge: self.held_bridge.clone(),
}
}
}
impl<T> PartialEq for ReactorProviderState<T>
where
T: Reactor,
{
fn eq(&self, rhs: &Self) -> bool {
self.id == rhs.id
}
}
/// The Reactor Agent Provider.
///
/// This component provides its children access to a reactor agent.
#[function_component]
pub fn ReactorProvider<R, C = Bincode>(props: &WorkerProviderProps) -> Html
where
R: 'static + Reactor,
<<R as Reactor>::Scope as ReactorScoped>::Input:
Serialize + for<'de> Deserialize<'de> + 'static,
<<R as Reactor>::Scope as ReactorScoped>::Output:
Serialize + for<'de> Deserialize<'de> + 'static,
C: Codec + 'static,
{
let WorkerProviderProps {
children,
path,
lazy,
reach,
} = props.clone();
// Creates a spawning function so Codec is can be erased from contexts.
let spawn_bridge_fn: Rc<dyn Fn() -> ReactorBridge<R>> = {
let path = path.clone();
Rc::new(move || ReactorSpawner::<R>::new().encoding::<C>().spawn(&path))
};
let state = {
use_memo((path, lazy, reach), move |(_path, lazy, reach)| {
let state = ReactorProviderState::<R> {
id: get_next_id(),
spawn_bridge_fn,
reach: *reach,
held_bridge: Rc::default(),
};
if *reach == Reach::Public && !*lazy {
state.get_held_bridge();
}
state
})
};
html! {
<ContextProvider<ReactorProviderState<R>> context={(*state).clone()}>
{children}
</ContextProvider<ReactorProviderState<R>>>
}
}

View File

@ -0,0 +1,145 @@
//! This module contains extensions to the component scope for agent access.
use std::any::type_name;
use std::fmt;
use std::rc::Rc;
use futures::stream::SplitSink;
use futures::{SinkExt, StreamExt};
use wasm_bindgen::UnwrapThrowExt;
use yew::html::Scope;
use yew::platform::pinned::RwLock;
use yew::platform::spawn_local;
use yew::prelude::*;
use crate::oneshot::{Oneshot, OneshotProviderState};
use crate::reactor::{Reactor, ReactorBridge, ReactorEvent, ReactorProviderState, ReactorScoped};
use crate::worker::{Worker, WorkerBridge, WorkerProviderState};
/// A Worker Bridge Handle.
#[derive(Debug)]
pub struct WorkerBridgeHandle<W>
where
W: Worker,
{
inner: WorkerBridge<W>,
}
impl<W> WorkerBridgeHandle<W>
where
W: Worker,
{
/// Sends a message to the worker agent.
pub fn send(&self, input: W::Input) {
self.inner.send(input)
}
}
type ReactorTx<R> =
Rc<RwLock<SplitSink<ReactorBridge<R>, <<R as Reactor>::Scope as ReactorScoped>::Input>>>;
/// A Reactor Bridge Handle.
pub struct ReactorBridgeHandle<R>
where
R: Reactor + 'static,
{
tx: ReactorTx<R>,
}
impl<R> fmt::Debug for ReactorBridgeHandle<R>
where
R: Reactor + 'static,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct(type_name::<Self>()).finish_non_exhaustive()
}
}
impl<R> ReactorBridgeHandle<R>
where
R: Reactor + 'static,
{
/// Sends a message to the reactor agent.
pub fn send(&self, input: <R::Scope as ReactorScoped>::Input) {
let tx = self.tx.clone();
spawn_local(async move {
let mut tx = tx.write().await;
let _ = tx.send(input).await;
});
}
}
/// An extension to [`Scope`](yew::html::Scope) that provides communication mechanism to agents.
///
/// You can access them on `ctx.link()`
pub trait AgentScopeExt {
/// Bridges to a Worker Agent.
fn bridge_worker<W>(&self, callback: Callback<W::Output>) -> WorkerBridgeHandle<W>
where
W: Worker + 'static;
/// Bridges to a Reactor Agent.
fn bridge_reactor<R>(&self, callback: Callback<ReactorEvent<R>>) -> ReactorBridgeHandle<R>
where
R: Reactor + 'static,
<R::Scope as ReactorScoped>::Output: 'static;
/// Runs an oneshot in an Oneshot Agent.
fn run_oneshot<T>(&self, input: T::Input, callback: Callback<T::Output>)
where
T: Oneshot + 'static;
}
impl<COMP> AgentScopeExt for Scope<COMP>
where
COMP: Component,
{
fn bridge_worker<W>(&self, callback: Callback<W::Output>) -> WorkerBridgeHandle<W>
where
W: Worker + 'static,
{
let inner = self
.context::<WorkerProviderState<W>>((|_| {}).into())
.expect_throw("failed to bridge to agent.")
.0
.create_bridge(callback);
WorkerBridgeHandle { inner }
}
fn bridge_reactor<R>(&self, callback: Callback<ReactorEvent<R>>) -> ReactorBridgeHandle<R>
where
R: Reactor + 'static,
<R::Scope as ReactorScoped>::Output: 'static,
{
let (tx, mut rx) = self
.context::<ReactorProviderState<R>>((|_| {}).into())
.expect_throw("failed to bridge to agent.")
.0
.create_bridge()
.split();
spawn_local(async move {
while let Some(m) = rx.next().await {
callback.emit(ReactorEvent::<R>::Output(m));
}
callback.emit(ReactorEvent::<R>::Finished);
});
let tx = Rc::new(RwLock::new(tx));
ReactorBridgeHandle { tx }
}
fn run_oneshot<T>(&self, input: T::Input, callback: Callback<T::Output>)
where
T: Oneshot + 'static,
{
let (inner, _) = self
.context::<OneshotProviderState<T>>((|_| {}).into())
.expect_throw("failed to bridge to agent.");
spawn_local(async move { callback.emit(inner.create_bridge().run(input).await) });
}
}

View File

@ -0,0 +1,83 @@
use std::rc::Rc;
use std::sync::atomic::{AtomicUsize, Ordering};
use yew::Reducible;
/// Gets a unique worker id
pub(crate) fn get_next_id() -> usize {
static CTR: AtomicUsize = AtomicUsize::new(0);
CTR.fetch_add(1, Ordering::SeqCst)
}
#[derive(Default, PartialEq)]
pub(crate) struct BridgeIdState {
pub inner: usize,
}
impl Reducible for BridgeIdState {
type Action = ();
fn reduce(self: Rc<Self>, _: Self::Action) -> Rc<Self> {
Self {
inner: self.inner + 1,
}
.into()
}
}
pub(crate) enum OutputsAction<T> {
Push(Rc<T>),
Close,
Reset,
}
pub(crate) struct OutputsState<T> {
pub ctr: usize,
pub inner: Vec<Rc<T>>,
pub closed: bool,
}
impl<T> Clone for OutputsState<T> {
fn clone(&self) -> Self {
Self {
ctr: self.ctr,
inner: self.inner.clone(),
closed: self.closed,
}
}
}
impl<T> Reducible for OutputsState<T> {
type Action = OutputsAction<T>;
fn reduce(mut self: Rc<Self>, action: Self::Action) -> Rc<Self> {
{
let this = Rc::make_mut(&mut self);
this.ctr += 1;
match action {
OutputsAction::Push(m) => this.inner.push(m),
OutputsAction::Close => {
this.closed = true;
}
OutputsAction::Reset => {
this.closed = false;
this.inner = Vec::new();
}
}
}
self
}
}
impl<T> Default for OutputsState<T> {
fn default() -> Self {
Self {
ctr: 0,
inner: Vec::new(),
closed: false,
}
}
}

View File

@ -0,0 +1,219 @@
use std::any::type_name;
use std::fmt;
use std::ops::Deref;
use std::rc::Rc;
use wasm_bindgen::prelude::*;
use yew::prelude::*;
use crate::utils::{BridgeIdState, OutputsAction, OutputsState};
use crate::worker::provider::WorkerProviderState;
use crate::worker::{Worker, WorkerBridge};
/// Hook handle for the [`use_worker_bridge`] hook.
pub struct UseWorkerBridgeHandle<T>
where
T: Worker,
{
inner: WorkerBridge<T>,
ctr: UseReducerDispatcher<BridgeIdState>,
}
impl<T> UseWorkerBridgeHandle<T>
where
T: Worker,
{
/// Send an input to a worker agent.
pub fn send(&self, msg: T::Input) {
self.inner.send(msg);
}
/// Reset the bridge.
///
/// Disconnect the old bridge and re-connects the agent with a new bridge.
pub fn reset(&self) {
self.ctr.dispatch(());
}
}
impl<T> Clone for UseWorkerBridgeHandle<T>
where
T: Worker,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
ctr: self.ctr.clone(),
}
}
}
impl<T> fmt::Debug for UseWorkerBridgeHandle<T>
where
T: Worker,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct(type_name::<Self>())
.field("inner", &self.inner)
.finish()
}
}
impl<T> PartialEq for UseWorkerBridgeHandle<T>
where
T: Worker,
{
fn eq(&self, rhs: &Self) -> bool {
self.inner == rhs.inner
}
}
/// A hook to bridge to a [`Worker`].
///
/// This hooks will only bridge the worker once over the entire component lifecycle.
///
/// Takes a callback as the argument.
///
/// The callback will be updated on every render to make sure captured values (if any) are up to
/// date.
#[hook]
pub fn use_worker_bridge<T, F>(on_output: F) -> UseWorkerBridgeHandle<T>
where
T: Worker + 'static,
F: Fn(T::Output) + 'static,
{
let ctr = use_reducer(BridgeIdState::default);
let worker_state = use_context::<WorkerProviderState<T>>()
.expect_throw("cannot find a provider for current agent.");
let on_output = Rc::new(on_output);
let on_output_clone = on_output.clone();
let on_output_ref = use_mut_ref(move || on_output_clone);
// Refresh the callback on every render.
{
let mut on_output_ref = on_output_ref.borrow_mut();
*on_output_ref = on_output;
}
let bridge = use_memo((worker_state, ctr.inner), |(state, _ctr)| {
state.create_bridge(Callback::from(move |output| {
let on_output = on_output_ref.borrow().clone();
on_output(output);
}))
});
UseWorkerBridgeHandle {
inner: (*bridge).clone(),
ctr: ctr.dispatcher(),
}
}
/// Hook handle for the [`use_worker_subscription`] hook.
pub struct UseWorkerSubscriptionHandle<T>
where
T: Worker,
{
bridge: UseWorkerBridgeHandle<T>,
outputs: Vec<Rc<T::Output>>,
ctr: usize,
}
impl<T> UseWorkerSubscriptionHandle<T>
where
T: Worker,
{
/// Send an input to a worker agent.
pub fn send(&self, msg: T::Input) {
self.bridge.send(msg);
}
/// Reset the subscription.
///
/// This disconnects the old bridge and re-connects the agent with a new bridge.
/// Existing outputs stored in the subscription will also be cleared.
pub fn reset(&self) {
self.bridge.reset();
}
}
impl<T> Clone for UseWorkerSubscriptionHandle<T>
where
T: Worker,
{
fn clone(&self) -> Self {
Self {
bridge: self.bridge.clone(),
outputs: self.outputs.clone(),
ctr: self.ctr,
}
}
}
impl<T> fmt::Debug for UseWorkerSubscriptionHandle<T>
where
T: Worker,
T::Output: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct(type_name::<Self>())
.field("bridge", &self.bridge)
.field("outputs", &self.outputs)
.finish()
}
}
impl<T> Deref for UseWorkerSubscriptionHandle<T>
where
T: Worker,
{
type Target = [Rc<T::Output>];
fn deref(&self) -> &[Rc<T::Output>] {
&self.outputs
}
}
impl<T> PartialEq for UseWorkerSubscriptionHandle<T>
where
T: Worker,
{
fn eq(&self, rhs: &Self) -> bool {
self.bridge == rhs.bridge && self.ctr == rhs.ctr
}
}
/// A hook to subscribe to the outputs of a [Worker] agent.
///
/// All outputs sent to current bridge will be collected into a slice.
#[hook]
pub fn use_worker_subscription<T>() -> UseWorkerSubscriptionHandle<T>
where
T: Worker + 'static,
{
let outputs = use_reducer(OutputsState::default);
let bridge = {
let outputs = outputs.clone();
use_worker_bridge::<T, _>(move |output| {
outputs.dispatch(OutputsAction::Push(Rc::new(output)))
})
};
{
let outputs_dispatcher = outputs.dispatcher();
use_effect_with(bridge.clone(), move |_| {
outputs_dispatcher.dispatch(OutputsAction::Reset);
|| {}
});
}
UseWorkerSubscriptionHandle {
bridge,
outputs: outputs.inner.clone(),
ctr: outputs.ctr,
}
}

View File

@ -0,0 +1,77 @@
//! This module contains the worker agent implementation.
//!
//! This is a low-level implementation that uses an actor model.
//!
//! # Example
//!
//! ```
//! # mod example {
//! use serde::{Deserialize, Serialize};
//! use yew::prelude::*;
//! use yew_agent::worker::{use_worker_bridge, UseWorkerBridgeHandle};
//!
//! // This would usually live in the same file as your worker
//! #[derive(Serialize, Deserialize)]
//! pub enum WorkerResponseType {
//! IncrementCounter,
//! }
//! # mod my_worker_mod {
//! # use yew_agent::worker::{HandlerId, WorkerScope};
//! # use super::WorkerResponseType;
//! # pub struct MyWorker {}
//! #
//! # impl yew_agent::worker::Worker for MyWorker {
//! # type Input = ();
//! # type Output = WorkerResponseType;
//! # type Message = ();
//! #
//! # fn create(scope: &WorkerScope<Self>) -> Self {
//! # MyWorker {}
//! # }
//! #
//! # fn update(&mut self, scope: &WorkerScope<Self>, _msg: Self::Message) {
//! # // do nothing
//! # }
//! #
//! # fn received(&mut self, scope: &WorkerScope<Self>, _msg: Self::Input, id: HandlerId) {
//! # scope.respond(id, WorkerResponseType::IncrementCounter);
//! # }
//! # }
//! # }
//! use my_worker_mod::MyWorker; // note that <MyWorker as yew_agent::Worker>::Output == WorkerResponseType
//! #[function_component(UseWorkerBridge)]
//! fn bridge() -> Html {
//! let counter = use_state(|| 0);
//!
//! // a scoped block to clone the state in
//! {
//! let counter = counter.clone();
//! // response will be of type MyWorker::Output, i.e. WorkerResponseType
//! let bridge: UseWorkerBridgeHandle<MyWorker> = use_worker_bridge(move |response| match response {
//! WorkerResponseType::IncrementCounter => {
//! counter.set(*counter + 1);
//! }
//! });
//! }
//!
//! html! {
//! <div>
//! {*counter}
//! </div>
//! }
//! }
//! # }
//! ```
mod hooks;
mod provider;
#[doc(inline)]
pub use gloo_worker::{
HandlerId, Worker, WorkerBridge, WorkerDestroyHandle, WorkerRegistrar, WorkerScope,
};
pub use hooks::{
use_worker_bridge, use_worker_subscription, UseWorkerBridgeHandle, UseWorkerSubscriptionHandle,
};
pub(crate) use provider::WorkerProviderState;
pub use provider::{WorkerProvider, WorkerProviderProps};

View File

@ -0,0 +1,159 @@
use std::any::type_name;
use std::cell::RefCell;
use std::fmt;
use std::rc::Rc;
use gloo_worker::Spawnable;
use serde::{Deserialize, Serialize};
use yew::prelude::*;
use super::{Worker, WorkerBridge};
use crate::reach::Reach;
use crate::utils::get_next_id;
use crate::{Bincode, Codec};
/// Properties for [WorkerProvider].
#[derive(Debug, Properties, PartialEq, Clone)]
pub struct WorkerProviderProps {
/// The path to an agent.
pub path: AttrValue,
/// The reachability of an agent.
///
/// Default: [`Public`](Reach::Public).
#[prop_or(Reach::Public)]
pub reach: Reach,
/// Lazily spawn the agent.
///
/// The agent will be spawned when the first time a hook requests a bridge.
///
/// Does not affect private agents.
///
/// Default: `true`
#[prop_or(true)]
pub lazy: bool,
/// Children of the provider.
#[prop_or_default]
pub children: Html,
}
pub(crate) struct WorkerProviderState<W>
where
W: Worker,
{
id: usize,
spawn_bridge_fn: Rc<dyn Fn() -> WorkerBridge<W>>,
reach: Reach,
held_bridge: Rc<RefCell<Option<WorkerBridge<W>>>>,
}
impl<W> fmt::Debug for WorkerProviderState<W>
where
W: Worker,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(type_name::<Self>())
}
}
impl<W> WorkerProviderState<W>
where
W: Worker,
W::Output: 'static,
{
fn get_held_bridge(&self) -> WorkerBridge<W> {
let mut held_bridge = self.held_bridge.borrow_mut();
match held_bridge.as_mut() {
Some(m) => m.clone(),
None => {
let bridge = (self.spawn_bridge_fn)();
*held_bridge = Some(bridge.clone());
bridge
}
}
}
/// Creates a bridge, uses "fork" for public agents.
pub fn create_bridge(&self, cb: Callback<W::Output>) -> WorkerBridge<W> {
match self.reach {
Reach::Public => {
let held_bridge = self.get_held_bridge();
held_bridge.fork(Some(move |m| cb.emit(m)))
}
Reach::Private => (self.spawn_bridge_fn)(),
}
}
}
impl<W> Clone for WorkerProviderState<W>
where
W: Worker,
{
fn clone(&self) -> Self {
Self {
id: self.id,
spawn_bridge_fn: self.spawn_bridge_fn.clone(),
reach: self.reach,
held_bridge: self.held_bridge.clone(),
}
}
}
impl<W> PartialEq for WorkerProviderState<W>
where
W: Worker,
{
fn eq(&self, rhs: &Self) -> bool {
self.id == rhs.id
}
}
/// The Worker Agent Provider.
///
/// This component provides its children access to a worker agent.
#[function_component]
pub fn WorkerProvider<W, C = Bincode>(props: &WorkerProviderProps) -> Html
where
W: Worker + 'static,
W::Input: Serialize + for<'de> Deserialize<'de> + 'static,
W::Output: Serialize + for<'de> Deserialize<'de> + 'static,
C: Codec + 'static,
{
let WorkerProviderProps {
children,
path,
lazy,
reach,
} = props.clone();
// Creates a spawning function so Codec is can be erased from contexts.
let spawn_bridge_fn: Rc<dyn Fn() -> WorkerBridge<W>> = {
let path = path.clone();
Rc::new(move || W::spawner().encoding::<C>().spawn(&path))
};
let state = {
use_memo((path, lazy, reach), move |(_path, lazy, reach)| {
let state = WorkerProviderState::<W> {
id: get_next_id(),
spawn_bridge_fn,
reach: *reach,
held_bridge: Rc::default(),
};
if *reach == Reach::Public && !*lazy {
state.get_held_bridge();
}
state
})
};
html! {
<ContextProvider<WorkerProviderState<W>> context={(*state).clone()}>
{children}
</ContextProvider<WorkerProviderState<W>>>
}
}