remotivelabs.topology.namespaces.can

Used to represent a CAN (Controller Area Network) namespace in a simulation or testing environment.

It inherits all behavior from GenericNamespace without modification, but serves as a semantic identifier for CAN-specific configurations or extensions.

async def open(self) -> typing_extensions.Self:

Opens the CAN namespace and validates that the namespace is of the correct type. This is an idempotent operation - calling it multiple times has no additional effect.

Returns:

The namespace

Raises:
  • ValueError: If the namespace is not of type 'can', 'canfd', or 'udp'.
class Restbus:

Restbus provides a way to publish frames according to their cycle time. Sending recurring frames is common with for example CAN or LIN. Notice that each Namespace has its own Restbus. It allows configuring frame publishing behavior, applying filters, and controlling the restbus lifecycle.

Also notice that new frames can be added or removed in order to emulate how different frames are sent depending on the state of the model.

Usage:

Should be used with through a Namespace implementation such as GenericNamespace, but can be used standalone.

async with Restbus(namespace, broker_client, restbus_configs) as restbus:
    await restbus.start()
Arguments:
  • namespace: The namespace to operate in.
  • broker_client: The broker client to communicate with.
  • restbus_configs: Optional list of configurations with filters for the restbus. Applied in order; if multiple filters match the same frame, the last matching configuration takes precedence.
Restbus( namespace: str, broker_client: remotivelabs.broker.BrokerClient, restbus_configs: list[RestbusConfig] | None = None)
async def open(self) -> typing_extensions.Self:

Open the restbus and initialize it with configured frames. This is an idempotent operation - calling it multiple times has no additional effect.

async def close(self) -> None:

Close the restbus for this namespace, this will remove all configuration and put the restbus in idle state. This is an idempotent operation - calling it multiple times has no additional effect.

async def add( self, *frame_configs: remotivelabs.broker.RestbusFrameConfig, start: bool = False):

Add frames by configuration to the restbus.

Arguments:
  • *frame_configs: One or more frame configurations
  • start: Whether to start the restbus publishing immediately. Note: this flag controls a global state for the entire restbus, not individual frames.
async def start(self):

Start restbus publishing for this namespace.

async def stop(self):

Stop restbus publishing for this namespace.

async def remove(self, *frames: str):

Remove frames from the restbus.

Arguments:
  • *frames: One or more frame names to remove
async def update_signals( self, *signal_configs: remotivelabs.broker.RestbusSignalConfig) -> None:

Update signal configurations for the restbus.

Arguments:
  • *signal_configs: Signal configurations to update.
async def reset_signals(self, *signals: str) -> None:

Reset specified signals in the restbus to their original values.

Arguments:
  • *signals: Signal names to reset.
async def reset(self) -> None:

Reset all restbus data for this namespace.

@asynccontextmanager
async def transaction( self) -> AsyncGenerator[remotivelabs.topology.namespaces.generic.TransactionContext, NoneType]:

Context manager for batching signal updates and resets.

Usage:
async with restbus.transaction() as tx:
    tx.updates.append(...)
    tx.resets.append(...)
@dataclass
class RestbusConfig:

Configuration for the Restbus

Attributes:
  • restbus_filters: List of filters used to select frames; matching frames will be configured and managed by the restbus.
  • delay_multiplier: Multiplier for applying artificial delays; default is 1 (no delay).
  • cycle_time_millis: Optional fixed cycle time in milliseconds for publishing. By default the cycle time from the signal database is used.
delay_multiplier: float = 1
cycle_time_millis: float | None = None