diff --git a/ci/feature-soundness-release.sh b/ci/feature-soundness-release.sh index 6de18c3ab..0fd501fba 100755 --- a/ci/feature-soundness-release.sh +++ b/ci/feature-soundness-release.sh @@ -18,15 +18,3 @@ cargo clippy --release --no-default-features --features default,ssr -- --deny=wa cargo clippy --release --no-default-features --features csr,default,ssr -- --deny=warnings cargo clippy --release --no-default-features --features hydration,ssr -- --deny=warnings cargo clippy --release --no-default-features --features default,hydration,ssr -- --deny=warnings -cargo clippy --release --no-default-features --features tokio -- --deny=warnings -cargo clippy --release --no-default-features --features csr,tokio -- --deny=warnings -cargo clippy --release --no-default-features --features default,tokio -- --deny=warnings -cargo clippy --release --no-default-features --features csr,default,tokio -- --deny=warnings -cargo clippy --release --no-default-features --features hydration,tokio -- --deny=warnings -cargo clippy --release --no-default-features --features default,hydration,tokio -- --deny=warnings -cargo clippy --release --no-default-features --features ssr,tokio -- --deny=warnings -cargo clippy --release --no-default-features --features csr,ssr,tokio -- --deny=warnings -cargo clippy --release --no-default-features --features default,ssr,tokio -- --deny=warnings -cargo clippy --release --no-default-features --features csr,default,ssr,tokio -- --deny=warnings -cargo clippy --release --no-default-features --features hydration,ssr,tokio -- --deny=warnings -cargo clippy --release --no-default-features --features default,hydration,ssr,tokio -- --deny=warnings diff --git a/ci/feature-soundness.sh b/ci/feature-soundness.sh index 5686b2fa3..b23b2592f 100755 --- a/ci/feature-soundness.sh +++ b/ci/feature-soundness.sh @@ -18,15 +18,3 @@ cargo clippy --no-default-features --features default,ssr -- --deny=warnings cargo clippy --no-default-features --features csr,default,ssr -- --deny=warnings cargo clippy --no-default-features --features hydration,ssr -- --deny=warnings cargo clippy --no-default-features --features default,hydration,ssr -- --deny=warnings -cargo clippy --no-default-features --features tokio -- --deny=warnings -cargo clippy --no-default-features --features csr,tokio -- --deny=warnings -cargo clippy --no-default-features --features default,tokio -- --deny=warnings -cargo clippy --no-default-features --features csr,default,tokio -- --deny=warnings -cargo clippy --no-default-features --features hydration,tokio -- --deny=warnings -cargo clippy --no-default-features --features default,hydration,tokio -- --deny=warnings -cargo clippy --no-default-features --features ssr,tokio -- --deny=warnings -cargo clippy --no-default-features --features csr,ssr,tokio -- --deny=warnings -cargo clippy --no-default-features --features default,ssr,tokio -- --deny=warnings -cargo clippy --no-default-features --features csr,default,ssr,tokio -- --deny=warnings -cargo clippy --no-default-features --features hydration,ssr,tokio -- --deny=warnings -cargo clippy --no-default-features --features default,hydration,ssr,tokio -- --deny=warnings diff --git a/examples/Cargo.lock b/examples/Cargo.lock index 96ad3592f..bd384fb69 100644 --- a/examples/Cargo.lock +++ b/examples/Cargo.lock @@ -1595,6 +1595,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pinned" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a829027bd95e54cfe13e3e258a1ae7b645960553fb82b75ff852c29688ee595b" +dependencies = [ + "futures 0.3.24", + "rustversion", + "thiserror", +] + [[package]] name = "pkg-config" version = "0.3.25" @@ -2115,18 +2126,18 @@ checksum = "b1141d4d61095b28419e22cb0bbf02755f5e54e0526f97f1e3d1d160e60885fb" [[package]] name = "thiserror" -version = "1.0.34" +version = "1.0.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c1b05ca9d106ba7d2e31a9dab4a64e7be2cce415321966ea3132c49a656e252" +checksum = "10deb33631e3c9018b9baf9dcbbc4f737320d2b576bac10f6aefa048fa407e3e" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.34" +version = "1.0.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8f2591983642de85c921015f3f070c665a197ed69e417af436115e3a1407487" +checksum = "982d17546b47146b28f7c22e3d08465f6b8903d0ea13c1660d9d84a6e7adcdbb" dependencies = [ "proc-macro2", "quote", @@ -2744,6 +2755,7 @@ dependencies = [ "num_cpus", "once_cell", "pin-project", + "pinned", "serde", "slab", "thiserror", diff --git a/examples/futures/Cargo.toml b/examples/futures/Cargo.toml index 7d2b90a09..1ce604b85 100644 --- a/examples/futures/Cargo.toml +++ b/examples/futures/Cargo.toml @@ -9,7 +9,7 @@ license = "MIT OR Apache-2.0" pulldown-cmark = { version = "0.9", default-features = false } wasm-bindgen = "0.2" wasm-bindgen-futures = "0.4" -yew = { path = "../../packages/yew", features = ["tokio", "csr"] } +yew = { path = "../../packages/yew", features = ["csr"] } gloo = "0.8" [dependencies.web-sys] diff --git a/examples/js_callback/Cargo.toml b/examples/js_callback/Cargo.toml index 9e3bceb03..0801247f8 100644 --- a/examples/js_callback/Cargo.toml +++ b/examples/js_callback/Cargo.toml @@ -7,7 +7,7 @@ license = "MIT OR Apache-2.0" [dependencies] wasm-bindgen = "0.2" -yew = { path = "../../packages/yew", features = ["csr", "tokio"] } +yew = { path = "../../packages/yew", features = ["csr"] } wasm-bindgen-futures = "0.4" js-sys = "0.3" once_cell = "1" diff --git a/examples/simple_ssr/Cargo.toml b/examples/simple_ssr/Cargo.toml index 4bb1b3b54..5265df686 100644 --- a/examples/simple_ssr/Cargo.toml +++ b/examples/simple_ssr/Cargo.toml @@ -32,4 +32,4 @@ clap = { version = "3.1.7", features = ["derive"] } [features] hydration = ["yew/hydration"] -ssr = ["yew/ssr", "yew/tokio"] +ssr = ["yew/ssr"] diff --git a/examples/ssr_router/Cargo.toml b/examples/ssr_router/Cargo.toml index 10104835c..ca20bb14b 100644 --- a/examples/ssr_router/Cargo.toml +++ b/examples/ssr_router/Cargo.toml @@ -34,5 +34,5 @@ hyper = { version = "0.14", features = ["server", "http1"] } jemallocator = "0.5" [features] -ssr = ["yew/ssr", "yew/tokio"] +ssr = ["yew/ssr"] hydration = ["yew/hydration"] diff --git a/packages/yew/Cargo.toml b/packages/yew/Cargo.toml index e3778ac91..1d5c6f72d 100644 --- a/packages/yew/Cargo.toml +++ b/packages/yew/Cargo.toml @@ -25,16 +25,23 @@ slab = "0.4" wasm-bindgen = "0.2" yew-macro = { version = "^0.19.0", path = "../yew-macro" } thiserror = "1.0" -futures = { version = "0.3", default-features = false, features = ["std", "async-await"] } +futures = { version = "0.3", default-features = false, features = ["std"] } html-escape = { version = "0.2.9", optional = true } implicit-clone = { version = "0.3", features = ["map"] } base64ct = { version = "1.5.0", features = ["std"], optional = true } bincode = { version = "1.3.3", optional = true } serde = { version = "1", features = ["derive"] } tracing = "0.1.36" -pin-project = "1.0.11" +prokio = "0.1.0" rustversion = "1" +[target.'cfg(target_arch = "wasm32")'.dependencies] +wasm-bindgen-futures = "0.4" + +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +# We still need tokio as we have docs linked to it. +tokio = { version = "1.19", features = ["rt"] } + [dependencies.web-sys] version = "^0.3.59" features = [ @@ -70,16 +77,8 @@ features = [ "SubmitEvent" ] -[target.'cfg(target_arch = "wasm32")'.dependencies] -# we move it here so no promise-based spawn_local can present for -# non-wasm32 targets. -wasm-bindgen-futures = "0.4" - -[target.'cfg(not(target_arch = "wasm32"))'.dependencies] -num_cpus = { version = "1.13", optional = true } -once_cell = "1" -tokio = { version = "1.21.1", features = ["rt", "time"], optional = true } -tokio-stream = { version = "0.1", features = ["time"], optional = true } +[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] +tokio = { version = "1.19", features = ["full"] } [dev-dependencies] wasm-bindgen-test = "0.3" @@ -95,15 +94,11 @@ features = [ ] [features] -tokio = ["dep:tokio", "dep:num_cpus", "dep:tokio-stream"] ssr = ["dep:html-escape", "dep:base64ct", "dep:bincode"] csr = [] hydration = ["csr", "dep:bincode"] default = [] -[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] -tokio = { version = "1.19", features = ["full"] } - [package.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "documenting"] diff --git a/packages/yew/Makefile.toml b/packages/yew/Makefile.toml index da4903e1b..643c4b948 100644 --- a/packages/yew/Makefile.toml +++ b/packages/yew/Makefile.toml @@ -1,6 +1,6 @@ [tasks.native-test] command = "cargo" -args = ["test", "--features", "csr,ssr,hydration,tokio"] +args = ["test", "--features", "csr,ssr,hydration"] [tasks.wasm-test] command = "wasm-pack" diff --git a/packages/yew/src/html/component/scope.rs b/packages/yew/src/html/component/scope.rs index 0c8ae202e..bca6f17e2 100644 --- a/packages/yew/src/html/component/scope.rs +++ b/packages/yew/src/html/component/scope.rs @@ -248,9 +248,9 @@ impl Scope { /// /// This method will not notify the component when the stream has been fully exhausted. If /// you want this feature, you can add an EOF message variant for your component and use - /// [`StreamExt::chain`] and [`stream::once`] to chain an EOF message to the original stream. - /// If your stream is produced by another crate, you can use [`StreamExt::map`] to transform - /// the stream's item type to the component message type. + /// [`StreamExt::chain`] and [`stream::once`](futures::stream::once) to chain an EOF message to + /// the original stream. If your stream is produced by another crate, you can use + /// [`StreamExt::map`] to transform the stream's item type to the component message type. pub fn send_stream(&self, stream: S) where M: Into, diff --git a/packages/yew/src/lib.rs b/packages/yew/src/lib.rs index 878eda98d..257d62aa2 100644 --- a/packages/yew/src/lib.rs +++ b/packages/yew/src/lib.rs @@ -26,7 +26,6 @@ //! - `csr`: Enables Client-side Rendering support and [`Renderer`]. Only enable this feature if you //! are making a Yew application (not a library). //! - `ssr`: Enables Server-side Rendering support and [`ServerRenderer`]. -//! - `tokio`: Enables future-based APIs on non-wasm32 targets with tokio runtime. //! - `hydration`: Enables Hydration support. //! //! ## Example diff --git a/packages/yew/src/platform.rs b/packages/yew/src/platform.rs new file mode 100644 index 000000000..20c35fba8 --- /dev/null +++ b/packages/yew/src/platform.rs @@ -0,0 +1,48 @@ +//! Yew's compatibility between JavaScript Runtime and Native Runtimes. +//! +//! This module is also published under the name [prokio] on crates.io. +//! +//! # Rationale +//! +//! When designing components and libraries that works on both WebAssembly targets backed by +//! JavaScript Runtime and non-WebAssembly targets with Native Runtimes. Developers usually face +//! challenges that requires applying multiple feature flags throughout their application: +//! +//! 1. Select I/O and timers that works with the target runtime. +//! 2. Native Runtimes usually require `Send` futures and WebAssembly types are usually `!Send`. +//! +//! # Implementation +//! +//! To alleviate these issues, Yew implements a single-threaded runtime that executes `?Send` +//! (`Send` or `!Send`) futures. +//! +//! On platforms with multi-threading support, Yew spawns multiple independent runtimes +//! proportional to the CPU core number. When tasks are spawned with a runtime handle, it will +//! randomly select a worker thread from the internal pool. All tasks spawned with `spawn_local` +//! will run on the same thread as the thread the task was running. When the runtime runs in a +//! WebAssembly target, all tasks will be scheduled on the main thread. +//! +//! This runtime is designed in favour of IO-bounded workload with similar runtime cost. +//! When running I/O workloads, it would produce a slightly better performance as tasks are +//! never moved to another thread. However, If a worker thread is busy, +//! other threads will not be able to steal tasks scheduled on the busy thread. +//! When you have a CPU-bounded task where CPU time is significantly +//! more expensive, it should be spawned with a dedicated thread (or Web Worker) and communicates +//! with the application using channels. +//! +//! Yew platform provides the following components: +//! +//! 1. A Task Scheduler that is capable of running non-Send tasks. +//! 2. A Timer that is compatible with the scheduler backend. +//! 3. Task Synchronisation Mechanisms. +//! +//! # Runtime Backend +//! +//! The Yew runtime is implemented with different runtimes depending on the target platform and can +//! use all features (timers / IO / task synchronisation) from the selected native runtime: +//! +//! - `wasm-bindgen-futures` (WebAssembly targets) +//! - `tokio` (non-WebAssembly targets) + +#[doc(inline)] +pub use prokio::*; diff --git a/packages/yew/src/platform/fmt/buffer.rs b/packages/yew/src/platform/fmt/buffer.rs deleted file mode 100644 index 28b88c839..000000000 --- a/packages/yew/src/platform/fmt/buffer.rs +++ /dev/null @@ -1,212 +0,0 @@ -use std::cell::UnsafeCell; -use std::fmt::{self, Write}; -use std::marker::PhantomData; -use std::rc::Rc; -use std::task::{Poll, Waker}; - -use futures::stream::{FusedStream, Stream}; - -static BUF_SIZE: usize = 1024; - -enum BufStreamState { - Ready, - Pending(Waker), - Done, -} - -struct Inner { - buf: String, - state: BufStreamState, - - // This type is not send or sync. - _marker: PhantomData>, -} - -impl Inner { - #[inline] - const fn new() -> Self { - Self { - buf: String::new(), - state: BufStreamState::Ready, - _marker: PhantomData, - } - } - - #[inline] - fn wake(&mut self) { - if let BufStreamState::Pending(ref waker) = self.state { - waker.wake_by_ref(); - self.state = BufStreamState::Ready; - } - } - - #[inline] - fn buf_reserve(&mut self) { - if self.buf.is_empty() { - self.buf.reserve(BUF_SIZE); - } - } -} - -impl Write for Inner { - fn write_str(&mut self, s: &str) -> fmt::Result { - if s.is_empty() { - return Ok(()); - } - - self.wake(); - if s.len() < BUF_SIZE { - self.buf_reserve(); - } - - self.buf.write_str(s) - } - - fn write_char(&mut self, c: char) -> fmt::Result { - self.wake(); - self.buf_reserve(); - - self.buf.write_char(c) - } - - fn write_fmt(&mut self, args: fmt::Arguments<'_>) -> fmt::Result { - self.wake(); - self.buf_reserve(); - - self.buf.write_fmt(args) - } -} - -/// An asynchronous [`String`] writer. -/// -/// This type implements [`fmt::Write`] and can be used with [`write!`] and [`writeln!`]. -pub(crate) struct BufWriter { - inner: Rc>, -} - -impl Write for BufWriter { - #[inline] - fn write_str(&mut self, s: &str) -> fmt::Result { - // SAFETY: - // - // We can acquire a mutable reference without checking as: - // - // - This type is !Sync and !Send. - // - This function is not used by any other functions that has access to the inner type. - // - The mutable reference is dropped at the end of this function. - let inner = unsafe { &mut *self.inner.get() }; - - inner.write_str(s) - } - - #[inline] - fn write_char(&mut self, c: char) -> fmt::Result { - // SAFETY: - // - // We can acquire a mutable reference without checking as: - // - // - This type is !Sync and !Send. - // - This function is not used by any other functions that has access to the inner type. - // - The mutable reference is dropped at the end of this function. - let inner = unsafe { &mut *self.inner.get() }; - - inner.write_char(c) - } - - #[inline] - fn write_fmt(&mut self, args: fmt::Arguments<'_>) -> fmt::Result { - // SAFETY: - // - // We can acquire a mutable reference without checking as: - // - // - This type is !Sync and !Send. - // - This function is not used by any other functions that has access to the inner type. - // - The mutable reference is dropped at the end of this function. - let inner = unsafe { &mut *self.inner.get() }; - - inner.write_fmt(args) - } -} - -impl Drop for BufWriter { - fn drop(&mut self) { - // SAFETY: - // - // We can acquire a mutable reference without checking as: - // - // - This type is !Sync and !Send. - // - This function is not used by any other functions that has access to the inner type. - // - The mutable reference is dropped at the end of this function. - let inner = unsafe { &mut *self.inner.get() }; - - inner.wake(); - inner.state = BufStreamState::Done; - } -} - -/// An asynchronous [`String`] reader. -pub(crate) struct BufReader { - inner: Rc>, -} - -impl Stream for BufReader { - type Item = String; - - fn poll_next( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - // SAFETY: - // - // We can acquire a mutable reference without checking as: - // - // - This type is !Sync and !Send. - // - This function is not used by any other functions that has access to the inner type. - // - The mutable reference is dropped at the end of this function. - let inner = unsafe { &mut *self.inner.get() }; - - if !inner.buf.is_empty() { - let buf = std::mem::take(&mut inner.buf); - return Poll::Ready(Some(buf)); - } - - if let BufStreamState::Done = inner.state { - return Poll::Ready(None); - } - - inner.state = BufStreamState::Pending(cx.waker().clone()); - Poll::Pending - } -} - -impl FusedStream for BufReader { - fn is_terminated(&self) -> bool { - // SAFETY: - // - // We can acquire a mutable reference without checking as: - // - // - This type is !Sync and !Send. - // - This function is not used by any other functions that has access to the inner type. - // - The mutable reference is dropped at the end of this function. - let inner = unsafe { &*self.inner.get() }; - - matches!( - (&inner.state, inner.buf.is_empty()), - (BufStreamState::Done, true) - ) - } -} - -/// Creates an asynchronous buffer that operates over String. -pub(crate) fn buffer() -> (BufWriter, BufReader) { - let inner = Rc::new(UnsafeCell::new(Inner::new())); - - let w = { - let inner = inner.clone(); - BufWriter { inner } - }; - - let r = BufReader { inner }; - - (w, r) -} diff --git a/packages/yew/src/platform/fmt/mod.rs b/packages/yew/src/platform/fmt/mod.rs deleted file mode 100644 index 9bba8a612..000000000 --- a/packages/yew/src/platform/fmt/mod.rs +++ /dev/null @@ -1,70 +0,0 @@ -//! Asynchronous utilities to work with `String`s. - -use std::future::Future; - -use futures::future::{self, MaybeDone}; -use futures::stream::{FusedStream, Stream}; -use futures::StreamExt; -use pin_project::pin_project; - -mod buffer; - -pub(crate) use buffer::{buffer, BufReader, BufWriter}; - -/// A buffered asynchronous [`String`] [`Stream`]. -/// -/// A BufStream combines a BufWriter - BufReader pair and a resolving future that writes to the -/// buffer and polls the future alongside the buffer. -#[pin_project] -pub(crate) struct BufStream -where - F: Future, -{ - #[pin] - resolver: MaybeDone, - inner: BufReader, -} - -impl BufStream -where - F: Future, -{ - /// Creates a `BufStream`. - pub fn new(f: C) -> Self - where - C: FnOnce(BufWriter) -> F, - { - let (w, r) = buffer(); - let resolver = future::maybe_done(f(w)); - - BufStream { inner: r, resolver } - } -} - -impl Stream for BufStream -where - F: Future, -{ - type Item = String; - - #[inline] - fn poll_next( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - let this = self.project(); - let _ = this.resolver.poll(cx); - - this.inner.poll_next_unpin(cx) - } -} - -impl FusedStream for BufStream -where - F: Future, -{ - #[inline] - fn is_terminated(&self) -> bool { - self.inner.is_terminated() - } -} diff --git a/packages/yew/src/platform/mod.rs b/packages/yew/src/platform/mod.rs deleted file mode 100644 index 65b803e3d..000000000 --- a/packages/yew/src/platform/mod.rs +++ /dev/null @@ -1,190 +0,0 @@ -//! Compatibility between JavaScript Runtime and Native Runtimes. -//! -//! When designing components and libraries that works on both WebAssembly targets backed by -//! JavaScript Runtime and non-WebAssembly targets with Native Runtimes. Developers usually face -//! challenges that requires applying multiple feature flags throughout their application: -//! -//! 1. Select I/O and timers that works with the target runtime. -//! 2. Native Runtimes usually require `Send` futures and WebAssembly usually use `!Send` -//! primitives for better performance during Client-side Rendering. -//! -//! To alleviate these issues, Yew implements a single-threaded runtime that executes `?Send` -//! (`Send` or `!Send`) futures. When your application starts with `yew::Renderer` or is rendered by -//! `yew::ServerRenderer`, it is executed within the Yew runtime. On systems with multi-threading -//! support, it spawns multiple independent runtimes in a worker pool proportional to the CPU -//! core number. The renderer will randomly select a worker thread from the internal pool. All tasks -//! spawned with `spawn_local` in the application will run on the same thread as the -//! rendering thread the renderer has selected. When the renderer runs in a WebAssembly target, all -//! tasks will be scheduled on the main thread. -//! -//! This runtime is designed in favour of IO-bounded workload with similar runtime cost. It produces -//! better performance by pinning tasks to a single worker thread. However, this means that if a -//! worker thread is back-logged, other threads will not be able to "help" by running tasks -//! scheduled on the busy thread. When you have a CPU-bounded task where CPU time is significantly -//! more expensive than rendering tasks, it should be spawned with a dedicated thread or -//! `yew-agent` and communicates with the application using channels or agent bridges. -//! -//! # Runtime Backend -//! -//! Yew runtime is implemented with different runtimes depending on the target platform and can use -//! all features (timers / IO / task synchronisation) from the selected native runtime: -//! -//! - `wasm-bindgen-futures` (WebAssembly targets) -//! - `tokio` (non-WebAssembly targets) -//! -//! # Compatibility with other async runtimes -//! -//! Yew's ServerRenderer can also be executed in applications using other async runtimes(e.g.: -//! `async-std`). Rendering tasks will enter Yew runtime and be executed with `tokio`. When the -//! rendering task finishes, the result is returned to the original runtime. This process is -//! transparent to the future that executes the renderer. The Yew application still needs to use -//! `tokio`'s timer, IO and task synchronisation primitives. - -use std::future::Future; -use std::io::Result; -use std::marker::PhantomData; - -#[cfg(feature = "ssr")] -pub(crate) mod fmt; - -pub mod pinned; -pub mod time; - -#[cfg(target_arch = "wasm32")] -#[path = "rt_wasm_bindgen/mod.rs"] -mod imp; -#[cfg(all(not(target_arch = "wasm32"), feature = "tokio"))] -#[path = "rt_tokio/mod.rs"] -mod imp; -#[cfg(all(not(target_arch = "wasm32"), not(feature = "tokio")))] -#[path = "rt_none/mod.rs"] -mod imp; - -/// Spawns a task on current thread. -/// -/// # Panics -/// -/// This function will panic when not being executed from within a Yew Application. -#[inline(always)] -pub fn spawn_local(f: F) -where - F: Future + 'static, -{ - imp::spawn_local(f); -} - -/// A Runtime Builder. -#[derive(Debug)] -pub struct RuntimeBuilder { - worker_threads: usize, -} - -impl Default for RuntimeBuilder { - fn default() -> Self { - Self { - worker_threads: imp::get_default_runtime_size(), - } - } -} - -impl RuntimeBuilder { - /// Creates a new Runtime Builder. - pub fn new() -> Self { - Self::default() - } - - /// Sets the number of worker threads the Runtime will use. - /// - /// # Default - /// - /// The default number of worker threads is the number of available logical CPU cores. - /// - /// # Note - /// - /// This setting has no effect if current platform has no thread support (e.g.: WebAssembly). - pub fn worker_threads(&mut self, val: usize) -> &mut Self { - self.worker_threads = val; - - self - } - - /// Creates a Runtime. - pub fn build(&mut self) -> Result { - Ok(Runtime { - inner: imp::Runtime::new(self.worker_threads)?, - }) - } -} - -/// The Yew Runtime. -#[derive(Debug, Clone, Default)] -pub struct Runtime { - inner: imp::Runtime, -} - -impl Runtime { - /// Creates a runtime Builder. - pub fn builder() -> RuntimeBuilder { - RuntimeBuilder::new() - } - - /// Spawns a task with it pinned to a worker thread. - /// - /// This can be used to execute non-Send futures without blocking the current thread. - /// - /// [`spawn_local`] is available with tasks executed with `spawn_pinned`. - pub fn spawn_pinned(&self, create_task: F) - where - F: FnOnce() -> Fut, - F: Send + 'static, - Fut: Future + 'static, - { - self.inner.spawn_pinned(create_task); - } -} - -/// A Local Runtime Handle. -/// -/// This type can be used to acquire a runtime handle to spawn local tasks. -#[derive(Debug, Clone)] -pub struct LocalHandle { - inner: imp::LocalHandle, - // This type is not send or sync. - _marker: PhantomData<*const ()>, -} - -impl LocalHandle { - /// Creates a Handle to current Runtime worker. - /// - /// # Panics - /// - /// This method will panic if not called within Yew Runtime. - pub fn current() -> Self { - let inner = imp::LocalHandle::current(); - - Self { - inner, - _marker: PhantomData, - } - } - - /// Creates a Handle to current Runtime worker. - /// - /// This methods will return `None` if called from outside Yew Runtime. - pub fn try_current() -> Option { - let inner = imp::LocalHandle::try_current()?; - - Some(Self { - inner, - _marker: PhantomData, - }) - } - - /// Spawns a Future with current Runtime worker. - pub fn spawn_local(&self, f: F) - where - F: Future + 'static, - { - self.inner.spawn_local(f); - } -} diff --git a/packages/yew/src/platform/pinned/mod.rs b/packages/yew/src/platform/pinned/mod.rs deleted file mode 100644 index ee0eb9df2..000000000 --- a/packages/yew/src/platform/pinned/mod.rs +++ /dev/null @@ -1,6 +0,0 @@ -//! Task synchronisation primitives for pinned tasks. -//! -//! This module provides task synchronisation for `!Send` futures. - -pub mod mpsc; -pub mod oneshot; diff --git a/packages/yew/src/platform/pinned/mpsc.rs b/packages/yew/src/platform/pinned/mpsc.rs deleted file mode 100644 index ef920ac0c..000000000 --- a/packages/yew/src/platform/pinned/mpsc.rs +++ /dev/null @@ -1,401 +0,0 @@ -//! A multi-producer single-receiver channel. - -use std::cell::UnsafeCell; -use std::collections::VecDeque; -use std::marker::PhantomData; -use std::rc::Rc; -use std::task::{Poll, Waker}; - -use futures::sink::Sink; -use futures::stream::{FusedStream, Stream}; -use thiserror::Error; - -/// Error returned by [`try_next`](UnboundedReceiver::try_next). -#[derive(Error, Debug)] -#[error("queue is empty")] -pub struct TryRecvError { - _marker: PhantomData<()>, -} - -/// Error returned by [`send_now`](UnboundedSender::send_now). -#[derive(Error, Debug)] -#[error("failed to send")] -pub struct SendError { - /// The send value. - pub inner: T, -} - -/// Error returned by [`UnboundedSender`] when used as a [`Sink`](futures::sink::Sink). -#[derive(Error, Debug)] -#[error("failed to send")] -pub struct TrySendError { - _marker: PhantomData<()>, -} - -#[derive(Debug)] -struct Inner { - rx_waker: Option, - closed: bool, - sender_ctr: usize, - items: VecDeque, - - // This type is not send or sync. - _marker: PhantomData>, -} - -impl Inner { - fn close(&mut self) { - self.closed = true; - - if let Some(ref m) = self.rx_waker { - m.wake_by_ref(); - } - } -} - -/// The receiver of an unbounded mpsc channel. -#[derive(Debug)] -pub struct UnboundedReceiver { - inner: Rc>>, -} - -impl UnboundedReceiver { - /// Try to read the next value from the channel. - /// - /// This function will return: - /// - `Ok(Some(T))` if a value is ready. - /// - `Ok(None)` if the channel has become closed. - /// - `Err(TryRecvError)` if the channel is not closed and the channel is empty. - pub fn try_next(&self) -> std::result::Result, TryRecvError> { - // SAFETY: - // - // We can acquire a mutable reference without checking as: - // - // - This type is !Sync and !Send. - // - This function is not used by any other functions and hence uniquely owns the - // mutable reference. - // - The mutable reference is dropped at the end of this function. - let inner = unsafe { &mut *self.inner.get() }; - - match (inner.items.pop_front(), inner.closed) { - (Some(m), _) => Ok(Some(m)), - (None, false) => Ok(None), - (None, true) => Err(TryRecvError { - _marker: PhantomData, - }), - } - } -} - -impl Stream for UnboundedReceiver { - type Item = T; - - fn poll_next( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - // SAFETY: - // - // We can acquire a mutable reference without checking as: - // - // - This type is !Sync and !Send. - // - This function is not used by any other functions and hence uniquely owns the - // mutable reference. - // - The mutable reference is dropped at the end of this function. - let inner = unsafe { &mut *self.inner.get() }; - - match (inner.items.pop_front(), inner.closed) { - (Some(m), _) => Poll::Ready(Some(m)), - (None, false) => { - inner.rx_waker = Some(cx.waker().clone()); - Poll::Pending - } - (None, true) => Poll::Ready(None), - } - } -} - -impl FusedStream for UnboundedReceiver { - fn is_terminated(&self) -> bool { - // SAFETY: - // - // We can acquire a mutable reference without checking as: - // - // - This type is !Sync and !Send. - // - This function is not used by any other functions and hence uniquely owns the - // mutable reference. - // - The mutable reference is dropped at the end of this function. - let inner = unsafe { &*self.inner.get() }; - inner.items.is_empty() && inner.closed - } -} - -impl Drop for UnboundedReceiver { - fn drop(&mut self) { - // SAFETY: - // - // We can acquire a mutable reference without checking as: - // - // - This type is !Sync and !Send. - // - This function is not used by any other functions and hence uniquely owns the - // mutable reference. - // - The mutable reference is dropped at the end of this function. - let inner = unsafe { &mut *self.inner.get() }; - inner.close(); - } -} - -/// The sender of an unbounded mpsc channel. -#[derive(Debug)] -pub struct UnboundedSender { - inner: Rc>>, -} - -impl UnboundedSender { - /// Sends a value to the unbounded receiver. - pub fn send_now(&self, item: T) -> Result<(), SendError> { - // SAFETY: - // - // We can acquire a mutable reference without checking as: - // - // - This type is !Sync and !Send. - // - This function is not used by any function that have already acquired a mutable - // reference. - // - The mutable reference is dropped at the end of this function. - let inner = unsafe { &mut *self.inner.get() }; - - if inner.closed { - return Err(SendError { inner: item }); - } - - inner.items.push_back(item); - - if let Some(ref m) = inner.rx_waker { - m.wake_by_ref(); - } - - Ok(()) - } - - /// Closes the channel. - /// - /// Every sender (dropped or not) is considered closed when this method is called. - pub fn close_now(&self) { - // SAFETY: - // - // We can acquire a mutable reference without checking as: - // - // - This type is !Sync and !Send. - // - This function is not used by any function that have already acquired a mutable - // reference. - // - The mutable reference is dropped at the end of this function. - let inner = unsafe { &mut *self.inner.get() }; - inner.close(); - } -} - -impl Clone for UnboundedSender { - fn clone(&self) -> Self { - let self_ = Self { - inner: self.inner.clone(), - }; - - // SAFETY: - // - // We can acquire a mutable reference without checking as: - // - // - This type is !Sync and !Send. - // - This function is not used by any other functions and hence uniquely owns the - // mutable reference. - // - The mutable reference is dropped at the end of this function. - let inner = unsafe { &mut *self.inner.get() }; - inner.sender_ctr += 1; - - self_ - } -} - -impl Drop for UnboundedSender { - fn drop(&mut self) { - // SAFETY: - // - // We can acquire a mutable reference without checking as: - // - // - This type is !Sync and !Send. - // - This function is not used by any other functions and hence uniquely owns the - // mutable reference. - // - The mutable reference is dropped at the end of this function. - let inner = unsafe { &mut *self.inner.get() }; - - let sender_ctr = { - inner.sender_ctr -= 1; - inner.sender_ctr - }; - - if sender_ctr == 0 { - inner.close(); - } - } -} - -impl Sink for &'_ UnboundedSender { - type Error = TrySendError; - - fn start_send(self: std::pin::Pin<&mut Self>, item: T) -> Result<(), Self::Error> { - self.send_now(item).map_err(|_| TrySendError { - _marker: PhantomData, - }) - } - - fn poll_ready( - self: std::pin::Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, - ) -> Poll> { - let inner = unsafe { &*self.inner.get() }; - match inner.closed { - false => Poll::Ready(Ok(())), - true => Poll::Ready(Err(TrySendError { - _marker: PhantomData, - })), - } - } - - fn poll_flush( - self: std::pin::Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, - ) -> Poll> { - Poll::Ready(Ok(())) - } - - fn poll_close( - self: std::pin::Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, - ) -> Poll> { - self.close_now(); - - Poll::Ready(Ok(())) - } -} - -/// Creates an unbounded channel. -/// -/// # Note -/// -/// This channel has an infinite buffer and can run out of memory if the channel is not actively -/// drained. -pub fn unbounded() -> (UnboundedSender, UnboundedReceiver) { - let inner = Rc::new(UnsafeCell::new(Inner { - rx_waker: None, - closed: false, - - sender_ctr: 1, - items: VecDeque::new(), - _marker: PhantomData, - })); - - ( - UnboundedSender { - inner: inner.clone(), - }, - UnboundedReceiver { inner }, - ) -} - -#[cfg(not(target_arch = "wasm32"))] -#[cfg(feature = "tokio")] -#[cfg(test)] -mod tests { - use std::time::Duration; - - use futures::sink::SinkExt; - use futures::stream::StreamExt; - use tokio::task::LocalSet; - use tokio::test; - - use super::*; - use crate::platform::spawn_local; - use crate::platform::time::sleep; - - #[test] - async fn mpsc_works() { - let local_set = LocalSet::new(); - - local_set - .run_until(async { - let (tx, mut rx) = unbounded::(); - - spawn_local(async move { - for i in 0..10 { - (&tx).send(i).await.expect("failed to send."); - sleep(Duration::from_millis(1)).await; - } - }); - - for i in 0..10 { - let received = rx.next().await.expect("failed to receive"); - - assert_eq!(i, received); - } - - assert_eq!(rx.next().await, None); - }) - .await; - } - - #[test] - async fn mpsc_drops_receiver() { - let (tx, rx) = unbounded::(); - drop(rx); - - (&tx).send(0).await.expect_err("should fail to send."); - } - - #[test] - async fn mpsc_multi_sender() { - let local_set = LocalSet::new(); - - local_set - .run_until(async { - let (tx, mut rx) = unbounded::(); - - spawn_local(async move { - let tx2 = tx.clone(); - - for i in 0..10 { - if i % 2 == 0 { - (&tx).send(i).await.expect("failed to send."); - } else { - (&tx2).send(i).await.expect("failed to send."); - } - - sleep(Duration::from_millis(1)).await; - } - - drop(tx2); - - for i in 10..20 { - (&tx).send(i).await.expect("failed to send."); - - sleep(Duration::from_millis(1)).await; - } - }); - - for i in 0..20 { - let received = rx.next().await.expect("failed to receive"); - - assert_eq!(i, received); - } - - assert_eq!(rx.next().await, None); - }) - .await; - } - - #[test] - async fn mpsc_drops_sender() { - let (tx, mut rx) = unbounded::(); - drop(tx); - - assert_eq!(rx.next().await, None); - } -} diff --git a/packages/yew/src/platform/pinned/oneshot.rs b/packages/yew/src/platform/pinned/oneshot.rs deleted file mode 100644 index 84db209d9..000000000 --- a/packages/yew/src/platform/pinned/oneshot.rs +++ /dev/null @@ -1,225 +0,0 @@ -//! A one-time send - receive channel. - -use std::cell::UnsafeCell; -use std::future::Future; -use std::marker::PhantomData; -use std::rc::Rc; -use std::task::{Poll, Waker}; - -use thiserror::Error; - -/// Error returned by awaiting the [`Receiver`]. -#[derive(Debug, Error)] -#[error("channel has been closed.")] -pub struct RecvError { - _marker: PhantomData<()>, -} - -#[derive(Debug)] -struct Inner { - rx_waker: Option, - closed: bool, - item: Option, - - // This type is not send or sync. - _marker: PhantomData>, -} - -/// The receiver of a oneshot channel. -#[derive(Debug)] -pub struct Receiver { - inner: Rc>>, -} - -impl Future for Receiver { - type Output = Result; - - fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { - // SAFETY: - // - // We can acquire a mutable reference without checking as: - // - // - This type is !Sync and !Send. - // - This function is not used by any other functions and hence uniquely owns the - // mutable reference. - // - The mutable reference is dropped at the end of this function. - let inner = unsafe { &mut *self.inner.get() }; - - // Implementation Note: - // - // It might be neater to use a match pattern here. - // However, this will slow down the polling process by 10%. - - if let Some(m) = inner.item.take() { - return Poll::Ready(Ok(m)); - } - - if inner.closed { - return Poll::Ready(Err(RecvError { - _marker: PhantomData, - })); - } - - inner.rx_waker = Some(cx.waker().clone()); - Poll::Pending - } -} - -impl Drop for Receiver { - fn drop(&mut self) { - // SAFETY: - // - // We can acquire a mutable reference without checking as: - // - // - This type is !Sync and !Send. - // - This function is not used by any other functions and hence uniquely owns the - // mutable reference. - // - The mutable reference is dropped at the end of this function. - let inner = unsafe { &mut *self.inner.get() }; - inner.closed = true; - } -} - -/// The sender of a oneshot channel. -#[derive(Debug)] -pub struct Sender { - inner: Rc>>, -} - -impl Sender { - /// Send an item to the other side of the channel, consumes the sender. - pub fn send(self, item: T) -> Result<(), T> { - // SAFETY: - // - // We can acquire a mutable reference without checking as: - // - // - This type is !Sync and !Send. - // - This function is not used by any other functions and hence uniquely owns the - // mutable reference. - // - The mutable reference is dropped at the end of this function. - let inner = unsafe { &mut *self.inner.get() }; - - if inner.closed { - return Err(item); - } - - inner.item = Some(item); - - if let Some(ref m) = inner.rx_waker { - m.wake_by_ref(); - } - - Ok(()) - } -} - -impl Drop for Sender { - fn drop(&mut self) { - // SAFETY: - // - // We can acquire a mutable reference without checking as: - // - // - This type is !Sync and !Send. - // - This function is not used by any other functions and hence uniquely owns the - // mutable reference. - // - The mutable reference is dropped at the end of this function. - let inner = unsafe { &mut *self.inner.get() }; - - inner.closed = true; - - if inner.item.is_none() { - if let Some(ref m) = inner.rx_waker { - m.wake_by_ref(); - } - } - } -} - -/// Creates a oneshot channel. -pub fn channel() -> (Sender, Receiver) { - let inner = Rc::new(UnsafeCell::new(Inner { - rx_waker: None, - closed: false, - item: None, - - _marker: PhantomData, - })); - - ( - Sender { - inner: inner.clone(), - }, - Receiver { inner }, - ) -} - -#[cfg(not(target_arch = "wasm32"))] -#[cfg(feature = "tokio")] -#[cfg(test)] -mod tests { - use std::sync::Arc; - use std::time::Duration; - - use tokio::sync::Barrier; - use tokio::task::LocalSet; - use tokio::test; - - use super::*; - use crate::platform::spawn_local; - use crate::platform::time::sleep; - - #[test] - async fn oneshot_works() { - let (tx, rx) = channel(); - - tx.send(0).expect("failed to send."); - - assert_eq!(rx.await.expect("failed to receive."), 0); - } - - #[test] - async fn oneshot_drops_sender() { - let local_set = LocalSet::new(); - - local_set - .run_until(async { - let (tx, rx) = channel::(); - - spawn_local(async move { - sleep(Duration::from_millis(1)).await; - - drop(tx); - }); - rx.await.expect_err("successful to receive."); - }) - .await; - } - - #[test] - async fn oneshot_drops_receiver() { - let local_set = LocalSet::new(); - - local_set - .run_until(async { - let (tx, rx) = channel::(); - - let bar = Arc::new(Barrier::new(2)); - - { - let bar = bar.clone(); - spawn_local(async move { - sleep(Duration::from_millis(1)).await; - - drop(rx); - - bar.wait().await; - }); - } - - bar.wait().await; - - tx.send(0).expect_err("successful to send."); - }) - .await; - } -} diff --git a/packages/yew/src/platform/rt_none/mod.rs b/packages/yew/src/platform/rt_none/mod.rs deleted file mode 100644 index 2d2fc5586..000000000 --- a/packages/yew/src/platform/rt_none/mod.rs +++ /dev/null @@ -1,72 +0,0 @@ -use std::future::Future; -use std::io; -use std::marker::PhantomData; - -pub(crate) mod time; - -pub(crate) fn get_default_runtime_size() -> usize { - 0 -} - -static NO_RUNTIME_NOTICE: &str = r#"No runtime configured for this platform, \ - features that requires a runtime can't be used. \ - Either compile with `target_arch = "wasm32", or enable the `tokio` feature."#; - -fn panic_no_runtime() -> ! { - panic!("{}", NO_RUNTIME_NOTICE); -} - -#[inline(always)] -pub(super) fn spawn_local(_f: F) -where - F: Future + 'static, -{ - panic_no_runtime(); -} - -#[derive(Debug, Clone)] -pub(crate) struct Runtime {} - -impl Default for Runtime { - fn default() -> Self { - panic_no_runtime(); - } -} - -impl Runtime { - pub fn new(_size: usize) -> io::Result { - panic_no_runtime(); - } - - pub fn spawn_pinned(&self, _create_task: F) - where - F: FnOnce() -> Fut, - F: Send + 'static, - Fut: Future + 'static, - { - panic_no_runtime(); - } -} - -#[derive(Debug, Clone)] -pub(crate) struct LocalHandle { - // This type is not send or sync. - _marker: PhantomData<*const ()>, -} - -impl LocalHandle { - pub fn try_current() -> Option { - panic_no_runtime(); - } - - pub fn current() -> Self { - panic_no_runtime(); - } - - pub fn spawn_local(&self, _f: F) - where - F: Future + 'static, - { - panic_no_runtime(); - } -} diff --git a/packages/yew/src/platform/rt_none/time.rs b/packages/yew/src/platform/rt_none/time.rs deleted file mode 100644 index d15cbb18f..000000000 --- a/packages/yew/src/platform/rt_none/time.rs +++ /dev/null @@ -1,13 +0,0 @@ -use std::time::Duration; - -use futures::stream::LocalBoxStream; - -use super::panic_no_runtime; - -pub(crate) async fn sleep(_dur: Duration) { - panic_no_runtime(); -} - -pub(crate) fn interval(_dur: Duration) -> LocalBoxStream<'static, ()> { - panic_no_runtime(); -} diff --git a/packages/yew/src/platform/rt_tokio/local_worker.rs b/packages/yew/src/platform/rt_tokio/local_worker.rs deleted file mode 100644 index 90e561845..000000000 --- a/packages/yew/src/platform/rt_tokio/local_worker.rs +++ /dev/null @@ -1,205 +0,0 @@ -//! We use a local worker implementation that does not produce a JoinHandle for spawn_pinned. -//! This avoids the cost to acquire a JoinHandle. -//! -//! See: [tokio-rs/tokio#4819](https://github.com/tokio-rs/tokio/issues/4819) -//! -//! We will not be able to produce a meaningful JoinHandle until WebAssembly targets support -//! unwinding. - -use std::cell::RefCell; -use std::future::Future; -use std::marker::PhantomData; -use std::sync::Arc; -use std::{io, thread}; - -static DEFAULT_WORKER_NAME: &str = "yew-runtime-worker"; - -use std::sync::atomic::{AtomicUsize, Ordering}; - -use futures::channel::mpsc::UnboundedSender; -use futures::stream::StreamExt; -use tokio::task::{spawn_local, LocalSet}; - -type SpawnTask = Box; - -thread_local! { - static TASK_COUNT: RefCell>> = RefCell::new(None); - static LOCAL_SET: LocalSet = LocalSet::new(); -} - -pub(crate) struct LocalWorker { - task_count: Arc, - tx: UnboundedSender, -} - -impl LocalWorker { - pub fn new() -> io::Result { - let (tx, mut rx) = futures::channel::mpsc::unbounded::(); - - let task_count: Arc = Arc::default(); - - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build()?; - - { - let task_count = task_count.clone(); - thread::Builder::new() - .name(DEFAULT_WORKER_NAME.into()) - .spawn(move || { - TASK_COUNT.with(move |m| { - *m.borrow_mut() = Some(task_count); - }); - - LOCAL_SET.with(|local_set| { - local_set.block_on(&rt, async move { - while let Some(m) = rx.next().await { - m(); - } - }); - }); - })?; - } - - Ok(Self { task_count, tx }) - } - - pub fn task_count(&self) -> usize { - self.task_count.load(Ordering::Acquire) - } - - pub fn spawn_pinned(&self, f: F) - where - F: 'static + Send + FnOnce() -> Fut, - Fut: 'static + Future, - { - let guard = LocalJobCountGuard::new(self.task_count.clone()); - - // We ignore the result upon a failure, this can never happen unless the runtime is - // exiting which all instances of Runtime will be dropped at that time and hence cannot - // spawn pinned tasks. - let _ = self.tx.unbounded_send(Box::new(move || { - spawn_local(async move { - let _guard = guard; - - f().await; - }); - })); - } -} - -pub struct LocalJobCountGuard(Arc); - -impl LocalJobCountGuard { - fn new(inner: Arc) -> Self { - inner.fetch_add(1, Ordering::AcqRel); - LocalJobCountGuard(inner) - } -} - -impl Drop for LocalJobCountGuard { - fn drop(&mut self) { - self.0.fetch_sub(1, Ordering::AcqRel); - } -} - -#[derive(Debug, Clone)] -pub(crate) struct LocalHandle { - // This type is not send or sync. - _marker: PhantomData<*const ()>, - task_count: Arc, -} - -impl LocalHandle { - pub fn try_current() -> Option { - // We cache the handle to prevent borrowing RefCell. - thread_local! { - static LOCAL_HANDLE: Option = TASK_COUNT - .with(|m| m.borrow().clone()) - .map(|task_count| LocalHandle { task_count, _marker: PhantomData }); - } - - LOCAL_HANDLE.with(|m| m.clone()) - } - - pub fn current() -> Self { - Self::try_current().expect("outside of Yew runtime.") - } - - pub fn spawn_local(&self, f: F) - where - F: Future + 'static, - { - let guard = LocalJobCountGuard::new(self.task_count.clone()); - - LOCAL_SET.with(move |local_set| { - local_set.spawn_local(async move { - let _guard = guard; - - f.await; - }) - }); - } -} - -#[cfg(test)] -mod tests { - use std::time::Duration; - - use futures::channel::oneshot; - use tokio::test; - use tokio::time::timeout; - use yew::platform::Runtime; - - use super::*; - - #[test] - async fn test_local_handle_exists() { - assert!(LocalHandle::try_current().is_none()); - - let runtime = Runtime::default(); - let (tx, rx) = oneshot::channel(); - - runtime.spawn_pinned(move || async move { - tx.send(LocalHandle::try_current().is_some()) - .expect("failed to send"); - }); - - timeout(Duration::from_secs(5), rx) - .await - .expect("task timed out") - .expect("failed to receive"); - } - - #[test] - async fn test_local_handle_spawns_on_same_worker() { - assert!(LocalHandle::try_current().is_none()); - - let runtime = Runtime::default(); - let (tx1, rx1) = oneshot::channel(); - let (tx2, rx2) = oneshot::channel(); - - runtime.spawn_pinned(move || async move { - let handle = LocalHandle::current(); - - tx1.send(std::thread::current().id()) - .expect("failed to send"); - - handle.spawn_local(async move { - tx2.send(std::thread::current().id()) - .expect("failed to send"); - }) - }); - - let result1 = timeout(Duration::from_secs(5), rx1) - .await - .expect("task timed out") - .expect("failed to receive"); - let result2 = timeout(Duration::from_secs(5), rx2) - .await - .expect("task timed out") - .expect("failed to receive"); - - assert_eq!(result1, result2); - } -} diff --git a/packages/yew/src/platform/rt_tokio/mod.rs b/packages/yew/src/platform/rt_tokio/mod.rs deleted file mode 100644 index 39beec947..000000000 --- a/packages/yew/src/platform/rt_tokio/mod.rs +++ /dev/null @@ -1,181 +0,0 @@ -use std::future::Future; -use std::sync::Arc; -use std::{fmt, io}; - -use once_cell::sync::Lazy; - -pub(crate) mod time; - -mod local_worker; - -pub(crate) use local_worker::LocalHandle; -use local_worker::LocalWorker; - -pub(crate) fn get_default_runtime_size() -> usize { - // We use num_cpus as std::thread::available_parallelism() does not take - // system resource constraint (e.g.: cgroups) into consideration. - num_cpus::get() -} - -#[inline(always)] -pub(super) fn spawn_local(f: F) -where - F: Future + 'static, -{ - match LocalHandle::try_current() { - Some(m) => { - // If within a Yew runtime, use a local handle increases the local task count. - m.spawn_local(f); - } - None => { - tokio::task::spawn_local(f); - } - } -} - -#[derive(Clone)] -pub(crate) struct Runtime { - workers: Arc>, -} - -impl fmt::Debug for Runtime { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Runtime") - .field("workers", &"Vec") - .finish() - } -} - -impl Default for Runtime { - fn default() -> Self { - static DEFAULT_RT: Lazy = Lazy::new(|| { - Runtime::new(get_default_runtime_size()).expect("failed to create runtime.") - }); - - DEFAULT_RT.clone() - } -} - -impl Runtime { - pub fn new(size: usize) -> io::Result { - assert!(size > 0, "must have more than 1 worker."); - - let mut workers = Vec::with_capacity(size); - - for _ in 0..size { - let worker = LocalWorker::new()?; - workers.push(worker); - } - - Ok(Self { - workers: workers.into(), - }) - } - - fn find_least_busy_local_worker(&self) -> &LocalWorker { - let mut workers = self.workers.iter(); - - let mut worker = workers.next().expect("must have more than 1 worker."); - let mut task_count = worker.task_count(); - - for current_worker in workers { - if task_count == 0 { - // We don't have to search until the end. - break; - } - - let current_worker_task_count = current_worker.task_count(); - - if current_worker_task_count < task_count { - task_count = current_worker_task_count; - worker = current_worker; - } - } - - worker - } - - pub fn spawn_pinned(&self, create_task: F) - where - F: FnOnce() -> Fut, - F: Send + 'static, - Fut: Future + 'static, - { - let worker = self.find_least_busy_local_worker(); - worker.spawn_pinned(create_task); - } -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - use std::time::Duration; - - use futures::channel::oneshot; - use tokio::sync::Barrier; - use tokio::test; - use tokio::time::timeout; - - use super::*; - - #[test] - async fn test_spawn_pinned_least_busy() { - let runtime = Runtime::new(2).expect("failed to create runtime."); - - let (tx1, rx1) = oneshot::channel(); - let (tx2, rx2) = oneshot::channel(); - - let bar = Arc::new(Barrier::new(2)); - - { - let bar = bar.clone(); - runtime.spawn_pinned(move || async move { - bar.wait().await; - tx1.send(std::thread::current().id()) - .expect("failed to send!"); - }); - } - - runtime.spawn_pinned(move || async move { - bar.wait().await; - tx2.send(std::thread::current().id()) - .expect("failed to send!"); - }); - - let result1 = timeout(Duration::from_secs(5), rx1) - .await - .expect("task timed out") - .expect("failed to receive"); - let result2 = timeout(Duration::from_secs(5), rx2) - .await - .expect("task timed out") - .expect("failed to receive"); - - // first task and second task are not on the same thread. - assert_ne!(result1, result2); - } - - #[test] - async fn test_spawn_local_within_send() { - let runtime = Runtime::default(); - - let (tx, rx) = oneshot::channel(); - - runtime.spawn_pinned(move || async move { - tokio::task::spawn(async move { - // tokio::task::spawn_local cannot spawn tasks outside of a local context. - // - // yew::platform::spawn_local can spawn tasks within a Send task as long as running - // under a Yew Runtime. - spawn_local(async move { - tx.send(()).expect("failed to send!"); - }) - }); - }); - - timeout(Duration::from_secs(5), rx) - .await - .expect("task timed out") - .expect("failed to receive"); - } -} diff --git a/packages/yew/src/platform/rt_tokio/time.rs b/packages/yew/src/platform/rt_tokio/time.rs deleted file mode 100644 index bcab607c3..000000000 --- a/packages/yew/src/platform/rt_tokio/time.rs +++ /dev/null @@ -1,14 +0,0 @@ -use std::future::Future; -use std::time::Duration; - -use futures::stream::{Stream, StreamExt}; -use tokio_stream::wrappers::IntervalStream; - -#[inline(always)] -pub(crate) fn sleep(dur: Duration) -> impl Future { - tokio::time::sleep(dur) -} - -pub(crate) fn interval(dur: Duration) -> impl Stream { - IntervalStream::new(tokio::time::interval(dur)).then(|_| async {}) -} diff --git a/packages/yew/src/platform/rt_wasm_bindgen/mod.rs b/packages/yew/src/platform/rt_wasm_bindgen/mod.rs deleted file mode 100644 index fb5949800..000000000 --- a/packages/yew/src/platform/rt_wasm_bindgen/mod.rs +++ /dev/null @@ -1,56 +0,0 @@ -use std::future::Future; -use std::io; -use std::marker::PhantomData; - -pub(crate) mod time; - -pub(crate) fn get_default_runtime_size() -> usize { - 0 -} - -pub(super) use wasm_bindgen_futures::spawn_local; - -#[derive(Debug, Clone, Default)] -pub(crate) struct Runtime {} - -impl Runtime { - pub fn new(_size: usize) -> io::Result { - Ok(Self {}) - } - - pub fn spawn_pinned(&self, create_task: F) - where - F: FnOnce() -> Fut, - F: Send + 'static, - Fut: Future + 'static, - { - spawn_local(create_task()) - } -} - -#[derive(Debug, Clone)] -pub(crate) struct LocalHandle { - // This type is not send or sync. - _marker: PhantomData<*const ()>, -} - -impl LocalHandle { - pub fn try_current() -> Option { - Some(Self { - _marker: PhantomData, - }) - } - - pub fn current() -> Self { - Self { - _marker: PhantomData, - } - } - - pub fn spawn_local(&self, f: F) - where - F: Future + 'static, - { - spawn_local(f); - } -} diff --git a/packages/yew/src/platform/rt_wasm_bindgen/time.rs b/packages/yew/src/platform/rt_wasm_bindgen/time.rs deleted file mode 100644 index c34dea609..000000000 --- a/packages/yew/src/platform/rt_wasm_bindgen/time.rs +++ /dev/null @@ -1,75 +0,0 @@ -use std::cell::Cell; -use std::future::Future; -use std::pin::Pin; -use std::rc::Rc; -use std::task::{Context, Poll}; -use std::time::Duration; - -use futures::stream; -use futures::stream::Stream; -use gloo::timers::callback::Timeout; - -#[inline(always)] -pub(crate) fn sleep(dur: Duration) -> impl Future { - pub struct Sleep { - inner: Option, - dur_left: Option, - timeout_registered: Rc>, - } - - impl Future for Sleep { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - static I32_MAX_U128: u128 = 2_147_483_647; - static I32_MAX_U32: u32 = 2_147_483_647; - - // If polling before the registered timeout is reached, return Pending. - if self.timeout_registered.get() { - return Poll::Pending; - } - - // set_timeout can only accept maximum of i32, so we wrap around if it gets longer. - let next_timeout = match self.dur_left.map(|m| (m, u32::try_from(m))) { - Some((m_u128, Err(_))) => { - self.dur_left = Some(m_u128 - I32_MAX_U128); - I32_MAX_U32 - } - Some((m_u128, _)) if m_u128 > I32_MAX_U128 => { - self.dur_left = Some(m_u128 - I32_MAX_U128); - I32_MAX_U32 - } - Some((_, Ok(m_u32))) => { - self.dur_left = None; - m_u32 - } - None => return Poll::Ready(()), - }; - - let waker = cx.waker().clone(); - self.timeout_registered.set(true); - let timeout_registered = self.timeout_registered.clone(); - - self.inner = Some(Timeout::new(next_timeout, move || { - timeout_registered.set(false); - waker.wake(); - })); - - Poll::Pending - } - } - - Sleep { - inner: None, - dur_left: Some(dur.as_millis()), - timeout_registered: Cell::new(false).into(), - } -} - -pub(crate) fn interval(dur: Duration) -> impl Stream { - stream::unfold((), move |_: ()| async move { - sleep(dur).await; - - Some(((), ())) - }) -} diff --git a/packages/yew/src/platform/time.rs b/packages/yew/src/platform/time.rs deleted file mode 100644 index 2b359172d..000000000 --- a/packages/yew/src/platform/time.rs +++ /dev/null @@ -1,20 +0,0 @@ -//! Utilities for bridging time and tasks. - -use std::future::Future; -use std::time::Duration; - -use futures::stream::Stream; - -use crate::platform::imp::time as imp; - -/// Waits until duration has elapsed. -#[inline(always)] -pub fn sleep(dur: Duration) -> impl Future { - imp::sleep(dur) -} - -/// Creates a Stream that yields an item after every period has elapsed. -#[inline(always)] -pub fn interval(period: Duration) -> impl Stream { - imp::interval(period) -} diff --git a/packages/yew/src/scheduler.rs b/packages/yew/src/scheduler.rs index f3dcdcc3d..c7f37dbca 100644 --- a/packages/yew/src/scheduler.rs +++ b/packages/yew/src/scheduler.rs @@ -4,7 +4,7 @@ use std::cell::RefCell; use std::collections::BTreeMap; use std::rc::Rc; -/// Alias for Rc> +/// Alias for `Rc>` pub type Shared = Rc>; /// A routine which could be run. diff --git a/packages/yew/src/utils/mod.rs b/packages/yew/src/utils/mod.rs index 29d537af5..2091349ec 100644 --- a/packages/yew/src/utils/mod.rs +++ b/packages/yew/src/utils/mod.rs @@ -4,7 +4,7 @@ use std::marker::PhantomData; use yew::html::ChildrenRenderer; -/// Map IntoIterator> to Iterator +/// Map `IntoIterator>` to `Iterator` pub fn into_node_iter(it: IT) -> impl Iterator where IT: IntoIterator, diff --git a/tools/benchmark-ssr/Cargo.toml b/tools/benchmark-ssr/Cargo.toml index f6753048b..17c03ffbd 100644 --- a/tools/benchmark-ssr/Cargo.toml +++ b/tools/benchmark-ssr/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -yew = { path = "../../packages/yew", features = ["tokio", "ssr"] } +yew = { path = "../../packages/yew", features = ["ssr"] } function_router = { path = "../../examples/function_router" } tokio = { version = "1.19", features = ["full"] } jemallocator = "0.5.0"