remotivelabs.topology.behavioral_model

Examples

remotivelabs.topology.behavioral_model.BehavioralModel instances in RemotiveTopology run on top of RemotiveBroker, allowing them to use real network protocols such as CAN buses or SOME/IP networks.

The example below shows the simplest possible behavioral model - it connects to a broker and handles built-in control messages (ping, reboot), but performs no other work:

import asyncio

from remotivelabs.broker import BrokerClient

from remotivelabs.topology.behavioral_model import BehavioralModel


async def main():
    async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
        async with BehavioralModel(
            "BodyCanModule",
            broker_client=broker_client,
        ) as bm:
            await bm.run_forever()


if __name__ == "__main__":
    asyncio.run(main())

Adding namespaces

To interact with a network, provide one or more namespaces (see remotivelabs.topology.namespaces). The example below adds a remotivelabs.topology.namespaces.can.CanNamespace configured with a remotivelabs.topology.namespaces.generic.RestbusConfig that periodically sends all CAN frames the ECU is listed as sender of:

import asyncio

from remotivelabs.broker import BrokerClient

from remotivelabs.topology.behavioral_model import BehavioralModel
from remotivelabs.topology.namespaces import filters
from remotivelabs.topology.namespaces.can import CanNamespace, RestbusConfig


async def main():
    async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
        driver_can_0 = CanNamespace(
            "HazardLightControlUnit-DriverCan0",
            broker_client,
            restbus_configs=[RestbusConfig([filters.SenderFilter(ecu_name="HazardLightControlUnit")])],
        )
        async with BehavioralModel(
            "HazardLightControlUnit",
            namespaces=[driver_can_0],
            broker_client=broker_client,
        ) as bm:
            await bm.run_forever()


if __name__ == "__main__":
    asyncio.run(main())

Reacting to incoming frames

The examples so far only send frames. To react to incoming frames, attach an input handler. A namespace builds one with its create_input_handler() method: given a set of filters (see remotivelabs.topology.namespaces.filters), the handler subscribes to every matching frame and invokes the provided async callback each time one arrives. Passing the handler to BehavioralModel(input_handlers=[...]) wires it into the lifecycle - it starts with the model and stops with it.

By default, BehavioralModel delivers frames to input handlers only when at least one of their signals has changed (on_change=True). The delivered frame still contains all signals, unchanged ones included. Pass on_change=False if a handler needs to fire on every transmission.

The example below reacts to a single CAN frame:

import asyncio

from remotivelabs.broker import BrokerClient, Frame
from remotivelabs.broker.filters import FrameFilter

from remotivelabs.topology.behavioral_model import BehavioralModel
from remotivelabs.topology.namespaces.can import CanNamespace


async def _on_hazard_button_pressed(frame: Frame) -> None:
    print(f"Hazard light frame received: {frame}")


async def main():
    async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
        driver_can_0 = CanNamespace("BodyCanModule-DriverCan0", broker_client)

        async with BehavioralModel(
            "BodyCanModule",
            namespaces=[driver_can_0],
            broker_client=broker_client,
            input_handlers=[
                driver_can_0.create_input_handler(
                    [FrameFilter("HazardLightButton")],
                    _on_hazard_button_pressed,
                )
            ],
        ) as bm:
            await bm.run_forever()


if __name__ == "__main__":
    asyncio.run(main())

Excluding noisy signals

Some frames carry signals that change on every transmission but aren't particularly interesting to user code - E2E counters and CRCs are a common example. With on-change delivery, those signals can be enough on their own to wake an input handler on every cycle, even when the rest of the payload is unchanged.

One way to avoid that is to attach a filter to BehavioralModel(input_filters=[...]), which is applied to every input handler on the model. For E2E, passing remotivelabs.broker.filters.E2eSignalsFilter with exclude=True drops the CRC and counter entirely - they no longer take part in change detection, and they don't appear on the frames delivered to your callbacks either.

Per-handler filters remain available on create_input_handler(...) when a single handler needs different treatment.

import asyncio

from remotivelabs.broker import BrokerClient, Frame
from remotivelabs.broker.filters import FrameFilter

from remotivelabs.topology.behavioral_model import BehavioralModel
from remotivelabs.topology.namespaces.can import CanNamespace
from remotivelabs.topology.namespaces.filters import E2eSignalsFilter


async def _on_brake_pedal(frame: Frame) -> None:
    # Callback fires only when payload signals change, not when the E2E
    # counter ticks or the CRC is recomputed on an otherwise identical frame.
    print(f"Brake pedal frame received: {frame}")


async def main():
    async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
        driver_can_0 = CanNamespace("BodyCanModule-DriverCan0", broker_client)

        async with BehavioralModel(
            "BodyCanModule",
            namespaces=[driver_can_0],
            broker_client=broker_client,
            input_filters=[E2eSignalsFilter(exclude=True)],
            input_handlers=[
                driver_can_0.create_input_handler(
                    [FrameFilter("BrakePedalPositionSensor")],
                    _on_brake_pedal,
                )
            ],
        ) as bm:
            await bm.run_forever()


if __name__ == "__main__":
    asyncio.run(main())

Subclassing for custom control handlers

For a more advanced pattern, subclass remotivelabs.topology.behavioral_model.BehavioralModel to encapsulate namespaces and custom control handlers. Note that adding custom control messages introduces behaviour that cannot be replicated on real hardware:

import asyncio
from dataclasses import dataclass
from typing import Final

from remotivelabs.broker import BrokerClient

from remotivelabs.topology.behavioral_model import BehavioralModel
from remotivelabs.topology.control.request import ControlRequest
from remotivelabs.topology.control.response import ControlResponse
from remotivelabs.topology.namespaces import filters
from remotivelabs.topology.namespaces.can import CanNamespace, RestbusConfig


@dataclass
class ResetRestbus(ControlRequest):
    """Custom control request to reset the restbus to signal database defaults."""

    type: str = "reset_restbus"


class HazardLightControlUnit(BehavioralModel):
    """Behavioral model for the HazardLightControlUnit ECU."""

    ecu_name: Final[str] = "HazardLightControlUnit"

    def __init__(self, broker_client: BrokerClient) -> None:
        self._can_ns = CanNamespace(
            name=f"{self.ecu_name}-DriverCan0",
            broker_client=broker_client,
            restbus_configs=[RestbusConfig([filters.SenderFilter(ecu_name=self.ecu_name)])],
        )
        super().__init__(
            name=self.ecu_name,
            broker_client=broker_client,
            namespaces=[self._can_ns],
            control_handlers=[
                (ResetRestbus.type, self.on_reset_restbus),
            ],
        )

    async def on_reset_restbus(self, request: ControlRequest) -> ControlResponse:  # noqa: ARG002
        await self._can_ns.restbus.reset()
        return ControlResponse(status="ok")


async def main():
    async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
        async with HazardLightControlUnit(broker_client=broker_client) as bm:
            await bm.run_forever()


if __name__ == "__main__":
    asyncio.run(main())
class BehavioralModel:

A BehavioralModel is used to emulate some behavior instead of a real ECU.

It manages lifecycle operations for namespaces (e.g., CanNamespace, SomeIPNamespace), handles inputs, routes control requests, and provides a unified interface for testing setups.

BehavioralModel( name: str, broker_client: remotivelabs.broker.BrokerClient, namespaces: list[remotivelabs.topology.namespaces.namespace.Namespace] | None = None, input_handlers: list[tuple[str, remotivelabs.topology.namespaces.input_handlers.InputHandler]] | None = None, control_handlers: list[tuple[str, remotivelabs.topology.control.Handler]] | None = None, input_filters: Optional[Sequence[Union[remotivelabs.broker.filters.FrameFilterPredicate, remotivelabs.broker.filters.SignalFilterPredicate, remotivelabs.broker.filters.FrameSignalFilter]]] = None, on_change: bool = True)

Initialize the BehavioralModel instance.

Prometheus metrics are opt-in: set the METRICS_PORT environment variable to enable a process-wide exporter that serves /metrics on that port.

Arguments:
  • name: Identifier for the ECU stub instance, then name which receives control messages.
  • broker_client: The client used for communication with the broker.
  • namespaces: list of Namespace instances (CanNamespace, SomeIPNamespace, etc.).
  • input_handlers: Optional list of (namespace, handler) pairs to receive callbacks on inputs. Create these using the namespace's create_input_handler method.
  • control_handlers: Optional list of (command, handler) pairs for routing control messages.
  • input_filters: Optional filters applied to every input handler across all namespaces. Useful for cross-cutting concerns like excluding E2E signals: input_filters=[E2eSignalsFilter(exclude=True)].
  • on_change: When True (default), a frame is only delivered when at least one of its signals has changed. The delivered frame always contains all signals, including unchanged ones. When False, every frame is delivered regardless of whether its signals have changed.
Note:

Start the instance using a context manager:

async with BehavioralModel(...) as bm:
    ...
    await bm.run_forever()

Or use the start/stop methods directly:

bm = BehavioralModel(...)
await bm.start()
# ...
await bm.stop()

def is_running(self) -> bool:

Has the BehavioralModel been started?

async def start(self) -> None:

Start the behavioral model, open all namespaces, and initialize input handlers. This is an idempotent operation - calling it multiple times has no additional effect.

async def stop(self) -> None:

Stop the behavioral model, close all namespaces, and clean up resources. This is an idempotent operation - calling it multiple times has no additional effect.

async def run_forever(self) -> None:

Run the BehavioralModel indefinitely, processing inputs and control requests.

async def reset_restbuses(self) -> None:

Reset all restbus data for all namespaces.

@dataclass
class PingRequest(remotivelabs.topology.control.request.ControlRequest):

Control request to check if the BehavioralModel is alive and responsive.

Use remotivelabs.topology.control.ControlClient to send control requests.

PingRequest(type: str = 'ping_v1', argument: 'Any | None' = None)
type: str = 'ping_v1'
@dataclass
class RebootRequest(remotivelabs.topology.control.request.ControlRequest):

Control request to reset all namespace restbus to default values.

Use remotivelabs.topology.control.ControlClient to send control requests.

RebootRequest(type: str = 'reboot_v1', argument: 'Any | None' = None)
type: str = 'reboot_v1'