Skip to content

Low level API without Gevent

The blissdata low level API can be used without gevent. This is achieved by running gevent related code into a subprocess. Then the subprocess provides the data it is collecting through a multiprocessing.Queue (actually UNIX pipe). While it adds a layer of inter-process communication, it allows to use the existing code base without any modification.

Further work is planned to cut out blissdata dependency on gevent, consequently it would be possible to dispense with subprocess isolation.

Principle

In the low level API, node objects contain both data and logic based on gevent. To be able to transmit the data only, this is decoupled in two classes:

  • RemoteNodeWalker: It implements the same iterators as a node (walk, walk_from_last, walk_events, walk_on_new_events), but calling one of these functions actually runs a subprocess under the hood. The subprocess termination is handled automatically.
  • NodeContent: This represents the data contained by nodes, it as been separated from the node’s logic in order to ease serialization between processes. The db_name field can be used to create a new RemoteNodeWalker to iterate through it.

The basic usage consists to create a RemoteNodeWalker based on a node’s db_name, then use one of the walk* function to obtain NodeContent objects. These NodeContent objects can be used to create more RemoteNodeWalker. (and therefore more subprocesses).

Minimal example

#!/usr/bin/env python
from blissdata.data.remote_node import RemoteNodeWalker
from gevent.monkey import is_anything_patched

beacon_host = "localhost"
beacon_port = 25000
db_name = "test_session"
root_node = RemoteNodeWalker(beacon_host, beacon_port, db_name)

for event in root_node.walk_on_new_events():
    assert not is_anything_patched()
    print("type:", event.type)
    print("node:", event.node)
    print("data:", event.data)
    print()

Accessing Lima images

Because Lima images are only passed as reference in Redis, they still need to be requested from the lima server. Thanks to this, images don’t have to be sent over inter-process queue.

#!/usr/bin/env python
from blissdata.data.remote_node import RemoteNodeWalker
from gevent.monkey import is_anything_patched

# required to query lima images from references
from blissdata.data.nodes.lima import LimaImageChannelDataNode
from blissdata.data.events import EventType
from redis import Redis

beacon_host = "localhost"
beacon_port = 25000
db_name = "lima_test_session"
root_node = RemoteNodeWalker(beacon_host, beacon_port, db_name)

redis_connections = {}

for event in root_node.walk_on_new_events():
    assert not is_anything_patched()
    node = event.node
    data = event.data

    # only looking at lima data
    if event.type is EventType.NEW_DATA and node.type == "lima":
        redis_url = event.node.redis_url

        # create a redis connection if none already exists
        if redis_url not in redis_connections:
            redis_connections[redis_url] = Redis.from_url(redis_url)

        # download image(s) from the reference(s)
        source_node = LimaImageChannelDataNode(
            connection=redis_connections[redis_url], name=node.db_name
        )
        lima_data_view = source_node.get(
            data.first_index, data.description["last_image_ready"]
        )
        for index in range(data.first_index, lima_data_view.last_index):
            image = lima_data_view.get_image(index)
            print("IMAGE:", image.shape)  # PROCESS IMAGE...