Low-level API
For online data processing, we can access the in-memory data before it is stored in HDF5 files. More details, including implementation logic, can be found here.
The idea is to start a watcher, that monitors a specific BLISS session. This watcher needs to be started before the scans start.
import os
os.environ["BEACON_HOST"] = "localhost:10001"
os.environ["TANGO_HOST"] = "localhost:10000"
session_watcher("demo_session")
The output looks like this:
Start data processing ...
START 26_loopscan
URI: ['/tmp/scans/inhouse/id002106/id00/sample/sample_0001/sample_0001.h5::/26.1']
Counter({<EventType.NEW_DATA: 2>: 39,
<EventType.NEW_NODE: 1>: 5,
<EventType.PREPARED_SCAN: 4>: 1,
<EventType.END_SCAN: 3>: 1})
END 26_loopscan
Data processing stopped # upon CTRL-C
Session watcher¶
import gevent
from blissdata.data.node import get_session_node
def session_watcher(session_name, **kw):
session = get_session_node(session_name)
scan_types = ("scan", "scan_group")
scans = dict()
print("Start data processing ...", flush=True)
try:
for ev in session.walk_on_new_events(exclude_children=scan_types, wait=True):
if ev.type == ev.type.END_SCAN:
scans.pop(ev.node.db_name).get()
elif ev.type == ev.type.NEW_NODE and ev.node.type=="scan":
scans[ev.node.db_name] = gevent.spawn(scan_watcher, ev.node.db_name)
except KeyboardInterrupt:
for scan in scans.values():
scan.kill()
print("Data processing stopped", flush=True)
Scan watcher¶
from pprint import pprint
from collections import Counter
from blissdata.data.node import get_node
from nexus_writer_service.utils.scan_utils import scan_uris
def scan_watcher(scan_name):
scan = get_node(scan_name)
print("\nSTART", scan.name, flush=True)
print(" URI:", scan_uris(scan))
events = Counter()
for ev in scan.walk_events(wait=True):
events[ev.type] += 1
if ev.type == ev.type.END_SCAN:
break
pprint(events)
print("END", scan.name, flush=True)