Remove dynamic dispatch for schedule

This commit is contained in:
Maximilian Ammann 2022-05-31 21:45:44 +02:00
parent 06cca68e8e
commit 774708a787
5 changed files with 32 additions and 38 deletions

View File

@ -117,9 +117,7 @@ fn run_headless() {
.into_boxed_slice();
let processor = HeadlessPipelineProcessor::default();
let mut pipeline_context = PipelineContext {
processor: Box::new(processor),
};
let mut pipeline_context = PipelineContext::new(processor);
let pipeline = build_vector_tile_pipeline();
pipeline.process(
(
@ -133,15 +131,11 @@ fn run_headless() {
&mut pipeline_context,
);
let mut processor = pipeline_context.take_processor();
let mut processor = pipeline_context
.take_processor::<HeadlessPipelineProcessor>()
.unwrap();
while let Some(v) = processor
.as_any_mut()
.downcast_mut::<HeadlessPipelineProcessor>()
.unwrap()
.layers
.pop()
{
while let Some(v) = processor.layers.pop() {
map.map_schedule_mut()
.map_context
.tile_repository

View File

@ -24,24 +24,23 @@ where
pub fn schedule_method(&self) -> &SM {
&self.schedule_method
}
pub fn take(self) -> SM {
self.schedule_method
}
}
/// Can schedule a task from a future factory and a shared state.
// Should be object safe in order to be able to have a dyn object in MapContext
pub trait ScheduleMethod: 'static {
#[cfg(not(feature = "no-thread-safe-futures"))]
fn schedule(
fn schedule<T>(
&self,
future_factory: Box<(dyn (FnOnce() -> Pin<Box<dyn Future<Output = ()> + Send>>) + Send)>,
) -> Result<(), Error>;
future_factory: impl (FnOnce() -> T) + Send + 'static,
) -> Result<(), Error>
where
T: Future<Output = ()> + Send + 'static;
#[cfg(feature = "no-thread-safe-futures")]
fn schedule(
fn schedule<T>(
&self,
future_factory: Box<(dyn (FnOnce() -> Pin<Box<dyn Future<Output = ()>>>) + Send)>,
) -> Result<(), Error>;
future_factory: impl (FnOnce() -> T) + Send + 'static,
) -> Result<(), Error>
where
T: Future<Output = ()> + 'static;
}

View File

@ -13,12 +13,10 @@ impl TokioScheduleMethod {
}
impl ScheduleMethod for TokioScheduleMethod {
fn schedule(
&self,
future_factory: Box<
(dyn (FnOnce() -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>) + Send + 'static),
>,
) -> Result<(), Error> {
fn schedule<T>(&self, future_factory: impl FnOnce() -> T + Send + 'static) -> Result<(), Error>
where
T: Future<Output = ()> + Send + 'static,
{
tokio::task::spawn((future_factory)());
Ok(())
}

View File

@ -46,11 +46,9 @@ pub fn register_stages<HC: HttpClient, SM: ScheduleMethod>(
geometry_index: Arc::new(Mutex::new(GeometryIndex::new())),
};
let scheduler = Box::new(scheduler.take());
schedule.add_stage(
"request",
RequestStage::new(shared_thread_state.clone(), http_source_client, scheduler),
RequestStage::new(shared_thread_state.clone(), http_source_client, *scheduler),
);
schedule.add_stage(
"populate_tile_store",

View File

@ -8,27 +8,29 @@ use crate::io::tile_repository::TileRepository;
use crate::io::TileRequest;
use crate::schedule::Stage;
use crate::stages::SharedThreadState;
use crate::{HttpClient, ScheduleMethod, Style};
use crate::{HttpClient, ScheduleMethod, Scheduler, Style};
use std::collections::HashSet;
pub struct RequestStage<HC>
pub struct RequestStage<SM, HC>
where
SM: ScheduleMethod,
HC: HttpClient,
{
shared_thread_state: SharedThreadState,
scheduler: Box<dyn ScheduleMethod>,
scheduler: Scheduler<SM>,
http_source_client: HttpSourceClient<HC>,
try_failed: bool,
}
impl<HC> RequestStage<HC>
impl<SM, HC> RequestStage<SM, HC>
where
SM: ScheduleMethod,
HC: HttpClient,
{
pub fn new(
shared_thread_state: SharedThreadState,
http_source_client: HttpSourceClient<HC>,
scheduler: Box<dyn ScheduleMethod>,
scheduler: Scheduler<SM>,
) -> Self {
Self {
shared_thread_state,
@ -39,8 +41,9 @@ where
}
}
impl<HC> Stage for RequestStage<HC>
impl<SM, HC> Stage for RequestStage<SM, HC>
where
SM: ScheduleMethod,
HC: HttpClient,
{
fn run(
@ -74,8 +77,9 @@ where
}
}
impl<HC> RequestStage<HC>
impl<SM, HC> RequestStage<SM, HC>
where
SM: ScheduleMethod,
HC: HttpClient,
{
/// Request tiles which are currently in view.
@ -136,6 +140,7 @@ where
let state = self.shared_thread_state.clone();
self.scheduler
.schedule_method()
.schedule(Box::new(move || {
Box::pin(async move {
match client.fetch(&coords).await {