Skip to content

Getting started

Scans and Streams

Blissdata is like a store where everything is either a scan or a stream. Each scan represents an acquisition, containing streams and metadata. The streams themselves contain the actual data from the acquisition.

Each of these entities is uniquely identified by a key within Redis, which resembles the following:

  • esrf:scan:01HCYP7ARJ4NSG9VGPGDX6SN2X
  • esrf:stream:01HCYP7ATAAVKRDS8BV51Y7MM5

You can refer to the ULIDs specification for more details.

Connecting to a database

Prior to access the scans, you should connect to a blissdata instance. This is done by creating a DataStore object, providing him a Redis URL.

from blissdata.redis_engine.store import DataStore

data_store = DataStore("redis://localhost:6379")
Now you have an entry point to talk with a blissdata instance.

Note

It is recommended to reuse DataStore instance as most as possible as it keeps previously open sockets. This prevents from establishing new connections every time.

Looking for scans

As we just saw, scans are uniquely identified within Redis. Therefore, your first step is to obtain the key for the Scan you're interested in. Here are the possible options:

  • get_next_scan(since=None, block=True, timeout=0):

    The scan doesn't exist yet and you are waiting for it.

    from blissdata.redis_engine.exceptions import NoScanAvailable
    
    # Wait 10 seconds for a new scan
    try:
        timestamp, key = data_store.get_next_scan(timeout=10)
    except NoScanAvailable:
        raise Exception("No scan started within 10 seconds")
    scan_1 = data_store.load_scan(key)
    
    # -------------- PROCESS scan_1 --------------
    
    # Wait for the immediate next scan
    timestamp, key = data_store.get_next_scan(since=timestamp)
    scan_2 = data_store.load_scan(key)
    
    # -------------- PROCESS scan_2 --------------
    
    # Get the next scan immediately or raise an Exception
    try:
        timestamp, key = data_store.get_next_scan(since=timestamp, block=False)
    except NoScanAvailable:
        raise Exception("No scan started while scan_2 processing")
    scan_3 = data_store.load_scan(key)
    

    Tip

    Reusing the previous timestamp ensure you don't miss any new scan while processing a previous one.

  • get_last_scan():

    The scan just ran.

    from blissdata.redis_engine.exceptions import NoScanAvailable
    
    try:
        timestamp, key = data_store.get_last_scan()
    except NoScanAvailable:
        raise Exception("There is no scan at all !")
    scan = data_store.load_scan(key)
    

  • search_existing_scans(session, proposal, collection, dataset, name, number):

    The scan already exists and you want to find it among existing ones.

    timestamp, keys = data_store.search_existing_scans(session="my_session", proposal=..., ...)
    # load each scan from key, or simply uses keys to count how many scans correspond to your search...
    

    Tip

    timestamp is not used there, but you can provide it to get_next_scan(since=timestamp) to make it relative to a previous research.

Scan structure

Each scan is actually stored as a JSON inside Redis, but for the sake of simplicity, we never edit or read those JSONs by hand. Instead we rely on Redis-OM which helps wrapping JSONs into nice python objects.

scan = data_store.load_scan(key)
print(scan.name)
print(scan.number)
This commands makes a local copy of the remote JSON. Once loaded, the Scan object comes with several properties like its name, number, session, info, state, etc. Because of the copy, there is no remote access when looking at scan properties, thus you should not be concerned about performance or atomicity when using it.

Scan state and updates

Scans content is not frozen, or Blissdata would be pointless. Therefore we need to update our local copy at some point. This is done through the update method (think of git fetch for example). Scan's updates are not made on a continuous basis, instead they are pushed step by step, or state by state to be very precise.

We previously mentioned the state property in the Scan. This an important one as it acts as a backbone for the scan's life cycle. Every state change comes with a JSON update and reciprocal, JSON cannot changes without a new state.

When we call scan.update we are waiting for a new state to be available:

# different ways to call update, each returns a boolean (False if nothing changed)
scan.update() # default is blocking
scan.update(block=False)
scan.update(timeout=3)

Note

Scan's state is actually implemented by a dedicated Redis stream and is not part of the JSON.

The state machine used for scans contains the following states:

CREATED -> PREPARED -> STARTED -> STOPPED -> CLOSED

from blissdata.redis_engine.scan import ScanState

while scan.state < ScanState.PREPARED:
    scan.update()

if scan.state == ScanState.CLOSED:
    print(scan.state.name) # 'CLOSED'

Streams

As we saw, Scans are there to pack descriptive information and the streams together. But streams are the actual workhorse for carrying acquisition data. Each Stream object corresponds to a Redis stream. In addition, they may redirect queries to file after buffer eviction, or resolve references to external data sources (e.g. Lima images).

From a Scan object, you simply access the streams as a dictionary:

...
my_scan = data_store.load_scan(key)
my_stream = scan.streams["my_stream_name"]
print(my_stream.name)
print(my_stream.info)

Inside any stream, data is made of "points". Depending on the type of stream, one point can be a scalar value, a multidimensional array, a JSON value. Encoders and decoders are provided to serialize the data in a strongly typed manner.

In order to serve any access pattern, streams can be read through either random or sequential access.

Random access

Streams behave like arrays, where you access content based on the index, not on the time.

stream[3] # single point
stream[-1] # last point
stream[5:10] # points 5 to 10 (not included)
stream[50:100:10] # points 50 to 100 (not included), by steps of 10
stream[3:-5] # third to the fifth from the end (not included)
stream[3:4] # single point list

# omitting values
stream[:50] # from the beginning
stream[50:] # until the end
stream[:] # everything
For more details you may have a look at Python slicing.

Querying slices always returns a list, which can be empty. Querying indexes in the other hand can raise an IndexError. In Blissdata there exists more specific versions, all deriving from IndexError:

  • IndexNotYetThereError: Too early, this index is not yet acquired.
  • IndexWontBeThereError: Too bad, there will never be such index as the stream ended before reaching it.
  • IndexNoMoreThereError: Too late, it has been removed to free space in Redis (Note: Streams with a file fallback never raise this error, but transparently switch to file instead).

Sequential access

The sequential API is driven by the data publisher. A read() method with blocking mode and timeout allows to synchronize with the data being pushed into Redis.

Each stream comes with a cursor() method returning a Cursor object. You can create one or many to act as reading heads over the stream. Using cursors permitted to keep the stream object stateless (no current position), thus it is easier to share across the code.

A cursor has a position property and a read() method. There is no return on a cursor, the position can only increase.

my_cursor = stream.cursor()
my_cursor.position
    0
index, data = my_cursor.read(count=5)
index, data = my_cursor.read(block=False)
index, data = my_cursor.read(timeout=3.5)
index, data = my_cursor.read(count=-1)

Examples

Scan creation and streaming

#!/usr/bin/env python
import numpy as np

from blissdata.redis_engine.store import DataStore
from blissdata.redis_engine.scan import ScanState
from blissdata.redis_engine.encoding.numeric import NumericStreamEncoder
from blissdata.redis_engine.encoding.json import JsonStreamEncoder

# Configure blissdata for a Redis database
data_store = DataStore("redis://localhost:6379")

# The identity model we use is ESRFIdentityModel in blissdata.redis_engine.models
# It defines the json fields indexed by RediSearch
scan_id = {
    "name": "my_scan",
    "number": 1,
    "data_policy": "no_policy",
    "session": "example_session",
}

# ------------------------------------------------------------------ #
# create scan in the database
scan = data_store.create_scan(scan_id)
# ------------------------------------------------------------------ #

def run_scan(scan):
    """Create example streams for the scan and publish data"""
    # declare some streams in the scan
    encoder = NumericStreamEncoder(dtype=np.float64)
    scalar_stream = scan.create_stream("scalars", encoder, info={"unit": "mV", "embed the info": "you want"})

    encoder = NumericStreamEncoder(dtype=np.int32, shape=(4096, ))
    vector_stream = scan.create_stream("vectors", encoder, info={})

    encoder = NumericStreamEncoder(dtype=np.uint16, shape=(1024, 100, ))
    array_stream = scan.create_stream("arrays", encoder, info={})

    encoder = JsonStreamEncoder()
    json_stream = scan.create_stream("jsons", encoder, info={})

    # gather some metadata before running the scan
    scan.info["some_metadata"] = {"a": 42, "b": 561}

    # ------------------------------------------------------------------ #
    scan.prepare() # upload initial metadata and stream declarations
    # ------------------------------------------------------------------ #

    # Scan is ready to start, eventually wait for other processes.
    # Sharing stream keys to external publishers can be done there.

    # ------------------------------------------------------------------ #
    scan.start()
    # ------------------------------------------------------------------ #

    # publish data into streams (can be done in parallel)
    for i in range(1000):
        scalar_stream.send(float(i))
        vector_stream.send(np.full((4096, ), i, dtype=np.int32))
        array_stream.send(np.full((1024, 100, ), i, dtype=np.uint16))
        json_stream.send({"index": i})

    # close streams
    scalar_stream.seal()
    vector_stream.seal()
    array_stream.seal()
    json_stream.seal()

    # ------------------------------------------------------------------ #
    scan.stop()
    # ------------------------------------------------------------------ #

    # gather end of scan metadata
    scan.info["end_metadata"] = {"c": 123, "d": "xyz"}

    # ------------------------------------------------------------------ #
    scan.close() # upload final metadata
    # ------------------------------------------------------------------ #

try:
    run_scan(scan)
finally:
    if scan.state < ScanState.CLOSED:
        scan.close()