69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441 | class Scan:
def needs_write_permission(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
if self._write_permission:
return func(self, *args, **kwargs)
else:
raise NoWritePermission(f"Scan {self.key} is read-only")
return wrapper
@classmethod
def _load(cls, data_store, key):
scan = cls()
scan._write_permission = False
scan._data_store = data_store
prefix = scan._data_store._scan_model.make_key("")
if not key.startswith(prefix):
raise RuntimeError(f"Scan key should be prefixed by '{prefix}'")
id_model = cls._get_identity_model_cls(scan)
cls._expose_identity_model_fields(scan, id_model)
scan._streams = {}
scan._last_entry_id = b"0-0"
try:
# pk is just unprefixed version of key
pk = key[len(prefix) :]
scan._model = scan._data_store._scan_model.get(pk)
except model.NotFoundError as e:
raise ScanNotFoundError(
"Scan has been deleted from Redis, or key is wrong"
) from e
except ValidationError as e:
raise ScanValidationError(
"Scan exists in Redis but is invalid, most likely the scan model version on the publisher side is different"
) from e
else:
scan._state = ScanState.CREATED
scan.update(block=False)
return scan
@classmethod
def _create(cls, data_store, identity, info={}):
scan = cls()
scan._write_permission = True
scan._data_store = data_store
id_model = cls._get_identity_model_cls(scan)
scan._model = scan._data_store._scan_model(
id=id_model(**identity),
info=info,
state_stream=scan._data_store._stream_model(),
data_streams={},
)
cls._expose_identity_model_fields(scan, id_model)
scan._streams = {}
scan._writer_streams = {}
scan._state = ScanState.CREATED
scan._model.info = Scan._filter_nan_values(scan._model.info)
def _create_scan(pipe: redis.client.Pipeline) -> None:
scan._model.save(pipeline=pipe)
pipe.xadd(scan_creation_stream, {"key": scan.key}, maxlen=2048)
pipe.xadd(scan._model.state_stream.key(), {"state": scan.state.value})
scan._data_store._redis.transaction(_create_scan)
scan.json_info = "" # TODO to be removed, used to check info modification between state transitions
return scan
@staticmethod
def _get_identity_model_cls(scan):
"""Get the scan identity class."""
# pydantic v2
# id.field: <class 'pydantic.fields.FieldInfo'>
# id.field.annotation: <class 'redis_om.model.model.ModelMeta'>
#
# pydantic v1
# id.field: <class 'pydantic.fields.ModelField'>
# id.field.annotation: <class 'redis_om.model.model.ModelMeta'>
return scan._data_store._scan_model.id.field.annotation
@staticmethod
def _expose_identity_model_fields(scan, id_model):
"""Expose scan identity fields as properties of the scan instance."""
# id_model: <class 'redis_om.model.model.ModelMeta'>
#
# pydantic v2: id_model.model_fields or id_model.__fields__
# pydantic v1: id_model.__fields__
try:
prop_names = list(id_model.model_fields)
except AttributeError:
prop_names = list(id_model.__fields__)
for prop_name in prop_names:
if prop_name == "pk":
continue
def get_id_field(self, field=prop_name):
return getattr(self._model.id, field, None)
add_property(scan, prop_name, get_id_field)
@classmethod
def _load_rw(cls, data_store, key):
scan = Scan._load(data_store, key)
scan._write_permission = True
scan._writer_streams = {}
scan.json_info = ""
return scan
@staticmethod
def _filter_nan_values(obj):
# json_constant_map = {
# "-Infinity": float("-Infinity"),
# "Infinity": float("Infinity"),
# "NaN": None,
# }
class NumpyEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, np.ndarray):
return o.tolist()
elif isinstance(o, np.number):
return o.item()
else:
return json.JSONEncoder.default(self, o)
def format_bytes(nbytes):
suffixes = ["B", "KB", "MB", "GB", "TB", "PB"]
exp = int(math.log(nbytes, 1024))
return f"{nbytes/1024**exp:.4g}{suffixes[exp]}"
# Inspired from https://stackoverflow.com/a/65317610
# Only solution found to replace NaN values with null, which is valid in JSON.
# Other solutions imply new dependency or overriding methods which are not
# supposed to be in json module.
def json_nan_to_none(obj):
json_string = NumpyEncoder().encode(obj)
json_size = len(json_string)
if json_size > 2**20:
raise RuntimeError(
f"Scan JSON metadata is taking {format_bytes(json_size)} (limit 1MB)"
)
return json.loads(json_string, parse_constant=lambda constant: None)
# OR to define specific value for each constant
# return json.loads(json_string, parse_constant=lambda constant: json_constant_map[constant])
return json_nan_to_none(obj)
@property
def key(self):
return self._model.key()
@property
def info(self):
return self._model.info
@info.setter
@needs_write_permission
def info(self, info):
self._model.info = info
@property
def state(self):
return self._state
@property
def streams(self):
return self._streams.copy()
def _refresh_streams(self):
if self.state >= ScanState.PREPARED:
# instanciate Stream objects for models which not already have one
for name in self._model.data_streams.keys() - self._streams.keys():
model = self._model.data_streams[name]
event_stream = EventStream.open(self._data_store, name, model)
self._streams[name] = self._wrap_event_stream(event_stream)
def __str__(self):
return f'{type(self).__name__}(key:"{self.key}")'
def update(self, block=True, timeout=0) -> bool:
"""Update scan state and its content.
If the scan is already in a terminal state, False is returned immediately.
Otherwise it depends on 'block' and 'timeout' in seconds (timeout=0: wait forever).
Return a True if the scan state has changed.
Raise ScanNotFoundError if the scan is deleted."""
# Updating a scan in RW mode makes no sense, there should be one writer only, so he never needs to read.
assert not self._write_permission
# Ensure scan is not deleted, neither it is soon to be
# -1: scan has no planned expiration
# -2: scan already expired
# n: scan expire in n seconds
ttl = self._data_store._redis.ttl(self.key)
if ttl != -1 and ttl <= 10:
raise ScanNotFoundError("Scan has been deleted from Redis")
if self.state == ScanState.CLOSED:
return False
if not block:
timeout = None
else:
timeout = int(timeout * 1000)
# Because of expiration time, the scan can't have disappeared after the check we made at the beginning of
# this function. Therefore scan.state_stream exists and we won't get stucked on a non-existing stream.
result = self._data_store._redis.xread(
{self._model.state_stream.key(): self._last_entry_id}, block=timeout
)
if not result:
if timeout == 0:
raise RuntimeError(
"Redis blocking XREAD returned empty value, this is very unexpected !"
)
else:
return False
# Entries contain state, only last one is meaningful
last_entry = result[0][1][-1]
self._last_entry_id = last_entry[0].decode()
self._state = ScanState(int(last_entry[1][b"state"]))
# refresh json local copy on state change
try:
self._model = self._data_store._scan_model.get(self._model.pk)
except model.NotFoundError as e:
raise ScanNotFoundError("Scan has been deleted from Redis") from e
except ValidationError as e:
raise ScanValidationError(
"Scan exists in Redis but is invalid, most likely the scan model version on the publisher side is different"
) from e
self._refresh_streams()
return True
def _wrap_event_stream(self, event_stream):
plugin_name = None
if "plugin" not in event_stream.info:
# map legacy blissdata to plugins
if event_stream.encoding["type"] == "json":
format = event_stream.info.get("format", "")
if format == "lima_v1":
plugin_name = "lima"
elif format == "lima_v2":
plugin_name = "lima2"
elif format == "subscan":
plugin_name = "scan_sequence"
elif self.info.get("save", False) and "data_path" in event_stream.info:
# Old file backed stream where not self contained as the depended
# on scan.path to locate their file. Need to be resolved manually.
return Hdf5BackedStream(event_stream, self.path)
else:
plugin_name = event_stream.info["plugin"]
if plugin_name is None:
return Stream(event_stream)
else:
try:
plugin = loaded_plugins[plugin_name]
except KeyError:
try:
plugin_entry = discovered_plugins[plugin_name]
except KeyError:
# plugin is missing, but don't raise immediately to
# allow clients to access the rest of the scan.
# Return a broken stream object that will raise if used.
logger.warning(
f"Missing blissdata plugin '{plugin_name}', cannot load stream '{event_stream.name}'"
)
return MissingPluginStream(event_stream, plugin_name)
else:
plugin = plugin_entry.load()
loaded_plugins[plugin_name] = plugin
return plugin.stream_cls(event_stream)
@needs_write_permission
def create_stream(self, recipe):
"""Create a new data stream for the scan and return the associated Stream"""
name = recipe.name
encoder = recipe.encoder
info = recipe.info
if name in self._model.data_streams.keys():
raise RuntimeError(f'Stream "{name}" already exists.')
model = self._data_store._stream_model(encoding=encoder.info(), info=info)
event_stream = EventStream.create(self._data_store, name, model)
stream = self._wrap_event_stream(event_stream)
self._model.data_streams[recipe.name] = model
self._writer_streams[recipe.name] = stream
return stream
def get_writer_stream(self, name: str):
"""Load a Stream in read-write mode from an already prepared Scan.
Intended use is to distribute data publication into multiple processes.
IMPORTANT: Streams from a single Scan can be published from several
places, but each single stream must be published from a single place (if
ever it happens, Redis will not allow due to inconsistent indices).
Stream sealing on the other hand, can be done multiple times, from
multiple places."""
model = self._model.data_streams[name]
rw_event_stream = EventStream.create(self._data_store, name, model)
return self._wrap_event_stream(rw_event_stream)
@needs_write_permission
def _close_writer_streams(self):
"""Seal streams that are not sealed yet.
In the case of multiple processes/threads writing to the scan's streams,
it is each writer's responsibility to seal its stream. Then the scan owner
can wait for streams to be sealed and close the scan smoothly.
Eventually, it may timeout and force the closure of all streams, making
the still running writers to fail.
"""
for writer_stream in self._writer_streams.values():
writer_stream.seal()
self._writer_streams = {}
@needs_write_permission
def prepare(self):
if self.state is ScanState.CREATED:
self._set_state(ScanState.PREPARED)
else:
raise UnauthorizeStateTransition(self.state, ScanState.PREPARED)
@needs_write_permission
def start(self):
if self.state is ScanState.PREPARED:
self._set_state(ScanState.STARTED)
else:
raise UnauthorizeStateTransition(self.state, ScanState.STARTED)
@needs_write_permission
def stop(self):
if self.state is ScanState.STARTED:
self._close_writer_streams()
self._set_state(ScanState.STOPPED)
else:
raise UnauthorizeStateTransition(self.state, ScanState.STOPPED)
@needs_write_permission
def close(self):
self._close_writer_streams()
self._set_state(ScanState.CLOSED)
@needs_write_permission
def _set_state(self, state):
prev_state = self._state
self._state = state
self._model.info = Scan._filter_nan_values(self._model.info)
def update_scan_state(pipe: redis.client.Pipeline) -> None:
json_info = json.dumps(self._model.info)
if json_info != self.json_info:
assert (
prev_state is ScanState.CREATED and state is ScanState.PREPARED
) or state is ScanState.CLOSED, f"Scan info changed between states {ScanState(prev_state).name} and {ScanState(state).name}"
self.json_info = json_info
self._model.save(pipeline=pipe)
pipe.xadd(self._model.state_stream.key(), {"state": self.state.value})
self._data_store._redis.transaction(update_scan_state)
self._refresh_streams()
|