Skip to content

Streams

Bases: BaseStream

Source code in blissdata/streams/base.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
class Stream(BaseStream):
    def __init__(self, event_stream):
        super().__init__(event_stream)
        if event_stream.encoding["type"] == "numeric":
            self._kind = "array"
        elif event_stream.encoding["type"] == "json":
            self._kind = "json"
        else:
            raise UnknownEncodingError(
                f"Unknow stream encoding {event_stream.encoding}"
            )

    @staticmethod
    def recipe(name, dtype, shape=None, info={}):
        if dtype == "json":
            if shape is None:
                return StreamRecipe(name, info, JsonStreamEncoder())
            else:
                raise ValueError("JSON stream cannot have shape")
        else:
            if shape is None:
                shape = ()
            info = info.copy()
            info["dtype"] = np.dtype(dtype).name
            info["shape"] = shape
            return StreamRecipe(name, info, NumericStreamEncoder(dtype, shape))

    @property
    def kind(self):
        return self._kind

    @property
    def plugin(self):
        None

    def __len__(self):
        return len(self._event_stream)

    def __getitem__(self, key):
        return self._event_stream[key]

    def _need_last_only(self, last_only):
        return last_only

    def _build_view_from_events(self, index, events: EventRange, last_only):
        # NOTE event_range is never empty
        return View(events)

Bases: BaseView

Source code in blissdata/streams/base.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
class View(BaseView):
    def __init__(self, event_range: EventRange):
        self._events = event_range

    @property
    def index(self) -> int:
        return self._events.index

    def __len__(self) -> int:
        return len(self._events)

    def get_data(self, start=None, stop=None):
        trimmed_range = range(len(self))[start:stop]
        offset = self._events.nb_expired
        data_start = trimmed_range.start - offset
        data_stop = trimmed_range.stop - offset
        if data_start < 0:
            raise IndexNoMoreThereError
        else:
            return self._events.data[data_start:data_stop]