remotivelabs.broker

RemotiveLabs Broker API

This module handles communication with the RemotiveBroker.

The RemotiveBroker can run locally or in RemotiveCloud. Connecting to a RemotiveCloud instance requires additional authentication parameters—see the remotivelabs.broker.auth module for details.

Installation

pip install remotivelabs-broker

Usage

Connecting to a RemotiveBroker is straightforward using the BrokerClient class:

import asyncio

from remotivelabs.broker import BrokerClient


async def main() -> None:
    # Create a BrokerClient instance and list all namespaces available on that broker.
    async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
        ns = await broker_client.list_namespaces()
        print(f"Namespaces: {ns}")


if __name__ == "__main__":
    asyncio.run(main())

Listing frames and signals:

import asyncio

from remotivelabs.broker import BrokerClient, FrameInfo


async def main() -> None:
    async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
        # list all available frames in the "DriverCan" namespace
        frame_infos: list[FrameInfo] = await broker_client.list_frame_infos("DriverCan")

        # simply print all frames and their signals
        for frame_info in frame_infos:
            print(f"{frame_info.name}")
            for signal_info in frame_info.signals.values():
                print(f"  {signal_info.name}")


if __name__ == "__main__":
    asyncio.run(main())

Subscribing to frames or signals on a specific namespace:

import asyncio
from typing import AsyncIterator

from remotivelabs.broker import BrokerClient, Frame


async def main() -> None:
    async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
        # subscribe to a frame named "EngineData" (including all its signals) in the "DriverCan" namespace
        stream: AsyncIterator[Frame] = await broker_client.subscribe_frames(
            ("DriverCan", ["EngineData"]),
        )

        # will receive frames until stream is closed by broker or an error occurs
        async for frame in stream:
            print(f"Received frame: {frame}")


if __name__ == "__main__":
    asyncio.run(main())

Filters

It is possible to filter frames and signals using filter predicates. A filter predicate is any callable that takes a remotivelabs.broker.frame.FrameInfo or remotivelabs.broker.signal.SignalInfo object and returns a boolean indicating whether the object matches the filter criteria.

All filters have an exclude flag to indicate whether matches should be excluded (True) or included (False, default). Filters are callable and can be passed to Python built-in functions that expect a predicate, such as filter(), any(), or all(). The snippet below shows examples using filter():

Filter Strategy:

The filter_recursive() function applies frame and signal filters with context-dependent behavior. Frame filtering determines eligibility, then signal filtering applies differently based on frame inclusion: included frames start with all signals (exclude-only), excluded frames require explicit signal inclusion. Exclusion filters always take priority over inclusion filters.

Examples:

from remotivelabs.broker import FrameInfo, SignalInfo
from remotivelabs.broker.filters import (
    AllFramesFilter,
    FrameFilter,
    ReceiverFilter,
    SenderFilter,
    SignalFilter,
    filter_recursive,
    is_frame_filter,
    is_signal_filter,
)

# assume frames and signals are populated elsewhere
frames: list[FrameInfo] = []
signals: list[SignalInfo] = []

# Example 1: Include all frames
all_frames_filter = AllFramesFilter()
filtered_frames = list(filter(all_frames_filter, frames))

# Example 2: Include all frames but exclude a specific frame
frame_exclude_filter = FrameFilter(frame_name="Frame1", exclude=True)
filtered_frames = list(filter(frame_exclude_filter, frames))

# Example 3: Filter frames sent by a specific ECU
sender_filter = SenderFilter(ecu_name="ECU1")
filtered_frames = list(filter(sender_filter, frames))

# Example 4: Filter frames received by a specific ECU, excluding one signal
receiver_filter = ReceiverFilter(ecu_name="ECU2")
signal_exclude_filter = SignalFilter(signal_name="SignalA", exclude=True)
filtered_frames = list(filter(receiver_filter, frames))
filtered_signals = list(filter(signal_exclude_filter, signals))

# Example 5: Chaining filters
frame_include = FrameFilter(frame_name="FrameA")
frame_exclude = FrameFilter(frame_name="FrameB", exclude=True)
filtered_frames = list(filter(frame_exclude, filter(frame_include, frames)))

# Example 6: Combining inclusion and exclusion with AllFramesFilter and FrameFilter
all_frames_filter = AllFramesFilter()
frame_exclude_filter = FrameFilter(frame_name="FrameC", exclude=True)
filtered_frames = list(filter(frame_exclude_filter, filter(all_frames_filter, frames)))

# Example 7: Recursive filtering of signals (SignalInfos) in frames (FrameInfos)
filtered_frames = [
    filtered_frame
    for frame in frames
    if (filtered_frame := filter_recursive(frame, filters=[all_frames_filter, signal_exclude_filter])) is not None
]

# Example 10: Type checking filters
assert is_frame_filter(all_frames_filter)  # works with frames
assert not is_signal_filter(all_frames_filter)  # doesn't work with signals

assert not is_frame_filter(signal_exclude_filter)  # doesn't work with frames
assert is_signal_filter(signal_exclude_filter)  # works with signals

assert is_frame_filter(sender_filter)  # works with frames
assert is_signal_filter(sender_filter)  # works with signals

Filtered frames can be used when subscribing:

import asyncio
from typing import AsyncIterator

from remotivelabs.broker import BrokerClient, Frame, FrameInfo
from remotivelabs.broker.filters import ReceiverFilter


async def main() -> None:
    async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
        # list frames in the "DriverCan" namespace
        frame_infos: list[FrameInfo] = await broker_client.list_frame_infos("DriverCan")

        # filter according to some critera
        filtered_frame_infos = list(filter(ReceiverFilter(ecu_name="ECU2"), frame_infos))

        # subscribe using the filtered frame infos
        stream: AsyncIterator[Frame] = await broker_client.subscribe_frames(
            ("DriverCan", filtered_frame_infos),
        )

        # will receive frames until stream is closed by broker or an error occurs
        async for frame in stream:
            print(f"Received frame: {frame}")


if __name__ == "__main__":
    asyncio.run(main())

Filters can also be used directly when subscribing:

import asyncio
from typing import AsyncIterator

from remotivelabs.broker import BrokerClient, Frame
from remotivelabs.broker.filters import ReceiverFilter


async def main() -> None:
    async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
        # subscribe using the ReceiverFilter to only receive frames from ECU named "ECU2" in the "DriverCan" namespace
        stream: AsyncIterator[Frame] = await broker_client.subscribe_frames(
            ("DriverCan", [ReceiverFilter(ecu_name="ECU2")]),
        )

        # will receive frames until stream is closed by broker or an error occurs
        async for frame in stream:
            print(f"Received frame: {frame}")


if __name__ == "__main__":
    asyncio.run(main())

Restbus

The RemotiveBroker exposes a Restbus interface for simulating CAN bus traffic.

import asyncio

from remotivelabs.broker import BrokerClient
from remotivelabs.broker.restbus import RestbusFrameConfig, RestbusSignalConfig


async def main() -> None:
    async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
        # Start the restbus on the "DriverCan" namespace with one frame configuration
        await broker_client.restbus.add(
            ("DriverCan", [RestbusFrameConfig(name="EngineData")]),
            start=True,
        )

        # now update some of its signals
        signal_configs: list[RestbusSignalConfig] = [
            RestbusSignalConfig.set(name="EngineData.EngineRpm", value=1500),
            RestbusSignalConfig.set(name="EngineData.EngineTemp", value=90),
        ]
        await broker_client.restbus.update_signals(
            ("DriverCan", signal_configs),
        )


if __name__ == "__main__":
    asyncio.run(main())

Recording session

There is a RecordingSessionClient for managing recording session playback on the broker.

import asyncio

from remotivelabs.broker.recording_session import RecordingSessionClient


async def main() -> None:
    # Create a recording session client
    async with RecordingSessionClient(url="http://127.0.0.1:50051") as rs_client:
        # List available recording files on the broker
        files = await rs_client.list_recording_files()
        print(f"Available recording files: {files}")

        # open a recording session with the first file
        rs_ref = rs_client.get_session(str(files[0]))
        async with rs_ref as session:
            # start playback
            status = await session.play()
            print(f"Playback started with status: {status}")

        # exiting the async with block will close the session, but playback will continue if other sessions are open


if __name__ == "__main__":
    asyncio.run(main())

SecOC

RemotiveLabsBroker supports multiple SecOC properties. These properties can be set via the RemotiveBroker API to configure SecOC behavior for testing purposes.

import asyncio

from remotivelabs.broker import BrokerClient
from remotivelabs.broker.secoc import SecocFreshnessValue


async def main() -> None:
    async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
        prop = SecocFreshnessValue(fv=b"\x00\x00\x12\x34")
        await broker_client.set_secoc_property(namespace="MyNamespace", property=prop)


if __name__ == "__main__":
    asyncio.run(main())

Logging

This library uses Python's standard logging module. By default, the library does not configure any logging handlers, allowing applications to fully control their logging setup.

To enable logs from this library in your application or tests, configure logging as follows:

import logging

logging.basicConfig(level=logging.INFO)
logging.getLogger("remotivelabs.broker").setLevel(logging.DEBUG)

For more advanced configurations, refer to the Python logging documentation.

__version__ = '0.13.3'
class BrokerClient(remotivelabs.broker.connection.BrokerClientConnection):

Client for signal-related operations.

BrokerClient( url: str, client_id: str | None = None, auth: remotivelabs.broker.auth.AuthMethod = <remotivelabs.broker.auth.NoAuth object>)

Initializes a new BrokerClient instance.

Arguments:
  • url: The RemotiveBroker URL to connect to.
  • client_id: Optional client ID. If None, a random ID is generated.
  • auth: Authentication method to use. Defaults to NoAuth.
Note:

Start the instance using a context manager:

async with BrokerClient(...) as client:
    ...

Or use the connect/disconnect methods directly:

client = await BrokerClient(...).connect()
# ...
await client.disconnect()
async def broker_version(self) -> str:

Returns the broker version

async def read_signals( self, *values: tuple[str, list[str]]) -> list[Signal]:

Read signals from signal cache

Arguments:
  • *values: One or more tuples, each containing a namespace and a list of signal names of the signals to read.
Raises:
  • ValueError: If the list of values is empty.
async def publish( self, *values: tuple[str, list[WriteSignal]]) -> None:

Publish a list of signals to the broker.

Arguments:
  • *values: One or more tuples, each containing a namespace and a list of signals to publish.
Raises:
  • ValueError: If the list of values is empty.
async def publish_header(self, *headers: tuple[str, list[str]]) -> None:

Publish a list of headers to the broker.

Arguments:
  • *headers: One or more tuples, each containing a namespace and a list of frame names of which headers to publish.
Raises:
  • ValueError: If the list of values is empty.
async def subscribe( self, *signals: tuple[str, typing.Sequence[str | SignalInfo | remotivelabs.broker.filters.SignalFilterPredicate]], on_change: bool = False, initial_empty: bool = False) -> AsyncIterator[list[Signal]]:

Subscribe to individual signals by name, metadata, or filter predicate.

Each positional argument is a (namespace, items) tuple. Items can be freely mixed:

For frame-level subscriptions (receiving signals grouped by frame with a timestamp), use subscribe_frames() instead.

Arguments:
  • *signals: One or more (namespace, items) tuples describing which signals to subscribe to.
  • on_change: Whether to receive updates only on change.
  • initial_empty: If True, the first broker message (an empty sync marker) is consumed internally before the iterator is returned, ensuring the broker is ready before the first real value arrives. Useful when you need to guarantee the subscription is live before publishing something.
Returns:

An async iterator yielding batches of Signal objects.

Raises:
  • ValueError: If no signals are provided.
async def subscribe_header( self, *headers: tuple[str, typing.Sequence[str]], initial_empty: bool = False) -> AsyncIterator[Header]:

Subscribe to frame headers.

Headers carry only the frame identifier - no signal values. This is used in bus protocols where a master publishes a header and a slave responds with data, such as LIN. Subscribing to headers lets you observe which frames are being requested on the bus.

Arguments:
  • *headers: One or more (namespace, frame_names) tuples. Each frame_names list contains the names of the frames whose headers to subscribe to.
  • initial_empty: If True, the first broker message (an empty sync marker) is consumed internally before the iterator is returned, ensuring the broker is ready before the first real value arrives. Useful when you need to guarantee the subscription is live before publishing something.
Returns:

An async iterator yielding one Header per received frame header.

Raises:
  • ValueError: If no headers are provided.
async def subscribe_frames( self, *frames: tuple[str, typing.Sequence[str | FrameSubscription | FrameInfo | remotivelabs.broker.filters.FrameFilterPredicate]], on_change: bool | OnChangeMode = False, initial_empty: bool = False, decode_named_values: bool = False) -> AsyncIterator[Frame]:

Subscribe to frames and their signals.

Each positional argument is a (namespace, items) tuple. The items sequence accepts four types that can be freely mixed in the same list:

  • str - subscribe to the named frame and all its signals.
  • FrameSubscription - subscribe to a frame with explicit signal selection: signals=None subscribes to all signals; signals=[] subscribes to the frame identifier only.
  • FrameInfo - subscribe using the signal list embedded in the metadata object, as returned by list_frame_infos().
  • FrameFilterPredicate - subscribe to all frames that match the predicate; resolved at call time via list_frame_infos().
Arguments:
  • *frames: One or more (namespace, items) tuples describing what to subscribe to.
  • on_change: Delivery filtering. False (default) delivers every frame transmission. True or OnChangeMode.CHANGED delivers only frames with changed signals (yielded Frame contains only changed signals). OnChangeMode.MERGED additionally maintains a client-side cache so the yielded Frame always contains all signals seen so far.
  • initial_empty: If True, the first broker message (an empty sync marker) is consumed internally before the iterator is returned, ensuring the broker is ready before the first real value arrives. Useful when you need to guarantee the subscription is live before publishing something.
  • decode_named_values: If True, integer signal values that have a named-value mapping in the database (e.g. {0: "OFF", 1: "ON"}) are decoded to their string names in the yielded Frame.signals.
Returns:

An async iterator yielding Frame objects, one per received frame transmission.

async def set_secoc_property( self, namespace: str | NamespaceInfo, property: Union[SecocFreshnessValue, SecocTimeDiff, SecocKey, SecocCmac0]) -> None:

Set a SecOC property on the broker.

Arguments:
  • namespace: Target namespace
  • property: The SecOC property to set.
async def list_namespaces(self) -> list[NamespaceInfo]:

List all namespaces configured in the broker.

Returns:

A list of namespaces.

async def get_namespace(self, name: str) -> NamespaceInfo | None:

Get a namespace by name.

Returns:

The namespace if found, otherwise None.

async def list_frame_infos( self, *namespaces: str | NamespaceInfo) -> list[FrameInfo]:

List all frame infos in one or more namespaces.

Arguments:
  • namespaces: One or more NamespaceName or NamespaceInfo objects.
Returns:

A list of FrameInfo objects.

async def get_frame_info( self, name: str, namespace: str | NamespaceInfo) -> FrameInfo | None:

Get a frame by name in a specific namespace.

Arguments:
  • name: The name of the frame.
  • namespace: The namespace name or NamespaceInfo object.
Returns:

The FrameInfo object if found, otherwise None.

async def list_signal_infos( self, *namespaces: str | NamespaceInfo) -> list[SignalInfo]:

List all signal infos in one or more namespaces.

Arguments:
  • namespaces: One or more NamespaceName or NamespaceInfo objects.
Returns:

A list of SignalInfo objects.

async def get_signal_info( self, name: str, namespace: str | NamespaceInfo) -> SignalInfo | None:

Get a signal info by name in a specific namespace.

Arguments:
  • name: The name of the signal.
  • namespace: The namespace name or NamespaceInfo object.
Returns:

The SignalInfo object if found, otherwise None.

def is_ready(self) -> bool:

Check if the broker client is ready.

Returns:

True if the broker client is ready, otherwise False.

def is_closed(self) -> bool:

Check if the broker client is closed.

Returns:

True if the broker client is closed, otherwise False.

@dataclass(frozen=True)
class Frame:

A live snapshot of a frame carrying actual signal values and a timestamp.

Yielded by BrokerClient.subscribe_frames(). Shares the same name and namespace as FrameInfo, but signals contains decoded values.

Attributes:
  • timestamp: Time of first reception in microseconds.
  • name: Name of the frame.
  • namespace: Namespace to which the frame belongs.
  • signals: Dict mapping signal names to their current decoded values.
  • value: Raw frame value (e.g. the serialized byte payload).
Frame( timestamp: int, name: str, namespace: str, signals: dict[str, typing.Union[int, float, bytes, str, NoneType]], value: Union[int, float, bytes, str, NoneType])
timestamp: int
name: str
namespace: str
signals: dict[str, typing.Union[int, float, bytes, str, NoneType]]
value: Union[int, float, bytes, str, NoneType]
@dataclass
class FrameInfo:

Static metadata about a frame as defined in the signal database.

Returned by BrokerClient.list_frame_infos() and BrokerClient.get_frame_info(). Use it to introspect the database, build filter predicates, or construct a FrameSubscription. It does not carry live signal values - see Frame for that.

Attributes:
  • id: Frame identifier.
  • name: Name of the frame.
  • namespace: Namespace to which the frame belongs.
  • signals: Dict of signal names and their corresponding SignalInfo metadata.
  • sender: List of entities that send this frame.
  • receiver: List of entities that receive this frame.
  • cycle_time_millis: Frame cycle time in milliseconds.
  • description: Human-readable description of the frame.
  • payload_size: Frame payload size in bytes.
  • e2e: End-to-end protection configuration for the frame.
  • groups: List of byte groups within the frame, each optionally E2E-protected.
FrameInfo( id: int, name: str, namespace: str, signals: dict[str, SignalInfo], sender: list[str], receiver: list[str], cycle_time_millis: float, description: Optional[str] = None, payload_size: Optional[int] = None, e2e: Union[remotivelabs.broker.frame.E2eAutosar01, remotivelabs.broker.frame.E2eAutosar05, remotivelabs.broker.frame.E2eAutosar02, NoneType] = None, groups: Optional[list[remotivelabs.broker.frame.GroupInfo]] = None)
id: int
name: str
namespace: str
signals: dict[str, SignalInfo]
sender: list[str]
receiver: list[str]
cycle_time_millis: float
description: Optional[str] = None
payload_size: Optional[int] = None
e2e: Union[remotivelabs.broker.frame.E2eAutosar01, remotivelabs.broker.frame.E2eAutosar05, remotivelabs.broker.frame.E2eAutosar02, NoneType] = None
groups: Optional[list[remotivelabs.broker.frame.GroupInfo]] = None
FrameName = <class 'str'>
@dataclass(frozen=True)
class FrameSubscription:

Describes which frame and signals to subscribe to in BrokerClient.subscribe_frames().

Attributes:
  • name: Name of the frame to subscribe to.
  • signals: Controls which signals are included:

    • None (default) - subscribe to all signals in the frame.
    • [] - subscribe to the frame identifier only, with no signals (useful for receiving LIN headers without payload).
    • ["sig1", "sig2"] - subscribe to the listed signals only.
FrameSubscription(name: str, signals: list[str] | None = None)
name: str
signals: list[str] | None = None
@dataclass()
class NamespaceInfo:
NamespaceInfo(name: str, type: str = 'unknown')
name: str
type: str = 'unknown'
def is_virtual(self) -> bool:
NamespaceName = <class 'str'>
class OnChangeMode(enum.Enum):

Controls how signal changes are reported by a frame subscription.

  • OFF: Every frame transmission is delivered regardless of whether any signal value changed. This is the default and mirrors raw bus behavior.

  • CHANGED: The yielded Frame contains only the signals that changed in that delivery unchanged signals are absent from Frame.signals.

  • MERGED: Like CHANGED, the broker only sends frames with changed signals. In addition, this client maintains a per-frame signal cache. Before yielding, it merges the incoming changed signals into the cache and yields a Frame whose Frame.signals dict always contains all signals seen so far, not just the ones that changed in the latest delivery.

OFF = <OnChangeMode.OFF: 'off'>
CHANGED = <OnChangeMode.CHANGED: 'changed'>
MERGED = <OnChangeMode.MERGED: 'merged'>
@dataclass
class RestbusFrameConfig:

Configuration for a frame in the Restbus.

Attributes:
  • name: The name of the frame to configure.
  • cycle_time: Optional cycle time override for the frame in ms. If None, the default from the broker's database is used.
RestbusFrameConfig(name: str, cycle_time: float | None = None)
name: str
cycle_time: float | None = None
@dataclass
class RestbusSignalConfig:

Defines how a specific signal should behave when emitted by the Restbus.

Signal values are driven by a two-phase sequence:

  1. initial - values emitted once, in order, before the loop starts. May be empty.
  2. loop - values emitted in order, then repeated from the beginning indefinitely.

On each Restbus tick the next value in the sequence is used. Once initial is exhausted the Restbus cycles through loop forever. A single-element loop (e.g., loop=[0]) produces a constant value after the initial sequence.

Example - update-bit pattern (sends 1 once, then constant 0):

RestbusSignalConfig(name="UpdateBit", initial=[1], loop=[0])
# equivalent to:
RestbusSignalConfig.set_update_bit("UpdateBit")
Attributes:
  • name: The signal name as it appears in the signal database.
  • loop: Values emitted repeatedly after initial is exhausted. Must not be empty.
  • initial: Values emitted once before loop starts. Defaults to no initial values.
RestbusSignalConfig( name: str, loop: Sequence[Union[int, float, bytes, str, NoneType]], initial: Sequence[Union[int, float, bytes, str, NoneType]] = <factory>)
name: str
loop: Sequence[Union[int, float, bytes, str, NoneType]]
initial: Sequence[Union[int, float, bytes, str, NoneType]]
@classmethod
def set( cls, name: str, value: Union[int, float, bytes, str, NoneType]) -> RestbusSignalConfig:

Create a SignalConfig with a constant value.

Arguments:
  • name: Name of the signal
  • value: Value to set in Restbus
@classmethod
def set_update_bit( cls, name: str) -> RestbusSignalConfig:

Create a SignalConfig for an update-bit pattern (sends 1 once, then constant 0).

Arguments:
  • name: Name of the signal
@dataclass(frozen=True)
class SecocCmac0:

Use CMAC0 for SecOC in the RemotiveBroker.

SecocCmac0(enabled: bool)
enabled: bool
@dataclass(frozen=True)
class SecocFreshnessValue:

Set SecOC binary freshness value to be used by freshness manager. Property is limited to SecOC on a given name space.

SecocFreshnessValue(fv: bytes)
fv: bytes
@dataclass(frozen=True)
class SecocKey:

Set binary 128-bit key to be used for SecOC in the RemotiveBroker.

Multiple keys can be set and are separated by key ID's.

SecocKey(key_id: int, key: bytes)
key_id: int
key: bytes
SecocProperty = typing.Union[SecocFreshnessValue, SecocTimeDiff, SecocKey, SecocCmac0]
@dataclass(frozen=True)
class SecocTimeDiff:

Set a time delta to use in real time clock for SecOC freshness value. Time difference is in floating point seconds and is limited to a name space.

SecocTimeDiff(time_diff: float)
time_diff: float
@dataclass(frozen=True)
class MultiplexInfoFilter:

Marks a signal as a mux filter.

The signal is only active when the selector equals filter_value.

Attributes:
  • filter_value: The selector value that activates this signal.
MultiplexInfoFilter(filter_value: int)
filter_value: int
@dataclass(frozen=True)
class MultiplexInfoSelector:

Marks a signal as the mux selector.

The value of this signal determines which filter signals are active. A selector carries no filter value.

@dataclass(frozen=True)
class Signal:

A live signal value received from the broker.

Yielded by BrokerClient.subscribe() and BrokerClient.read_signals(). For signals grouped by frame with a timestamp, use BrokerClient.subscribe_frames() which yields remotivelabs.broker.frame.Frame objects instead.

Attributes:
  • name: The name of the signal.
  • namespace: The namespace the signal belongs to.
  • value: The current decoded value of the signal.
Signal( name: str, namespace: str, value: Union[int, float, bytes, str, NoneType])
name: str
namespace: str
value: Union[int, float, bytes, str, NoneType]
@dataclass(frozen=True)
class SignalInfo:

Static metadata describing a signal as defined in the signal database.

Returned by BrokerClient.list_signal_infos() and BrokerClient.get_signal_info(), and embedded in remotivelabs.broker.frame.FrameInfo.signals. Use it to introspect signal encoding, build filter predicates, or pass directly to BrokerClient.subscribe(). It does not carry a live value - see Signal for that.

Attributes:
  • name: Name of the signal.
  • namespace: Namespace the signal belongs to.
  • receiver: List of receivers for the signal.
  • sender: List of senders for the signal.
  • named_values: Mapping from raw numeric values to symbolic names (e.g., enums).
  • value_names: Reverse mapping from symbolic names to raw numeric values.
  • min: Minimum allowed value.
  • max: Maximum allowed value.
  • factor: Multiplication factor used for encoding and decoding the value in the frame.
  • offset: Offset used for encoding and decoding the value in the frame.
  • start_value: Initial value to use before publishing.
  • size: Size of the signal value when packed in the frame (bits).
  • start_bit: Index of the first bit where this signal's data begins inside the frame.
  • unit: Unit of the signal value.
  • description: Description of the signal, taken from the signal database.
  • multiplex: Multiplexing role of the signal, if any.
SignalInfo( name: str, namespace: str, receiver: list[str], sender: list[str], named_values: dict[int, str], value_names: dict[str, int], min: float, max: float, factor: float, offset: float, start_value: float, size: int, start_bit: int, unit: Optional[str] = None, description: Optional[str] = None, multiplex: Union[MultiplexInfoSelector, MultiplexInfoFilter, NoneType] = None)
name: str
namespace: str
receiver: list[str]
sender: list[str]
named_values: dict[int, str]
value_names: dict[str, int]
min: float
max: float
factor: float
offset: float
start_value: float
size: int
start_bit: int
unit: Optional[str] = None
description: Optional[str] = None
multiplex: Union[MultiplexInfoSelector, MultiplexInfoFilter, NoneType] = None
SignalName = <class 'str'>
SignalValue = typing.Union[int, float, bytes, str, NoneType]
@dataclass(frozen=True)
class WriteSignal:

A signal value to be published to the broker.

The write counterpart to Signal. Pass instances to BrokerClient.publish() to set signal values on the bus.

Attributes:
  • name: The name of the signal to write.
  • value: The value to assign to the signal.
WriteSignal(name: str, value: Union[int, float, bytes, str, NoneType])
name: str
value: Union[int, float, bytes, str, NoneType]