remotivelabs.broker.restbus

The RemotiveBroker exposes a Restbus interface for simulating CAN bus traffic.

import asyncio

from remotivelabs.broker import BrokerClient
from remotivelabs.broker.restbus import RestbusFrameConfig, RestbusSignalConfig


async def main() -> None:
    async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
        # Start the restbus on the "DriverCan" namespace with one frame configuration
        await broker_client.restbus.add(
            ("DriverCan", [RestbusFrameConfig(name="EngineData")]),
            start=True,
        )

        # now update some of its signals
        signal_configs: list[RestbusSignalConfig] = [
            RestbusSignalConfig.set(name="EngineData.EngineRpm", value=1500),
            RestbusSignalConfig.set(name="EngineData.EngineTemp", value=90),
        ]
        await broker_client.restbus.update_signals(
            ("DriverCan", signal_configs),
        )


if __name__ == "__main__":
    asyncio.run(main())
class Restbus:

Restbus client for managing Restbus operations on the RemoteLabs Broker.

Restbus( channel: grpc.aio._base_channel.Channel, update_call: Optional[grpc.aio._base_call.StreamStreamCall[restbus_api_pb2.UpdateRequest, common_pb2.StreamError]], client_name: str)
async def close(self, *namespace: str) -> None:

Removes all configured signals and stops the Restbus on the specified namespace.

Arguments:
  • *namespace: One or more namespaces to close
async def add( self, *frames: tuple[str, list[RestbusFrameConfig]], start: bool = False):

Adds one or more frames to the Restbus with optional start flag.

Arguments:
  • *frames: One or more tuples, each containing namespace and list of frame configurations which should be added to the restbus
  • start: If True, starts the frames after adding. Defaults to False.

Note: The start flag affects all frames running on the namespace, even if they are added since before.

async def start(self, *namespace: str):

Starts Restbus signal publishing for the specified namespaces.

Arguments:
  • *namespace: One or more namespaces to start
async def stop(self, *namespace: str):

Stops Restbus signal publishing for the specified namespaces.

Arguments:
  • *namespace: One or more namespaces to stop. A stopped restbus can be started again.
async def remove(self, *frames: tuple[str, list[str]]):

Removes specific frames from the Restbus.

Arguments:
  • *frames: One or more tuples, each containing namespace and list of frame names to remove from the restbus
async def update_signals( self, *signals: tuple[str, list[RestbusSignalConfig]]) -> None:

Updates the configured signals on the Restbus with new values.

Arguments:
  • *signals: One or more tuples, each containing namespace and list of signal configurations to apply to the restbus
async def reset_signals(self, *signals: tuple[str, list[str]]) -> None:

Resets specified signals to their default configured values.

Arguments:
  • *signals: One or more tuples, each containing namespace and list of signal names to reset
async def reset_namespaces(self, *namespace: str) -> None:

Resets all configured namespaces and their associated signals to default values.

Arguments:
  • *namespace: One or more namespaces to reset
@dataclass
class RestbusSignalConfig:

Defines how a specific signal should behave when emitted by the Restbus.

Signal values are driven by a two-phase sequence:

  1. initial - values emitted once, in order, before the loop starts. May be empty.
  2. loop - values emitted in order, then repeated from the beginning indefinitely.

On each Restbus tick the next value in the sequence is used. Once initial is exhausted the Restbus cycles through loop forever. A single-element loop (e.g., loop=[0]) produces a constant value after the initial sequence.

Example - update-bit pattern (sends 1 once, then constant 0):

RestbusSignalConfig(name="UpdateBit", initial=[1], loop=[0])
# equivalent to:
RestbusSignalConfig.set_update_bit("UpdateBit")
Attributes:
  • name: The signal name as it appears in the signal database.
  • loop: Values emitted repeatedly after initial is exhausted. Must not be empty.
  • initial: Values emitted once before loop starts. Defaults to no initial values.
RestbusSignalConfig( name: str, loop: Sequence[Union[int, float, bytes, str, NoneType]], initial: Sequence[Union[int, float, bytes, str, NoneType]] = <factory>)
name: str
loop: Sequence[Union[int, float, bytes, str, NoneType]]
initial: Sequence[Union[int, float, bytes, str, NoneType]]
@classmethod
def set( cls, name: str, value: Union[int, float, bytes, str, NoneType]) -> RestbusSignalConfig:

Create a SignalConfig with a constant value.

Arguments:
  • name: Name of the signal
  • value: Value to set in Restbus
@classmethod
def set_update_bit( cls, name: str) -> RestbusSignalConfig:

Create a SignalConfig for an update-bit pattern (sends 1 once, then constant 0).

Arguments:
  • name: Name of the signal
@dataclass
class RestbusFrameConfig:

Configuration for a frame in the Restbus.

Attributes:
  • name: The name of the frame to configure.
  • cycle_time: Optional cycle time override for the frame in ms. If None, the default from the broker's database is used.
RestbusFrameConfig(name: str, cycle_time: float | None = None)
name: str
cycle_time: float | None = None