Skip to content

Streams#

As we saw, scans are meant to pack descriptive information and a bunch of streams together. But the streams are the actual workhorse for carrying acquisition data.

Each stream is an indexed sequence of data points. Depending on the type of stream, one point can be a scalar value, a multidimensional array, a JSON file, etc.

Redis streams#

When talking about streams, one can think of UNIX-style streams (stdin, stdout, ...). But Redis approach is somehow different and this is the one Blissdata is based on.

In Redis, streams are not consumed. Instead, any number of readers may be reading the same content at different speeds. Actually each reader owns a cursor that he can play and rewind. This is possible because stream's content is stored in Redis, while it is only transient in UNIX-style streams.

Secondly, Redis streams are not only carrying sequences of bytes, but they can handle more complex payloads. You will see later that it can be extended thanks to plugins. This way, it is even possible to create streams to carry scans, to carry streams, etc. But don't worry, we will start with simple cases !

Stream sealing#

Streams have no predefined length. Instead it is sealed by its publisher at some point and nothing more can be written to it. The seal is eventually used by readers as a stop condition when iterating over a stream (see EndOfStream exception).

Memory eviction#

The space available in Redis is inherently limited. On the other hand, streams could last forever. Knowing this, we have not choice but to roll over memory. This is achieved by trimming streams over time, slowly removing the oldest part as more space is required (see Memory tracker).

Plugins provide a way to circumvent this limitation by having a fallback mechanism on a slower but permanent storage for example (assuming some process keeps archiving data in background).

Stream reading#

Before reading a stream, you obviously need to grab one. You will find it under the .streams attribute of a Scan. It is simply a dictionary:

# assuming you already have a scan object loaded
my_stream = scan.streams["my_stream_name"]
print(my_stream.name)
print(my_stream.info)

Once you got a stream, you can read it using any of the two available modes, namely stream and array modes.

Array mode#

Streams behave like arrays. You can access content based on index, at any moment. Slicing and negative indices are also supported:

stream[3] # third point
stream[-1] # last point

stream[5:10] # points 5 to 10 (not included)
stream[50:100:10] # points 50 to 100 (not included), by steps of 10
stream[3:-5] # third to the fifth from the end (not included)
stream[3:4] # single point list

# omitting values
stream[:50] # from the beginning
stream[50:] # until the end
stream[:] # everything

Limitation

It is not possible to wait for data arrival with array mode. Therefore, one can only keep polling on a particular index, which is not efficient. In that case, the stream mode is the way to go.

Index errors#

In conventional arrays, IndexError is raised whether or not the index is valid. But an IndexError from a stream can convey more information. For example the index could be expired already, or just not yet published.

Because of this, IndexError is declined into three versions:

  • IndexNotYetThereError: Too early, this index is not yet acquired.
  • IndexWontBeThereError: Too bad, there will never be such index as the stream ended before reaching it.
  • IndexNoMoreThereError: Too late, it has been removed to free memory in Redis (Note: Streams with a file fallback plugin never raise this error, but catch it to transparently switch to file instead).

Still, you can catch IndexError to cover all three cases at once.

Info

Negative indices are relative to the final length of the stream, therefore accessing such index before stream completion always raises IndexNotYetThereError.

Stream mode#

The stream mode allows to read data of on-going acquisitions without polling it. For this, you need to create a Cursor (a reading head). You can have multiple Cursors on the same stream, each holding its position.

The .read() method of the cursor returns when new data is available. It features blocking/non-blocking mode as of timeout.

from blissdata.redis_engine.exceptions import EndOfStream

my_cursor = stream.cursor()

try:
    while True:
        view = my_cursor.read() # blocking read
        # process view content
except EndOfStream:
    print("Cursor reached the end.")

Views#

Reading from a cursor returns a View which correspond to a particular range in the stream. The data it contains is returned by .get_data().

view = my_cursor.read() # block until new data is available
data_points = view.get_data() # retrieve the corresponding data

print(f"received {len(view)} points starting at index {view.index}")

The reason for using views is to split event and data retrieval. This way plugin's streams can provide custom data retrieval mechanisms on top of Redis, such as:

  • 2D detector image de-referencing from external storage
  • File fallback on stream expiration
  • Loading scans from their key to create streams of scans

Last only mode#

view = my_cursor.read(last_only=True)

The last_only mode allows to rapidly skip across cold data by only returning the most recent and not already read data point. This is very useful for monitoring purpose.

Parallel reading#

In order to read multiple streams at once, you would normally use multiple tasks or a loop over different cursors using timeouts. Instead of this, Redis offers multiplexing capability on server side. In other words, we just have to tell the list of streams we want to listen to.

In order to do this, you can replace your Cursors with a CursorGroup. It has the exact same .read() method, except it read many streams at once and returns as soon as data is available for one or more streams.

from blissdata.streams.base import CursorGroup

stream_list = [stream_a, stream_b, stream_c]
cursor_group = CursorGroup(stream_list)

try:
    while True:
        view_dict = cursor_group.read()

        for stream, view in view_dict.items()
            # process views here ...

except EndOfStream:
    print("All streams are done.")

Remember, views are intended to separate event and data retrieval. The CursorGroup allows for parallelizing the event part. However, choice is left to the user to retrieve data concurrently or not by iterating over views or calling .get_data() in separate tasks or threads.

Stream writing#

Prior to publish any data into a stream, we need to create one. This can only be made with the help of a Scan object, remember, orphan streams don't exist.

However, because plugins can bring a whole bestiary of different streams, there is not a unique constructor to them. This is why streams are made with recipes:

from blissdata.streams.base import Stream
from my_unicorn_plugin import UnicornStream

recipe = Stream.recipe("scalars", dtype=np.float64)
scalar_stream = scan.create_stream(recipe)

recipe = UnicornStream.recipe(color="pink", glitter=True)
unicorn_stream = scan.create_stream(recipe)