Remove a Box<>

This commit is contained in:
Maximilian Ammann 2022-09-15 16:25:18 +02:00
parent b4269f7c7e
commit fb250e0d28
12 changed files with 105 additions and 108 deletions

View File

@ -8,10 +8,11 @@ use maplibre::{
use maplibre_winit::winit::{WinitEnvironment, WinitMapWindowConfig};
pub async fn run_headed() {
MapBuilder::<WinitEnvironment<_, _, _, TokioAsyncProcedureCall>>::new()
let client = ReqwestHttpClient::new(None);
MapBuilder::<WinitEnvironment<_, _, _, TokioAsyncProcedureCall<_>>>::new()
.with_map_window_config(WinitMapWindowConfig::new("maplibre".to_string()))
.with_http_client(ReqwestHttpClient::new(None))
.with_apc(TokioAsyncProcedureCall::new())
.with_http_client(client.clone())
.with_apc(TokioAsyncProcedureCall::new(client))
.with_scheduler(TokioScheduler::new())
.build()
.initialize()

View File

@ -14,12 +14,13 @@ use maplibre_winit::winit::WinitEnvironment;
use tile_grid::{extent_wgs84_to_merc, Extent, GridIterator};
pub async fn run_headless(tile_size: u32, min: LatLon, max: LatLon) {
let mut map = MapBuilder::<HeadlessEnvironment<_, _, _, TokioAsyncProcedureCall>>::new()
let client = ReqwestHttpClient::new(None);
let mut map = MapBuilder::<HeadlessEnvironment<_, _, _, TokioAsyncProcedureCall<_>>>::new()
.with_map_window_config(HeadlessMapWindowConfig {
size: WindowSize::new(tile_size, tile_size).unwrap(),
})
.with_http_client(ReqwestHttpClient::new(None))
.with_apc(TokioAsyncProcedureCall::new())
.with_http_client(client.clone())
.with_apc(TokioAsyncProcedureCall::new(client)) // TODO: avoid passing client here
.with_scheduler(TokioScheduler::new())
.with_renderer_settings(RendererSettings {
texture_format: TextureFormat::Rgba8UnormSrgb,

View File

@ -1,6 +1,6 @@
use instant::Instant;
use maplibre::environment::{DefaultTransferables, Environment};
use maplibre::io::apc::{AsyncProcedureCall, Transferable};
use maplibre::io::apc::{AsyncProcedureCall, Message};
use maplibre::io::scheduler::Scheduler;
use maplibre::io::transferables::Transferables;
use maplibre::{

View File

@ -12,7 +12,7 @@ use std::pin::Pin;
/// `TessellatedLayer` contains the result of the tessellation for a specific layer, otherwise
/// `UnavailableLayer` if the layer doesn't exist.
#[derive(Clone)]
pub enum Transferable<T: Transferables> {
pub enum Message<T: Transferables> {
TileTessellated(T::TileTessellated),
UnavailableLayer(T::UnavailableLayer),
TessellatedLayer(T::TessellatedLayer),
@ -23,24 +23,18 @@ pub enum Input {
TileRequest(TileRequest),
}
pub trait Context<T: Transferables, HC: HttpClient> {
fn send(&self, data: Transferable<T>);
pub trait Context<T: Transferables, HC: HttpClient>: 'static {
fn send(&self, data: Message<T>);
fn source_client(&self) -> &SourceClient<HC>;
}
pub type AsyncProcedure<T, HC> =
fn(input: Input, context: Box<dyn Context<T, HC>>) -> Pin<Box<dyn Future<Output = ()>>>;
pub type AsyncProcedure<C> = fn(input: Input, context: C) -> Pin<Box<dyn Future<Output = ()>>>;
pub trait AsyncProcedureCall<T: Transferables, HC: HttpClient>: 'static {
type Context: Context<T, HC> + Send;
fn receive(&mut self) -> Option<Box<Transferable<T>>>; // FIXME remove box
fn receive(&mut self) -> Option<Message<T>>;
fn schedule(
&self,
input: Input,
procedure: AsyncProcedure<T, HC>,
http_client: HttpSourceClient<HC>,
);
fn schedule(&self, input: Input, procedure: AsyncProcedure<Self::Context>);
}

View File

@ -30,8 +30,6 @@ pub trait PipelineProcessor: Downcast {
}
}
impl_downcast!(PipelineProcessor);
/// Context which is available to each step within a [`DataPipeline`]
pub struct PipelineContext {
processor: Box<dyn PipelineProcessor>,

View File

@ -1,5 +1,5 @@
use crate::environment::DefaultTransferables;
use crate::io::apc::{AsyncProcedure, AsyncProcedureCall, Context, Input, Transferable};
use crate::io::apc::{AsyncProcedure, AsyncProcedureCall, Context, Input, Message};
use crate::io::source_client::{HttpSourceClient, SourceClient};
use crate::io::transferables::Transferables;
use crate::{Environment, HttpClient};
@ -12,7 +12,7 @@ use tokio_util::task::LocalPoolHandle;
// FIXME: Make this generic using the Schedule
#[derive(Clone)]
pub struct TokioContext<T: Transferables, HC: HttpClient> {
sender: Sender<Transferable<T>>,
sender: Sender<Message<T>>,
source_client: SourceClient<HC>,
}
@ -20,9 +20,8 @@ impl<T: Transferables, HC: HttpClient> Context<T, HC> for TokioContext<T, HC>
where
T: Clone,
{
fn send(&self, data: Transferable<T>) {
fn send(&self, data: Message<T>) {
self.sender.send(data).unwrap();
log::debug!("sent");
}
fn source_client(&self) -> &SourceClient<HC> {
@ -30,47 +29,44 @@ where
}
}
pub struct TokioAsyncProcedureCall {
pub struct TokioAsyncProcedureCall<HC: HttpClient> {
channel: (
Sender<Transferable<DefaultTransferables>>,
Receiver<Transferable<DefaultTransferables>>,
Sender<Message<DefaultTransferables>>,
Receiver<Message<DefaultTransferables>>,
),
pool: LocalPoolHandle,
http_client: HC,
}
impl TokioAsyncProcedureCall {
pub fn new() -> Self {
impl<HC: HttpClient> TokioAsyncProcedureCall<HC> {
pub fn new(http_client: HC) -> Self {
Self {
channel: mpsc::channel(),
pool: LocalPoolHandle::new(4),
http_client,
}
}
}
impl<HC: HttpClient> AsyncProcedureCall<DefaultTransferables, HC> for TokioAsyncProcedureCall {
impl<HC: HttpClient> AsyncProcedureCall<DefaultTransferables, HC> for TokioAsyncProcedureCall<HC> {
type Context = TokioContext<DefaultTransferables, HC>;
fn receive(&mut self) -> Option<Box<Transferable<DefaultTransferables>>> {
fn receive(&mut self) -> Option<Message<DefaultTransferables>> {
let transferred = self.channel.1.try_recv().ok()?;
log::debug!("received");
Some(Box::new(transferred))
Some(transferred)
}
fn schedule(
&self,
input: Input,
procedure: AsyncProcedure<DefaultTransferables, HC>,
http_client: HttpSourceClient<HC>,
) {
fn schedule(&self, input: Input, procedure: AsyncProcedure<Self::Context>) {
let sender = self.channel.0.clone();
let client = self.http_client.clone(); // FIXME: do not clone each time
self.pool.spawn_pinned(move || async move {
(procedure)(
input,
Box::new(TokioContext {
TokioContext {
sender,
source_client: SourceClient::Http(http_client),
}),
source_client: SourceClient::Http(HttpSourceClient::new(client)),
},
)
.await;
});

View File

@ -1,6 +1,7 @@
//! [Stages](Stage) for requesting and preparing data
use std::cell::RefCell;
use std::marker::PhantomData;
use std::rc::Rc;
use std::sync::{mpsc, Arc, Mutex};
@ -8,7 +9,7 @@ use geozero::{mvt::tile, GeozeroDatasource};
use request_stage::RequestStage;
use crate::environment::DefaultTransferables;
use crate::io::apc::{AsyncProcedureCall, Context, Transferable};
use crate::io::apc::{AsyncProcedureCall, Context, Message};
use crate::io::transferables::Transferables;
use crate::io::transferables::{
DefaultTessellatedLayer, DefaultTileTessellated, DefaultUnavailableLayer, TessellatedLayer,
@ -47,24 +48,26 @@ pub fn register_stages<E: Environment>(
schedule.add_stage("populate_tile_store", PopulateTileStore::<E>::new(apc));
}
pub struct HeadedPipelineProcessor<E: Environment> {
context: Box<dyn Context<E::Transferables, E::HttpClient>>, // TODO: remove box
pub struct HeadedPipelineProcessor<T: Transferables, HC: HttpClient, C: Context<T, HC>> {
context: C,
phantom_t: PhantomData<T>,
phantom_hc: PhantomData<HC>,
}
impl<E: Environment> PipelineProcessor for HeadedPipelineProcessor<E> {
impl<'c, T: Transferables, HC: HttpClient, C: Context<T, HC>> PipelineProcessor
for HeadedPipelineProcessor<T, HC, C>
{
fn tile_finished(&mut self, coords: &WorldTileCoords) {
self.context.send(Transferable::TileTessellated(
<E::Transferables as Transferables>::TileTessellated::new(*coords),
))
self.context
.send(Message::TileTessellated(T::TileTessellated::new(*coords)))
}
fn layer_unavailable(&mut self, coords: &WorldTileCoords, layer_name: &str) {
self.context.send(Transferable::UnavailableLayer(
<E::Transferables as Transferables>::UnavailableLayer::new(
self.context
.send(Message::UnavailableLayer(T::UnavailableLayer::new(
*coords,
layer_name.to_owned(),
),
))
)))
}
fn layer_tesselation_finished(
@ -74,14 +77,13 @@ impl<E: Environment> PipelineProcessor for HeadedPipelineProcessor<E> {
feature_indices: Vec<u32>,
layer_data: tile::Layer,
) {
self.context.send(Transferable::TessellatedLayer(
<E::Transferables as Transferables>::TessellatedLayer::new(
self.context
.send(Message::TessellatedLayer(T::TessellatedLayer::new(
*coords,
buffer,
feature_indices,
layer_data,
),
))
)))
}
fn layer_indexing_finished(

View File

@ -1,6 +1,6 @@
//! Receives data from async threads and populates the [`crate::io::tile_repository::TileRepository`].
use crate::io::apc::{AsyncProcedureCall, Transferable};
use crate::io::apc::{AsyncProcedureCall, Message};
use crate::io::transferables::{TessellatedLayer, TileTessellated, UnavailableLayer};
use crate::{context::MapContext, io::tile_repository::StoredLayer, schedule::Stage, Environment};
use std::borrow::BorrowMut;
@ -27,15 +27,15 @@ impl<E: Environment> Stage for PopulateTileStore<E> {
) {
if let Ok(mut apc) = self.apc.deref().try_borrow_mut() {
if let Some(result) = apc.receive() {
match *result {
Transferable::TileTessellated(tranferred) => {
match result {
Message::TileTessellated(tranferred) => {
let coords = tranferred.coords();
tile_repository.success(coords);
tracing::trace!("Tile at {} finished loading", coords);
log::warn!("Tile at {} finished loading", coords);
}
// FIXME: deduplicate
Transferable::UnavailableLayer(tranferred) => {
Message::UnavailableLayer(tranferred) => {
let layer: StoredLayer = tranferred.to_stored_layer();
tracing::debug!(
"Layer {} at {} reached main thread",
@ -44,7 +44,7 @@ impl<E: Environment> Stage for PopulateTileStore<E> {
);
tile_repository.put_tessellated_layer(layer);
}
Transferable::TessellatedLayer(data) => {
Message::TessellatedLayer(data) => {
let layer: StoredLayer = data.to_stored_layer();
tracing::debug!(
"Layer {} at {} reached main thread",

View File

@ -69,9 +69,9 @@ impl<E: Environment> Stage for RequestStage<E> {
}
}
pub fn schedule<E: Environment>(
pub fn schedule<E: Environment, C: Context<E::Transferables, E::HttpClient>>(
input: Input,
context: Box<dyn Context<E::Transferables, E::HttpClient>>, // TODO: remove box
context: C,
) -> Pin<Box<(dyn Future<Output = ()> + 'static)>> {
// FIXME: improve input handling
let input = match input {
@ -88,8 +88,11 @@ pub fn schedule<E: Environment>(
Ok(data) => {
let data = data.into_boxed_slice();
let mut pipeline_context =
PipelineContext::new(HeadedPipelineProcessor::<E> { context });
let mut pipeline_context = PipelineContext::new(HeadedPipelineProcessor {
context,
phantom_t: Default::default(),
phantom_hc: Default::default(),
});
let pipeline = build_vector_tile_pipeline();
pipeline.process((input, data), &mut pipeline_context);
}
@ -144,8 +147,13 @@ impl<E: Environment> RequestStage<E> {
coords: *coords,
layers: layers.clone(),
}),
schedule::<E>,
self.http_source_client.clone(),
schedule::<
E,
<E::AsyncProcedureCall as AsyncProcedureCall<
E::Transferables,
E::HttpClient,
>>::Context,
>,
);
}

View File

@ -1,7 +1,7 @@
use crate::platform::sync::pool_scheduler::WebWorkerPoolScheduler;
use maplibre::environment::DefaultTransferables;
use maplibre::environment::Environment;
use maplibre::io::apc::{AsyncProcedure, AsyncProcedureCall, Context, Transferable};
use maplibre::io::apc::{AsyncProcedure, AsyncProcedureCall, Context, Message};
use maplibre::io::scheduler::Scheduler;
use maplibre::io::source_client::{HttpClient, HttpSourceClient, SourceClient};
use maplibre::io::transferables::Transferables;
@ -12,7 +12,7 @@ use std::sync::mpsc::{Receiver, Sender};
#[derive(Clone)]
pub struct AtomicContext<T: Transferables, HC: HttpClient> {
sender: Sender<Transferable<T>>,
sender: Sender<Message<T>>,
source_client: SourceClient<HC>,
}
@ -20,7 +20,7 @@ impl<T: Transferables, HC: HttpClient> Context<T, HC> for AtomicContext<T, HC>
where
T: Clone,
{
fn send(&self, data: Transferable<T>) {
fn send(&self, data: Message<T>) {
self.sender.send(data).unwrap();
}
@ -31,8 +31,8 @@ where
pub struct AtomicAsyncProcedureCall {
channel: (
Sender<Transferable<DefaultTransferables>>,
Receiver<Transferable<DefaultTransferables>>,
Sender<Message<DefaultTransferables>>,
Receiver<Message<DefaultTransferables>>,
),
scheduler: WebWorkerPoolScheduler,
}
@ -49,7 +49,7 @@ impl AtomicAsyncProcedureCall {
impl<HC: HttpClient> AsyncProcedureCall<DefaultTransferables, HC> for AtomicAsyncProcedureCall {
type Context = AtomicContext<DefaultTransferables, HC>;
fn receive(&self) -> Option<Transferable<DefaultTransferables>> {
fn receive(&self) -> Option<Message<DefaultTransferables>> {
let transferred = self.channel.1.try_recv().ok()?;
Some(transferred)
}

View File

@ -4,7 +4,7 @@ use crate::platform::unsync::transferables::{
use crate::{MapType, WHATWGFetchHttpClient};
use js_sys::Uint8Array;
use maplibre::environment::Environment;
use maplibre::io::apc::{AsyncProcedure, AsyncProcedureCall, Context, Input, Transferable};
use maplibre::io::apc::{AsyncProcedure, AsyncProcedureCall, Context, Input, Message};
use maplibre::io::scheduler::Scheduler;
use maplibre::io::source_client::{HttpClient, HttpSourceClient, SourceClient};
use maplibre::io::transferables::Transferables;
@ -25,19 +25,23 @@ use wasm_bindgen::JsCast;
use wasm_bindgen::JsValue;
use web_sys::{DedicatedWorkerGlobalScope, Worker};
type UsedTransferables = LinearTransferables;
type UsedHttpClient = WHATWGFetchHttpClient;
type UsedContext = PassingContext;
#[derive(Clone)]
pub struct PassingContext<HC: HttpClient> {
source_client: SourceClient<HC>,
pub struct PassingContext {
source_client: SourceClient<UsedHttpClient>,
}
impl<HC: HttpClient> Context<LinearTransferables, HC> for PassingContext<HC> {
fn send(&self, data: Transferable<LinearTransferables>) {
impl Context<UsedTransferables, UsedHttpClient> for PassingContext {
fn send(&self, data: Message<UsedTransferables>) {
// TODO: send back to main thread via postMessage
let (tag, serialized): (u32, &[u8]) = match &data {
Transferable::TileTessellated(data) => (1, bytemuck::bytes_of(data)),
Transferable::UnavailableLayer(data) => (2, bytemuck::bytes_of(data)),
Transferable::TessellatedLayer(data) => (3, bytemuck::bytes_of(data.data.as_ref())),
Message::TileTessellated(data) => (1, bytemuck::bytes_of(data)),
Message::UnavailableLayer(data) => (2, bytemuck::bytes_of(data)),
Message::TessellatedLayer(data) => (3, bytemuck::bytes_of(data.data.as_ref())),
};
let serialized_array_buffer = js_sys::ArrayBuffer::new(serialized.len() as u32);
@ -53,7 +57,7 @@ impl<HC: HttpClient> Context<LinearTransferables, HC> for PassingContext<HC> {
global.post_message(&array).unwrap();
}
fn source_client(&self) -> &SourceClient<HC> {
fn source_client(&self) -> &SourceClient<UsedHttpClient> {
&self.source_client
}
}
@ -62,7 +66,7 @@ pub struct PassingAsyncProcedureCall {
new_worker: Box<dyn Fn() -> Worker>,
workers: Vec<Worker>,
received: Vec<Box<Transferable<LinearTransferables>>>,
received: Vec<Message<UsedTransferables>>,
}
impl PassingAsyncProcedureCall {
@ -89,21 +93,15 @@ impl PassingAsyncProcedureCall {
}
}
impl<HC: HttpClient> AsyncProcedureCall<LinearTransferables, HC> for PassingAsyncProcedureCall {
type Context = PassingContext<HC>;
impl AsyncProcedureCall<UsedTransferables, UsedHttpClient> for PassingAsyncProcedureCall {
type Context = UsedContext;
fn receive(&mut self) -> Option<Box<Transferable<LinearTransferables>>> {
fn receive(&mut self) -> Option<Message<UsedTransferables>> {
self.received.pop()
}
fn schedule(
&self,
input: Input,
procedure: AsyncProcedure<LinearTransferables, HC>,
http_client: HttpSourceClient<HC>, // FIXME
) {
let procedure_ptr =
procedure as *mut AsyncProcedure<LinearTransferables, WHATWGFetchHttpClient> as u32; // TODO: is u32 fine?
fn schedule(&self, input: Input, procedure: AsyncProcedure<Self::Context>) {
let procedure_ptr = procedure as *mut AsyncProcedure<Self::Context> as u32; // TODO: is u32 fine?
let input = serde_json::to_string(&input).unwrap();
let array = js_sys::Array::new();
@ -119,8 +117,7 @@ impl<HC: HttpClient> AsyncProcedureCall<LinearTransferables, HC> for PassingAsyn
pub async fn unsync_worker_entry(procedure_ptr: u32, input: String) -> Result<(), JsValue> {
log::info!("worker_entry unsync");
let procedure: AsyncProcedure<LinearTransferables, WHATWGFetchHttpClient> =
unsafe { std::mem::transmute(procedure_ptr) };
let procedure: AsyncProcedure<UsedContext> = unsafe { std::mem::transmute(procedure_ptr) };
let input = serde_json::from_str::<Input>(&input).unwrap();
@ -128,7 +125,7 @@ pub async fn unsync_worker_entry(procedure_ptr: u32, input: String) -> Result<()
source_client: SourceClient::Http(HttpSourceClient::new(WHATWGFetchHttpClient::new())),
};
(procedure)(input, Box::new(context)).await;
(procedure)(input, context).await;
Ok(())
}
@ -143,7 +140,7 @@ pub fn unsync_main_entry(
let mut map = unsafe { Rc::from_raw(map_ptr) };
let transferred = match tag {
3 => Some(Transferable::<LinearTransferables>::TessellatedLayer(
3 => Some(Message::<UsedTransferables>::TessellatedLayer(
LinearTessellatedLayer {
data: unsafe {
let mut uninit = Box::<InnerData>::new_zeroed();
@ -154,13 +151,13 @@ pub fn unsync_main_entry(
},
},
)),
1 => Some(Transferable::<LinearTransferables>::TileTessellated(
*bytemuck::from_bytes::<<LinearTransferables as Transferables>::TileTessellated>(
1 => Some(Message::<UsedTransferables>::TileTessellated(
*bytemuck::from_bytes::<<UsedTransferables as Transferables>::TileTessellated>(
&data.to_vec(),
),
)),
2 => Some(Transferable::<LinearTransferables>::UnavailableLayer(
*bytemuck::from_bytes::<<LinearTransferables as Transferables>::UnavailableLayer>(
2 => Some(Message::<UsedTransferables>::UnavailableLayer(
*bytemuck::from_bytes::<<UsedTransferables as Transferables>::UnavailableLayer>(
&data.to_vec(),
),
)),
@ -178,7 +175,7 @@ pub fn unsync_main_entry(
.deref()
.borrow_mut()
.received
.push(Box::new(transferred)); // FIXME: remove box
.push(transferred);
mem::forget(map);

View File

@ -17,14 +17,14 @@ unsafe impl TransparentWrapper<WorldTileCoords> for WrapperWorldTileCoords {}
unsafe impl bytemuck::Zeroable for WrapperWorldTileCoords {}
unsafe impl bytemuck::Pod for WrapperWorldTileCoords {}
#[derive(Copy, Clone, Debug)]
#[derive(Copy, Clone)]
#[repr(transparent)]
pub struct LongVertexShader([ShaderVertex; 15000]);
unsafe impl TransparentWrapper<[ShaderVertex; 15000]> for LongVertexShader {}
unsafe impl bytemuck::Zeroable for LongVertexShader {}
unsafe impl bytemuck::Pod for LongVertexShader {}
#[derive(Copy, Clone, Debug)]
#[derive(Copy, Clone)]
#[repr(transparent)]
pub struct LongIndices([IndexDataType; 40000]);
unsafe impl TransparentWrapper<[IndexDataType; 40000]> for LongIndices {}
@ -74,7 +74,7 @@ impl UnavailableLayer for LinearUnavailableLayer {
}
}
#[derive(Copy, Clone, Pod, Zeroable, Debug)]
#[derive(Copy, Clone, Pod, Zeroable)]
#[repr(C)]
pub struct InnerData {
pub coords: WrapperWorldTileCoords,