xviz/python/xviz_avs/builder/future_instance.py
Timothy Wojtaszek 3cd43ae3af
Python module rename to xviz_avs (#588)
* Fix Protobuf writer to add PBE1 prefix to file
* Update protobuf & fix directory writer
* Fix FutureInstance builder and add tests
2020-05-12 22:00:14 -07:00

90 lines
3.3 KiB
Python

from xviz_avs.builder.base_builder import CATEGORY, PRIMITIVE_TYPES
from xviz_avs.builder.primitive import XVIZPrimitiveBuilder
from xviz_avs.v2.core_pb2 import FutureInstances, PrimitiveState
class XVIZFutureInstanceBuilder(XVIZPrimitiveBuilder):
def __init__(self, metadata, logger=None):
super().__init__(metadata, logger)
self._category = CATEGORY.FUTURE_INSTANCE # Override category
self.reset()
self._futures = {}
# Store entries in this list as (timestamp, type, primitive)
# which we convert to Protobuf messages upon get_data()
self._futures_list = {}
def reset(self):
super().reset()
self._ts = None
def timestamp(self, timestamp):
self._ts = timestamp
return self
def _get_primitives_type(self, primitives, primitive_type):
if primitive_type == PRIMITIVE_TYPES.CIRCLE:
return primitives.circles
elif primitive_type == PRIMITIVE_TYPES.IMAGE:
return primitives.images
elif primitive_type == PRIMITIVE_TYPES.POINT:
return primitives.points
elif primitive_type == PRIMITIVE_TYPES.POLYGON:
return primitives.polygons
elif primitive_type == PRIMITIVE_TYPES.POLYLINE:
return primitives.polylines
elif primitive_type == PRIMITIVE_TYPES.STADIUM:
return primitives.stadiums
elif primitive_type == PRIMITIVE_TYPES.TEXT:
return primitives.texts
else:
raise ValueError("FutureInstance type '{0}' is not recognized".format(primitive_type))
def _flush_futures_list(self):
# Since you cannot insert into a repeated message field
# we construct the protobuf message in order
for stream, entries in self._futures_list.items():
if stream not in self._futures:
self._futures[stream] = FutureInstances()
futures = self._futures[stream]
# sort on timestamp
entries.sort(key=lambda e: e[0])
last_ts = None
for entry in entries:
if last_ts is None or entry[0] != last_ts:
# Adding a new timestamp entry to the arrays
last_ts = entry[0]
futures.timestamps.append(entry[0])
future_prim = futures.primitives.add()
future_prim_type = self._get_primitives_type(future_prim, entry[1])
future_prim_type.append(entry[2])
else:
index = len(futures.timestamps)-1
future_prim = futures.primitives[index]
future_prim_type = self._get_primitives_type(future_prim, entry[1])
future_prim_type.append(entry[2])
def _flush(self):
if self._stream_id not in self._futures_list:
self._futures_list[self._stream_id] = []
primitive = self._format_primitive()
self._futures_list[self._stream_id].append((self._ts, self._type, primitive))
self.reset()
def get_data(self):
if self._type:
self._flush()
self._flush_futures_list()
if not self._futures:
return None
return self._futures