remotivelabs.broker
RemotiveLabs Broker API
This module handles communication with the RemotiveBroker.
The RemotiveBroker can run locally or in RemotiveCloud. Connecting to a RemotiveCloud instance requires additional
authentication parameters—see the remotivelabs.broker.auth module for details.
Installation
pip install remotivelabs-broker
Project Links
Usage
Connecting to a RemotiveBroker is straightforward using the BrokerClient class:
import asyncio
from remotivelabs.broker import BrokerClient
async def main() -> None:
# Create a BrokerClient instance and list all namespaces available on that broker.
async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
ns = await broker_client.list_namespaces()
print(f"Namespaces: {ns}")
if __name__ == "__main__":
asyncio.run(main())
Listing frames and signals:
import asyncio
from remotivelabs.broker import BrokerClient, FrameInfo
async def main() -> None:
async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
# list all available frames in the "DriverCan" namespace
frame_infos: list[FrameInfo] = await broker_client.list_frame_infos("DriverCan")
# simply print all frames and their signals
for frame_info in frame_infos:
print(f"{frame_info.name}")
for signal_info in frame_info.signals.values():
print(f" {signal_info.name}")
if __name__ == "__main__":
asyncio.run(main())
Subscribing to frames or signals on a specific namespace:
import asyncio
from typing import AsyncIterator
from remotivelabs.broker import BrokerClient, Frame
async def main() -> None:
async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
# subscribe to a frame named "EngineData" (including all its signals) in the "DriverCan" namespace
stream: AsyncIterator[Frame] = await broker_client.subscribe_frames(
("DriverCan", ["EngineData"]),
)
# will receive frames until stream is closed by broker or an error occurs
async for frame in stream:
print(f"Received frame: {frame}")
if __name__ == "__main__":
asyncio.run(main())
Filters
It is possible to filter frames and signals using filter predicates. A filter predicate is any callable that takes a
FrameInfo or SignalInfo object and returns a boolean indicating whether the object matches the filter criteria.
All filters have an exclude flag to indicate whether matches should be excluded (True) or
included (False, default). Filters are callable and can be passed to Python built-in functions that
expect a predicate, such as filter(), any(), or all(). The snippet below shows examples
using filter():
Filter Strategy:
The
filter_recursive()function applies frame and signal filters with context-dependent behavior. Frame filtering determines eligibility, then signal filtering applies differently based on frame inclusion: included frames start with all signals (exclude-only), excluded frames require explicit signal inclusion. Exclusion filters always take priority over inclusion filters.
Examples:
from remotivelabs.broker import FrameInfo, SignalInfo
from remotivelabs.broker.filters import (
AllFramesFilter,
FrameFilter,
ReceiverFilter,
SenderFilter,
SignalFilter,
filter_recursive,
is_frame_filter,
is_signal_filter,
)
# assume frames and signals are populated elsewhere
frames: list[FrameInfo] = []
signals: list[SignalInfo] = []
# Example 1: Include all frames
all_frames_filter = AllFramesFilter()
filtered_frames = list(filter(all_frames_filter, frames))
# Example 2: Include all frames but exclude a specific frame
frame_exclude_filter = FrameFilter(frame_name="Frame1", exclude=True)
filtered_frames = list(filter(frame_exclude_filter, frames))
# Example 3: Filter frames sent by a specific ECU
sender_filter = SenderFilter(ecu_name="ECU1")
filtered_frames = list(filter(sender_filter, frames))
# Example 4: Filter frames received by a specific ECU, excluding one signal
receiver_filter = ReceiverFilter(ecu_name="ECU2")
signal_exclude_filter = SignalFilter(signal_name="SignalA", exclude=True)
filtered_frames = list(filter(receiver_filter, frames))
filtered_signals = list(filter(signal_exclude_filter, signals))
# Example 5: Chaining filters
frame_include = FrameFilter(frame_name="FrameA")
frame_exclude = FrameFilter(frame_name="FrameB", exclude=True)
filtered_frames = list(filter(frame_exclude, filter(frame_include, frames)))
# Example 6: Combining inclusion and exclusion with AllFramesFilter and FrameFilter
all_frames_filter = AllFramesFilter()
frame_exclude_filter = FrameFilter(frame_name="FrameC", exclude=True)
filtered_frames = list(filter(frame_exclude_filter, filter(all_frames_filter, frames)))
# Example 7: Recursive filtering of signals (SignalInfos) in frames (FrameInfos)
filtered_frames = [
filtered_frame
for frame in frames
if (filtered_frame := filter_recursive(frame, filters=[all_frames_filter, signal_exclude_filter])) is not None
]
# Example 10: Type checking filters
assert is_frame_filter(all_frames_filter) # works with frames
assert not is_signal_filter(all_frames_filter) # doesn't work with signals
assert not is_frame_filter(signal_exclude_filter) # doesn't work with frames
assert is_signal_filter(signal_exclude_filter) # works with signals
assert is_frame_filter(sender_filter) # works with frames
assert is_signal_filter(sender_filter) # works with signals
Filtered frames can be used when subscribing:
import asyncio
from typing import AsyncIterator
from remotivelabs.broker import BrokerClient, Frame, FrameInfo
from remotivelabs.broker.filters import ReceiverFilter
async def main() -> None:
async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
# list frames in the "DriverCan" namespace
frame_infos: list[FrameInfo] = await broker_client.list_frame_infos("DriverCan")
# filter according to some critera
filtered_frame_infos = list(filter(ReceiverFilter(ecu_name="ECU2"), frame_infos))
# subscribe using the filtered frame infos
stream: AsyncIterator[Frame] = await broker_client.subscribe_frames(
("DriverCan", filtered_frame_infos),
)
# will receive frames until stream is closed by broker or an error occurs
async for frame in stream:
print(f"Received frame: {frame}")
if __name__ == "__main__":
asyncio.run(main())
Filters can also be used directly when subscribing:
import asyncio
from typing import AsyncIterator
from remotivelabs.broker import BrokerClient, Frame
from remotivelabs.broker.filters import ReceiverFilter
async def main() -> None:
async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
# subscribe using the ReceiverFilter to only receive frames from ECU named "ECU2" in the "DriverCan" namespace
stream: AsyncIterator[Frame] = await broker_client.subscribe_frames(
("DriverCan", [ReceiverFilter(ecu_name="ECU2")]),
)
# will receive frames until stream is closed by broker or an error occurs
async for frame in stream:
print(f"Received frame: {frame}")
if __name__ == "__main__":
asyncio.run(main())
Restbus
The RemotiveBroker exposes a Restbus interface for simulating CAN bus traffic.
import asyncio
from remotivelabs.broker import BrokerClient
from remotivelabs.broker.restbus import RestbusFrameConfig, RestbusSignalConfig
async def main() -> None:
async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
# Start the restbus on the "DriverCan" namespace with one frame configuration
await broker_client.restbus.add(
("DriverCan", [RestbusFrameConfig(name="EngineData")]),
start=True,
)
# now update some of its signals
signal_configs: list[RestbusSignalConfig] = [
RestbusSignalConfig.set(name="EngineData.EngineRpm", value=1500),
RestbusSignalConfig.set(name="EngineData.EngineTemp", value=90),
]
await broker_client.restbus.update_signals(
("DriverCan", signal_configs),
)
if __name__ == "__main__":
asyncio.run(main())
Recording session
There is a RecordingSessionClient for managing recording session playback on the broker.
import asyncio
from remotivelabs.broker.recording_session import RecordingSessionClient
async def main() -> None:
# Create a recording session client
async with RecordingSessionClient(url="http://127.0.0.1:50051") as rs_client:
# List available recording files on the broker
files = await rs_client.list_recording_files()
print(f"Available recording files: {files}")
# open a recording session with the first file
rs_ref = rs_client.get_session(str(files[0]))
async with rs_ref as session:
# start playback
status = await session.play()
print(f"Playback started with status: {status}")
# exiting the async with block will close the session, but playback will continue if other sessions are open
if __name__ == "__main__":
asyncio.run(main())
SecOC
RemotiveLabsBroker supports multiple SecOC properties. These properties can be set via the RemotiveBroker API to configure SecOC behavior for testing purposes.
import asyncio
from remotivelabs.broker import BrokerClient
from remotivelabs.broker.secoc import SecocFreshnessValue
async def main() -> None:
async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
prop = SecocFreshnessValue(fv=b"\x00\x00\x12\x34")
await broker_client.set_secoc_property(namespace="MyNamespace", property=prop)
if __name__ == "__main__":
asyncio.run(main())
Logging
This library uses Python's standard logging module. By default, the library does not configure any logging handlers,
allowing applications to fully control their logging setup.
To enable logs from this library in your application or tests, configure logging as follows:
import logging
logging.basicConfig(level=logging.INFO)
logging.getLogger("remotivelabs.broker").setLevel(logging.DEBUG)
For more advanced configurations, refer to the Python logging documentation.
Client for signal-related operations.
Initializes a new BrokerClient instance.
Arguments:
- url: The RemotiveBroker URL to connect to.
- client_id: Optional client ID. If None, a random ID is generated.
- auth: Authentication method to use. Defaults to NoAuth.
Note:
Start the instance using a context manager:
async with BrokerClient(...) as client: ...Or use the connect/disconnect methods directly:
client = await BrokerClient(...).connect() # ... await client.disconnect()
Read signals from signal cache
Arguments:
- *values: One or more tuples, each containing a namespace and a list of signal names of the signals to read.
Raises:
- ValueError: If the list of values is empty.
Publish a list of signals to the broker.
Arguments:
- *values: One or more tuples, each containing a namespace and a list of signals to publish.
Raises:
- ValueError: If the list of values is empty.
Publish a list of headers to the broker.
Arguments:
- *headers: One or more tuples, each containing a namespace and a list of frame names of which headers to publish.
Raises:
- ValueError: If the list of values is empty.
Subscribe to a list of signals.
Arguments:
- *signals: One or more tuples, each containing namespace and sequence of signal metadata to subscribe to.
- on_change: Whether to receive updates only on change.
- initial_empty: True will wait until the broker has sent an initial message
Returns:
An asynchronous iterator of lists of Signal objects.
Subscribe to headers.
Arguments:
- *headers: One or more tuples, each containing namespace and a sequence of frame names of which headers to subscribe to.
- initial_empty: True will wait until the broker has sent an initial message
Returns:
An asynchronous iterator of of Header objects.
Subscribe to a Frames.
Arguments:
- *frames: One or more tuples, each containing namespace and sequence of frame metadata to subscribe to.
- on_change: Whether to receive updates only on change.
- initial_empty: True will wait until the broker has sent an initial message
- decode_named_values: True will decode named values to str.
Returns:
An asynchronous iterator with Frames.
Set a SecOC property on the broker.
Arguments:
- namespace: Target namespace
- property: The SecOC property to set.
List all frame infos in one or more namespaces.
Arguments:
- namespaces: One or more NamespaceName or NamespaceInfo objects.
Returns:
A list of FrameInfo objects.
Get a frame by name in a specific namespace.
Arguments:
- name: The name of the frame.
- namespace: The namespace name or NamespaceInfo object.
Returns:
The FrameInfo object if found, otherwise None.
List all signal infos in one or more namespaces.
Arguments:
- namespaces: One or more NamespaceName or NamespaceInfo objects.
Returns:
A list of SignalInfo objects.
Get a signal info by name in a specific namespace.
Arguments:
- name: The name of the signal.
- namespace: The namespace name or NamespaceInfo object.
Returns:
The SignalInfo object if found, otherwise None.
A concrete instance of a frame carrying signal values.
Attributes:
- name: Name of the frame.
- namespace: Namespace to which the frame belongs.
- timestamp: in micro seconds, set when first seen.
- signals: Dict with signal names with their current values.
- value: Raw or composite value associated with the frame (e.g., serialized representation).
Metadata about a frame in a namespace.
Attributes:
- name: Name of the frame.
- namespace: Namespace to which the frame belongs.
- signals: Dict of signal names and their corresponding SignalInfo.
- sender: List of entities that send this frame.
- receiver: List of entities that receive this frame.
- cycle_time_millis: Frame cycle time in milliseconds.
Used to subscribe to a frame and optionally its signals.
Attributes:
- name: Name of the frame to subscribe to.
- signals: List of signal names to subscribe to
- None: subscribe to all signals in the frame.
- []: subscribe to the frame without any signals.
A header with its frame name and namespace, used as an arbitration identifier in a communication protocol where only one participant can transmit at a time, for example LIN.
Attributes:
- frame_name: The name of the frame of which header to publish.
- namespace: The namespace the header belongs to.
Configuration for a frame in the Restbus.
Attributes:
- name: The name of the frame to configure.
- cycle_time: Optional cycle time override for the frame. If None, the default from the broker's database is used.
This class defines how a specific signal should behave when emitted by Restbus. A signal can have:
Attributes:
- name: The name of the signal
- loop: Values emitted in order after the initial sequence
- initial: Optional values emitted once before the loop starts
Create a SignalConfig with a constant value.
Arguments:
- name: Name of the signal
- value: Value to set in Restbus
Use CMAC0 for SecOC in the RemotiveBroker.
Set SecOC binary freshness value to be used by freshness manager. Property is limited to SecOC on a given name space.
Set binary 128-bit key to be used for SecOC in the RemotiveBroker.
Multiple keys can be set and are separated by key ID's.
Set a time delta to use in real time clock for SecOC freshness value. Time difference is in floating point seconds and is limited to a name space.
A signal with its name, namespace, and current value.
Attributes:
- name: The name of the signal.
- namespace: The namespace the signal belongs to.
- value: The current value of the signal.
Metadata describing a signal
Attributes:
- name: Name of the signal.
- namespace: Namespace the signal belongs to.
- receiver: List of receivers for the signal.
- sender: List of senders for the signal.
- named_values: Mapping from raw numeric values to symbolic names (e.g., enums).
- value_names: Reverse mapping from symbolic names to raw numeric values.
- min: Minimum allowed value.
- max: Maximum allowed value.
- factor: Multiplication faction used for encoding and decoding value in frame.
A signal intended to be published with a specific value.
Attributes:
- name: The name of the signal to write.
- value: The value to assign to the signal.