mirror of
https://github.com/maplibre/maplibre-rs.git
synced 2025-12-08 19:05:57 +00:00
Start implementing pipeline steps
This commit is contained in:
parent
1fd40c2fe9
commit
cdb7f1395e
@ -1,11 +1,78 @@
|
||||
use crate::coords::WorldTileCoords;
|
||||
use crate::io::{LayerTessellateMessage, TessellateMessage, TileRequestID, TileTessellateMessage};
|
||||
use crate::render::ShaderVertex;
|
||||
use crate::tessellation::{IndexDataType, OverAlignedVertexBuffer};
|
||||
use geozero::mvt::tile;
|
||||
use std::marker::PhantomData;
|
||||
use std::process::Output;
|
||||
use std::sync::mpsc;
|
||||
|
||||
pub trait PipelineProcessor {
|
||||
fn finished_tile_tesselation(&mut self, request_id: TileRequestID, coords: &WorldTileCoords);
|
||||
fn unavailable_layer(&mut self, coords: &WorldTileCoords, layer_name: &str);
|
||||
fn finished_layer_tesselation(
|
||||
&mut self,
|
||||
coords: &WorldTileCoords,
|
||||
buffer: OverAlignedVertexBuffer<ShaderVertex, IndexDataType>,
|
||||
// Holds for each feature the count of indices.
|
||||
feature_indices: Vec<u32>,
|
||||
layer_data: tile::Layer,
|
||||
);
|
||||
}
|
||||
|
||||
pub struct HeadedPipelineProcessor {
|
||||
pub message_sender: mpsc::Sender<TessellateMessage>,
|
||||
}
|
||||
|
||||
impl PipelineProcessor for HeadedPipelineProcessor {
|
||||
fn finished_tile_tesselation(&mut self, request_id: TileRequestID, coords: &WorldTileCoords) {
|
||||
self.message_sender
|
||||
.send(TessellateMessage::Tile(TileTessellateMessage {
|
||||
request_id,
|
||||
coords: *coords,
|
||||
}))
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
fn unavailable_layer(&mut self, coords: &WorldTileCoords, layer_name: &str) {
|
||||
self.message_sender.send(TessellateMessage::Layer(
|
||||
LayerTessellateMessage::UnavailableLayer {
|
||||
coords: *coords,
|
||||
layer_name: layer_name.to_owned(),
|
||||
},
|
||||
));
|
||||
}
|
||||
fn finished_layer_tesselation(
|
||||
&mut self,
|
||||
coords: &WorldTileCoords,
|
||||
buffer: OverAlignedVertexBuffer<ShaderVertex, IndexDataType>,
|
||||
feature_indices: Vec<u32>,
|
||||
layer_data: tile::Layer,
|
||||
) {
|
||||
self.message_sender
|
||||
.send(TessellateMessage::Layer(
|
||||
LayerTessellateMessage::TessellatedLayer {
|
||||
coords: *coords,
|
||||
buffer,
|
||||
feature_indices,
|
||||
layer_data,
|
||||
},
|
||||
))
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PipelineContext {
|
||||
pub processor: Box<dyn PipelineProcessor>,
|
||||
}
|
||||
|
||||
impl PipelineContext {}
|
||||
|
||||
pub trait Processable {
|
||||
type Input;
|
||||
type Output;
|
||||
|
||||
fn process(&self, input: Self::Input) -> Self::Output;
|
||||
fn process(&self, input: Self::Input, context: &mut PipelineContext) -> Self::Output;
|
||||
}
|
||||
|
||||
pub struct PipelineStep<P, N>
|
||||
@ -25,9 +92,9 @@ where
|
||||
type Input = P::Input;
|
||||
type Output = N::Output;
|
||||
|
||||
fn process(&self, input: Self::Input) -> Self::Output {
|
||||
let output = self.func.process(input);
|
||||
self.next.process(output)
|
||||
fn process(&self, input: Self::Input, context: &mut PipelineContext) -> Self::Output {
|
||||
let output = self.func.process(input, context);
|
||||
self.next.process(output, context)
|
||||
}
|
||||
}
|
||||
|
||||
@ -40,45 +107,45 @@ impl<I> Processable for EndStep<I> {
|
||||
type Input = I;
|
||||
type Output = I;
|
||||
|
||||
fn process(&self, input: Self::Input) -> Self::Output {
|
||||
fn process(&self, input: Self::Input, _context: &mut PipelineContext) -> Self::Output {
|
||||
input
|
||||
}
|
||||
}
|
||||
|
||||
impl<I, O> Processable for &fn(I) -> O {
|
||||
impl<I, O> Processable for &fn(input: I, context: &mut PipelineContext) -> O {
|
||||
type Input = I;
|
||||
type Output = O;
|
||||
|
||||
fn process(&self, input: Self::Input) -> Self::Output {
|
||||
(self)(input)
|
||||
fn process(&self, input: Self::Input, context: &mut PipelineContext) -> Self::Output {
|
||||
(self)(input, context)
|
||||
}
|
||||
}
|
||||
|
||||
impl<I, O> Processable for fn(I) -> O {
|
||||
impl<I, O> Processable for fn(input: I, context: &mut PipelineContext) -> O {
|
||||
type Input = I;
|
||||
type Output = O;
|
||||
|
||||
fn process(&self, input: Self::Input) -> Self::Output {
|
||||
(self)(input)
|
||||
fn process(&self, input: Self::Input, context: &mut PipelineContext) -> Self::Output {
|
||||
(self)(input, context)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FnProcessable<I: 'static, O: 'static> {
|
||||
func: &'static fn(I) -> O,
|
||||
func: &'static fn(I, context: &mut PipelineContext) -> O,
|
||||
}
|
||||
|
||||
impl<I, O> Processable for FnProcessable<I, O> {
|
||||
type Input = I;
|
||||
type Output = O;
|
||||
|
||||
fn process(&self, input: Self::Input) -> Self::Output {
|
||||
(self.func)(input)
|
||||
fn process(&self, input: Self::Input, context: &mut PipelineContext) -> Self::Output {
|
||||
(self.func)(input, context)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ClosureProcessable<F, I, O>
|
||||
where
|
||||
F: Fn(I) -> O,
|
||||
F: Fn(I, &mut PipelineContext) -> O,
|
||||
{
|
||||
func: F,
|
||||
phantom_i: PhantomData<I>,
|
||||
@ -87,78 +154,170 @@ where
|
||||
|
||||
impl<F, I, O> Processable for ClosureProcessable<F, I, O>
|
||||
where
|
||||
F: Fn(I) -> O,
|
||||
F: Fn(I, &mut PipelineContext) -> O,
|
||||
{
|
||||
type Input = I;
|
||||
type Output = O;
|
||||
|
||||
fn process(&self, input: Self::Input) -> Self::Output {
|
||||
(self.func)(input)
|
||||
fn process(&self, input: Self::Input, context: &mut PipelineContext) -> Self::Output {
|
||||
(self.func)(input, context)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Closure2Processable<I, O> {
|
||||
func: fn(I) -> O,
|
||||
func: fn(I, context: &mut PipelineContext) -> O,
|
||||
}
|
||||
|
||||
impl<I, O> Processable for Closure2Processable<I, O> {
|
||||
type Input = I;
|
||||
type Output = O;
|
||||
|
||||
fn process(&self, input: Self::Input) -> Self::Output {
|
||||
(self.func)(input)
|
||||
fn process(&self, input: Self::Input, context: &mut PipelineContext) -> Self::Output {
|
||||
(self.func)(input, context)
|
||||
}
|
||||
}
|
||||
|
||||
impl<I, O> Processable for dyn Fn(I) -> O {
|
||||
type Input = I;
|
||||
type Output = O;
|
||||
mod steps {
|
||||
use crate::io::pipeline::{PipelineContext, Processable};
|
||||
use crate::io::{TileRequest, TileRequestID};
|
||||
use crate::tessellation::zero_tessellator::ZeroTessellator;
|
||||
use crate::tessellation::IndexDataType;
|
||||
use geozero::GeozeroDatasource;
|
||||
use prost::Message;
|
||||
use std::collections::HashSet;
|
||||
|
||||
fn process(&self, input: Self::Input) -> Self::Output {
|
||||
(self)(input)
|
||||
pub struct ParseTileStep {}
|
||||
|
||||
impl Processable for ParseTileStep {
|
||||
type Input = (TileRequest, TileRequestID, Box<[u8]>);
|
||||
type Output = (TileRequest, TileRequestID, geozero::mvt::Tile);
|
||||
|
||||
fn process(
|
||||
&self,
|
||||
(tile_request, request_id, data): Self::Input,
|
||||
_context: &mut PipelineContext,
|
||||
) -> Self::Output {
|
||||
let tile = geozero::mvt::Tile::decode(data.as_ref()).expect("failed to load tile");
|
||||
(tile_request, request_id, tile)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct IndexLayerStep {}
|
||||
|
||||
pub struct TessellateLayerStep {}
|
||||
|
||||
impl Processable for TessellateLayerStep {
|
||||
type Input = (TileRequest, TileRequestID, geozero::mvt::Tile);
|
||||
type Output = ();
|
||||
|
||||
fn process(
|
||||
&self,
|
||||
(tile_request, request_id, mut tile): Self::Input,
|
||||
context: &mut PipelineContext,
|
||||
) -> Self::Output {
|
||||
let coords = &tile_request.coords;
|
||||
|
||||
for layer in &mut tile.layers {
|
||||
let cloned_layer = layer.clone();
|
||||
let layer_name: &str = &cloned_layer.name;
|
||||
if !tile_request.layers.contains(layer_name) {
|
||||
continue;
|
||||
}
|
||||
|
||||
tracing::info!("layer {} at {} ready", layer_name, coords);
|
||||
|
||||
let mut tessellator = ZeroTessellator::<IndexDataType>::default();
|
||||
if let Err(e) = layer.process(&mut tessellator) {
|
||||
context.processor.unavailable_layer(coords, layer_name);
|
||||
|
||||
tracing::error!(
|
||||
"layer {} at {} tesselation failed {:?}",
|
||||
layer_name,
|
||||
&coords,
|
||||
e
|
||||
);
|
||||
} else {
|
||||
context.processor.finished_layer_tesselation(
|
||||
coords,
|
||||
tessellator.buffer.into(),
|
||||
tessellator.feature_indices,
|
||||
cloned_layer,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
let available_layers: HashSet<_> = tile
|
||||
.layers
|
||||
.iter()
|
||||
.map(|layer| layer.name.clone())
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
for missing_layer in tile_request.layers.difference(&available_layers) {
|
||||
context.processor.unavailable_layer(coords, missing_layer);
|
||||
|
||||
tracing::info!(
|
||||
"requested layer {} at {} not found in tile",
|
||||
missing_layer,
|
||||
&coords
|
||||
);
|
||||
}
|
||||
|
||||
tracing::info!("tile tessellated at {} finished", &tile_request.coords);
|
||||
|
||||
context
|
||||
.processor
|
||||
.finished_tile_tesselation(request_id, &tile_request.coords);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::io::pipeline::{
|
||||
Closure2Processable, ClosureProcessable, EndStep, FnProcessable, PipelineStep, Processable,
|
||||
Closure2Processable, ClosureProcessable, EndStep, FnProcessable, HeadedPipelineProcessor,
|
||||
PipelineContext, PipelineStep, Processable,
|
||||
};
|
||||
use std::sync::mpsc;
|
||||
|
||||
fn add_one(input: u32) -> u8 {
|
||||
fn add_one(input: u32, context: &mut PipelineContext) -> u8 {
|
||||
input as u8 + 1
|
||||
}
|
||||
|
||||
fn add_two(input: u8) -> u32 {
|
||||
fn add_two(input: u8, context: &mut PipelineContext) -> u32 {
|
||||
input as u32 + 2
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test() {
|
||||
let mut context = PipelineContext {
|
||||
processor: Box::new(HeadedPipelineProcessor {
|
||||
message_sender: mpsc::channel().0,
|
||||
}),
|
||||
};
|
||||
let output: u32 = PipelineStep {
|
||||
func: FnProcessable {
|
||||
func: &(add_two as fn(u8) -> u32),
|
||||
func: &(add_two as fn(u8, &mut PipelineContext) -> u32),
|
||||
},
|
||||
next: EndStep::default(),
|
||||
}
|
||||
.process(5u8);
|
||||
.process(5u8, &mut context);
|
||||
|
||||
assert_eq!(output, 7);
|
||||
|
||||
let output = PipelineStep {
|
||||
func: &(add_one as fn(u32) -> u8),
|
||||
func: &(add_one as fn(u32, &mut PipelineContext) -> u8),
|
||||
next: PipelineStep {
|
||||
func: &(add_two as fn(u8) -> u32),
|
||||
func: &(add_two as fn(u8, &mut PipelineContext) -> u32),
|
||||
next: EndStep::default(),
|
||||
},
|
||||
}
|
||||
.process(5);
|
||||
.process(5u32, &mut context);
|
||||
|
||||
assert_eq!(output, 8);
|
||||
|
||||
let output: u32 = PipelineStep {
|
||||
func: ClosureProcessable {
|
||||
func: |input: u8| -> u32 {
|
||||
func: |input: u8, context| -> u32 {
|
||||
return input as u32 + 2;
|
||||
},
|
||||
phantom_i: Default::default(),
|
||||
@ -166,17 +325,17 @@ mod tests {
|
||||
},
|
||||
next: EndStep::default(),
|
||||
}
|
||||
.process(5u8);
|
||||
.process(5u8, &mut context);
|
||||
|
||||
assert_eq!(output, 7);
|
||||
|
||||
let output: u32 = PipelineStep {
|
||||
func: Closure2Processable {
|
||||
func: |input: u8| -> u32 { input as u32 + 2 },
|
||||
func: |input: u8, context| -> u32 { input as u32 + 2 },
|
||||
},
|
||||
next: EndStep::default(),
|
||||
}
|
||||
.process(5u8);
|
||||
.process(5u8, &mut context);
|
||||
|
||||
assert_eq!(output, 7);
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user