Data flow¶
In Bliss, Data and Meta-data flow from detector to
Redis data base, they are never kept locally in
memory. During a Scan
, every Data produced by detector included
in a Scan
will follow this rule. Data are kept for a maximum of 1
day (by default and for a maximum amount of 1GB. In any case the rule
of this data base is not to store Data but to publish live
Data to external process like on-line data analysis, final
storage…
Data Client(s)¶
It’s now possible to link On-line data analysis easily with a Bliss session.
Any client can subscribe to receive Data notification of a
session
. As soon as it can reach a beam-line Beacon
server.
Already several client use this mechanism: Flint
, External Writer
and PyMca
Simple example¶
Bliss
provide a Python module to archive
Data subscription. The following example listen to a session called
demo_session and follow the scan activity:
from blissdata.data import node
session_node = node.get_session_node('demo_session')
for event,*values in session_node.walk_on_new_events(include_filter='scan'):
if event == event.NEW_NODE:
scan = values[0]
print(f'Scan {scan.db_name} started')
elif event == event.END_SCAN:
scan = values[0]
print(f'Scan {scan.db_name} ends')
Future¶
In near future, it would be even possible to change scanning strategy with the computation result of an one-line data analysis program. i.e: change the position range of scanning motor during the scan…
Simple data-analysis interaction¶
Just a simple example to demonstrate the possible communication between on-line data and a alignment scan.
In the following example, one-line data analysis program stop the alignment when it estimate that enough points is taken.
from bliss.config import channels
from blissdata.data import node
max_value = 0.
# communcation channel for bliss session
stop_alignment = channels.Channel('stop_alignment',default_value=False)
session_node = node.get_session_node('demo_session')
for event,*values in session_node.walk_on_new_events():
if event == event.NEW_NODE:
node = values[0]
if node.type == 'scan':
# reset max_value to 0
# when a new scan starts.
max_value = 0.
elif event == event.NEW_DATA_IN_CHANNEL:
channel = values[0]
if not channel.name.find('gauss') > -1:
continue
data_value = channel.get(-1) # last value
if data_value > max_value:
max_value = data_value
elif data_value < (max_value/2.):
print(f"Stop Alignment scan max_value:{max_value} current_value:{data_value}")
#stop the scan
stop_alignment.value = True
A simple hook to stop the scan:
from bliss.config import channels
from bliss.scanning import scan
class StopAlignmentHook(scan.WatchdogCallback):
def __init__(self):
super().__init__()
self._stop_flag = channels.Channel('stop_alignment',
value=False)
def on_scan_data(self,*args):
if self._stop_flag.value:
raise StopIteration
Run it¶
Start the simple on-line data analysis program.
Then run the scan has follow:
DEMO_SESSION [1]: s = ascan(robz,-5,5,500,.001,sim_ct_gauss_noise,save=False,run=False)
DEMO_SESSION [2]: s.set_watchdog_callback(StopAlignmentHook())
DEMO_SESSION [3]: s.run()