Skip to content

Cheat Sheet#

Here are code snippets to compose a scripts based on your needs.

DataStore#

Connecting to a database:

from blissdata.redis_engine.store import DataStore
data_store = DataStore("redis://localhost:6379")

Scans#

Info

Following snippets now reuse data_store object.

Get last scan#

# may raise NoScanAvailable if none is found
_, scan_key = data_store.get_last_scan()

Wait for next scan#

_, scan_key = data_store.get_next_scan()

Wait for scans in contiguous sequence#

prev_timestamp = None
while True:
    prev_timestamp, scan_key = data_store.get_next_scan(since=prev_timestamp)
    print(f"New scan created: {scan_key}")

Search among existing scans#

_, scan_keys = data_store.search_existing_scans(dataset="efgh*")
print(f"Found {len(scan_keys)} scans matching the query")

Loading scan from a key#

Load the scan from the keys returned by previous snippets:

scan = data_store.load_scan(scan_key)
print(scan.name)
print(scan.info)

Streams#

Info

Following snippets assume a scan is already loaded.

Pick a stream in a scan#

from blissdata.redis_engine.scan import ScanState

# Scan definition (including streams) is complete once scan is PREPARED
while scan.state < ScanState.PREPARED:
    scan.update() # wait until a new state is available

# scan.streams is like a dict
stream = scan.streams["stream_name"]

Array mode#

stream = scan.streams["stream_name"]
stream[3] # single point
stream[-1] # last point
stream[100:200] # slice
stream[:] # whole stream

Stream mode#

from blissdata.redis_engine.exceptions import EndOfStream

stream = scan.streams["stream_name"]
cursor = stream.cursor()

while True:
    try:
        view = cursor.read() # return when new data is available
    except EndOfStream:
        pass
    print(f"Got {len(view)} new points, starting at index {view.index}")
    print(view.get_data())

Monitoring#

import time
from blissdata.redis_engine.exceptions import EndOfStream

stream = scan.streams["stream_name"]
cursor = stream.cursor()

while True:
    try:
        view = cursor.read(last_only=True) # return when data is available, but only the last value
    except EndOfStream:
        pass
    print(f"Got a new value at index {view.index}")
    print(view.get_data()) # single point list
    time.sleep(1) # slow down to desired monitoring rate

Parallel reading#

from blissdata.stream.base import CursorGroup

cursor_group = CursorGroup(scan.streams) # read all scan's streams

while True:
    try:
        views = cursor_group.read()
    except EndOfStream:
        pass
    for stream, view in views.items():
        print(f"Got {len(view)} new points in {stream.name}, starting at index {view.index}")
        print(view.get_data())