remotivelabs.topology.namespaces.generic

GenericNamespace provides access to frames and signals in a specific namespace using a broker client.

This class also manages restbus configurations which enable frame publishing based on filters. To load restbus configuration and activate the restbus, use this class as an async context manager.

GenericNamespace( name: str, broker_client: remotivelabs.broker.BrokerClient, restbus_configs: list[RestbusConfig] | None = None, decode_named_values: bool = False)

Initialize the generic namespace client.

Arguments:
  • name: The namespace name to operate in.
  • broker_client: The client used to communicate with the broker.
  • restbus_configs: Optional list of configurations with filters for the restbus. Applied in order; if multiple filters match the same frame, the last matching configuration takes precedence.
  • decode_named_values: True will decode named values to str.
Note:

Use together with a BehavioralModel or start the instance using a context manager:

async with GenericNamespace(...) as broker_client:
    ...
restbus: Restbus

Property to interact with the restbus. The restbus manages periodic publishing of frames and signals.

async def open(self) -> typing_extensions.Self:

Activate the restbus and load frame configurations using filters. This is an idempotent operation - calling it multiple times has no additional effect.

Returns:

The namespace

async def close(self) -> None:

Will close the restbus, all frames are removed from the restbus in the namespace. This is an idempotent operation - calling it multiple times has no additional effect.

Set a SecOC property in the namespace.

async def list_frame_infos(self) -> list[remotivelabs.broker.FrameInfo]:

Return all available frame infos in the namespace.

async def get_frame_info(self, name: str) -> remotivelabs.broker.FrameInfo | None:

Return information about a specific frame by name.

async def list_signal_infos(self) -> list[remotivelabs.broker.SignalInfo]:

Return all available signal infos in the namespace.

async def get_signal_info(self, name: str) -> remotivelabs.broker.SignalInfo | None:

Return information about a specific signal by name.

async def publish(self, *signals: remotivelabs.broker.WriteSignal) -> None:

Publish one or more signals to the broker.

Arguments:
  • *signals: One or more WriteSignal instances to publish.
async def subscribe( self, *name: str, on_change: bool = False) -> AsyncIterator[list[remotivelabs.broker.Signal]]:

Subscribe to a list of signals.

Arguments:
  • *signals: One or more signals to subscribe to.
  • on_change: Whether to receive updates only on change.

Note: does not support decoding enums

Returns:

An asynchronous iterator of lists of Signal objects.

async def subscribe_frames( self, *frames: remotivelabs.broker.FrameSubscription, on_change: bool = False, initial_empty: bool = False, decode_named_values: bool = False) -> AsyncIterator[remotivelabs.broker.Frame]:

Subscribe to a Frames.

Arguments:
  • *frames: One or more frames to subscribe to.
  • on_change: Whether to receive updates only on change.
  • initial_empty: True will wait until the broker has sent an initial message
  • decode_named_values: True will decode named values to str.
Returns:

An asynchronous iterator with Frames.

Creates a list of input handlers for the given namespace to be used with a BehavioralModel.

Each handler defines a filter and a corresponding async callback for processing matching frames.

Arguments:
  • - filters: A sequence of filter objects to select relevant frames or signals.
  • - callback: An async callback function that receives and processes a Frame.
@dataclass
class RestbusConfig:

Configuration for the Restbus

Attributes:
  • restbus_filters: List of filters used to select frames; matching frames will be configured and managed by the restbus.
  • delay_multiplier: Multiplier for applying artificial delays; default is 1 (no delay).
  • cycle_time_millis: Optional fixed cycle time in milliseconds for publishing. By default the cycle time from the signal database is used.
delay_multiplier: float = 1
cycle_time_millis: float | None = None
class Restbus:

Restbus provides a way to publish frames according to their cycle time. Sending recurring frames is common with for example CAN or LIN. Notice that each Namespace has its own Restbus. It allows configuring frame publishing behavior, applying filters, and controlling the restbus lifecycle.

Also notice that new frames can be added or removed in order to emulate how different frames are sent depending on the state of the model.

Usage:

Should be used with through a Namespace implementation such as GenericNamespace, but can be used standalone.

async with Restbus(namespace, broker_client, restbus_configs) as restbus:
    await restbus.start()
Arguments:
  • namespace: The namespace to operate in.
  • broker_client: The broker client to communicate with.
  • restbus_configs: Optional list of configurations with filters for the restbus. Applied in order; if multiple filters match the same frame, the last matching configuration takes precedence.
Restbus( namespace: str, broker_client: remotivelabs.broker.BrokerClient, restbus_configs: list[RestbusConfig] | None = None)
async def open(self) -> typing_extensions.Self:

Open the restbus and initialize it with configured frames. This is an idempotent operation - calling it multiple times has no additional effect.

async def close(self) -> None:

Close the restbus for this namespace, this will remove all configuration and put the restbus in idle state. This is an idempotent operation - calling it multiple times has no additional effect.

async def add( self, *frame_configs: remotivelabs.broker.RestbusFrameConfig, start: bool = False):

Add frames by configuration to the restbus.

Arguments:
  • *frame_configs: One or more frame configurations
  • start: Whether to start the restbus publishing immediately. Note: this flag controls a global state for the entire restbus, not individual frames.
async def start(self):

Start restbus publishing for this namespace.

async def stop(self):

Stop restbus publishing for this namespace.

async def remove(self, *frames: str):

Remove frames from the restbus.

Arguments:
  • *frames: One or more frame names to remove
async def update_signals( self, *signal_configs: remotivelabs.broker.RestbusSignalConfig) -> None:

Update signal configurations for the restbus.

Arguments:
  • *signal_configs: Signal configurations to update.
async def reset_signals(self, *signals: str) -> None:

Reset specified signals in the restbus to their original values.

Arguments:
  • *signals: Signal names to reset.
async def reset(self) -> None:

Reset all restbus data for this namespace.

@asynccontextmanager
async def transaction( self) -> AsyncGenerator[TransactionContext, NoneType]:

Context manager for batching signal updates and resets.

Usage:
async with restbus.transaction() as tx:
    tx.updates.append(...)
    tx.resets.append(...)
@dataclass
class TransactionContext:
TransactionContext( updates: list[remotivelabs.broker.RestbusSignalConfig] = <factory>, resets: list[str] = <factory>)
resets: list[str]
def update( self, *signal_config: remotivelabs.broker.RestbusSignalConfig) -> None:
def set_update_bit(self, *signals: str) -> None:
def reset_signal(self, *signals: str) -> None: