mirror of
https://github.com/aurora-opensource/xviz.git
synced 2026-01-18 14:07:46 +00:00
* Fix Protobuf writer to add PBE1 prefix to file * Update protobuf & fix directory writer * Fix FutureInstance builder and add tests
90 lines
3.3 KiB
Python
90 lines
3.3 KiB
Python
from xviz_avs.builder.base_builder import CATEGORY, PRIMITIVE_TYPES
|
|
from xviz_avs.builder.primitive import XVIZPrimitiveBuilder
|
|
from xviz_avs.v2.core_pb2 import FutureInstances, PrimitiveState
|
|
|
|
|
|
class XVIZFutureInstanceBuilder(XVIZPrimitiveBuilder):
|
|
def __init__(self, metadata, logger=None):
|
|
super().__init__(metadata, logger)
|
|
self._category = CATEGORY.FUTURE_INSTANCE # Override category
|
|
|
|
self.reset()
|
|
self._futures = {}
|
|
|
|
# Store entries in this list as (timestamp, type, primitive)
|
|
# which we convert to Protobuf messages upon get_data()
|
|
self._futures_list = {}
|
|
|
|
def reset(self):
|
|
super().reset()
|
|
self._ts = None
|
|
|
|
def timestamp(self, timestamp):
|
|
self._ts = timestamp
|
|
return self
|
|
|
|
def _get_primitives_type(self, primitives, primitive_type):
|
|
if primitive_type == PRIMITIVE_TYPES.CIRCLE:
|
|
return primitives.circles
|
|
elif primitive_type == PRIMITIVE_TYPES.IMAGE:
|
|
return primitives.images
|
|
elif primitive_type == PRIMITIVE_TYPES.POINT:
|
|
return primitives.points
|
|
elif primitive_type == PRIMITIVE_TYPES.POLYGON:
|
|
return primitives.polygons
|
|
elif primitive_type == PRIMITIVE_TYPES.POLYLINE:
|
|
return primitives.polylines
|
|
elif primitive_type == PRIMITIVE_TYPES.STADIUM:
|
|
return primitives.stadiums
|
|
elif primitive_type == PRIMITIVE_TYPES.TEXT:
|
|
return primitives.texts
|
|
else:
|
|
raise ValueError("FutureInstance type '{0}' is not recognized".format(primitive_type))
|
|
|
|
def _flush_futures_list(self):
|
|
# Since you cannot insert into a repeated message field
|
|
# we construct the protobuf message in order
|
|
for stream, entries in self._futures_list.items():
|
|
if stream not in self._futures:
|
|
self._futures[stream] = FutureInstances()
|
|
|
|
futures = self._futures[stream]
|
|
|
|
# sort on timestamp
|
|
entries.sort(key=lambda e: e[0])
|
|
last_ts = None
|
|
|
|
for entry in entries:
|
|
if last_ts is None or entry[0] != last_ts:
|
|
# Adding a new timestamp entry to the arrays
|
|
last_ts = entry[0]
|
|
futures.timestamps.append(entry[0])
|
|
future_prim = futures.primitives.add()
|
|
future_prim_type = self._get_primitives_type(future_prim, entry[1])
|
|
future_prim_type.append(entry[2])
|
|
else:
|
|
index = len(futures.timestamps)-1
|
|
future_prim = futures.primitives[index]
|
|
future_prim_type = self._get_primitives_type(future_prim, entry[1])
|
|
future_prim_type.append(entry[2])
|
|
|
|
def _flush(self):
|
|
if self._stream_id not in self._futures_list:
|
|
self._futures_list[self._stream_id] = []
|
|
|
|
primitive = self._format_primitive()
|
|
self._futures_list[self._stream_id].append((self._ts, self._type, primitive))
|
|
|
|
self.reset()
|
|
|
|
def get_data(self):
|
|
if self._type:
|
|
self._flush()
|
|
|
|
self._flush_futures_list()
|
|
|
|
if not self._futures:
|
|
return None
|
|
|
|
return self._futures
|