Getting started¶
Scans and Streams¶
Blissdata is like a store where everything is either a scan or a stream. Each scan represents an acquisition, containing streams and metadata. The streams themselves contain the actual data from the acquisition.
Each of these entities is uniquely identified by a key within Redis, which resembles the following:
- esrf:scan:01HCYP7ARJ4NSG9VGPGDX6SN2X
- esrf:stream:01HCYP7ATAAVKRDS8BV51Y7MM5
You can refer to the ULIDs specification for more details.
Connecting to a database¶
Prior to access the scans, you should connect to a blissdata instance. This is done by creating a DataStore object, providing him a Redis URL.
from blissdata.redis_engine.store import DataStore
data_store = DataStore("redis://localhost:6379")
Note
It is recommended to reuse DataStore instance as most as possible as it keeps previously open sockets. This prevents from establishing new connections every time.
Looking for scans¶
As we just saw, scans are uniquely identified within Redis. Therefore, your first step is to obtain the key for the Scan you’re interested in. Here are the possible options:
-
get_next_scan(since=None, block=True, timeout=0):
The scan doesn’t exist yet and you are waiting for it.
from blissdata.redis_engine.exceptions import NoScanAvailable # Wait 10 seconds for a new scan try: timestamp, key = data_store.get_next_scan(timeout=10) except NoScanAvailable: raise Exception("No scan started within 10 seconds") scan_1 = data_store.load_scan(key) # -------------- PROCESS scan_1 -------------- # Wait for the immediate next scan timestamp, key = data_store.get_next_scan(since=timestamp) scan_2 = data_store.load_scan(key) # -------------- PROCESS scan_2 -------------- # Get the next scan immediately or raise an Exception try: timestamp, key = data_store.get_next_scan(since=timestamp, block=False) except NoScanAvailable: raise Exception("No scan started while scan_2 processing") scan_3 = data_store.load_scan(key)
Tip
Reusing the previous timestamp ensure you don’t miss any new scan while processing a previous one.
-
get_last_scan():
The scan just ran.
from blissdata.redis_engine.exceptions import NoScanAvailable try: timestamp, key = data_store.get_last_scan() except NoScanAvailable: raise Exception("There is no scan at all !") scan = data_store.load_scan(key)
-
search_existing_scans(session, proposal, collection, dataset, name, number):
The scan already exists and you want to find it among existing ones.
timestamp, keys = data_store.search_existing_scans(session="my_session", proposal=..., ...) # load each scan from key, or simply uses keys to count how many scans correspond to your search...
Tip
timestamp is not used there, but you can provide it to get_next_scan(since=timestamp) to make it relative to a previous research.
Scan structure¶
Each scan is actually stored as a JSON inside Redis, but for the sake of simplicity, we never edit or read those JSONs by hand. Instead we rely on Redis-OM which helps wrapping JSONs into nice python objects.
scan = data_store.load_scan(key)
print(scan.name)
print(scan.number)
Scan state and updates¶
Scans content is not frozen, or Blissdata would be pointless. Therefore we need to update our local copy at some
point. This is done through the update method (think of git fetch
for example). Scan’s updates are not made
on a continuous basis, instead they are pushed step by step, or state by state to be very precise.
We previously mentioned the state property in the Scan. This an important one as it acts as a backbone for the scan’s life cycle. Every state change comes with a JSON update and reciprocal, JSON cannot changes without a new state.
When we call scan.update we are waiting for a new state to be available:
# different ways to call update, each returns a boolean (False if nothing changed)
scan.update() # default is blocking
scan.update(block=False)
scan.update(timeout=3)
Note
Scan’s state is actually implemented by a dedicated Redis stream and is not part of the JSON.
The state machine used for scans contains the following states:
CREATED -> PREPARED -> STARTED -> STOPPED -> CLOSED
from blissdata.redis_engine.scan import ScanState
while scan.state < ScanState.PREPARED:
scan.update()
if scan.state == ScanState.CLOSED:
print(scan.state.name) # 'CLOSED'
Streams¶
As we saw, Scans are there to pack descriptive information and the streams together. But streams are the actual workhorse for carrying acquisition data. Each Stream object corresponds to a Redis stream. In addition, they may redirect queries to file after buffer eviction, or resolve references to external data sources (e.g. Lima images).
From a Scan object, you simply access the streams as a dictionary:
...
my_scan = data_store.load_scan(key)
my_stream = scan.streams["my_stream_name"]
print(my_stream.name)
print(my_stream.info)
Inside any stream, data is made of “points”. Depending on the type of stream, one point can be a scalar value, a multidimensional array, a JSON value. Encoders and decoders are provided to serialize the data in a strongly typed manner.
In order to serve any access pattern, streams can be read through either random or sequential access.
Random access¶
Streams behave like arrays, where you access content based on the index, not on the time.
stream[3] # single point
stream[-1] # last point
stream[5:10] # points 5 to 10 (not included)
stream[50:100:10] # points 50 to 100 (not included), by steps of 10
stream[3:-5] # third to the fifth from the end (not included)
stream[3:4] # single point list
# omitting values
stream[:50] # from the beginning
stream[50:] # until the end
stream[:] # everything
Querying slices always returns a list, which can be empty. Querying indexes in the other hand can raise an IndexError. In Blissdata there exists more specific versions, all deriving from IndexError:
- IndexNotYetThereError: Too early, this index is not yet acquired.
- IndexWontBeThereError: Too bad, there will never be such index as the stream ended before reaching it.
- IndexNoMoreThereError: Too late, it has been removed to free space in Redis (Note: Streams with a file fallback never raise this error, but transparently switch to file instead).
Sequential access¶
The sequential API is driven by the data publisher. A read() method with blocking mode and timeout allows to synchronize with the data being pushed into Redis.
Each stream comes with a cursor() method returning a Cursor object. You can create one or many to act as reading heads over the stream. Using cursors permitted to keep the stream object stateless (no current position), thus it is easier to share across the code.
A cursor has a position property and a read() method. There is no return on a cursor, the position can only increase.
my_cursor = stream.cursor()
my_cursor.position
0
index, data = my_cursor.read(count=5)
index, data = my_cursor.read(block=False)
index, data = my_cursor.read(timeout=3.5)
index, data = my_cursor.read(count=-1)
Examples¶
Scan creation and streaming¶
#!/usr/bin/env python
import numpy as np
from blissdata.redis_engine.store import DataStore
from blissdata.redis_engine.scan import ScanState
from blissdata.redis_engine.encoding.numeric import NumericStreamEncoder
from blissdata.redis_engine.encoding.json import JsonStreamEncoder
# Configure blissdata for a Redis database
data_store = DataStore("redis://localhost:6379")
# The identity model we use is ESRFIdentityModel in blissdata.redis_engine.models
# It defines the json fields indexed by RediSearch
scan_id = {
"name": "my_scan",
"number": 1,
"data_policy": "no_policy",
"session": "example_session",
}
# ------------------------------------------------------------------ #
# create scan in the database
scan = data_store.create_scan(scan_id)
# ------------------------------------------------------------------ #
def run_scan(scan):
"""Create example streams for the scan and publish data"""
# declare some streams in the scan
encoder = NumericStreamEncoder(dtype=np.float64)
scalar_stream = scan.create_stream("scalars", encoder, info={"unit": "mV", "embed the info": "you want"})
encoder = NumericStreamEncoder(dtype=np.int32, shape=(4096, ))
vector_stream = scan.create_stream("vectors", encoder, info={})
encoder = NumericStreamEncoder(dtype=np.uint16, shape=(1024, 100, ))
array_stream = scan.create_stream("arrays", encoder, info={})
encoder = JsonStreamEncoder()
json_stream = scan.create_stream("jsons", encoder, info={})
# gather some metadata before running the scan
scan.info["some_metadata"] = {"a": 42, "b": 561}
# ------------------------------------------------------------------ #
scan.prepare() # upload initial metadata and stream declarations
# ------------------------------------------------------------------ #
# Scan is ready to start, eventually wait for other processes.
# Sharing stream keys to external publishers can be done there.
# ------------------------------------------------------------------ #
scan.start()
# ------------------------------------------------------------------ #
# publish data into streams (can be done in parallel)
for i in range(1000):
scalar_stream.send(float(i))
vector_stream.send(np.full((4096, ), i, dtype=np.int32))
array_stream.send(np.full((1024, 100, ), i, dtype=np.uint16))
json_stream.send({"index": i})
# close streams
scalar_stream.seal()
vector_stream.seal()
array_stream.seal()
json_stream.seal()
# ------------------------------------------------------------------ #
scan.stop()
# ------------------------------------------------------------------ #
# gather end of scan metadata
scan.info["end_metadata"] = {"c": 123, "d": "xyz"}
# ------------------------------------------------------------------ #
scan.close() # upload final metadata
# ------------------------------------------------------------------ #
try:
run_scan(scan)
finally:
if scan.state < ScanState.CLOSED:
scan.close()