remotivelabs.broker
RemotiveLabs Broker API
This module handles communication with the RemotiveBroker.
The RemotiveBroker can run locally or in RemotiveCloud. Connecting to a RemotiveCloud instance requires additional
authentication parameters—see the remotivelabs.broker.auth module for details.
Installation
pip install remotivelabs-broker
Project Links
Usage
Connecting to a RemotiveBroker is straightforward using the BrokerClient class:
import asyncio
from remotivelabs.broker import BrokerClient
async def main() -> None:
# Create a BrokerClient instance and list all namespaces available on that broker.
async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
ns = await broker_client.list_namespaces()
print(f"Namespaces: {ns}")
if __name__ == "__main__":
asyncio.run(main())
Listing frames and signals:
import asyncio
from remotivelabs.broker import BrokerClient, FrameInfo
async def main() -> None:
async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
# list all available frames in the "DriverCan" namespace
frame_infos: list[FrameInfo] = await broker_client.list_frame_infos("DriverCan")
# simply print all frames and their signals
for frame_info in frame_infos:
print(f"{frame_info.name}")
for signal_info in frame_info.signals.values():
print(f" {signal_info.name}")
if __name__ == "__main__":
asyncio.run(main())
Subscribing to frames or signals on a specific namespace:
import asyncio
from typing import AsyncIterator
from remotivelabs.broker import BrokerClient, Frame
async def main() -> None:
async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
# subscribe to a frame named "EngineData" (including all its signals) in the "DriverCan" namespace
stream: AsyncIterator[Frame] = await broker_client.subscribe_frames(
("DriverCan", ["EngineData"]),
)
# will receive frames until stream is closed by broker or an error occurs
async for frame in stream:
print(f"Received frame: {frame}")
if __name__ == "__main__":
asyncio.run(main())
Filters
It is possible to filter frames and signals using filter predicates. A filter predicate is any callable that takes a
remotivelabs.broker.frame.FrameInfo or remotivelabs.broker.signal.SignalInfo object and returns a boolean
indicating whether the object matches the filter criteria.
All filters have an exclude flag to indicate whether matches should be excluded (True) or
included (False, default). Filters are callable and can be passed to Python built-in functions that
expect a predicate, such as filter(), any(), or all(). The snippet below shows examples
using filter():
Filter Strategy:
The
filter_recursive()function applies frame and signal filters with context-dependent behavior. Frame filtering determines eligibility, then signal filtering applies differently based on frame inclusion: included frames start with all signals (exclude-only), excluded frames require explicit signal inclusion. Exclusion filters always take priority over inclusion filters.
Examples:
from remotivelabs.broker import FrameInfo, SignalInfo
from remotivelabs.broker.filters import (
AllFramesFilter,
FrameFilter,
ReceiverFilter,
SenderFilter,
SignalFilter,
filter_recursive,
is_frame_filter,
is_signal_filter,
)
# assume frames and signals are populated elsewhere
frames: list[FrameInfo] = []
signals: list[SignalInfo] = []
# Example 1: Include all frames
all_frames_filter = AllFramesFilter()
filtered_frames = list(filter(all_frames_filter, frames))
# Example 2: Include all frames but exclude a specific frame
frame_exclude_filter = FrameFilter(frame_name="Frame1", exclude=True)
filtered_frames = list(filter(frame_exclude_filter, frames))
# Example 3: Filter frames sent by a specific ECU
sender_filter = SenderFilter(ecu_name="ECU1")
filtered_frames = list(filter(sender_filter, frames))
# Example 4: Filter frames received by a specific ECU, excluding one signal
receiver_filter = ReceiverFilter(ecu_name="ECU2")
signal_exclude_filter = SignalFilter(signal_name="SignalA", exclude=True)
filtered_frames = list(filter(receiver_filter, frames))
filtered_signals = list(filter(signal_exclude_filter, signals))
# Example 5: Chaining filters
frame_include = FrameFilter(frame_name="FrameA")
frame_exclude = FrameFilter(frame_name="FrameB", exclude=True)
filtered_frames = list(filter(frame_exclude, filter(frame_include, frames)))
# Example 6: Combining inclusion and exclusion with AllFramesFilter and FrameFilter
all_frames_filter = AllFramesFilter()
frame_exclude_filter = FrameFilter(frame_name="FrameC", exclude=True)
filtered_frames = list(filter(frame_exclude_filter, filter(all_frames_filter, frames)))
# Example 7: Recursive filtering of signals (SignalInfos) in frames (FrameInfos)
filtered_frames = [
filtered_frame
for frame in frames
if (filtered_frame := filter_recursive(frame, filters=[all_frames_filter, signal_exclude_filter])) is not None
]
# Example 10: Type checking filters
assert is_frame_filter(all_frames_filter) # works with frames
assert not is_signal_filter(all_frames_filter) # doesn't work with signals
assert not is_frame_filter(signal_exclude_filter) # doesn't work with frames
assert is_signal_filter(signal_exclude_filter) # works with signals
assert is_frame_filter(sender_filter) # works with frames
assert is_signal_filter(sender_filter) # works with signals
Filtered frames can be used when subscribing:
import asyncio
from typing import AsyncIterator
from remotivelabs.broker import BrokerClient, Frame, FrameInfo
from remotivelabs.broker.filters import ReceiverFilter
async def main() -> None:
async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
# list frames in the "DriverCan" namespace
frame_infos: list[FrameInfo] = await broker_client.list_frame_infos("DriverCan")
# filter according to some critera
filtered_frame_infos = list(filter(ReceiverFilter(ecu_name="ECU2"), frame_infos))
# subscribe using the filtered frame infos
stream: AsyncIterator[Frame] = await broker_client.subscribe_frames(
("DriverCan", filtered_frame_infos),
)
# will receive frames until stream is closed by broker or an error occurs
async for frame in stream:
print(f"Received frame: {frame}")
if __name__ == "__main__":
asyncio.run(main())
Filters can also be used directly when subscribing:
import asyncio
from typing import AsyncIterator
from remotivelabs.broker import BrokerClient, Frame
from remotivelabs.broker.filters import ReceiverFilter
async def main() -> None:
async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
# subscribe using the ReceiverFilter to only receive frames from ECU named "ECU2" in the "DriverCan" namespace
stream: AsyncIterator[Frame] = await broker_client.subscribe_frames(
("DriverCan", [ReceiverFilter(ecu_name="ECU2")]),
)
# will receive frames until stream is closed by broker or an error occurs
async for frame in stream:
print(f"Received frame: {frame}")
if __name__ == "__main__":
asyncio.run(main())
Restbus
The RemotiveBroker exposes a Restbus interface for simulating CAN bus traffic.
import asyncio
from remotivelabs.broker import BrokerClient
from remotivelabs.broker.restbus import RestbusFrameConfig, RestbusSignalConfig
async def main() -> None:
async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
# Start the restbus on the "DriverCan" namespace with one frame configuration
await broker_client.restbus.add(
("DriverCan", [RestbusFrameConfig(name="EngineData")]),
start=True,
)
# now update some of its signals
signal_configs: list[RestbusSignalConfig] = [
RestbusSignalConfig.set(name="EngineData.EngineRpm", value=1500),
RestbusSignalConfig.set(name="EngineData.EngineTemp", value=90),
]
await broker_client.restbus.update_signals(
("DriverCan", signal_configs),
)
if __name__ == "__main__":
asyncio.run(main())
Recording session
There is a RecordingSessionClient for managing recording session playback on the broker.
import asyncio
from remotivelabs.broker.recording_session import RecordingSessionClient
async def main() -> None:
# Create a recording session client
async with RecordingSessionClient(url="http://127.0.0.1:50051") as rs_client:
# List available recording files on the broker
files = await rs_client.list_recording_files()
print(f"Available recording files: {files}")
# open a recording session with the first file
rs_ref = rs_client.get_session(str(files[0]))
async with rs_ref as session:
# start playback
status = await session.play()
print(f"Playback started with status: {status}")
# exiting the async with block will close the session, but playback will continue if other sessions are open
if __name__ == "__main__":
asyncio.run(main())
SecOC
RemotiveLabsBroker supports multiple SecOC properties. These properties can be set via the RemotiveBroker API to configure SecOC behavior for testing purposes.
import asyncio
from remotivelabs.broker import BrokerClient
from remotivelabs.broker.secoc import SecocFreshnessValue
async def main() -> None:
async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
prop = SecocFreshnessValue(fv=b"\x00\x00\x12\x34")
await broker_client.set_secoc_property(namespace="MyNamespace", property=prop)
if __name__ == "__main__":
asyncio.run(main())
Logging
This library uses Python's standard logging module. By default, the library does not configure any logging handlers,
allowing applications to fully control their logging setup.
To enable logs from this library in your application or tests, configure logging as follows:
import logging
logging.basicConfig(level=logging.INFO)
logging.getLogger("remotivelabs.broker").setLevel(logging.DEBUG)
For more advanced configurations, refer to the Python logging documentation.
Client for signal-related operations.
Initializes a new BrokerClient instance.
Arguments:
- url: The RemotiveBroker URL to connect to.
- client_id: Optional client ID. If None, a random ID is generated.
- auth: Authentication method to use. Defaults to NoAuth.
Note:
Start the instance using a context manager:
async with BrokerClient(...) as client: ...Or use the connect/disconnect methods directly:
client = await BrokerClient(...).connect() # ... await client.disconnect()
Read signals from signal cache
Arguments:
- *values: One or more tuples, each containing a namespace and a list of signal names of the signals to read.
Raises:
- ValueError: If the list of values is empty.
Publish a list of signals to the broker.
Arguments:
- *values: One or more tuples, each containing a namespace and a list of signals to publish.
Raises:
- ValueError: If the list of values is empty.
Publish a list of headers to the broker.
Arguments:
- *headers: One or more tuples, each containing a namespace and a list of frame names of which headers to publish.
Raises:
- ValueError: If the list of values is empty.
Subscribe to individual signals by name, metadata, or filter predicate.
Each positional argument is a (namespace, items) tuple. Items can be freely mixed:
str- subscribe to the named signal.SignalInfo- subscribe using a metadata object returned bylist_signal_infos().remotivelabs.broker.filters.SignalFilterPredicate- subscribe to all signals matching the predicate; resolved at call time vialist_signal_infos().
For frame-level subscriptions (receiving signals grouped by frame with a timestamp),
use subscribe_frames() instead.
Arguments:
- *signals: One or more
(namespace, items)tuples describing which signals to subscribe to. - on_change: Whether to receive updates only on change.
- initial_empty: If
True, the first broker message (an empty sync marker) is consumed internally before the iterator is returned, ensuring the broker is ready before the first real value arrives. Useful when you need to guarantee the subscription is live before publishing something.
Returns:
An async iterator yielding batches of
Signalobjects.
Raises:
- ValueError: If no signals are provided.
Subscribe to frame headers.
Headers carry only the frame identifier - no signal values. This is used in bus protocols where a master publishes a header and a slave responds with data, such as LIN. Subscribing to headers lets you observe which frames are being requested on the bus.
Arguments:
- *headers: One or more
(namespace, frame_names)tuples. Eachframe_nameslist contains the names of the frames whose headers to subscribe to. - initial_empty: If
True, the first broker message (an empty sync marker) is consumed internally before the iterator is returned, ensuring the broker is ready before the first real value arrives. Useful when you need to guarantee the subscription is live before publishing something.
Returns:
An async iterator yielding one
Headerper received frame header.
Raises:
- ValueError: If no headers are provided.
Subscribe to frames and their signals.
Each positional argument is a (namespace, items) tuple. The items sequence
accepts four types that can be freely mixed in the same list:
str- subscribe to the named frame and all its signals.FrameSubscription- subscribe to a frame with explicit signal selection:signals=Nonesubscribes to all signals;signals=[]subscribes to the frame identifier only.FrameInfo- subscribe using the signal list embedded in the metadata object, as returned bylist_frame_infos().FrameFilterPredicate- subscribe to all frames that match the predicate; resolved at call time vialist_frame_infos().
Arguments:
- *frames: One or more
(namespace, items)tuples describing what to subscribe to. - on_change: Delivery filtering.
False(default) delivers every frame transmission.TrueorOnChangeMode.CHANGEDdelivers only frames with changed signals (yielded Frame contains only changed signals).OnChangeMode.MERGEDadditionally maintains a client-side cache so the yielded Frame always contains all signals seen so far. - initial_empty: If
True, the first broker message (an empty sync marker) is consumed internally before the iterator is returned, ensuring the broker is ready before the first real value arrives. Useful when you need to guarantee the subscription is live before publishing something. - decode_named_values: If
True, integer signal values that have a named-value mapping in the database (e.g.{0: "OFF", 1: "ON"}) are decoded to their string names in the yieldedFrame.signals.
Returns:
An async iterator yielding
Frameobjects, one per received frame transmission.
Set a SecOC property on the broker.
Arguments:
- namespace: Target namespace
- property: The SecOC property to set.
List all frame infos in one or more namespaces.
Arguments:
- namespaces: One or more NamespaceName or NamespaceInfo objects.
Returns:
A list of FrameInfo objects.
Get a frame by name in a specific namespace.
Arguments:
- name: The name of the frame.
- namespace: The namespace name or NamespaceInfo object.
Returns:
The FrameInfo object if found, otherwise None.
List all signal infos in one or more namespaces.
Arguments:
- namespaces: One or more NamespaceName or NamespaceInfo objects.
Returns:
A list of SignalInfo objects.
Get a signal info by name in a specific namespace.
Arguments:
- name: The name of the signal.
- namespace: The namespace name or NamespaceInfo object.
Returns:
The SignalInfo object if found, otherwise None.
A live snapshot of a frame carrying actual signal values and a timestamp.
Yielded by BrokerClient.subscribe_frames(). Shares the same name and
namespace as FrameInfo, but signals contains decoded values.
Attributes:
- timestamp: Time of first reception in microseconds.
- name: Name of the frame.
- namespace: Namespace to which the frame belongs.
- signals: Dict mapping signal names to their current decoded values.
- value: Raw frame value (e.g. the serialized byte payload).
Static metadata about a frame as defined in the signal database.
Returned by BrokerClient.list_frame_infos() and BrokerClient.get_frame_info().
Use it to introspect the database, build filter predicates, or construct a
FrameSubscription. It does not carry live signal values - see Frame for that.
Attributes:
- id: Frame identifier.
- name: Name of the frame.
- namespace: Namespace to which the frame belongs.
- signals: Dict of signal names and their corresponding
SignalInfometadata. - sender: List of entities that send this frame.
- receiver: List of entities that receive this frame.
- cycle_time_millis: Frame cycle time in milliseconds.
- description: Human-readable description of the frame.
- payload_size: Frame payload size in bytes.
- e2e: End-to-end protection configuration for the frame.
- groups: List of byte groups within the frame, each optionally E2E-protected.
Describes which frame and signals to subscribe to in BrokerClient.subscribe_frames().
Attributes:
- name: Name of the frame to subscribe to.
signals: Controls which signals are included:
None(default) - subscribe to all signals in the frame.[]- subscribe to the frame identifier only, with no signals (useful for receiving LIN headers without payload).["sig1", "sig2"]- subscribe to the listed signals only.
A frame header (arbitration identifier) without payload signals.
Used in protocols where a master publishes an identifier and a slave responds
with data, such as LIN. Yielded by BrokerClient.subscribe_header() and
passed to BrokerClient.publish_header().
Attributes:
- frame_name: The name of the frame whose header to publish or that was received.
- namespace: The namespace the header belongs to.
Controls how signal changes are reported by a frame subscription.
OFF: Every frame transmission is delivered regardless of whether any signal value changed. This is the default and mirrors raw bus behavior.
CHANGED: The yielded
Framecontains only the signals that changed in that delivery unchanged signals are absent fromFrame.signals.MERGED: Like
CHANGED, the broker only sends frames with changed signals. In addition, this client maintains a per-frame signal cache. Before yielding, it merges the incoming changed signals into the cache and yields aFramewhoseFrame.signalsdict always contains all signals seen so far, not just the ones that changed in the latest delivery.
Configuration for a frame in the Restbus.
Attributes:
- name: The name of the frame to configure.
- cycle_time: Optional cycle time override for the frame in ms. If None, the default from the broker's database is used.
Defines how a specific signal should behave when emitted by the Restbus.
Signal values are driven by a two-phase sequence:
- initial - values emitted once, in order, before the loop starts. May be empty.
- loop - values emitted in order, then repeated from the beginning indefinitely.
On each Restbus tick the next value in the sequence is used. Once initial is
exhausted the Restbus cycles through loop forever. A single-element loop
(e.g., loop=[0]) produces a constant value after the initial sequence.
Example - update-bit pattern (sends 1 once, then constant 0):
RestbusSignalConfig(name="UpdateBit", initial=[1], loop=[0])
# equivalent to:
RestbusSignalConfig.set_update_bit("UpdateBit")
Attributes:
Create a SignalConfig with a constant value.
Arguments:
- name: Name of the signal
- value: Value to set in Restbus
Use CMAC0 for SecOC in the RemotiveBroker.
Set SecOC binary freshness value to be used by freshness manager. Property is limited to SecOC on a given name space.
Set binary 128-bit key to be used for SecOC in the RemotiveBroker.
Multiple keys can be set and are separated by key ID's.
Set a time delta to use in real time clock for SecOC freshness value. Time difference is in floating point seconds and is limited to a name space.
Marks a signal as a mux filter.
The signal is only active when the selector equals filter_value.
Attributes:
- filter_value: The selector value that activates this signal.
Marks a signal as the mux selector.
The value of this signal determines which filter signals are active. A selector carries no filter value.
A live signal value received from the broker.
Yielded by BrokerClient.subscribe() and BrokerClient.read_signals().
For signals grouped by frame with a timestamp, use BrokerClient.subscribe_frames()
which yields remotivelabs.broker.frame.Frame objects instead.
Attributes:
- name: The name of the signal.
- namespace: The namespace the signal belongs to.
- value: The current decoded value of the signal.
Static metadata describing a signal as defined in the signal database.
Returned by BrokerClient.list_signal_infos() and BrokerClient.get_signal_info(),
and embedded in remotivelabs.broker.frame.FrameInfo.signals. Use it to introspect
signal encoding, build filter predicates, or pass directly to BrokerClient.subscribe().
It does not carry a live value - see Signal for that.
Attributes:
- name: Name of the signal.
- namespace: Namespace the signal belongs to.
- receiver: List of receivers for the signal.
- sender: List of senders for the signal.
- named_values: Mapping from raw numeric values to symbolic names (e.g., enums).
- value_names: Reverse mapping from symbolic names to raw numeric values.
- min: Minimum allowed value.
- max: Maximum allowed value.
- factor: Multiplication factor used for encoding and decoding the value in the frame.
- offset: Offset used for encoding and decoding the value in the frame.
- start_value: Initial value to use before publishing.
- size: Size of the signal value when packed in the frame (bits).
- start_bit: Index of the first bit where this signal's data begins inside the frame.
- unit: Unit of the signal value.
- description: Description of the signal, taken from the signal database.
- multiplex: Multiplexing role of the signal, if any.
A signal value to be published to the broker.
The write counterpart to Signal. Pass instances to BrokerClient.publish()
to set signal values on the bus.
Attributes:
- name: The name of the signal to write.
- value: The value to assign to the signal.