Writing a hardware controller¶
This section will describe how to write a hardware controller class in BLISS to interface with a hardware device. It will not consider the management of counters, axes or any other controller’s subitems. For these topics see how to write a controller.
Good practice¶
As the hardware controller object won’t involve counters, axes or subitems, it is not necessary to inherit from another BLISS base class or to implement special methods to be compatible with BLISS internal mechanisms. However, there are still good practices to remember such as:
- Lazy initialization of the communication object to avoid delay when BLISS session starts
- Register communication object and hardware controller in the global map
- Prefer the
write_readlines
method of the communication object (atomic/locked) to avoid troubles in case of concurrent requests - List known commands and check that command arguments values are correct
- Use local cache with slow devices when accessing the hardware is not necessary
- Fill doc-string as much as possible to describe hardware functionalities
- Hide internal attributes by using names starting with
_
Communication layer and get_comm
helper¶
BLISS already implements various communication protocols.
To create the communication object, BLISS provides a unique helper function get_comm()
YAML example with a serial line device
class: FooDevice
name: foo
serial:
url: /dev/ttyS0
from bliss import global_map
from bliss.comm.util import get_comm, SERIAL
class FooDevice:
def __init__(self, config):
self._config = config
self._comm = None
def _init_com(self):
default_options = {'baudrate': 19200}
self._comm = get_comm(self._config, ctype=SERIAL, **default_options)
global_map.register(self, children_list=[self._comm])
@property
def comm(self):
if self._comm is None:
self._init_com()
return self._comm
Lazy initialization of the communication object
Notice the usage of a property to protect the comm
attribute and implement the lazy initialization.
Initializing the FooDevice
object does not trigger the initialization of the communication object.
The communication object will be created only when the comm
property will be accessed for the first time.
Global map registration
The command global_map.register(self, children_list=[self._comm])
registers the communication
object under the hardware controller (self
) and the hardware controller under the 'controller'
global map node.
Declaring the com as a child will ensure that when debugging
is activated on this controller, it will also activate debugging on the com object.
For more details see global map registration.
Sending commands¶
Below an example of a send and receive method, that checks the command validity
and implements an atomic write-read communication with the hardware (using comm.write_readline
).
It also handles optional arguments and device channels.
Notice the usage of the self.comm
property (instead of the self._comm
attribute) to ensure
the initialization of the communication object when sending the first command.
Communication messages are logged using the log_debug
function
(see debugon to activate debugging)
Send command method example
from bliss.common.logtools import log_debug
def send_cmd(self, cmd, arg=None, channel=None):
""" Send a command to the hardware and read the answer """
self._check_cmd(cmd, arg, channel)
msg = f"{cmd}"
if channel is not None:
msg += f" {channel}"
if arg is not None:
msg += f" {arg}"
msg += "\n"
log_debug(self, f"send_cmd {msg}")
ans = self.comm.write_readline(msg.encode()).decode()
log_debug(self, f"receive {ans}")
return ans
def _check_cmd(self, cmd, arg=None, channel=None):
""" Check command and argument validity """
_CMD2PARAM = { # a dict { cmd: (possible arg values, ...) }
"ACQ": ("ON", "OFF"), # a cmd taking 2 possible arg values of type str
"BDR": (9600, 19200, 38400), # a cmd taking 3 possible arg values of type int
"VER": None, # a cmd which does not take arg
"ITM": [1e-3, 1.], # a cmd taking an arg of type float in a given range
}
_CHANNEL_NAMES = (1, 2, 3, 4)
if cmd not in _CMD2PARAM.keys():
raise ValueError(
f"Unknown command '{cmd}', should be in {list(_CMD2PARAM.keys())}"
)
if channel is not None:
if channel not in _CHANNEL_NAMES:
raise ValueError(
f"Unknown channel '{channel}', should be in {_CHANNEL_NAMES}"
)
if arg not in [None, '?']: # setter case
rng = _CMD2PARAM[cmd]
# cmd does not expect an arg
if rng is None:
raise ValueError(
f"command '{cmd}' does not expect an argument but receives '{arg}'"
)
# cmd expects known arg values
if isinstance(rng, tuple):
if arg not in rng:
raise ValueError(
f"command '{cmd}' expects argument in {rng} but receives '{arg}'"
)
# cmd expects arg value in a given range
if isinstance(rng, list):
mini, maxi = rng
if arg < mini or arg > maxi:
raise ValueError(
f"command '{cmd}' expects argument in range {rng} but receives '{arg}'"
)
Note
Here the _check_cmd
method is just an example. Such method is very specific to the communication protocol
and the hardware device. Therefore, developers will probably have to adapt this example to their own situation.
Caching device parameters¶
When it is possible, try to store locally the parameters read from the hardware to avoid useless communication. For example, the version of the hardware is a static parameter. Therefore, it can be read only once and stored locally.
Caching a static device parameter
@property
def version(self):
if self._version is None:
self._version = self.send_cmd("VER", "?")
return self._version
Also, if you expect to use the controller through a single client or behind a server layer, device parameters can be cached safely.
For example, _integration_time
can be read only once from the controller the first time and stored locally.
Then, cached value is updated only when the user sets a new value on the device. This will avoid to communicate with the device each time you
need the parameter value. This is particularly recommended for parameters that are frequently used/evaluated in other part of the code
such as loops and computations.
Caching last value of device parameter
@property
def integration_time(self):
if self._integration_time is None:
self._integration_time = self.send_cmd("ITM", "?")
return self._integration_time
@integration_time.setter
def integration_time(self, value):
self.send_cmd("ITM", value)
self._integration_time = value
Full example¶
Example of an hardware controller implementation
from bliss import global_map
from bliss.comm.util import get_comm, TCP
from bliss.common.logtools import log_debug
class FooDevice:
"""
Low level controller class for device XXX model YYY
manufacturer:
website:
description:
"""
_WEOL = "\r" # write eol
_REOL = "\r\n" # read eol
_CMD2PARAM = { # a dict { cmd: (possible arg values, ...) }
"ACQ": ("ON", "OFF"), # a cmd taking 2 possible arg values of type str
"BDR": (9600, 19200, 38400), # a cmd taking 3 possible arg values of type int
"VER": None, # a cmd which does not take arg
"ITM": [1e-3, 1.], # a cmd taking an arg of type float in a given range
}
_CHANNEL_NAMES = (1, 2, 3, 4)
_DEFAULT_PORT = 10001
def __init__(self, config):
self._config = config
self._comm = None
# === attribute examples
self._version = None
self._integration_time = None
def __del__(self):
self.__close__()
def __close__(self):
""" BLISS will try to call this method when closing a session """
self._close_comm()
def _init_comm(self):
""" Initialize communication object """
default_options = {'eol': self._REOL, 'port': self._DEFAULT_PORT}
self._comm = get_comm(self._config , ctype=TCP, **default_options)
global_map.register(self, children_list=[self._comm])
def _close_comm(self):
""" Close communication """
if self._comm is not None:
self._comm.close()
self._comm = None
@property
def comm(self):
if self._comm is None:
self._init_comm()
return self._comm
def send_cmd(self, cmd, arg=None, channel=None):
""" Send a command to the hardware and read the answer """
self._check_cmd(cmd, arg, channel)
msg = f"{cmd}"
if channel is not None:
msg += f" {channel}"
if arg is not None:
msg += f" {arg}"
msg += f"{self._WEOL}"
log_debug(self, f"send_cmd {msg}")
ans = self.comm.write_readline(msg.encode()).decode()
log_debug(self, f"receive {ans}")
return ans
def _check_cmd(self, cmd, arg=None, channel=None):
""" Check command and argument validity """
if cmd not in self._CMD2PARAM.keys():
raise ValueError(
f"Unknown command '{cmd}', should be in {list(self._CMD2PARAM.keys())}"
)
if channel is not None:
if channel not in self._CHANNEL_NAMES:
raise ValueError(
f"Unknown channel '{channel}', should be in {self._CHANNEL_NAMES}"
)
if arg not in [None, '?']: # setter case
rng = self._CMD2PARAM[cmd]
# cmd does not expect an arg
if rng is None:
raise ValueError(
f"command '{cmd}' does not expect an argument but receives '{arg}'"
)
# cmd expects known arg values
if isinstance(rng, tuple):
if arg not in rng:
raise ValueError(
f"command '{cmd}' expects argument in {rng} but receives '{arg}'"
)
# cmd expects arg value in a given range
if isinstance(rng, list):
mini, maxi = rng
if arg < mini or arg > maxi:
raise ValueError(
f"command '{cmd}' expects argument in range {rng} but receives '{arg}'"
)
#=== User API / property examples ======================================
@property
def version(self):
if self._version is None:
self._version = self.send_cmd("VER", "?")
return self._version
@property
def integration_time(self):
if self._integration_time is None:
self._integration_time = self.send_cmd("ITM", "?")
return self._integration_time
@integration_time.setter
def integration_time(self, value):
self.send_cmd("ITM", value)
self._integration_time = value