Cheat Sheet#
Here are code snippets to compose a scripts based on your needs.
DataStore#
Connecting to a database:
from blissdata.redis_engine.store import DataStore
data_store = DataStore("redis://localhost:6379")
Scans#
Info
Following snippets now reuse data_store
object.
Get last scan#
# may raise NoScanAvailable if none is found
_, scan_key = data_store.get_last_scan()
Wait for next scan#
_, scan_key = data_store.get_next_scan()
Wait for scans in contiguous sequence#
prev_timestamp = None
while True:
prev_timestamp, scan_key = data_store.get_next_scan(since=prev_timestamp)
print(f"New scan created: {scan_key}")
Search among existing scans#
_, scan_keys = data_store.search_existing_scans(dataset="efgh*")
print(f"Found {len(scan_keys)} scans matching the query")
Loading scan from a key#
Load the scan from the keys returned by previous snippets:
scan = data_store.load_scan(scan_key)
print(scan.name)
print(scan.info)
Streams#
Info
Following snippets assume a scan
is already loaded.
Pick a stream in a scan#
from blissdata.redis_engine.scan import ScanState
# Scan definition (including streams) is complete once scan is PREPARED
while scan.state < ScanState.PREPARED:
scan.update() # wait until a new state is available
# scan.streams is like a dict
stream = scan.streams["stream_name"]
Randomly access a stream#
stream = scan.streams["stream_name"]
stream[3] # single point
stream[-1] # last point
stream[100:200] # slice
stream[:] # whole stream
Sequentially access a stream#
from blissdata.redis_engine.exceptions import EndOfStream
stream = scan.streams["stream_name"]
cursor = stream.cursor()
while True:
try:
view = cursor.read() # return when new data is available
except EndOfStream:
pass
print(f"Got {len(view)} new points, starting at index {view.index}")
print(view.get_data())
Monitoring a stream#
import time
from blissdata.redis_engine.exceptions import EndOfStream
stream = scan.streams["stream_name"]
cursor = stream.cursor()
while True:
try:
view = cursor.read(last_only=True) # return when data is available, but only the last value
except EndOfStream:
pass
print(f"Got a new value at index {view.index}")
print(view.get_data()) # single point list
time.sleep(1) # slow down to desired monitoring rate
Sequentially access N streams#
from blissdata.stream.base import CursorGroup
cursor_group = CursorGroup(scan.streams) # read all scan's streams
while True:
try:
views = cursor_group.read()
except EndOfStream:
pass
for stream, view in views.items():
print(f"Got {len(view)} new points in {stream.name}, starting at index {view.index}")
print(view.get_data())