remotivelabs.broker

RemotiveLabs Broker API

This module handles communication with the RemotiveBroker.

The RemotiveBroker can run locally or in RemotiveCloud. Connecting to a RemotiveCloud instance requires additional authentication parameters—see the remotivelabs.broker.auth module for details.

Installation

pip install remotivelabs-broker

Usage

Connecting to a RemotiveBroker is straightforward using the BrokerClient class:

import asyncio

from remotivelabs.broker import BrokerClient


async def main() -> None:
    # Create a BrokerClient instance and list all namespaces available on that broker.
    async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
        ns = await broker_client.list_namespaces()
        print(f"Namespaces: {ns}")


if __name__ == "__main__":
    asyncio.run(main())

Listing frames and signals:

import asyncio

from remotivelabs.broker import BrokerClient, FrameInfo


async def main() -> None:
    async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
        # list all available frames in the "DriverCan" namespace
        frame_infos: list[FrameInfo] = await broker_client.list_frame_infos("DriverCan")

        # simply print all frames and their signals
        for frame_info in frame_infos:
            print(f"{frame_info.name}")
            for signal_info in frame_info.signals.values():
                print(f"  {signal_info.name}")


if __name__ == "__main__":
    asyncio.run(main())

Subscribing to frames or signals on a specific namespace:

import asyncio
from typing import AsyncIterator

from remotivelabs.broker import BrokerClient, Frame


async def main() -> None:
    async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
        # subscribe to a frame named "EngineData" (including all its signals) in the "DriverCan" namespace
        stream: AsyncIterator[Frame] = await broker_client.subscribe_frames(
            ("DriverCan", ["EngineData"]),
        )

        # will receive frames until stream is closed by broker or an error occurs
        async for frame in stream:
            print(f"Received frame: {frame}")


if __name__ == "__main__":
    asyncio.run(main())

Filters

It is possible to filter frames and signals using filter predicates. A filter predicate is any callable that takes a FrameInfo or SignalInfo object and returns a boolean indicating whether the object matches the filter criteria.

All filters have an exclude flag to indicate whether matches should be excluded (True) or included (False, default). Filters are callable and can be passed to Python built-in functions that expect a predicate, such as filter(), any(), or all(). The snippet below shows examples using filter():

Filter Strategy:

The filter_recursive() function applies frame and signal filters with context-dependent behavior. Frame filtering determines eligibility, then signal filtering applies differently based on frame inclusion: included frames start with all signals (exclude-only), excluded frames require explicit signal inclusion. Exclusion filters always take priority over inclusion filters.

Examples:

from remotivelabs.broker import FrameInfo, SignalInfo
from remotivelabs.broker.filters import (
    AllFramesFilter,
    FrameFilter,
    ReceiverFilter,
    SenderFilter,
    SignalFilter,
    filter_recursive,
    is_frame_filter,
    is_signal_filter,
)

# assume frames and signals are populated elsewhere
frames: list[FrameInfo] = []
signals: list[SignalInfo] = []

# Example 1: Include all frames
all_frames_filter = AllFramesFilter()
filtered_frames = list(filter(all_frames_filter, frames))

# Example 2: Include all frames but exclude a specific frame
frame_exclude_filter = FrameFilter(frame_name="Frame1", exclude=True)
filtered_frames = list(filter(frame_exclude_filter, frames))

# Example 3: Filter frames sent by a specific ECU
sender_filter = SenderFilter(ecu_name="ECU1")
filtered_frames = list(filter(sender_filter, frames))

# Example 4: Filter frames received by a specific ECU, excluding one signal
receiver_filter = ReceiverFilter(ecu_name="ECU2")
signal_exclude_filter = SignalFilter(signal_name="SignalA", exclude=True)
filtered_frames = list(filter(receiver_filter, frames))
filtered_signals = list(filter(signal_exclude_filter, signals))

# Example 5: Chaining filters
frame_include = FrameFilter(frame_name="FrameA")
frame_exclude = FrameFilter(frame_name="FrameB", exclude=True)
filtered_frames = list(filter(frame_exclude, filter(frame_include, frames)))

# Example 6: Combining inclusion and exclusion with AllFramesFilter and FrameFilter
all_frames_filter = AllFramesFilter()
frame_exclude_filter = FrameFilter(frame_name="FrameC", exclude=True)
filtered_frames = list(filter(frame_exclude_filter, filter(all_frames_filter, frames)))

# Example 7: Recursive filtering of signals (SignalInfos) in frames (FrameInfos)
filtered_frames = [
    filtered_frame
    for frame in frames
    if (filtered_frame := filter_recursive(frame, filters=[all_frames_filter, signal_exclude_filter])) is not None
]

# Example 10: Type checking filters
assert is_frame_filter(all_frames_filter)  # works with frames
assert not is_signal_filter(all_frames_filter)  # doesn't work with signals

assert not is_frame_filter(signal_exclude_filter)  # doesn't work with frames
assert is_signal_filter(signal_exclude_filter)  # works with signals

assert is_frame_filter(sender_filter)  # works with frames
assert is_signal_filter(sender_filter)  # works with signals

Filtered frames can be used when subscribing:

import asyncio
from typing import AsyncIterator

from remotivelabs.broker import BrokerClient, Frame, FrameInfo
from remotivelabs.broker.filters import ReceiverFilter


async def main() -> None:
    async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
        # list frames in the "DriverCan" namespace
        frame_infos: list[FrameInfo] = await broker_client.list_frame_infos("DriverCan")

        # filter according to some critera
        filtered_frame_infos = list(filter(ReceiverFilter(ecu_name="ECU2"), frame_infos))

        # subscribe using the filtered frame infos
        stream: AsyncIterator[Frame] = await broker_client.subscribe_frames(
            ("DriverCan", filtered_frame_infos),
        )

        # will receive frames until stream is closed by broker or an error occurs
        async for frame in stream:
            print(f"Received frame: {frame}")


if __name__ == "__main__":
    asyncio.run(main())

Filters can also be used directly when subscribing:

import asyncio
from typing import AsyncIterator

from remotivelabs.broker import BrokerClient, Frame
from remotivelabs.broker.filters import ReceiverFilter


async def main() -> None:
    async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
        # subscribe using the ReceiverFilter to only receive frames from ECU named "ECU2" in the "DriverCan" namespace
        stream: AsyncIterator[Frame] = await broker_client.subscribe_frames(
            ("DriverCan", [ReceiverFilter(ecu_name="ECU2")]),
        )

        # will receive frames until stream is closed by broker or an error occurs
        async for frame in stream:
            print(f"Received frame: {frame}")


if __name__ == "__main__":
    asyncio.run(main())

Restbus

The RemotiveBroker exposes a Restbus interface for simulating CAN bus traffic.

import asyncio

from remotivelabs.broker import BrokerClient
from remotivelabs.broker.restbus import RestbusFrameConfig, RestbusSignalConfig


async def main() -> None:
    async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
        # Start the restbus on the "DriverCan" namespace with one frame configuration
        await broker_client.restbus.add(
            ("DriverCan", [RestbusFrameConfig(name="EngineData")]),
            start=True,
        )

        # now update some of its signals
        signal_configs: list[RestbusSignalConfig] = [
            RestbusSignalConfig.set(name="EngineData.EngineRpm", value=1500),
            RestbusSignalConfig.set(name="EngineData.EngineTemp", value=90),
        ]
        await broker_client.restbus.update_signals(
            ("DriverCan", signal_configs),
        )


if __name__ == "__main__":
    asyncio.run(main())

Recording session

There is a RecordingSessionClient for managing recording session playback on the broker.

import asyncio

from remotivelabs.broker.recording_session import RecordingSessionClient


async def main() -> None:
    # Create a recording session client
    async with RecordingSessionClient(url="http://127.0.0.1:50051") as rs_client:
        # List available recording files on the broker
        files = await rs_client.list_recording_files()
        print(f"Available recording files: {files}")

        # open a recording session with the first file
        rs_ref = rs_client.get_session(str(files[0]))
        async with rs_ref as session:
            # start playback
            status = await session.play()
            print(f"Playback started with status: {status}")

        # exiting the async with block will close the session, but playback will continue if other sessions are open


if __name__ == "__main__":
    asyncio.run(main())

SecOC

RemotiveLabsBroker supports multiple SecOC properties. These properties can be set via the RemotiveBroker API to configure SecOC behavior for testing purposes.

import asyncio

from remotivelabs.broker import BrokerClient
from remotivelabs.broker.secoc import SecocFreshnessValue


async def main() -> None:
    async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
        prop = SecocFreshnessValue(fv=b"\x00\x00\x12\x34")
        await broker_client.set_secoc_property(namespace="MyNamespace", property=prop)


if __name__ == "__main__":
    asyncio.run(main())

Logging

This library uses Python's standard logging module. By default, the library does not configure any logging handlers, allowing applications to fully control their logging setup.

To enable logs from this library in your application or tests, configure logging as follows:

import logging

logging.basicConfig(level=logging.INFO)
logging.getLogger("remotivelabs.broker").setLevel(logging.DEBUG)

For more advanced configurations, refer to the Python logging documentation.

__version__ = '0.12.0b1'
class BrokerClient(remotivelabs.broker.connection.BrokerClientConnection):

Client for signal-related operations.

BrokerClient( url: str, client_id: str | None = None, auth: remotivelabs.broker.auth.AuthMethod = <remotivelabs.broker.auth.NoAuth object>)

Initializes a new BrokerClient instance.

Arguments:
  • url: The RemotiveBroker URL to connect to.
  • client_id: Optional client ID. If None, a random ID is generated.
  • auth: Authentication method to use. Defaults to NoAuth.
Note:

Start the instance using a context manager:

async with BrokerClient(...) as client:
    ...

Or use the connect/disconnect methods directly:

client = await BrokerClient(...).connect()
# ...
await client.disconnect()
async def broker_version(self) -> str:

Returns the broker version

async def read_signals( self, *values: tuple[str, list[str]]) -> list[Signal]:

Read signals from signal cache

Arguments:
  • *values: One or more tuples, each containing a namespace and a list of signal names of the signals to read.
Raises:
  • ValueError: If the list of values is empty.
async def publish( self, *values: tuple[str, list[WriteSignal]]) -> None:

Publish a list of signals to the broker.

Arguments:
  • *values: One or more tuples, each containing a namespace and a list of signals to publish.
Raises:
  • ValueError: If the list of values is empty.
async def publish_header(self, *headers: tuple[str, list[str]]) -> None:

Publish a list of headers to the broker.

Arguments:
  • *headers: One or more tuples, each containing a namespace and a list of frame names of which headers to publish.
Raises:
  • ValueError: If the list of values is empty.
async def subscribe( self, *signals: tuple[str, typing.Sequence[str | SignalInfo | remotivelabs.broker.filters.SignalFilterPredicate]], on_change: bool = False, initial_empty: bool = False) -> AsyncIterator[list[Signal]]:

Subscribe to a list of signals.

Arguments:
  • *signals: One or more tuples, each containing namespace and sequence of signal metadata to subscribe to.
  • on_change: Whether to receive updates only on change.
  • initial_empty: True will wait until the broker has sent an initial message
Returns:

An asynchronous iterator of lists of Signal objects.

async def subscribe_header( self, *headers: tuple[str, typing.Sequence[str]], initial_empty: bool = False) -> AsyncIterator[Header]:

Subscribe to headers.

Arguments:
  • *headers: One or more tuples, each containing namespace and a sequence of frame names of which headers to subscribe to.
  • initial_empty: True will wait until the broker has sent an initial message
Returns:

An asynchronous iterator of of Header objects.

async def subscribe_frames( self, *frames: tuple[str, typing.Sequence[str | FrameSubscription | FrameInfo | remotivelabs.broker.filters.FrameFilterPredicate]], on_change: bool = False, initial_empty: bool = False, decode_named_values: bool = False) -> AsyncIterator[Frame]:

Subscribe to a Frames.

Arguments:
  • *frames: One or more tuples, each containing namespace and sequence of frame metadata to subscribe to.
  • on_change: Whether to receive updates only on change.
  • initial_empty: True will wait until the broker has sent an initial message
  • decode_named_values: True will decode named values to str.
Returns:

An asynchronous iterator with Frames.

async def set_secoc_property( self, namespace: str | NamespaceInfo, property: Union[SecocFreshnessValue, SecocTimeDiff, SecocKey, SecocCmac0]) -> None:

Set a SecOC property on the broker.

Arguments:
  • namespace: Target namespace
  • property: The SecOC property to set.
async def list_namespaces(self) -> list[NamespaceInfo]:

List all namespaces configured in the broker.

Returns:

A list of namespaces.

async def get_namespace(self, name: str) -> NamespaceInfo | None:

Get a namespace by name.

Returns:

The namespace if found, otherwise None.

async def list_frame_infos( self, *namespaces: str | NamespaceInfo) -> list[FrameInfo]:

List all frame infos in one or more namespaces.

Arguments:
  • namespaces: One or more NamespaceName or NamespaceInfo objects.
Returns:

A list of FrameInfo objects.

async def get_frame_info( self, name: str, namespace: str | NamespaceInfo) -> FrameInfo | None:

Get a frame by name in a specific namespace.

Arguments:
  • name: The name of the frame.
  • namespace: The namespace name or NamespaceInfo object.
Returns:

The FrameInfo object if found, otherwise None.

async def list_signal_infos( self, *namespaces: str | NamespaceInfo) -> list[SignalInfo]:

List all signal infos in one or more namespaces.

Arguments:
  • namespaces: One or more NamespaceName or NamespaceInfo objects.
Returns:

A list of SignalInfo objects.

async def get_signal_info( self, name: str, namespace: str | NamespaceInfo) -> SignalInfo | None:

Get a signal info by name in a specific namespace.

Arguments:
  • name: The name of the signal.
  • namespace: The namespace name or NamespaceInfo object.
Returns:

The SignalInfo object if found, otherwise None.

def is_ready(self) -> bool:

Check if the broker client is ready.

Returns:

True if the broker client is ready, otherwise False.

def is_closed(self) -> bool:

Check if the broker client is closed.

Returns:

True if the broker client is closed, otherwise False.

@dataclass(frozen=True)
class Frame:

A concrete instance of a frame carrying signal values.

Attributes:
  • name: Name of the frame.
  • namespace: Namespace to which the frame belongs.
  • timestamp: in micro seconds, set when first seen.
  • signals: Dict with signal names with their current values.
  • value: Raw or composite value associated with the frame (e.g., serialized representation).
Frame( timestamp: int, name: str, namespace: str, signals: dict[str, typing.Union[int, float, bytes, str]], value: Union[int, float, bytes, str])
timestamp: int
name: str
namespace: str
signals: dict[str, typing.Union[int, float, bytes, str]]
value: Union[int, float, bytes, str]
@dataclass
class FrameInfo:

Metadata about a frame in a namespace.

Attributes:
  • name: Name of the frame.
  • namespace: Namespace to which the frame belongs.
  • signals: Dict of signal names and their corresponding SignalInfo.
  • sender: List of entities that send this frame.
  • receiver: List of entities that receive this frame.
  • cycle_time_millis: Frame cycle time in milliseconds.
FrameInfo( name: str, namespace: str, signals: dict[str, SignalInfo], sender: list[str], receiver: list[str], cycle_time_millis: float)
name: str
namespace: str
signals: dict[str, SignalInfo]
sender: list[str]
receiver: list[str]
cycle_time_millis: float
FrameName = <class 'str'>
@dataclass(frozen=True)
class FrameSubscription:

Used to subscribe to a frame and optionally its signals.

Attributes:
  • name: Name of the frame to subscribe to.
  • signals: List of signal names to subscribe to
    • None: subscribe to all signals in the frame.
    • []: subscribe to the frame without any signals.
FrameSubscription(name: str, signals: list[str] | None = None)
name: str
signals: list[str] | None = None
@dataclass()
class NamespaceInfo:
NamespaceInfo(name: str, type: str = 'unknown')
name: str
type: str = 'unknown'
def is_virtual(self) -> bool:
NamespaceName = <class 'str'>
@dataclass
class RestbusFrameConfig:

Configuration for a frame in the Restbus.

Attributes:
  • name: The name of the frame to configure.
  • cycle_time: Optional cycle time override for the frame. If None, the default from the broker's database is used.
RestbusFrameConfig(name: str, cycle_time: float | None = None)
name: str
cycle_time: float | None = None
@dataclass
class RestbusSignalConfig:

This class defines how a specific signal should behave when emitted by Restbus. A signal can have:

Attributes:
  • name: The name of the signal
  • loop: Values emitted in order after the initial sequence
  • initial: Optional values emitted once before the loop starts
RestbusSignalConfig( name: str, loop: Sequence[Union[int, float, bytes, str]], initial: Sequence[Union[int, float, bytes, str]] = <factory>)
name: str
loop: Sequence[Union[int, float, bytes, str]]
initial: Sequence[Union[int, float, bytes, str]]
@classmethod
def set( cls, name: str, value: Union[int, float, bytes, str]) -> RestbusSignalConfig:

Create a SignalConfig with a constant value.

Arguments:
  • name: Name of the signal
  • value: Value to set in Restbus
@classmethod
def set_update_bit( cls, name: str) -> RestbusSignalConfig:

Create a SignalConfig for an update-bit pattern (sends 1 once, then constant 0).

Arguments:
  • name: Name of the signal
@dataclass(frozen=True)
class SecocCmac0:

Use CMAC0 for SecOC in the RemotiveBroker.

SecocCmac0(enabled: bool)
enabled: bool
@dataclass(frozen=True)
class SecocFreshnessValue:

Set SecOC binary freshness value to be used by freshness manager. Property is limited to SecOC on a given name space.

SecocFreshnessValue(fv: bytes)
fv: bytes
@dataclass(frozen=True)
class SecocKey:

Set binary 128-bit key to be used for SecOC in the RemotiveBroker.

Multiple keys can be set and are separated by key ID's.

SecocKey(key_id: int, key: bytes)
key_id: int
key: bytes
SecocProperty = typing.Union[SecocFreshnessValue, SecocTimeDiff, SecocKey, SecocCmac0]
@dataclass(frozen=True)
class SecocTimeDiff:

Set a time delta to use in real time clock for SecOC freshness value. Time difference is in floating point seconds and is limited to a name space.

SecocTimeDiff(time_diff: float)
time_diff: float
@dataclass(frozen=True)
class Signal:

A signal with its name, namespace, and current value.

Attributes:
  • name: The name of the signal.
  • namespace: The namespace the signal belongs to.
  • value: The current value of the signal.
Signal(name: str, namespace: str, value: Union[int, float, bytes, str])
name: str
namespace: str
value: Union[int, float, bytes, str]
@dataclass(frozen=True)
class SignalInfo:

Metadata describing a signal

Attributes:
  • name: Name of the signal.
  • namespace: Namespace the signal belongs to.
  • receiver: List of receivers for the signal.
  • sender: List of senders for the signal.
  • named_values: Mapping from raw numeric values to symbolic names (e.g., enums).
  • value_names: Reverse mapping from symbolic names to raw numeric values.
  • min: Minimum allowed value.
  • max: Maximum allowed value.
  • factor: Multiplication faction used for encoding and decoding value in frame.
SignalInfo( name: str, namespace: str, receiver: list[str], sender: list[str], named_values: dict[int, str], value_names: dict[str, int], min: float, max: float, factor: float)
name: str
namespace: str
receiver: list[str]
sender: list[str]
named_values: dict[int, str]
value_names: dict[str, int]
min: float
max: float
factor: float
SignalName = <class 'str'>
SignalValue = typing.Union[int, float, bytes, str]
@dataclass(frozen=True)
class WriteSignal:

A signal intended to be published with a specific value.

Attributes:
  • name: The name of the signal to write.
  • value: The value to assign to the signal.
WriteSignal(name: str, value: Union[int, float, bytes, str])
name: str
value: Union[int, float, bytes, str]