mirror of
https://github.com/yewstack/yew.git
synced 2025-12-08 21:26:25 +00:00
Configurable Runtime (#2772)
* Adds Runtime. * A LocalRuntime. * Add note. * Add SSR benchmark. * Only create default runtime if no custom runtime is set. * Use jemalloc for benchmarking. * Remove once_cell for web assembly. * Add time. * Fix wasm_bindgen. * Adjust inlining. * Optimise benchmark output. * Optimise BufWriter. * Add json output. * Add Benchmark Workflow. * Remove local set from tests. * Fix Workflow syntax. * Exclude benchmark from doc tests. * Adjust feature flags. * Adds a pinned channel implementation. * Make Send bound explicit. * Implement on immutable reference. * Fix Sink close. * run_pinned -> spawn_pinned. * Add tests. * Adjusts worker threads. * Fix workflow. * Remove futures-executor. * Cargo update. * Fix docs. * Update notice. * Fix docs. * Fix docs. * Switch to task spawning. * Use futures unordered instead of spawn_local. * Switch to join_all. * Remove LocalPoolHandle. * Fix docs. * Spawn a single task. * Fix merge failure. * Remove LocalRuntime. * Update documentation. * Merge local-runtime-handle into local-runtime. * Add some tests. * Fix clippy notice. * Fix comment. * Address various review comments. * Remove unused type. * Fix clippy. * Fix clippy.
This commit is contained in:
parent
72213eec08
commit
cffb7c5e7e
2
.github/workflows/post-size-cmp.yml
vendored
2
.github/workflows/post-size-cmp.yml
vendored
@ -8,7 +8,7 @@ on:
|
||||
- completed
|
||||
|
||||
jobs:
|
||||
size-cmp:
|
||||
post-size-cmp:
|
||||
name: Post Comment on Pull Request
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
|
||||
29
examples/Cargo.lock
generated
29
examples/Cargo.lock
generated
@ -545,6 +545,12 @@ dependencies = [
|
||||
"percent-encoding",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fs_extra"
|
||||
version = "1.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2022715d62ab30faffd124d40b76f4134a550a87792276512b18d63272333394"
|
||||
|
||||
[[package]]
|
||||
name = "function_memory_game"
|
||||
version = "0.1.0"
|
||||
@ -1155,6 +1161,27 @@ version = "1.0.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "112c678d4050afce233f4f2852bb2eb519230b3cf12f33585275537d7e41578d"
|
||||
|
||||
[[package]]
|
||||
name = "jemalloc-sys"
|
||||
version = "0.5.1+5.3.0-patched"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b7c2b313609b95939cb0c5a5c6917fb9b7c9394562aa3ef44eb66ffa51736432"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"fs_extra",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "jemallocator"
|
||||
version = "0.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "16c2514137880c52b0b4822b563fadd38257c1f380858addb74a400889696ea6"
|
||||
dependencies = [
|
||||
"jemalloc-sys",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "js-sys"
|
||||
version = "0.3.59"
|
||||
@ -1948,6 +1975,8 @@ dependencies = [
|
||||
"env_logger",
|
||||
"function_router",
|
||||
"futures 0.3.21",
|
||||
"hyper",
|
||||
"jemallocator",
|
||||
"log",
|
||||
"tokio",
|
||||
"tower",
|
||||
|
||||
@ -25,7 +25,7 @@ async fn render(
|
||||
|
||||
Box::new(
|
||||
stream::once(async move { index_html_before })
|
||||
.chain(renderer.render_stream().await)
|
||||
.chain(renderer.render_stream())
|
||||
.chain(stream::once(async move { index_html_after }))
|
||||
.map(|m| Result::<_, BoxedError>::Ok(m.into())),
|
||||
)
|
||||
@ -54,6 +54,5 @@ async fn main() {
|
||||
let routes = html.or(warp::fs::dir(opts.dir));
|
||||
|
||||
println!("You can view the website at: http://localhost:8080/");
|
||||
|
||||
warp::serve(routes).run(([127, 0, 0, 1], 8080)).await;
|
||||
}
|
||||
|
||||
@ -17,7 +17,7 @@ required-features = ["ssr"]
|
||||
yew = { path = "../../packages/yew" }
|
||||
function_router = { path = "../function_router" }
|
||||
log = "0.4"
|
||||
futures = "0.3"
|
||||
futures = { version = "0.3", features = ["std"], default-features = false }
|
||||
|
||||
[target.'cfg(target_arch = "wasm32")'.dependencies]
|
||||
wasm-bindgen-futures = "0.4"
|
||||
@ -30,7 +30,9 @@ tower = { version = "0.4", features = ["make"] }
|
||||
tower-http = { version = "0.3", features = ["fs"] }
|
||||
env_logger = "0.9"
|
||||
clap = { version = "3.1.7", features = ["derive"] }
|
||||
hyper = { version = "0.14", features = ["server", "http1"] }
|
||||
jemallocator = "0.5"
|
||||
|
||||
[features]
|
||||
ssr = ["yew/ssr"]
|
||||
ssr = ["yew/ssr", "yew/tokio"]
|
||||
hydration = ["yew/hydration"]
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
use std::collections::HashMap;
|
||||
use std::convert::Infallible;
|
||||
use std::future::Future;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use axum::body::{Body, StreamBody};
|
||||
@ -13,8 +14,14 @@ use axum::{Extension, Router};
|
||||
use clap::Parser;
|
||||
use function_router::{ServerApp, ServerAppProps};
|
||||
use futures::stream::{self, StreamExt};
|
||||
use hyper::server::Server;
|
||||
use tower::ServiceExt;
|
||||
use tower_http::services::ServeDir;
|
||||
use yew::platform::Runtime;
|
||||
|
||||
// We use jemalloc as it produces better performance.
|
||||
#[global_allocator]
|
||||
static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc;
|
||||
|
||||
/// A basic example
|
||||
#[derive(Parser, Debug)]
|
||||
@ -38,14 +45,38 @@ async fn render(
|
||||
|
||||
StreamBody::new(
|
||||
stream::once(async move { index_html_before })
|
||||
.chain(renderer.render_stream().await)
|
||||
.chain(renderer.render_stream())
|
||||
.chain(stream::once(async move { index_html_after }))
|
||||
.map(Result::<_, Infallible>::Ok),
|
||||
)
|
||||
}
|
||||
|
||||
// An executor to process requests on the Yew runtime.
|
||||
//
|
||||
// By spawning requests on the Yew runtime,
|
||||
// it processes request on the same thread as the rendering task.
|
||||
//
|
||||
// This increases performance in some environments (e.g.: in VM).
|
||||
#[derive(Clone, Default)]
|
||||
struct Executor {
|
||||
inner: Runtime,
|
||||
}
|
||||
|
||||
impl<F> hyper::rt::Executor<F> for Executor
|
||||
where
|
||||
F: Future + Send + 'static,
|
||||
{
|
||||
fn execute(&self, fut: F) {
|
||||
self.inner.spawn_pinned(move || async move {
|
||||
fut.await;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let exec = Executor::default();
|
||||
|
||||
env_logger::init();
|
||||
|
||||
let opts = Opt::parse();
|
||||
@ -86,7 +117,8 @@ async fn main() {
|
||||
|
||||
println!("You can view the website at: http://localhost:8080/");
|
||||
|
||||
axum::Server::bind(&"0.0.0.0:8080".parse().unwrap())
|
||||
Server::bind(&"127.0.0.1:8080".parse().unwrap())
|
||||
.executor(exec)
|
||||
.serve(app.into_make_service())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@ -41,6 +41,8 @@
|
||||
//! `tokio`'s timer, IO and task synchronisation primitives.
|
||||
|
||||
use std::future::Future;
|
||||
use std::io::Result;
|
||||
use std::marker::PhantomData;
|
||||
|
||||
#[cfg(feature = "ssr")]
|
||||
pub(crate) mod io;
|
||||
@ -72,21 +74,118 @@ where
|
||||
imp::spawn_local(f);
|
||||
}
|
||||
|
||||
/// Runs a task with it pinned onto a local worker thread.
|
||||
///
|
||||
/// This can be used to execute non-Send futures without blocking the current thread.
|
||||
///
|
||||
/// It maintains an internal thread pool dedicated to executing local futures.
|
||||
///
|
||||
/// [`spawn_local`] is available with tasks executed with `run_pinned`.
|
||||
#[inline(always)]
|
||||
#[cfg(feature = "ssr")]
|
||||
pub(crate) async fn run_pinned<F, Fut>(create_task: F) -> Fut::Output
|
||||
where
|
||||
F: FnOnce() -> Fut,
|
||||
F: Send + 'static,
|
||||
Fut: Future + 'static,
|
||||
Fut::Output: Send + 'static,
|
||||
{
|
||||
imp::run_pinned(create_task).await
|
||||
/// A Runtime Builder.
|
||||
#[derive(Debug)]
|
||||
pub struct RuntimeBuilder {
|
||||
worker_threads: usize,
|
||||
}
|
||||
|
||||
impl Default for RuntimeBuilder {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
worker_threads: imp::get_default_runtime_size(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl RuntimeBuilder {
|
||||
/// Creates a new Runtime Builder.
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
/// Sets the number of worker threads the Runtime will use.
|
||||
///
|
||||
/// # Default
|
||||
///
|
||||
/// The default number of worker threads is the number of available logical CPU cores.
|
||||
///
|
||||
/// # Note
|
||||
///
|
||||
/// This setting has no effect if current platform has no thread support (e.g.: WebAssembly).
|
||||
pub fn worker_threads(&mut self, val: usize) -> &mut Self {
|
||||
self.worker_threads = val;
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
/// Creates a Runtime.
|
||||
pub fn build(&mut self) -> Result<Runtime> {
|
||||
Ok(Runtime {
|
||||
inner: imp::Runtime::new(self.worker_threads)?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// The Yew Runtime.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct Runtime {
|
||||
inner: imp::Runtime,
|
||||
}
|
||||
|
||||
impl Runtime {
|
||||
/// Creates a runtime Builder.
|
||||
pub fn builder() -> RuntimeBuilder {
|
||||
RuntimeBuilder::new()
|
||||
}
|
||||
|
||||
/// Spawns a task with it pinned to a worker thread.
|
||||
///
|
||||
/// This can be used to execute non-Send futures without blocking the current thread.
|
||||
///
|
||||
/// [`spawn_local`] is available with tasks executed with `spawn_pinned`.
|
||||
pub fn spawn_pinned<F, Fut>(&self, create_task: F)
|
||||
where
|
||||
F: FnOnce() -> Fut,
|
||||
F: Send + 'static,
|
||||
Fut: Future<Output = ()> + 'static,
|
||||
{
|
||||
self.inner.spawn_pinned(create_task);
|
||||
}
|
||||
}
|
||||
|
||||
/// A Local Runtime Handle.
|
||||
///
|
||||
/// This type can be used to acquire a runtime handle to spawn local tasks.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct LocalHandle {
|
||||
inner: imp::LocalHandle,
|
||||
// This type is not send or sync.
|
||||
_marker: PhantomData<*const ()>,
|
||||
}
|
||||
|
||||
impl LocalHandle {
|
||||
/// Creates a Handle to current Runtime worker.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This method will panic if not called within Yew Runtime.
|
||||
pub fn current() -> Self {
|
||||
let inner = imp::LocalHandle::current();
|
||||
|
||||
Self {
|
||||
inner,
|
||||
_marker: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a Handle to current Runtime worker.
|
||||
///
|
||||
/// This methods will return `None` if called from outside Yew Runtime.
|
||||
pub fn try_current() -> Option<Self> {
|
||||
let inner = imp::LocalHandle::try_current()?;
|
||||
|
||||
Some(Self {
|
||||
inner,
|
||||
_marker: PhantomData,
|
||||
})
|
||||
}
|
||||
|
||||
/// Spawns a Future with current Runtime worker.
|
||||
pub fn spawn_local<F>(&self, f: F)
|
||||
where
|
||||
F: Future<Output = ()> + 'static,
|
||||
{
|
||||
self.inner.spawn_local(f);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,7 +1,13 @@
|
||||
use std::future::Future;
|
||||
use std::io;
|
||||
use std::marker::PhantomData;
|
||||
|
||||
pub(crate) mod time;
|
||||
|
||||
pub(crate) fn get_default_runtime_size() -> usize {
|
||||
0
|
||||
}
|
||||
|
||||
static NO_RUNTIME_NOTICE: &str = r#"No runtime configured for this platform, \
|
||||
features that requires a runtime can't be used. \
|
||||
Either compile with `target_arch = "wasm32", or enable the `tokio` feature."#;
|
||||
@ -18,13 +24,49 @@ where
|
||||
panic_no_runtime();
|
||||
}
|
||||
|
||||
#[cfg(feature = "ssr")]
|
||||
pub(crate) async fn run_pinned<F, Fut>(_create_task: F) -> Fut::Output
|
||||
where
|
||||
F: FnOnce() -> Fut,
|
||||
F: Send + 'static,
|
||||
Fut: Future + 'static,
|
||||
Fut::Output: Send + 'static,
|
||||
{
|
||||
panic_no_runtime();
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct Runtime {}
|
||||
|
||||
impl Default for Runtime {
|
||||
fn default() -> Self {
|
||||
panic_no_runtime();
|
||||
}
|
||||
}
|
||||
|
||||
impl Runtime {
|
||||
pub fn new(_size: usize) -> io::Result<Self> {
|
||||
panic_no_runtime();
|
||||
}
|
||||
|
||||
pub fn spawn_pinned<F, Fut>(&self, _create_task: F)
|
||||
where
|
||||
F: FnOnce() -> Fut,
|
||||
F: Send + 'static,
|
||||
Fut: Future<Output = ()> + 'static,
|
||||
{
|
||||
panic_no_runtime();
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct LocalHandle {
|
||||
// This type is not send or sync.
|
||||
_marker: PhantomData<*const ()>,
|
||||
}
|
||||
|
||||
impl LocalHandle {
|
||||
pub fn try_current() -> Option<Self> {
|
||||
panic_no_runtime();
|
||||
}
|
||||
|
||||
pub fn current() -> Self {
|
||||
panic_no_runtime();
|
||||
}
|
||||
|
||||
pub fn spawn_local<F>(&self, _f: F)
|
||||
where
|
||||
F: Future<Output = ()> + 'static,
|
||||
{
|
||||
panic_no_runtime();
|
||||
}
|
||||
}
|
||||
|
||||
205
packages/yew/src/platform/rt_tokio/local_worker.rs
Normal file
205
packages/yew/src/platform/rt_tokio/local_worker.rs
Normal file
@ -0,0 +1,205 @@
|
||||
//! We use a local worker implementation that does not produce a JoinHandle for spawn_pinned.
|
||||
//! This avoids the cost to acquire a JoinHandle.
|
||||
//!
|
||||
//! See: [tokio-rs/tokio#4819](https://github.com/tokio-rs/tokio/issues/4819)
|
||||
//!
|
||||
//! We will not be able to produce a meaningful JoinHandle until WebAssembly targets support
|
||||
//! unwinding.
|
||||
|
||||
use std::cell::RefCell;
|
||||
use std::future::Future;
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::Arc;
|
||||
use std::{io, thread};
|
||||
|
||||
static DEFAULT_WORKER_NAME: &str = "yew-runtime-worker";
|
||||
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
||||
use futures::channel::mpsc::UnboundedSender;
|
||||
use futures::stream::StreamExt;
|
||||
use tokio::task::{spawn_local, LocalSet};
|
||||
|
||||
type SpawnTask = Box<dyn Send + FnOnce()>;
|
||||
|
||||
thread_local! {
|
||||
static TASK_COUNT: RefCell<Option<Arc<AtomicUsize>>> = RefCell::new(None);
|
||||
static LOCAL_SET: LocalSet = LocalSet::new();
|
||||
}
|
||||
|
||||
pub(crate) struct LocalWorker {
|
||||
task_count: Arc<AtomicUsize>,
|
||||
tx: UnboundedSender<SpawnTask>,
|
||||
}
|
||||
|
||||
impl LocalWorker {
|
||||
pub fn new() -> io::Result<Self> {
|
||||
let (tx, mut rx) = futures::channel::mpsc::unbounded::<SpawnTask>();
|
||||
|
||||
let task_count: Arc<AtomicUsize> = Arc::default();
|
||||
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()?;
|
||||
|
||||
{
|
||||
let task_count = task_count.clone();
|
||||
thread::Builder::new()
|
||||
.name(DEFAULT_WORKER_NAME.into())
|
||||
.spawn(move || {
|
||||
TASK_COUNT.with(move |m| {
|
||||
*m.borrow_mut() = Some(task_count);
|
||||
});
|
||||
|
||||
LOCAL_SET.with(|local_set| {
|
||||
local_set.block_on(&rt, async move {
|
||||
while let Some(m) = rx.next().await {
|
||||
m();
|
||||
}
|
||||
});
|
||||
});
|
||||
})?;
|
||||
}
|
||||
|
||||
Ok(Self { task_count, tx })
|
||||
}
|
||||
|
||||
pub fn task_count(&self) -> usize {
|
||||
self.task_count.load(Ordering::Acquire)
|
||||
}
|
||||
|
||||
pub fn spawn_pinned<F, Fut>(&self, f: F)
|
||||
where
|
||||
F: 'static + Send + FnOnce() -> Fut,
|
||||
Fut: 'static + Future<Output = ()>,
|
||||
{
|
||||
let guard = LocalJobCountGuard::new(self.task_count.clone());
|
||||
|
||||
// We ignore the result upon a failure, this can never happen unless the runtime is
|
||||
// exiting which all instances of Runtime will be dropped at that time and hence cannot
|
||||
// spawn pinned tasks.
|
||||
let _ = self.tx.unbounded_send(Box::new(move || {
|
||||
spawn_local(async move {
|
||||
let _guard = guard;
|
||||
|
||||
f().await;
|
||||
});
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
pub struct LocalJobCountGuard(Arc<AtomicUsize>);
|
||||
|
||||
impl LocalJobCountGuard {
|
||||
fn new(inner: Arc<AtomicUsize>) -> Self {
|
||||
inner.fetch_add(1, Ordering::AcqRel);
|
||||
LocalJobCountGuard(inner)
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for LocalJobCountGuard {
|
||||
fn drop(&mut self) {
|
||||
self.0.fetch_sub(1, Ordering::AcqRel);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct LocalHandle {
|
||||
// This type is not send or sync.
|
||||
_marker: PhantomData<*const ()>,
|
||||
task_count: Arc<AtomicUsize>,
|
||||
}
|
||||
|
||||
impl LocalHandle {
|
||||
pub fn try_current() -> Option<Self> {
|
||||
// We cache the handle to prevent borrowing RefCell.
|
||||
thread_local! {
|
||||
static LOCAL_HANDLE: Option<LocalHandle> = TASK_COUNT
|
||||
.with(|m| m.borrow().clone())
|
||||
.map(|task_count| LocalHandle { task_count, _marker: PhantomData });
|
||||
}
|
||||
|
||||
LOCAL_HANDLE.with(|m| m.clone())
|
||||
}
|
||||
|
||||
pub fn current() -> Self {
|
||||
Self::try_current().expect("outside of Yew runtime.")
|
||||
}
|
||||
|
||||
pub fn spawn_local<F>(&self, f: F)
|
||||
where
|
||||
F: Future<Output = ()> + 'static,
|
||||
{
|
||||
let guard = LocalJobCountGuard::new(self.task_count.clone());
|
||||
|
||||
LOCAL_SET.with(move |local_set| {
|
||||
local_set.spawn_local(async move {
|
||||
let _guard = guard;
|
||||
|
||||
f.await;
|
||||
})
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::time::Duration;
|
||||
|
||||
use futures::channel::oneshot;
|
||||
use tokio::test;
|
||||
use tokio::time::timeout;
|
||||
use yew::platform::Runtime;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
async fn test_local_handle_exists() {
|
||||
assert!(LocalHandle::try_current().is_none());
|
||||
|
||||
let runtime = Runtime::default();
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
runtime.spawn_pinned(move || async move {
|
||||
tx.send(LocalHandle::try_current().is_some())
|
||||
.expect("failed to send");
|
||||
});
|
||||
|
||||
timeout(Duration::from_secs(5), rx)
|
||||
.await
|
||||
.expect("task timed out")
|
||||
.expect("failed to receive");
|
||||
}
|
||||
|
||||
#[test]
|
||||
async fn test_local_handle_spawns_on_same_worker() {
|
||||
assert!(LocalHandle::try_current().is_none());
|
||||
|
||||
let runtime = Runtime::default();
|
||||
let (tx1, rx1) = oneshot::channel();
|
||||
let (tx2, rx2) = oneshot::channel();
|
||||
|
||||
runtime.spawn_pinned(move || async move {
|
||||
let handle = LocalHandle::current();
|
||||
|
||||
tx1.send(std::thread::current().id())
|
||||
.expect("failed to send");
|
||||
|
||||
handle.spawn_local(async move {
|
||||
tx2.send(std::thread::current().id())
|
||||
.expect("failed to send");
|
||||
})
|
||||
});
|
||||
|
||||
let result1 = timeout(Duration::from_secs(5), rx1)
|
||||
.await
|
||||
.expect("task timed out")
|
||||
.expect("failed to receive");
|
||||
let result2 = timeout(Duration::from_secs(5), rx2)
|
||||
.await
|
||||
.expect("task timed out")
|
||||
.expect("failed to receive");
|
||||
|
||||
assert_eq!(result1, result2);
|
||||
}
|
||||
}
|
||||
@ -1,25 +1,20 @@
|
||||
use std::future::Future;
|
||||
use std::sync::Arc;
|
||||
use std::{fmt, io};
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
pub(crate) mod time;
|
||||
|
||||
#[cfg(feature = "ssr")]
|
||||
pub(super) async fn run_pinned<F, Fut>(create_task: F) -> Fut::Output
|
||||
where
|
||||
F: FnOnce() -> Fut,
|
||||
F: Send + 'static,
|
||||
Fut: Future + 'static,
|
||||
Fut::Output: Send + 'static,
|
||||
{
|
||||
use once_cell::sync::Lazy;
|
||||
use tokio_util::task::LocalPoolHandle;
|
||||
mod local_worker;
|
||||
|
||||
static POOL_HANDLE: Lazy<LocalPoolHandle> =
|
||||
Lazy::new(|| LocalPoolHandle::new(num_cpus::get() * 2));
|
||||
pub(crate) use local_worker::LocalHandle;
|
||||
use local_worker::LocalWorker;
|
||||
|
||||
POOL_HANDLE
|
||||
.spawn_pinned(create_task)
|
||||
.await
|
||||
.expect("future has panicked!")
|
||||
pub(crate) fn get_default_runtime_size() -> usize {
|
||||
// We use num_cpus as std::thread::available_parallelism() does not take
|
||||
// system resource constraint (e.g.: cgroups) into consideration.
|
||||
num_cpus::get()
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
@ -27,5 +22,151 @@ pub(super) fn spawn_local<F>(f: F)
|
||||
where
|
||||
F: Future<Output = ()> + 'static,
|
||||
{
|
||||
tokio::task::spawn_local(f);
|
||||
match LocalHandle::try_current() {
|
||||
Some(m) => {
|
||||
// If within a Yew runtime, use a local handle increases the local task count.
|
||||
m.spawn_local(f);
|
||||
}
|
||||
None => {
|
||||
tokio::task::spawn_local(f);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct Runtime {
|
||||
workers: Arc<Vec<LocalWorker>>,
|
||||
}
|
||||
|
||||
impl fmt::Debug for Runtime {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("Runtime")
|
||||
.field("workers", &"Vec<LocalWorker>")
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for Runtime {
|
||||
fn default() -> Self {
|
||||
static DEFAULT_RT: Lazy<Runtime> = Lazy::new(|| {
|
||||
Runtime::new(get_default_runtime_size()).expect("failed to create runtime.")
|
||||
});
|
||||
|
||||
DEFAULT_RT.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl Runtime {
|
||||
pub fn new(size: usize) -> io::Result<Self> {
|
||||
assert!(size > 0, "must have more than 1 worker.");
|
||||
|
||||
let mut workers = Vec::with_capacity(size);
|
||||
|
||||
for _ in 0..size {
|
||||
let worker = LocalWorker::new()?;
|
||||
workers.push(worker);
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
workers: workers.into(),
|
||||
})
|
||||
}
|
||||
|
||||
fn find_least_busy_local_worker(&self) -> &LocalWorker {
|
||||
let mut workers = self.workers.iter();
|
||||
|
||||
let mut worker = workers.next().expect("must have more than 1 worker.");
|
||||
let mut task_count = worker.task_count();
|
||||
|
||||
for current_worker in workers {
|
||||
if task_count == 0 {
|
||||
// We don't have to search until the end.
|
||||
break;
|
||||
}
|
||||
|
||||
let current_worker_task_count = current_worker.task_count();
|
||||
|
||||
if current_worker_task_count < task_count {
|
||||
task_count = current_worker_task_count;
|
||||
worker = current_worker;
|
||||
}
|
||||
}
|
||||
|
||||
worker
|
||||
}
|
||||
|
||||
pub fn spawn_pinned<F, Fut>(&self, create_task: F)
|
||||
where
|
||||
F: FnOnce() -> Fut,
|
||||
F: Send + 'static,
|
||||
Fut: Future<Output = ()> + 'static,
|
||||
{
|
||||
let worker = self.find_least_busy_local_worker();
|
||||
worker.spawn_pinned(create_task);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::time::Duration;
|
||||
|
||||
use futures::channel::oneshot;
|
||||
use tokio::test;
|
||||
use tokio::time::timeout;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
async fn test_spawn_pinned_least_busy() {
|
||||
let runtime = Runtime::new(2).expect("failed to create runtime.");
|
||||
|
||||
let (tx1, rx1) = oneshot::channel();
|
||||
let (tx2, rx2) = oneshot::channel();
|
||||
|
||||
runtime.spawn_pinned(move || async move {
|
||||
tx1.send(std::thread::current().id())
|
||||
.expect("failed to send!");
|
||||
});
|
||||
|
||||
runtime.spawn_pinned(move || async move {
|
||||
tx2.send(std::thread::current().id())
|
||||
.expect("failed to send!");
|
||||
});
|
||||
|
||||
let result1 = timeout(Duration::from_secs(5), rx1)
|
||||
.await
|
||||
.expect("task timed out")
|
||||
.expect("failed to receive");
|
||||
let result2 = timeout(Duration::from_secs(5), rx2)
|
||||
.await
|
||||
.expect("task timed out")
|
||||
.expect("failed to receive");
|
||||
|
||||
// first task and second task are not on the same thread.
|
||||
assert_ne!(result1, result2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
async fn test_spawn_local_within_send() {
|
||||
let runtime = Runtime::new(1).expect("failed to create runtime.");
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
runtime.spawn_pinned(move || async move {
|
||||
tokio::task::spawn(async move {
|
||||
// tokio::task::spawn_local cannot spawn tasks outside of a local context.
|
||||
//
|
||||
// yew::platform::spawn_local can spawn tasks within a Send task as long as running
|
||||
// under a Yew Runtime.
|
||||
spawn_local(async move {
|
||||
tx.send(()).expect("failed to send!");
|
||||
})
|
||||
});
|
||||
});
|
||||
|
||||
timeout(Duration::from_secs(5), rx)
|
||||
.await
|
||||
.expect("task timed out")
|
||||
.expect("failed to receive");
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,17 +1,56 @@
|
||||
#[cfg(feature = "ssr")]
|
||||
use std::future::Future;
|
||||
use std::io;
|
||||
use std::marker::PhantomData;
|
||||
|
||||
pub(crate) mod time;
|
||||
|
||||
pub(crate) fn get_default_runtime_size() -> usize {
|
||||
0
|
||||
}
|
||||
|
||||
pub(super) use wasm_bindgen_futures::spawn_local;
|
||||
|
||||
#[cfg(feature = "ssr")]
|
||||
pub(crate) async fn run_pinned<F, Fut>(create_task: F) -> Fut::Output
|
||||
where
|
||||
F: FnOnce() -> Fut,
|
||||
F: Send + 'static,
|
||||
Fut: Future + 'static,
|
||||
Fut::Output: Send + 'static,
|
||||
{
|
||||
create_task().await
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub(crate) struct Runtime {}
|
||||
|
||||
impl Runtime {
|
||||
pub fn new(_size: usize) -> io::Result<Self> {
|
||||
Ok(Self {})
|
||||
}
|
||||
|
||||
pub fn spawn_pinned<F, Fut>(&self, create_task: F)
|
||||
where
|
||||
F: FnOnce() -> Fut,
|
||||
F: Send + 'static,
|
||||
Fut: Future<Output = ()> + 'static,
|
||||
{
|
||||
spawn_local(create_task())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct LocalHandle {
|
||||
// This type is not send or sync.
|
||||
_marker: PhantomData<*const ()>,
|
||||
}
|
||||
|
||||
impl LocalHandle {
|
||||
pub fn try_current() -> Option<Self> {
|
||||
Some(Self {
|
||||
_marker: PhantomData,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn current() -> Self {
|
||||
Self {
|
||||
_marker: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn spawn_local<F>(&self, f: F)
|
||||
where
|
||||
F: Future<Output = ()> + 'static,
|
||||
{
|
||||
spawn_local(f);
|
||||
}
|
||||
}
|
||||
|
||||
@ -5,9 +5,17 @@ use tracing::Instrument;
|
||||
|
||||
use crate::html::{BaseComponent, Scope};
|
||||
use crate::platform::io::{self, DEFAULT_BUF_SIZE};
|
||||
use crate::platform::{run_pinned, spawn_local};
|
||||
use crate::platform::{spawn_local, LocalHandle, Runtime};
|
||||
|
||||
/// A Yew Server-side Renderer that renders on the current thread.
|
||||
///
|
||||
/// # Note
|
||||
///
|
||||
/// This renderer does not spawn its own runtime and can only be used when:
|
||||
///
|
||||
/// - `wasm-bindgen` is selected as the backend of Yew runtime.
|
||||
/// - running within a [`Runtime`](crate::platform::Runtime).
|
||||
/// - running within a tokio [`LocalSet`](tokio::task::LocalSet).
|
||||
#[cfg(feature = "ssr")]
|
||||
#[derive(Debug)]
|
||||
pub struct LocalServerRenderer<COMP>
|
||||
@ -92,7 +100,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Renders Yew Applications into a string Stream
|
||||
/// Renders Yew Application into a string Stream
|
||||
#[tracing::instrument(
|
||||
level = tracing::Level::DEBUG,
|
||||
name = "render",
|
||||
@ -131,6 +139,7 @@ where
|
||||
create_props: Box<dyn Send + FnOnce() -> COMP::Properties>,
|
||||
hydratable: bool,
|
||||
capacity: usize,
|
||||
rt: Option<Runtime>,
|
||||
}
|
||||
|
||||
impl<COMP> fmt::Debug for ServerRenderer<COMP>
|
||||
@ -181,9 +190,17 @@ where
|
||||
create_props: Box::new(create_props),
|
||||
hydratable: true,
|
||||
capacity: DEFAULT_BUF_SIZE,
|
||||
rt: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets the runtime the ServerRenderer will run the rendering task with.
|
||||
pub fn with_runtime(mut self, rt: Runtime) -> Self {
|
||||
self.rt = Some(rt);
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the capacity of renderer buffer.
|
||||
///
|
||||
/// Default: `8192`
|
||||
@ -216,34 +233,43 @@ where
|
||||
|
||||
/// Renders Yew Application to a String.
|
||||
pub async fn render_to_string(self, w: &mut String) {
|
||||
let mut s = self.render_stream().await;
|
||||
let mut s = self.render_stream();
|
||||
|
||||
while let Some(m) = s.next().await {
|
||||
w.push_str(&m);
|
||||
}
|
||||
}
|
||||
|
||||
/// Renders Yew Applications into a string Stream.
|
||||
///
|
||||
/// # Note
|
||||
///
|
||||
/// Unlike [`LocalServerRenderer::render_stream`], this method is `async fn`.
|
||||
pub async fn render_stream(self) -> impl Stream<Item = String> {
|
||||
// We use run_pinned to switch to our runtime.
|
||||
run_pinned(move || async move {
|
||||
let Self {
|
||||
create_props,
|
||||
hydratable,
|
||||
capacity,
|
||||
} = self;
|
||||
/// Renders Yew Application into a string Stream.
|
||||
pub fn render_stream(self) -> impl Send + Stream<Item = String> {
|
||||
let Self {
|
||||
create_props,
|
||||
hydratable,
|
||||
capacity,
|
||||
rt,
|
||||
} = self;
|
||||
|
||||
let (mut w, r) = io::buffer(capacity);
|
||||
let create_task = move || async move {
|
||||
let props = create_props();
|
||||
let scope = Scope::<COMP>::new(None);
|
||||
|
||||
LocalServerRenderer::<COMP>::with_props(props)
|
||||
.hydratable(hydratable)
|
||||
.capacity(capacity)
|
||||
.render_stream()
|
||||
})
|
||||
.await
|
||||
scope
|
||||
.render_into_stream(&mut w, props.into(), hydratable)
|
||||
.await;
|
||||
};
|
||||
|
||||
match rt {
|
||||
// If a runtime is specified, spawn to the specified runtime.
|
||||
Some(m) => m.spawn_pinned(create_task),
|
||||
None => match LocalHandle::try_current() {
|
||||
// If within a Yew Runtime, spawn to the current runtime.
|
||||
Some(m) => m.spawn_local(create_task()),
|
||||
// Outside of Yew Runtime, spawn to the default runtime.
|
||||
None => Runtime::default().spawn_pinned(create_task),
|
||||
},
|
||||
}
|
||||
|
||||
r
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user