Move yew-agent to gloo (#2326)

* move agent to gloo

* rename: agent -> worker

* make it compile

* use gloo-worker from crates.io & fmt
This commit is contained in:
Muhammad Hamza 2022-01-05 19:40:27 +05:00 committed by GitHub
parent ac3af0a9bc
commit 3ca7c0f90a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 34 additions and 1208 deletions

View File

@ -1,5 +1,5 @@
use agents::native_worker::Worker;
use yew_agent::PublicAgent;
use yew_agent::PublicWorker;
fn main() {
wasm_logger::init(wasm_logger::Config::default());

View File

@ -1,5 +1,6 @@
pub mod native_worker;
use std::rc::Rc;
use yew::{html, Component, Context, Html};
use yew_agent::{Bridge, Bridged};
@ -17,9 +18,9 @@ impl Component for Model {
type Properties = ();
fn create(ctx: &Context<Self>) -> Self {
let link = ctx.link();
let callback = link.callback(|_| Msg::DataReceived);
let worker = native_worker::Worker::bridge(callback);
let link = ctx.link().clone();
let callback = move |_| link.send_message(Msg::DataReceived);
let worker = native_worker::Worker::bridge(Rc::new(callback));
Self { worker }
}

View File

@ -1,6 +1,6 @@
use gloo_timers::callback::Interval;
use serde::{Deserialize, Serialize};
use yew_agent::{Agent, AgentLink, HandlerId, Public};
use yew_agent::{HandlerId, Public, WorkerLink};
#[derive(Serialize, Deserialize, Debug)]
pub enum Request {
@ -17,17 +17,17 @@ pub enum Msg {
}
pub struct Worker {
link: AgentLink<Worker>,
link: WorkerLink<Worker>,
_interval: Interval,
}
impl Agent for Worker {
impl yew_agent::Worker for Worker {
type Reach = Public<Self>;
type Message = Msg;
type Input = Request;
type Output = Response;
fn create(link: AgentLink<Self>) -> Self {
fn create(link: WorkerLink<Self>) -> Self {
let duration = 3;
let interval = {

View File

@ -1,8 +1,8 @@
use serde::{Deserialize, Serialize};
use yew_agent::{Agent, AgentLink, HandlerId, Public};
use yew_agent::{HandlerId, Public, WorkerLink};
pub struct Worker {
link: AgentLink<Self>,
link: WorkerLink<Self>,
}
#[derive(Serialize, Deserialize)]
@ -15,13 +15,13 @@ pub struct WorkerOutput {
pub value: u32,
}
impl Agent for Worker {
impl yew_agent::Worker for Worker {
type Reach = Public<Self>;
type Message = ();
type Input = WorkerInput;
type Output = WorkerOutput;
fn create(link: AgentLink<Self>) -> Self {
fn create(link: WorkerLink<Self>) -> Self {
Self { link }
}

View File

@ -1,4 +1,5 @@
use crate::agent::{Worker, WorkerInput, WorkerOutput};
use std::rc::Rc;
use web_sys::HtmlInputElement;
use yew::prelude::*;
@ -22,7 +23,11 @@ impl Component for Model {
type Properties = ();
fn create(ctx: &Context<Self>) -> Self {
let worker = Worker::bridge(ctx.link().callback(Self::Message::WorkerMsg));
let cb = {
let link = ctx.link().clone();
move |e| link.send_message(Self::Message::WorkerMsg(e))
};
let worker = Worker::bridge(Rc::new(cb));
Self {
clicker_value: 0,

View File

@ -5,7 +5,7 @@ pub mod agent;
pub mod app;
use app::Model;
use wasm_bindgen::prelude::*;
use yew_agent::PublicAgent;
use yew_agent::PublicWorker;
#[wasm_bindgen(start)]
pub fn start() {

View File

@ -7,28 +7,6 @@ readme = "../../README.md"
description = "Agents for Yew"
license = "MIT OR Apache-2.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anymap2 = "0.13"
bincode = "1"
gloo-console = "0.2"
gloo-utils = "0.1"
js-sys = "0.3"
serde = { version = "1", features = ["derive"] }
slab = "0.4"
wasm-bindgen = "0.2"
yew = { version = "0.19.3", path = "../yew" }
wasm-bindgen-futures = "0.4"
[dependencies.web-sys]
version = "0.3"
features = [
"Blob",
"BlobPropertyBag",
"DedicatedWorkerGlobalScope",
"MessageEvent",
"Url",
"Worker",
"WorkerOptions",
]
gloo-worker = "0.1"

View File

@ -16,16 +16,16 @@ impl<T> UseBridgeHandle<T>
where
T: Bridged,
{
/// Send a message to an agent.
/// Send a message to an worker.
pub fn send(&self, msg: T::Input) {
let mut bridge = self.inner.borrow_mut();
bridge.send(msg);
}
}
/// A hook to bridge to an Agent.
/// A hook to bridge to an [`Worker`].
///
/// This hooks will only bridge the agent once over the entire component lifecycle.
/// This hooks will only bridge the worker once over the entire component lifecycle.
///
/// Takes a callback as the only argument. The callback will be updated on every render to make
/// sure captured values (if any) are up to date.
@ -46,16 +46,18 @@ where
}
let bridge = use_mut_ref(move || {
T::bridge(Callback::from(move |output| {
let on_output = on_output_ref.borrow().clone();
on_output(output);
}))
T::bridge({
Rc::new(move |output| {
let on_output = on_output_ref.borrow().clone();
on_output(output);
})
})
});
UseBridgeHandle { inner: bridge }
}
impl<T: Agent> Clone for UseBridgeHandle<T> {
impl<T: Worker> Clone for UseBridgeHandle<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),

View File

@ -1,119 +1,7 @@
//! This module contains Yew's web worker implementation.
mod hooks;
mod link;
mod pool;
mod worker;
#[doc(inline)]
pub use gloo_worker::*;
pub use hooks::{use_bridge, UseBridgeHandle};
pub use link::AgentLink;
pub(crate) use link::*;
pub(crate) use pool::*;
pub use pool::{Dispatched, Dispatcher};
pub use worker::{Private, PrivateAgent, Public, PublicAgent};
use serde::{Deserialize, Serialize};
use std::fmt;
use std::ops::{Deref, DerefMut};
use yew::callback::Callback;
/// Declares the behavior of the agent.
pub trait Agent: Sized + 'static {
/// Reach capability of the agent.
type Reach: Discoverer<Agent = Self>;
/// Type of an input message.
type Message;
/// Incoming message type.
type Input;
/// Outgoing message type.
type Output;
/// Creates an instance of an agent.
fn create(link: AgentLink<Self>) -> Self;
/// This method called on every update message.
fn update(&mut self, msg: Self::Message);
/// This method called on when a new bridge created.
fn connected(&mut self, _id: HandlerId) {}
/// This method called on every incoming message.
fn handle_input(&mut self, msg: Self::Input, id: HandlerId);
/// This method called on when a new bridge destroyed.
fn disconnected(&mut self, _id: HandlerId) {}
/// This method called when the agent is destroyed.
fn destroy(&mut self) {}
/// Represents the name of loading resorce for remote workers which
/// have to live in a separate files.
fn name_of_resource() -> &'static str {
"main.js"
}
/// Indicates whether the name of the resource is relative.
///
/// The default implementation returns `false`, which will cause the result
/// returned by [`Self::name_of_resource`] to be interpreted as an absolute
/// URL. If `true` is returned, it will be interpreted as a relative URL.
fn resource_path_is_relative() -> bool {
false
}
/// Signifies if resource is a module.
/// This has pending browser support.
fn is_module() -> bool {
false
}
}
/// Id of responses handler.
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Hash, Clone, Copy)]
pub struct HandlerId(usize, bool);
impl HandlerId {
fn new(id: usize, respondable: bool) -> Self {
HandlerId(id, respondable)
}
fn raw_id(self) -> usize {
self.0
}
/// Indicates if a handler id corresponds to callback in the Agent runtime.
pub fn is_respondable(self) -> bool {
self.1
}
}
/// Determine a visibility of an agent.
#[doc(hidden)]
pub trait Discoverer {
type Agent: Agent;
/// Spawns an agent and returns `Bridge` implementation.
fn spawn_or_join(
_callback: Option<Callback<<Self::Agent as Agent>::Output>>,
) -> Box<dyn Bridge<Self::Agent>>;
}
/// Bridge to a specific kind of worker.
pub trait Bridge<AGN: Agent> {
/// Send a message to an agent.
fn send(&mut self, msg: AGN::Input);
}
/// This trait allows registering or getting the address of a worker.
pub trait Bridged: Agent + Sized + 'static {
/// Creates a messaging bridge between a worker and the component.
fn bridge(callback: Callback<Self::Output>) -> Box<dyn Bridge<Self>>;
}
impl<T> Bridged for T
where
T: Agent,
<T as Agent>::Reach: Discoverer<Agent = T>,
{
fn bridge(callback: Callback<Self::Output>) -> Box<dyn Bridge<Self>> {
Self::Reach::spawn_or_join(Some(callback))
}
}

View File

@ -1,258 +0,0 @@
use super::*;
use std::cell::RefCell;
use std::fmt;
use std::future::Future;
use std::rc::Rc;
use wasm_bindgen_futures::spawn_local;
use yew::callback::Callback;
use yew::html::ImplicitClone;
use yew::scheduler::{self, Runnable, Shared};
/// Defines communication from Worker to Consumers
pub(crate) trait Responder<AGN: Agent> {
/// Implementation for communication channel from Worker to Consumers
fn respond(&self, id: HandlerId, output: AGN::Output);
}
/// Link to agent's scope for creating callbacks.
pub struct AgentLink<AGN: Agent> {
scope: AgentScope<AGN>,
responder: Rc<dyn Responder<AGN>>,
}
impl<AGN: Agent> AgentLink<AGN> {
/// Create link for a scope.
pub(crate) fn connect<T>(scope: &AgentScope<AGN>, responder: T) -> Self
where
T: Responder<AGN> + 'static,
{
AgentLink {
scope: scope.clone(),
responder: Rc::new(responder),
}
}
/// Send response to an agent.
pub fn respond(&self, id: HandlerId, output: AGN::Output) {
self.responder.respond(id, output);
}
/// Send a message to the agent
pub fn send_message<T>(&self, msg: T)
where
T: Into<AGN::Message>,
{
self.scope.send(AgentLifecycleEvent::Message(msg.into()));
}
/// Send an input to self
pub fn send_input<T>(&self, input: T)
where
T: Into<AGN::Input>,
{
let handler_id = HandlerId::new(0, false);
self.scope
.send(AgentLifecycleEvent::Input(input.into(), handler_id));
}
/// Create a callback which will send a message to the agent when invoked.
pub fn callback<F, IN, M>(&self, function: F) -> Callback<IN>
where
M: Into<AGN::Message>,
F: Fn(IN) -> M + 'static,
{
let scope = self.scope.clone();
let closure = move |input| {
let output = function(input).into();
scope.send(AgentLifecycleEvent::Message(output));
};
closure.into()
}
/// This method creates a [`Callback`] which returns a Future which
/// returns a message to be sent back to the agent
///
/// # Panics
/// If the future panics, then the promise will not resolve, and
/// will leak.
pub fn callback_future<FN, FU, IN, M>(&self, function: FN) -> Callback<IN>
where
M: Into<AGN::Message>,
FU: Future<Output = M> + 'static,
FN: Fn(IN) -> FU + 'static,
{
let link = self.clone();
let closure = move |input: IN| {
let future: FU = function(input);
link.send_future(future);
};
closure.into()
}
/// This method processes a Future that returns a message and sends it back to the agent.
///
/// # Panics
/// If the future panics, then the promise will not resolve, and will leak.
pub fn send_future<F, M>(&self, future: F)
where
M: Into<AGN::Message>,
F: Future<Output = M> + 'static,
{
let link: AgentLink<AGN> = self.clone();
let js_future = async move {
let message: AGN::Message = future.await.into();
let cb = link.callback(|m: AGN::Message| m);
cb.emit(message);
};
spawn_local(js_future);
}
}
impl<AGN: Agent> fmt::Debug for AgentLink<AGN> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("AgentLink<_>")
}
}
impl<AGN: Agent> Clone for AgentLink<AGN> {
fn clone(&self) -> Self {
AgentLink {
scope: self.scope.clone(),
responder: self.responder.clone(),
}
}
}
/// This struct holds a reference to a component and to a global scheduler.
pub(crate) struct AgentScope<AGN: Agent> {
state: Shared<AgentState<AGN>>,
}
impl<AGN: Agent> fmt::Debug for AgentScope<AGN> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("AgentScope<_>")
}
}
impl<AGN: Agent> Clone for AgentScope<AGN> {
fn clone(&self) -> Self {
AgentScope {
state: self.state.clone(),
}
}
}
impl<AGN: Agent> AgentScope<AGN> {
/// Create agent scope
pub fn new() -> Self {
let state = Rc::new(RefCell::new(AgentState::new()));
AgentScope { state }
}
/// Schedule message for sending to agent
pub fn send(&self, event: AgentLifecycleEvent<AGN>) {
scheduler::push(Box::new(AgentRunnable {
state: self.state.clone(),
event,
}));
}
}
impl<AGN: Agent> Default for AgentScope<AGN> {
fn default() -> Self {
Self::new()
}
}
impl<AGN: Agent> ImplicitClone for AgentScope<AGN> {}
struct AgentState<AGN> {
agent: Option<AGN>,
// TODO(#939): Use agent field to control create message this flag
destroyed: bool,
}
impl<AGN> AgentState<AGN> {
fn new() -> Self {
AgentState {
agent: None,
destroyed: false,
}
}
}
/// Internal Agent lifecycle events
#[derive(Debug)]
pub(crate) enum AgentLifecycleEvent<AGN: Agent> {
/// Request to create link
Create(AgentLink<AGN>),
/// Internal Agent message
Message(AGN::Message),
/// Client connected
Connected(HandlerId),
/// Received message from Client
Input(AGN::Input, HandlerId),
/// Client disconnected
Disconnected(HandlerId),
/// Request to destroy agent
Destroy,
}
struct AgentRunnable<AGN: Agent> {
state: Shared<AgentState<AGN>>,
event: AgentLifecycleEvent<AGN>,
}
impl<AGN> Runnable for AgentRunnable<AGN>
where
AGN: Agent,
{
fn run(self: Box<Self>) {
let mut state = self.state.borrow_mut();
if state.destroyed {
return;
}
match self.event {
AgentLifecycleEvent::Create(link) => {
state.agent = Some(AGN::create(link));
}
AgentLifecycleEvent::Message(msg) => {
state
.agent
.as_mut()
.expect("agent was not created to process messages")
.update(msg);
}
AgentLifecycleEvent::Connected(id) => {
state
.agent
.as_mut()
.expect("agent was not created to send a connected message")
.connected(id);
}
AgentLifecycleEvent::Input(inp, id) => {
state
.agent
.as_mut()
.expect("agent was not created to process inputs")
.handle_input(inp, id);
}
AgentLifecycleEvent::Disconnected(id) => {
state
.agent
.as_mut()
.expect("agent was not created to send a disconnected message")
.disconnected(id);
}
AgentLifecycleEvent::Destroy => {
let mut agent = state
.agent
.take()
.expect("trying to destroy not existent agent");
agent.destroy();
state.destroyed = true;
}
}
}
}

View File

@ -1,89 +0,0 @@
use super::*;
use gloo_console as console;
use slab::Slab;
use yew::scheduler::Shared;
pub(crate) type Last = bool;
/// Type alias to a sharable Slab that owns optional callbacks that emit messages of the type of the specified Agent.
pub(crate) type SharedOutputSlab<AGN> = Shared<Slab<Option<Callback<<AGN as Agent>::Output>>>>;
/// The slab contains the callback, the id is used to look up the callback,
/// and the output is the message that will be sent via the callback.
pub(crate) fn locate_callback_and_respond<AGN: Agent>(
slab: &SharedOutputSlab<AGN>,
id: HandlerId,
output: AGN::Output,
) {
let callback = {
let slab = slab.borrow();
match slab.get(id.raw_id()).cloned() {
Some(callback) => callback,
None => {
console::warn!(format!(
"Id of handler does not exist in the slab: {}.",
id.raw_id()
));
return;
}
}
};
match callback {
Some(callback) => callback.emit(output),
None => console::warn!(format!("The Id of the handler: {}, while present in the slab, is not associated with a callback.", id.raw_id())),
}
}
/// A newtype around a bridge to indicate that it is distinct from a normal bridge
pub struct Dispatcher<T>(pub(crate) Box<dyn Bridge<T>>);
impl<T> fmt::Debug for Dispatcher<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("Dispatcher<_>")
}
}
impl<T> Deref for Dispatcher<T> {
type Target = dyn Bridge<T>;
fn deref(&self) -> &Self::Target {
self.0.deref()
}
}
impl<T> DerefMut for Dispatcher<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.0.deref_mut()
}
}
/// This trait allows the creation of a dispatcher to an existing agent that will not send replies when messages are sent.
pub trait Dispatched: Agent + Sized + 'static {
/// Creates a dispatcher to the agent that will not send messages back.
///
/// # Note
/// Dispatchers don't have `HandlerId`s and therefore `Agent::handle` will be supplied `None`
/// for the `id` parameter, and `connected` and `disconnected` will not be called.
///
/// # Important
/// Because the Agents using Context or Public reaches use the number of existing bridges to
/// keep track of if the agent itself should exist, creating dispatchers will not guarantee that
/// an Agent will exist to service requests sent from Dispatchers. You **must** keep at least one
/// bridge around if you wish to use a dispatcher. If you are using agents in a write-only manner,
/// then it is suggested that you create a bridge that handles no-op responses as high up in the
/// component hierarchy as possible - oftentimes the root component for simplicity's sake.
fn dispatcher() -> Dispatcher<Self>;
}
#[doc(hidden)]
pub trait Dispatchable {}
impl<T> Dispatched for T
where
T: Agent,
<T as Agent>::Reach: Discoverer<Agent = T>,
<T as Agent>::Reach: Dispatchable,
{
fn dispatcher() -> Dispatcher<T> {
Dispatcher(Self::Reach::spawn_or_join(None))
}
}

View File

@ -1,165 +0,0 @@
mod private;
mod public;
mod queue;
pub use private::{Private, PrivateAgent};
pub use public::{Public, PublicAgent};
use super::*;
use js_sys::{Array, Reflect, Uint8Array};
use serde::{Deserialize, Serialize};
use wasm_bindgen::{closure::Closure, JsCast, JsValue, UnwrapThrowExt};
use web_sys::{
Blob, BlobPropertyBag, DedicatedWorkerGlobalScope, MessageEvent, Url, Worker, WorkerOptions,
};
pub(crate) struct WorkerResponder {}
impl<AGN> Responder<AGN> for WorkerResponder
where
AGN: Agent,
<AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
<AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
{
fn respond(&self, id: HandlerId, output: AGN::Output) {
let msg = FromWorker::ProcessOutput(id, output);
let data = msg.pack();
worker_self().post_message_vec(data);
}
}
/// Message packager, based on serde::Serialize/Deserialize
pub trait Packed {
/// Pack serializable message into Vec<u8>
fn pack(&self) -> Vec<u8>;
/// Unpack deserializable message of byte slice
fn unpack(data: &[u8]) -> Self;
}
impl<T: Serialize + for<'de> Deserialize<'de>> Packed for T {
fn pack(&self) -> Vec<u8> {
bincode::serialize(&self).expect("can't serialize an agent message")
}
fn unpack(data: &[u8]) -> Self {
bincode::deserialize(data).expect("can't deserialize an agent message")
}
}
/// Serializable messages to worker
#[derive(Serialize, Deserialize, Debug)]
enum ToWorker<T> {
/// Client is connected
Connected(HandlerId),
/// Incoming message to Worker
ProcessInput(HandlerId, T),
/// Client is disconnected
Disconnected(HandlerId),
/// Worker should be terminated
Destroy,
}
/// Serializable messages sent by worker to consumer
#[derive(Serialize, Deserialize, Debug)]
enum FromWorker<T> {
/// Worker sends this message when `wasm` bundle has loaded.
WorkerLoaded,
/// Outgoing message to consumer
ProcessOutput(HandlerId, T),
}
fn send_to_remote<AGN>(worker: &Worker, msg: ToWorker<AGN::Input>)
where
AGN: Agent,
<AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
<AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
{
let msg = msg.pack();
worker.post_message_vec(msg);
}
fn worker_new(name_of_resource: &str, resource_is_relative: bool, is_module: bool) -> Worker {
let origin = gloo_utils::document()
.location()
.unwrap_throw()
.origin()
.unwrap_throw();
let pathname = gloo_utils::window().location().pathname().unwrap_throw();
let prefix = if resource_is_relative {
pathname
.rfind(|c| c == '/')
.map(|i| &pathname[..i])
.unwrap_or_default()
} else {
""
};
let script_url = format!("{}{}/{}", origin, prefix, name_of_resource);
let wasm_url = format!(
"{}{}/{}",
origin,
prefix,
name_of_resource.replace(".js", "_bg.wasm")
);
let array = Array::new();
array.push(
&format!(
r#"importScripts("{}");wasm_bindgen("{}");"#,
script_url, wasm_url
)
.into(),
);
let blob = Blob::new_with_str_sequence_and_options(
&array,
BlobPropertyBag::new().type_("application/javascript"),
)
.unwrap();
let url = Url::create_object_url_with_blob(&blob).unwrap();
if is_module {
let options = WorkerOptions::new();
Reflect::set(
options.as_ref(),
&JsValue::from_str("type"),
&JsValue::from_str("module"),
)
.unwrap();
Worker::new_with_options(&url, &options).expect("failed to spawn worker")
} else {
Worker::new(&url).expect("failed to spawn worker")
}
}
fn worker_self() -> DedicatedWorkerGlobalScope {
JsValue::from(js_sys::global()).into()
}
trait WorkerExt {
fn set_onmessage_closure(&self, handler: impl 'static + Fn(Vec<u8>));
fn post_message_vec(&self, data: Vec<u8>);
}
macro_rules! worker_ext_impl {
($($type:ident),+) => {$(
impl WorkerExt for $type {
fn set_onmessage_closure(&self, handler: impl 'static + Fn(Vec<u8>)) {
let handler = move |message: MessageEvent| {
let data = Uint8Array::from(message.data()).to_vec();
handler(data);
};
let closure = Closure::wrap(Box::new(handler) as Box<dyn Fn(MessageEvent)>);
self.set_onmessage(Some(closure.as_ref().unchecked_ref()));
closure.forget();
}
fn post_message_vec(&self, data: Vec<u8>) {
self.post_message(&Uint8Array::from(data.as_slice()))
.expect("failed to post message");
}
}
)+};
}
worker_ext_impl! {
Worker, DedicatedWorkerGlobalScope
}

View File

@ -1,210 +0,0 @@
use super::*;
use queue::Queue;
use std::cell::RefCell;
use std::fmt;
use std::marker::PhantomData;
use std::rc::Rc;
use std::sync::atomic::{AtomicUsize, Ordering};
use web_sys::Worker;
use yew::callback::Callback;
thread_local! {
static QUEUE: Queue<usize> = Queue::new();
}
static PRIVATE_ID_COUNTER: AtomicUsize = AtomicUsize::new(0);
const SINGLETON_ID: HandlerId = HandlerId(0, true);
/// Create a new instance for every bridge.
#[allow(missing_debug_implementations)]
pub struct Private<AGN> {
_agent: PhantomData<AGN>,
}
/// A trait to enable private agents being registered in a web worker.
pub trait PrivateAgent {
/// Executes an agent in the current environment.
/// Uses in `main` function of a worker.
fn register();
}
impl<AGN> PrivateAgent for AGN
where
AGN: Agent<Reach = Private<AGN>>,
<AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
<AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
{
fn register() {
let scope = AgentScope::<AGN>::new();
let responder = WorkerResponder {};
let link = AgentLink::connect(&scope, responder);
let upd = AgentLifecycleEvent::Create(link);
scope.send(upd);
let handler = move |data: Vec<u8>| {
let msg = ToWorker::<AGN::Input>::unpack(&data);
match msg {
ToWorker::Connected(_id) => {
let upd = AgentLifecycleEvent::Connected(SINGLETON_ID);
scope.send(upd);
}
ToWorker::ProcessInput(_id, value) => {
let upd = AgentLifecycleEvent::Input(value, SINGLETON_ID);
scope.send(upd);
}
ToWorker::Disconnected(_id) => {
let upd = AgentLifecycleEvent::Disconnected(SINGLETON_ID);
scope.send(upd);
}
ToWorker::Destroy => {
let upd = AgentLifecycleEvent::Destroy;
scope.send(upd);
// Terminates web worker
worker_self().close();
}
}
};
let loaded: FromWorker<AGN::Output> = FromWorker::WorkerLoaded;
let loaded = loaded.pack();
let worker = worker_self();
worker.set_onmessage_closure(handler);
worker.post_message_vec(loaded);
}
}
impl<AGN> Discoverer for Private<AGN>
where
AGN: Agent,
<AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
<AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
{
type Agent = AGN;
fn spawn_or_join(callback: Option<Callback<AGN::Output>>) -> Box<dyn Bridge<AGN>> {
let id = PRIVATE_ID_COUNTER.fetch_add(1, Ordering::Relaxed);
let callback = callback.expect("Callback required for Private agents");
let handler = move |data: Vec<u8>, worker: &Worker| {
let msg = FromWorker::<AGN::Output>::unpack(&data);
match msg {
FromWorker::WorkerLoaded => {
QUEUE.with(|queue| {
queue.insert_loaded_agent(id);
if let Some(msgs) = queue.remove_msg_queue(&id) {
for msg in msgs {
worker.post_message_vec(msg)
}
}
});
}
FromWorker::ProcessOutput(id, output) => {
assert_eq!(id.raw_id(), SINGLETON_ID.raw_id());
callback.emit(output);
}
}
};
let name_of_resource = AGN::name_of_resource();
let is_relative = AGN::resource_path_is_relative();
let handler_cell = Rc::new(RefCell::new(Some(handler)));
let worker = {
let handler_cell = handler_cell.clone();
let worker = worker_new(name_of_resource, is_relative, AGN::is_module());
let worker_clone = worker.clone();
worker.set_onmessage_closure(move |data: Vec<u8>| {
if let Some(handler) = handler_cell.borrow().as_ref() {
handler(data, &worker_clone)
}
});
worker
};
let bridge = PrivateBridge {
handler_cell,
worker,
_agent: PhantomData,
id,
};
bridge.send_message(ToWorker::Connected(SINGLETON_ID));
Box::new(bridge)
}
}
/// A connection manager for components interaction with workers.
pub struct PrivateBridge<AGN, HNDL>
where
AGN: Agent,
<AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
<AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
HNDL: Fn(Vec<u8>, &Worker),
{
handler_cell: Rc<RefCell<Option<HNDL>>>,
worker: Worker,
_agent: PhantomData<AGN>,
id: usize,
}
impl<AGN, HNDL> PrivateBridge<AGN, HNDL>
where
AGN: Agent,
<AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
<AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
HNDL: Fn(Vec<u8>, &Worker),
{
/// Send a message to the worker, queuing the message if necessary
fn send_message(&self, msg: ToWorker<AGN::Input>) {
QUEUE.with(|queue| {
if queue.is_worker_loaded(&self.id) {
send_to_remote::<AGN>(&self.worker, msg);
} else {
queue.add_msg_to_queue(msg.pack(), self.id);
}
});
}
}
impl<AGN, HNDL> fmt::Debug for PrivateBridge<AGN, HNDL>
where
AGN: Agent,
<AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
<AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
HNDL: Fn(Vec<u8>, &Worker),
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("PrivateBridge<_>")
}
}
impl<AGN, HNDL> Bridge<AGN> for PrivateBridge<AGN, HNDL>
where
AGN: Agent,
<AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
<AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
HNDL: Fn(Vec<u8>, &Worker),
{
fn send(&mut self, msg: AGN::Input) {
let msg = ToWorker::ProcessInput(SINGLETON_ID, msg);
self.send_message(msg);
}
}
impl<AGN, HNDL> Drop for PrivateBridge<AGN, HNDL>
where
AGN: Agent,
<AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
<AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
HNDL: Fn(Vec<u8>, &Worker),
{
fn drop(&mut self) {
let disconnected = ToWorker::Disconnected(SINGLETON_ID);
send_to_remote::<AGN>(&self.worker, disconnected);
let destroy = ToWorker::Destroy;
send_to_remote::<AGN>(&self.worker, destroy);
self.handler_cell.borrow_mut().take();
QUEUE.with(|queue| {
queue.remove_agent(&self.id);
});
}
}

View File

@ -1,274 +0,0 @@
use super::WorkerExt;
use super::*;
use anymap2::{self, AnyMap};
use queue::Queue;
use slab::Slab;
use std::any::TypeId;
use std::cell::RefCell;
use std::fmt;
use std::marker::PhantomData;
use std::rc::Rc;
use web_sys::Worker;
use yew::callback::Callback;
use yew::scheduler::Shared;
thread_local! {
static REMOTE_AGENTS_POOL: RefCell<AnyMap> = RefCell::new(AnyMap::new());
static QUEUE: Queue<TypeId> = Queue::new();
}
/// Create a single instance in a tab.
#[allow(missing_debug_implementations)]
pub struct Public<AGN> {
_agent: PhantomData<AGN>,
}
impl<AGN> Discoverer for Public<AGN>
where
AGN: Agent,
<AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
<AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
{
type Agent = AGN;
fn spawn_or_join(callback: Option<Callback<AGN::Output>>) -> Box<dyn Bridge<AGN>> {
let bridge = REMOTE_AGENTS_POOL.with(|pool| {
let mut pool = pool.borrow_mut();
match pool.entry::<RemoteAgent<AGN>>() {
anymap2::Entry::Occupied(mut entry) => entry.get_mut().create_bridge(callback),
anymap2::Entry::Vacant(entry) => {
let slab: Shared<Slab<Option<Callback<AGN::Output>>>> =
Rc::new(RefCell::new(Slab::new()));
let handler = {
let slab = slab.clone();
move |data: Vec<u8>, worker: &Worker| {
let msg = FromWorker::<AGN::Output>::unpack(&data);
match msg {
FromWorker::WorkerLoaded => {
QUEUE.with(|queue| {
queue.insert_loaded_agent(TypeId::of::<AGN>());
if let Some(msgs) =
queue.remove_msg_queue(&TypeId::of::<AGN>())
{
for msg in msgs {
worker.post_message_vec(msg)
}
}
});
}
FromWorker::ProcessOutput(id, output) => {
locate_callback_and_respond::<AGN>(&slab, id, output);
}
}
}
};
let name_of_resource = AGN::name_of_resource();
let is_relative = AGN::resource_path_is_relative();
let worker = {
let worker = worker_new(name_of_resource, is_relative, AGN::is_module());
let worker_clone = worker.clone();
worker.set_onmessage_closure(move |data: Vec<u8>| {
handler(data, &worker_clone);
});
worker
};
let launched = RemoteAgent::new(worker, slab);
entry.insert(launched).create_bridge(callback)
}
}
});
Box::new(bridge)
}
}
impl<AGN> Dispatchable for Public<AGN>
where
AGN: Agent,
<AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
<AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
{
}
/// A connection manager for components interaction with workers.
pub struct PublicBridge<AGN>
where
AGN: Agent,
<AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
<AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
{
worker: Worker,
id: HandlerId,
_agent: PhantomData<AGN>,
}
impl<AGN> fmt::Debug for PublicBridge<AGN>
where
AGN: Agent,
<AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
<AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("PublicBridge<_>")
}
}
impl<AGN> PublicBridge<AGN>
where
AGN: Agent,
<AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
<AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
{
/// Send a message to the worker, queuing the message if necessary
fn send_message(&self, msg: ToWorker<AGN::Input>) {
QUEUE.with(|queue| {
if queue.is_worker_loaded(&TypeId::of::<AGN>()) {
send_to_remote::<AGN>(&self.worker, msg);
} else {
queue.add_msg_to_queue(msg.pack(), TypeId::of::<AGN>());
}
});
}
}
impl<AGN> Bridge<AGN> for PublicBridge<AGN>
where
AGN: Agent,
<AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
<AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
{
fn send(&mut self, msg: AGN::Input) {
let msg = ToWorker::ProcessInput(self.id, msg);
self.send_message(msg);
}
}
impl<AGN> Drop for PublicBridge<AGN>
where
AGN: Agent,
<AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
<AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
{
fn drop(&mut self) {
let terminate_worker = REMOTE_AGENTS_POOL.with(|pool| {
let mut pool = pool.borrow_mut();
let terminate_worker = {
if let Some(launched) = pool.get_mut::<RemoteAgent<AGN>>() {
launched.remove_bridge(self)
} else {
false
}
};
if terminate_worker {
pool.remove::<RemoteAgent<AGN>>();
}
terminate_worker
});
let disconnected = ToWorker::Disconnected(self.id);
self.send_message(disconnected);
if terminate_worker {
let destroy = ToWorker::Destroy;
self.send_message(destroy);
QUEUE.with(|queue| {
queue.remove_agent(&TypeId::of::<AGN>());
});
}
}
}
/// A trait to enable public agents being registered in a web worker.
pub trait PublicAgent {
/// Executes an agent in the current environment.
/// Uses in `main` function of a worker.
fn register();
}
impl<AGN> PublicAgent for AGN
where
AGN: Agent<Reach = Public<AGN>>,
<AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
<AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
{
fn register() {
let scope = AgentScope::<AGN>::new();
let responder = WorkerResponder {};
let link = AgentLink::connect(&scope, responder);
let upd = AgentLifecycleEvent::Create(link);
scope.send(upd);
let handler = move |data: Vec<u8>| {
let msg = ToWorker::<AGN::Input>::unpack(&data);
match msg {
ToWorker::Connected(id) => {
let upd = AgentLifecycleEvent::Connected(id);
scope.send(upd);
}
ToWorker::ProcessInput(id, value) => {
let upd = AgentLifecycleEvent::Input(value, id);
scope.send(upd);
}
ToWorker::Disconnected(id) => {
let upd = AgentLifecycleEvent::Disconnected(id);
scope.send(upd);
}
ToWorker::Destroy => {
let upd = AgentLifecycleEvent::Destroy;
scope.send(upd);
// Terminates web worker
worker_self().close();
}
}
};
let loaded: FromWorker<AGN::Output> = FromWorker::WorkerLoaded;
let loaded = loaded.pack();
let worker = worker_self();
worker.set_onmessage_closure(handler);
worker.post_message_vec(loaded);
}
}
struct RemoteAgent<AGN>
where
AGN: Agent,
<AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
<AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
{
worker: Worker,
slab: SharedOutputSlab<AGN>,
}
impl<AGN> RemoteAgent<AGN>
where
AGN: Agent,
<AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
<AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
{
pub fn new(worker: Worker, slab: SharedOutputSlab<AGN>) -> Self {
RemoteAgent { worker, slab }
}
fn create_bridge(&mut self, callback: Option<Callback<AGN::Output>>) -> PublicBridge<AGN> {
let respondable = callback.is_some();
let mut slab = self.slab.borrow_mut();
let id: usize = slab.insert(callback);
let id = HandlerId::new(id, respondable);
let bridge = PublicBridge {
worker: self.worker.clone(),
id,
_agent: PhantomData,
};
bridge.send_message(ToWorker::Connected(bridge.id));
bridge
}
fn remove_bridge(&mut self, bridge: &PublicBridge<AGN>) -> Last {
let mut slab = self.slab.borrow_mut();
let _ = slab.remove(bridge.id.raw_id());
slab.is_empty()
}
}

View File

@ -1,52 +0,0 @@
use std::cell::RefCell;
use std::collections::{hash_map, HashMap, HashSet};
use std::hash::Hash;
/// Thread-local instance used to queue worker messages
pub struct Queue<T: Eq + Hash> {
loaded_agents: RefCell<HashSet<T>>,
msg_queue: RefCell<HashMap<T, Vec<Vec<u8>>>>,
}
impl<T: Eq + Hash> Queue<T> {
pub fn new() -> Queue<T> {
Queue {
loaded_agents: RefCell::new(HashSet::new()),
msg_queue: RefCell::new(HashMap::new()),
}
}
#[inline]
pub fn remove_msg_queue(&self, id: &T) -> Option<Vec<Vec<u8>>> {
self.msg_queue.borrow_mut().remove(id)
}
#[inline]
pub fn insert_loaded_agent(&self, id: T) {
self.loaded_agents.borrow_mut().insert(id);
}
#[inline]
pub fn is_worker_loaded(&self, id: &T) -> bool {
self.loaded_agents.borrow().contains(id)
}
pub fn add_msg_to_queue(&self, msg: Vec<u8>, id: T) {
let mut queue = self.msg_queue.borrow_mut();
match queue.entry(id) {
hash_map::Entry::Vacant(record) => {
record.insert(vec![msg]);
}
hash_map::Entry::Occupied(ref mut record) => {
record.get_mut().push(msg);
}
}
}
/// This is called by a worker's `Drop` implementation in order to remove the worker from the list
/// of loaded workers.
pub fn remove_agent(&self, id: &T) {
self.loaded_agents.borrow_mut().remove(id);
self.msg_queue.borrow_mut().remove(id);
}
}

View File

@ -13,7 +13,7 @@ impl<T> ImplicitClone for Rc<T> {}
impl ImplicitClone for NodeRef {}
impl<Comp: Component> ImplicitClone for Scope<Comp> {}
// TODO there are still a few missing like AgentScope
// TODO there are still a few missing
/// A trait similar to `Into<T>` which allows conversion to a value of a `Properties` struct.
pub trait IntoPropValue<T> {