Skip to content

Low-level API

For online data processing, we can access the in-memory data before it is stored in HDF5 files. More details, including implementation logic, can be found here.

The idea is to start a watcher, that monitors a specific BLISS session. This watcher needs to be started before the scans start.

import os

os.environ["BEACON_HOST"] = "localhost:10001"
os.environ["TANGO_HOST"] = "localhost:10000"


The output looks like this:

Start data processing ...

START 26_loopscan
 URI: ['/tmp/scans/inhouse/id002106/id00/sample/sample_0001/sample_0001.h5::/26.1']
Counter({<EventType.NEW_DATA: 2>: 39,
         <EventType.NEW_NODE: 1>: 5,
         <EventType.PREPARED_SCAN: 4>: 1,
         <EventType.END_SCAN: 3>: 1})
END 26_loopscan

Data processing stopped  # upon CTRL-C

Session watcher

import gevent
from import get_session_node

def session_watcher(session_name, **kw):
    session = get_session_node(session_name)
    scan_types = ("scan", "scan_group")
    scans = dict()
    print("Start data processing ...", flush=True)

        for ev in session.walk_on_new_events(exclude_children=scan_types, wait=True):
            if ev.type == ev.type.END_SCAN:
            elif ev.type == ev.type.NEW_NODE and ev.node.type=="scan":
                scans[ev.node.db_name] = gevent.spawn(scan_watcher, ev.node.db_name)
    except KeyboardInterrupt:
        for scan in scans.values():
    print("Data processing stopped", flush=True)

Scan watcher

from pprint import pprint
from collections import Counter
from import get_node
from nexus_writer_service.utils.scan_utils import scan_uris

def scan_watcher(scan_name):
    scan = get_node(scan_name)
    print("\nSTART",, flush=True)

    print(" URI:", scan_uris(scan))

    events = Counter()
    for ev in scan.walk_events(wait=True):
        events[ev.type] += 1
        if ev.type == ev.type.END_SCAN:

    print("END",, flush=True)