remotivelabs.broker

RemotiveLabs Broker API

This module handles communication with the RemotiveBroker.

The RemotiveBroker can run locally or in RemotiveCloud. Connecting to a RemotiveCloud instance requires additional authentication parameters—see the remotivelabs.broker.auth module for details.

Installation

pip install remotivelabs-broker

Logging

This library uses Python's standard logging module. By default, the library does not configure any logging handlers, allowing applications to fully control their logging setup.

To enable logs from this library in your application or tests, configure logging as follows:

import logging

logging.basicConfig(level=logging.INFO)
logging.getLogger("remotivelabs.broker").setLevel(logging.DEBUG)

For more advanced configurations, refer to the Python logging documentation.

__version__ = '0.9.0b3'
class BrokerClient:

Represents an async client connection with a RemotiveBroker

BrokerClient( url: str, client_id: str | None = None, auth: remotivelabs.broker.auth.AuthMethod = <remotivelabs.broker.auth.NoAuth object>)

Initializes a new BrokerClient instance.

Arguments:
  • url: The RemotiveBroker URL to connect to.
  • client_id: Optional client ID. If None, a random ID is generated.
  • auth: Authentication method to use. Defaults to NoAuth.
Note:

Start the instance using a context manager:

async with BrokerClient(...) as broker_client:
    ...

Or use the connect/disconnect methods directly:

broker_client = await BrokerClient(...).connect()
# ...
await broker_client.disconnect()

url: str

The url the client connects to

Used to interface restbuses

client_id: str

The client id used by this client

async def connect(self) -> typing_extensions.Self:

Connect to the broker and verify license validity. This is an idempotent operation - calling it multiple times has no additional effect.

Returns:

The connected client instance.

Raises:
  • BrokerLicenseError: If the broker license is invalid.
  • BrokerConnectionError: If connection to the broker fails.
async def disconnect(self) -> None:

Disconnect the client from the broker and release all associated resources.

This method is idempotent, subsequent calls after the first have no effect. Once disconnected, the client can no longer be used.

async def publish( self, *values: tuple[str, list[WriteSignal]]) -> None:

Publish a list of signals to the broker.

Arguments:
  • *values: One or more tuples, each containing a namespace and a list of signals to publish.
Raises:
  • ValueError: If the list of values is empty.
async def subscribe( self, *signals: tuple[str, list[str]], on_change: bool = False, initial_empty: bool = False) -> AsyncIterator[list[Signal]]:

Subscribe to a list of signals.

Arguments:
  • *signals: One or more tuples, each containing namespace and list of signals to subscribe to.
  • on_change: Whether to receive updates only on change.
  • initial_empty: True will wait until the broker has sent an initial message
Returns:

An asynchronous iterator of lists of Signal objects.

async def subscribe_frames( self, *frames: tuple[str, list[FrameSubscription]], on_change: bool = False, initial_empty: bool = False, decode_named_values: bool = False) -> AsyncIterator[Frame]:

Subscribe to a Frames.

Arguments:
  • *frames: One or more tuples, each containing namespace and list of frames to subscribe to.
  • on_change: Whether to receive updates only on change.
  • initial_empty: True will wait until the broker has sent an initial message
  • decode_named_values: True will decode named values to str.
Returns:

An asynchronous iterator with Frames.

async def check_license(self) -> bool:

Check if the broker license is valid.

Returns:

True if the broker license is valid, otherwise False.

async def set_secoc_property( self, namespace: str | NamespaceInfo, property: Union[SecocFreshnessValue, SecocTimeDiff, SecocKey, SecocCmac0]) -> None:

Set a SecOC property on the broker.

Arguments:
  • namespace: Target namespace
  • property: The SecOC property to set.
async def list_namespaces(self) -> list[NamespaceInfo]:

List all namespaces configured in the broker.

Returns:

A list of namespaces.

async def get_namespace(self, name: str) -> NamespaceInfo | None:

Get a namespace by name.

Returns:

The namespace if found, otherwise None.

async def list_frame_infos( self, *namespaces: str | NamespaceInfo) -> list[FrameInfo]:

List all frame infos in one or more namespaces.

Arguments:
  • namespaces: One or more NamespaceName or NamespaceInfo objects.
Returns:

A list of FrameInfo objects.

async def get_frame_info( self, name: str, namespace: str | NamespaceInfo) -> FrameInfo | None:

Get a frame by name in a specific namespace.

Arguments:
  • name: The name of the frame.
  • namespace: The namespace name or NamespaceInfo object.
Returns:

The FrameInfo object if found, otherwise None.

async def list_signal_infos( self, *namespaces: str | NamespaceInfo) -> list[SignalInfo]:

List all signal infos in one or more namespaces.

Arguments:
  • namespaces: One or more NamespaceName or NamespaceInfo objects.
Returns:

A list of SignalInfo objects.

async def get_signal_info( self, name: str, namespace: str | NamespaceInfo) -> SignalInfo | None:

Get a signal info by name in a specific namespace.

Arguments:
  • name: The name of the signal.
  • namespace: The namespace name or NamespaceInfo object.
Returns:

The SignalInfo object if found, otherwise None.

def is_ready(self) -> bool:

Check if the broker client is ready.

Returns:

True if the broker client is ready, otherwise False.

def is_closed(self) -> bool:

Check if the broker client is closed.

Returns:

True if the broker client is closed, otherwise False.

@dataclass(frozen=True)
class Frame:

A concrete instance of a frame carrying signal values.

Attributes:
  • name: Name of the frame.
  • namespace: Namespace to which the frame belongs.
  • timestamp: in micro seconds, set when first seen.
  • signals: Dict with signal names with their current values.
  • value: Raw or composite value associated with the frame (e.g., serialized representation).
Frame( timestamp: int, name: str, namespace: str, signals: dict[str, typing.Union[int, float, bytes, str]], value: Union[int, float, bytes, str])
timestamp: int
name: str
namespace: str
signals: dict[str, typing.Union[int, float, bytes, str]]
value: Union[int, float, bytes, str]
@dataclass
class FrameInfo:

Metadata about a frame in a namespace.

Attributes:
  • name: Name of the frame.
  • namespace: Namespace to which the frame belongs.
  • signals: Dict of signal names and their corresponding SignalInfo.
  • sender: List of entities that send this frame.
  • receiver: List of entities that receive this frame.
  • cycle_time_millis: Frame cycle time in milliseconds.
FrameInfo( name: str, namespace: str, signals: dict[str, SignalInfo], sender: list[str], receiver: list[str], cycle_time_millis: float)
name: str
namespace: str
signals: dict[str, SignalInfo]
sender: list[str]
receiver: list[str]
cycle_time_millis: float
FrameName = <class 'str'>
@dataclass(frozen=True)
class FrameSubscription:

Used to subscribe to a frame and optionally its signals.

Attributes:
  • name: Name of the frame to subscribe to.
  • signals: List of signal names to subscribe to
    • None: subscribe to all signals in the frame.
    • []: subscribe to the frame without any signals.
FrameSubscription(name: str, signals: list[str] | None = None)
name: str
signals: list[str] | None = None
@dataclass(frozen=True)
class Signal:

A signal with its name, namespace, and current value.

Attributes:
  • name: The name of the signal.
  • namespace: The namespace the signal belongs to.
  • value: The current value of the signal.
Signal(name: str, namespace: str, value: Union[int, float, bytes, str])
name: str
namespace: str
value: Union[int, float, bytes, str]
@dataclass(frozen=True)
class SignalInfo:

Metadata describing a signal

Attributes:
  • name: Name of the signal.
  • namespace: Namespace the signal belongs to.
  • receiver: List of receivers for the signal.
  • sender: List of senders for the signal.
  • named_values: Mapping from raw numeric values to symbolic names (e.g., enums).
  • value_names: Reverse mapping from symbolic names to raw numeric values.
SignalInfo( name: str, namespace: str, receiver: list[str], sender: list[str], named_values: dict[int, str], value_names: dict[str, int])
name: str
namespace: str
receiver: list[str]
sender: list[str]
named_values: dict[int, str]
value_names: dict[str, int]
SignalName = <class 'str'>
SignalValue = typing.Union[int, float, bytes, str]
@dataclass(frozen=True)
class WriteSignal:

A signal intended to be published with a specific value.

Attributes:
  • name: The name of the signal to write.
  • value: The value to assign to the signal.
WriteSignal(name: str, value: Union[int, float, bytes, str])
name: str
value: Union[int, float, bytes, str]
@dataclass
class RestbusFrameConfig:

Configuration for a frame in the Restbus.

Attributes:
  • name: The name of the frame to configure.
  • cycle_time: Optional cycle time override for the frame. If None, the default from the broker's database is used.
RestbusFrameConfig(name: str, cycle_time: float | None = None)
name: str
cycle_time: float | None = None
@dataclass
class RestbusSignalConfig:

This class defines how a specific signal should behave when emitted by Restbus. A signal can have:

Attributes:
  • name: The name of the signal
  • loop: Values emitted in order after the initial sequence
  • initial: Optional values emitted once before the loop starts
RestbusSignalConfig( name: str, loop: Sequence[Union[int, float, bytes, str]], initial: Sequence[Union[int, float, bytes, str]] = <factory>)
name: str
loop: Sequence[Union[int, float, bytes, str]]
initial: Sequence[Union[int, float, bytes, str]]
@classmethod
def set( cls, name: str, value: Union[int, float, bytes, str]) -> RestbusSignalConfig:

Create a SignalConfig with a constant value.

Arguments:
  • name: Name of the signal
  • value: Value to set in Restbus
@classmethod
def set_update_bit( cls, name: str) -> RestbusSignalConfig:

Create a SignalConfig for an update-bit pattern (sends 1 once, then constant 0).

Arguments:
  • name: Name of the signal
@dataclass()
class NamespaceInfo:
NamespaceInfo(name: str, type: str = 'unknown')
name: str
type: str = 'unknown'
def is_virtual(self) -> bool:
NamespaceName = <class 'str'>
@dataclass(frozen=True)
class SecocCmac0:

Use CMAC0 for SecOC in the RemotiveBroker.

SecocCmac0(enabled: bool)
enabled: bool
@dataclass(frozen=True)
class SecocFreshnessValue:

Set SecOC binary freshness value to be used by freshness manager. Property is limited to SecOC on a given name space.

SecocFreshnessValue(fv: bytes)
fv: bytes
@dataclass(frozen=True)
class SecocKey:

Set binary 128-bit key to be used for SecOC in the RemotiveBroker.

Multiple keys can be set and are separated by key ID's.

SecocKey(key_id: int, key: bytes)
key_id: int
key: bytes
SecocProperty = typing.Union[SecocFreshnessValue, SecocTimeDiff, SecocKey, SecocCmac0]
@dataclass(frozen=True)
class SecocTimeDiff:

Set a time delta to use in real time clock for SecOC freshness value. Time difference is in floating point seconds and is limited to a name space.

SecocTimeDiff(time_diff: float)
time_diff: float