High-level API
For online data processing, we can access the in-memory data before it is stored in HDF5 files.
The idea is to start a watcher, that monitors a specific BLISS session. This watcher needs to be started before the scans start.
import os
os.environ["BEACON_HOST"] = "localhost:10001"
os.environ["TANGO_HOST"] = "localhost:10000"
watch_scans("demo_session")
Start data processing ...
START demo_session:tmp:scans:inhouse:id002106:sample:sample_0001:41_loopscan
timer:elapsed_time 0 17
timer:epoch 0 17
simulation_diode_sampling_controller:diode1 0 16
simulation_diode_sampling_controller:diode1 16 32
timer:elapsed_time 17 33
timer:epoch 17 33
simulation_diode_sampling_controller:diode1 32 48
timer:elapsed_time 33 49
timer:epoch 33 49
simulation_diode_sampling_controller:diode1 48 64
timer:elapsed_time 49 65
timer:epoch 49 65
simulation_diode_sampling_controller:diode1 64 80
timer:elapsed_time 65 81
timer:epoch 65 81
simulation_diode_sampling_controller:diode1 80 96
timer:elapsed_time 81 97
timer:epoch 81 97
simulation_diode_sampling_controller:diode1 96 112
timer:elapsed_time 97 113
timer:epoch 97 113
simulation_diode_sampling_controller:diode1 112 128
timer:elapsed_time 113 129
timer:epoch 113 129
simulation_diode_sampling_controller:diode1 128 144
timer:elapsed_time 129 145
timer:epoch 129 145
simulation_diode_sampling_controller:diode1 144 160
timer:elapsed_time 145 161
timer:epoch 145 161
simulation_diode_sampling_controller:diode1 160 176
timer:elapsed_time 161 177
timer:epoch 161 177
simulation_diode_sampling_controller:diode1 176 192
timer:elapsed_time 177 193
timer:epoch 177 193
simulation_diode_sampling_controller:diode1 192 200
timer:elapsed_time 193 200
timer:epoch 193 200
END demo_session:tmp:scans:inhouse:id002106:sample:sample_0001:41_loopscan
Data processing stopped # upon CTRL-C
Session watcher¶
from blissdata.data.scan import ScansWatcher
def watch_scans(session_name):
watcher = ScansWatcher(session_name)
watcher.set_observer(ScanObserver())
print("Start data processing ...", flush=True)
try:
watcher.run()
except KeyboardInterrupt:
pass
print("Data processing stopped", flush=True)
Scan watcher¶
from blissdata.data.scan import ScansObserver
class ScanObserver(ScansObserver):
def debug_print(self, channel_name, point_index, data):
print(channel_name, point_index, point_index+len(data), flush=True)
def on_scan_created(self, scan_db_name, scan_info):
print("\nSTART", scan_db_name, flush=True)
def on_scan_finished(self, scan_db_name, scan_info):
print("END", scan_db_name, flush=True)
def on_scalar_data_received(self, scan_db_name, channel_name, index, data_bunch):
self.debug_print(channel_name, index, data_bunch)
def on_ndim_data_received(self, scan_db_name, channel_name, dim, index, data_bunch):
self.debug_print(channel_name, index, data_bunch)
def on_lima_ref_received(self, scan_db_name, channel_name, dim, source_node, event_data):
self.debug_print(channel_name, event_data.first_index, event_data.data)