Skip to content

Writing a hardware controller

This section will describe how to write a hardware controller class in BLISS to interface with a hardware device. It will not consider the management of counters, axes or any other controller’s subitems. For these topics see how to write a controller.

Screenshot

Good practice

As the hardware controller object won’t involve counters, axes or subitems, it is not necessary to inherit from another BLISS base class or to implement special methods to be compatible with BLISS internal mechanisms. However, there are still good practices to remember such as:

  • Lazy initialization of the communication object to avoid delay when BLISS session starts
  • Register communication object and hardware controller in the global map
  • Prefer the write_readlines method of the communication object (atomic/locked) to avoid troubles in case of concurrent requests
  • List known commands and check that command arguments values are correct
  • Use local cache with slow devices when accessing the hardware is not necessary
  • Fill doc-string as much as possible to describe hardware functionalities
  • Hide internal attributes by using names starting with _

Communication layer and get_comm helper

BLISS already implements various communication protocols.

To create the communication object, BLISS provides a unique helper function get_comm()

YAML example with a serial line device

class: FooDevice
name: foo
serial:
  url: /dev/ttyS0
from bliss import global_map
from bliss.comm.util import get_comm, SERIAL

class FooDevice:
    def __init__(self, config):
        self._config = config
        self._comm = None

    def _init_com(self):
        default_options = {'baudrate': 19200}
        self._comm = get_comm(self._config, ctype=SERIAL, **default_options)
        global_map.register(self, children_list=[self._comm])

    @property
    def comm(self):
        if self._comm is None:
            self._init_com()
        return self._comm

Lazy initialization of the communication object

Notice the usage of a property to protect the comm attribute and implement the lazy initialization. Initializing the FooDevice object does not trigger the initialization of the communication object. The communication object will be created only when the comm property will be accessed for the first time.

Global map registration

The command global_map.register(self, children_list=[self._comm]) registers the communication object under the hardware controller (self) and the hardware controller under the 'controller' global map node. Declaring the com as a child will ensure that when debugging is activated on this controller, it will also activate debugging on the com object. For more details see global map registration.

Sending commands

Below an example of a send and receive method, that checks the command validity and implements an atomic write-read communication with the hardware (using comm.write_readline). It also handles optional arguments and device channels.

Notice the usage of the self.comm property (instead of the self._comm attribute) to ensure the initialization of the communication object when sending the first command.

Communication messages are logged using the log_debug function (see debugon to activate debugging)

Send command method example

from bliss.common.logtools import log_debug

def send_cmd(self, cmd, arg=None, channel=None):
    """ Send a command to the hardware and read the answer """

    self._check_cmd(cmd, arg, channel)

    msg = f"{cmd}"
    if channel is not None:
        msg += f" {channel}"
    if arg is not None:
        msg += f" {arg}"
    msg += "\n"

    log_debug(self, f"send_cmd {msg}")
    ans = self.comm.write_readline(msg.encode()).decode()
    log_debug(self, f"receive {ans}")

    return ans
def _check_cmd(self, cmd, arg=None, channel=None):
    """ Check command and argument validity """

    _CMD2PARAM = {                    # a dict { cmd: (possible arg values, ...) }
        "ACQ": ("ON", "OFF"),        # a cmd taking 2 possible arg values of type str
        "BDR": (9600, 19200, 38400), # a cmd taking 3 possible arg values of type int
        "VER": None,                 # a cmd which does not take arg
        "ITM": [1e-3, 1.],           # a cmd taking an arg of type float in a given range
    }

    _CHANNEL_NAMES = (1, 2, 3, 4)

    if cmd not in _CMD2PARAM.keys():
        raise ValueError(
            f"Unknown command '{cmd}', should be in {list(_CMD2PARAM.keys())}"
        )

    if channel is not None:
        if channel not in _CHANNEL_NAMES:
            raise ValueError(
                    f"Unknown channel '{channel}', should be in {_CHANNEL_NAMES}"
                )

    if arg not in [None, '?']: # setter case
        rng = _CMD2PARAM[cmd]

        # cmd does not expect an arg
        if rng is None:
            raise ValueError(
                    f"command '{cmd}' does not expect an argument but receives '{arg}'"
                )

        # cmd expects known arg values
        if isinstance(rng, tuple):
            if arg not in rng:
                raise ValueError(
                    f"command '{cmd}' expects argument in {rng} but receives '{arg}'"
                )

        # cmd expects arg value in a given range
        if isinstance(rng, list):
            mini, maxi = rng
            if arg < mini or arg > maxi:
                 raise ValueError(
                    f"command '{cmd}' expects argument in range {rng} but receives '{arg}'"
                )

Note

Here the _check_cmd method is just an example. Such method is very specific to the communication protocol and the hardware device. Therefore, developers will probably have to adapt this example to their own situation.

Caching device parameters

When it is possible, try to store locally the parameters read from the hardware to avoid useless communication. For example, the version of the hardware is a static parameter. Therefore, it can be read only once and stored locally.

Caching a static device parameter

@property
def version(self):
    if self._version is None:
        self._version = self.send_cmd("VER", "?")
    return self._version

Also, if you expect to use the controller through a single client or behind a server layer, device parameters can be cached safely. For example, _integration_time can be read only once from the controller the first time and stored locally. Then, cached value is updated only when the user sets a new value on the device. This will avoid to communicate with the device each time you need the parameter value. This is particularly recommended for parameters that are frequently used/evaluated in other part of the code such as loops and computations.

Caching last value of device parameter

@property
def integration_time(self):
    if self._integration_time is None:
        self._integration_time = self.send_cmd("ITM", "?")
    return self._integration_time

@integration_time.setter
def integration_time(self, value):
    self.send_cmd("ITM", value)
    self._integration_time = value

Full example

Example of an hardware controller implementation

from bliss import global_map
from bliss.comm.util import get_comm, TCP
from bliss.common.logtools import log_debug

class FooDevice:

    """
    Low level controller class for device XXX model YYY

    manufacturer:
    website: 
    description:
    """

    _WEOL = "\r"      # write eol
    _REOL = "\r\n"    # read eol

    _CMD2PARAM = {                    # a dict { cmd: (possible arg values, ...) }
        "ACQ": ("ON", "OFF"),        # a cmd taking 2 possible arg values of type str
        "BDR": (9600, 19200, 38400), # a cmd taking 3 possible arg values of type int
        "VER": None,                 # a cmd which does not take arg
        "ITM": [1e-3, 1.],           # a cmd taking an arg of type float in a given range
    }

    _CHANNEL_NAMES = (1, 2, 3, 4)
    _DEFAULT_PORT = 10001

    def __init__(self, config):
        self._config = config
        self._comm = None

        # === attribute examples
        self._version = None
        self._integration_time = None

    def __del__(self):
        self.__close__()

    def __close__(self):
        """ BLISS will try to call this method when closing a session """
        self._close_comm()

    def _init_comm(self):
        """ Initialize communication object """
        default_options = {'eol': self._REOL, 'port': self._DEFAULT_PORT}
        self._comm = get_comm(self._config , ctype=TCP, **default_options)
        global_map.register(self, children_list=[self._comm])

    def _close_comm(self):
        """ Close communication """
        if self._comm is not None:
            self._comm.close()
            self._comm = None

    @property
    def comm(self):
        if self._comm is None:
            self._init_comm()
        return self._comm

    def send_cmd(self, cmd, arg=None, channel=None):
        """ Send a command to the hardware and read the answer """

        self._check_cmd(cmd, arg, channel)

        msg = f"{cmd}"
        if channel is not None:
            msg += f" {channel}"
        if arg is not None:
            msg += f" {arg}"
        msg += f"{self._WEOL}"

        log_debug(self, f"send_cmd {msg}")
        ans = self.comm.write_readline(msg.encode()).decode()
        log_debug(self, f"receive {ans}")

        return ans

    def _check_cmd(self, cmd, arg=None, channel=None):
        """ Check command and argument validity """

        if cmd not in self._CMD2PARAM.keys():
            raise ValueError(
                f"Unknown command '{cmd}', should be in {list(self._CMD2PARAM.keys())}"
            )

        if channel is not None:
            if channel not in self._CHANNEL_NAMES:
                raise ValueError(
                        f"Unknown channel '{channel}', should be in {self._CHANNEL_NAMES}"
                    )

        if arg not in [None, '?']: # setter case
            rng = self._CMD2PARAM[cmd]

            # cmd does not expect an arg
            if rng is None:
                raise ValueError(
                        f"command '{cmd}' does not expect an argument but receives '{arg}'"
                    )

            # cmd expects known arg values
            if isinstance(rng, tuple):
                if arg not in rng:
                    raise ValueError(
                        f"command '{cmd}' expects argument in {rng} but receives '{arg}'"
                    )

            # cmd expects arg value in a given range
            if isinstance(rng, list):
                mini, maxi = rng
                if arg < mini or arg > maxi:
                    raise ValueError(
                        f"command '{cmd}' expects argument in range {rng} but receives '{arg}'"
                    )


    #=== User API / property examples ======================================

    @property
    def version(self):
        if self._version is None:
            self._version = self.send_cmd("VER", "?")
        return self._version

    @property
    def integration_time(self):
        if self._integration_time is None:
            self._integration_time = self.send_cmd("ITM", "?")
        return self._integration_time

    @integration_time.setter
    def integration_time(self, value):
        self.send_cmd("ITM", value)
        self._integration_time = value