class Stream(BaseStream):
def __init__(self, event_stream):
super().__init__(event_stream)
if event_stream.encoding["type"] == "numeric":
self._kind = "array"
elif event_stream.encoding["type"] == "json":
self._kind = "json"
else:
raise UnknownEncodingError(
f"Unknow stream encoding {event_stream.encoding}"
)
@staticmethod
def recipe(name, dtype, shape=None, info={}):
if dtype == "json":
if shape is None:
return StreamRecipe(name, info, JsonStreamEncoder())
else:
raise ValueError("JSON stream cannot have shape")
else:
if shape is None:
shape = ()
info = info.copy()
info["dtype"] = np.dtype(dtype).name
info["shape"] = shape
return StreamRecipe(name, info, NumericStreamEncoder(dtype, shape))
@property
def kind(self):
return self._kind
@property
def plugin(self):
None
def __len__(self):
return len(self._event_stream)
def __getitem__(self, key):
return self._event_stream[key]
def _need_last_only(self, last_only):
return last_only
def _build_view_from_events(self, index, events: EventRange, last_only):
# NOTE event_range is never empty
return View(events)