Skip to content

High-level API

For online data processing, we can access the in-memory data before it is stored in HDF5 files.

The idea is to start a watcher, that monitors a specific BLISS session. This watcher needs to be started before the scans start.

import os

os.environ["BEACON_HOST"] = "localhost:10001"
os.environ["TANGO_HOST"] = "localhost:10000"

watch_scans("demo_session")
Start data processing ...

START demo_session:tmp:scans:inhouse:id002106:sample:sample_0001:41_loopscan
timer:elapsed_time 0 17
timer:epoch 0 17
simulation_diode_sampling_controller:diode1 0 16
simulation_diode_sampling_controller:diode1 16 32
timer:elapsed_time 17 33
timer:epoch 17 33
simulation_diode_sampling_controller:diode1 32 48
timer:elapsed_time 33 49
timer:epoch 33 49
simulation_diode_sampling_controller:diode1 48 64
timer:elapsed_time 49 65
timer:epoch 49 65
simulation_diode_sampling_controller:diode1 64 80
timer:elapsed_time 65 81
timer:epoch 65 81
simulation_diode_sampling_controller:diode1 80 96
timer:elapsed_time 81 97
timer:epoch 81 97
simulation_diode_sampling_controller:diode1 96 112
timer:elapsed_time 97 113
timer:epoch 97 113
simulation_diode_sampling_controller:diode1 112 128
timer:elapsed_time 113 129
timer:epoch 113 129
simulation_diode_sampling_controller:diode1 128 144
timer:elapsed_time 129 145
timer:epoch 129 145
simulation_diode_sampling_controller:diode1 144 160
timer:elapsed_time 145 161
timer:epoch 145 161
simulation_diode_sampling_controller:diode1 160 176
timer:elapsed_time 161 177
timer:epoch 161 177
simulation_diode_sampling_controller:diode1 176 192
timer:elapsed_time 177 193
timer:epoch 177 193
simulation_diode_sampling_controller:diode1 192 200
timer:elapsed_time 193 200
timer:epoch 193 200
END demo_session:tmp:scans:inhouse:id002106:sample:sample_0001:41_loopscan

Data processing stopped  # upon CTRL-C

Session watcher

from blissdata.data.scan import ScansWatcher

def watch_scans(session_name):
    watcher = ScansWatcher(session_name)
    watcher.set_observer(ScanObserver())

    print("Start data processing ...", flush=True)
    try:
        watcher.run()
    except KeyboardInterrupt:
        pass
    print("Data processing stopped", flush=True)

Scan watcher

from blissdata.data.scan import ScansObserver

class ScanObserver(ScansObserver):
    def debug_print(self, channel_name, point_index, data):
        print(channel_name, point_index, point_index+len(data), flush=True)

    def on_scan_created(self, scan_db_name, scan_info):
        print("\nSTART", scan_db_name, flush=True)

    def on_scan_finished(self, scan_db_name, scan_info):
        print("END", scan_db_name, flush=True)

    def on_scalar_data_received(self, scan_db_name, channel_name, index, data_bunch):
        self.debug_print(channel_name, index, data_bunch)

    def on_ndim_data_received(self, scan_db_name, channel_name, dim, index, data_bunch):
        self.debug_print(channel_name, index, data_bunch)

    def on_lima_ref_received(self, scan_db_name, channel_name, dim, source_node, event_data):
        self.debug_print(channel_name, event_data.first_index, event_data.data)