remotivelabs.topology.behavioral_model
Examples
remotivelabs.topology.behavioral_model.BehavioralModel instances in RemotiveTopology run on top of RemotiveBroker, allowing them to use
real network protocols such as CAN buses or SOME/IP networks.
The example below shows the simplest possible behavioral model - it connects to a broker and handles built-in control messages (ping, reboot), but performs no other work:
import asyncio
from remotivelabs.broker import BrokerClient
from remotivelabs.topology.behavioral_model import BehavioralModel
async def main():
async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
async with BehavioralModel(
"BodyCanModule",
broker_client=broker_client,
) as bm:
await bm.run_forever()
if __name__ == "__main__":
asyncio.run(main())
Adding namespaces
To interact with a network, provide one or more namespaces (see remotivelabs.topology.namespaces).
The example below adds a remotivelabs.topology.namespaces.can.CanNamespace configured with a
remotivelabs.topology.namespaces.generic.RestbusConfig that periodically sends all CAN frames the
ECU is listed as sender of:
import asyncio
from remotivelabs.broker import BrokerClient
from remotivelabs.topology.behavioral_model import BehavioralModel
from remotivelabs.topology.namespaces import filters
from remotivelabs.topology.namespaces.can import CanNamespace, RestbusConfig
async def main():
async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
driver_can_0 = CanNamespace(
"HazardLightControlUnit-DriverCan0",
broker_client,
restbus_configs=[RestbusConfig([filters.SenderFilter(ecu_name="HazardLightControlUnit")])],
)
async with BehavioralModel(
"HazardLightControlUnit",
namespaces=[driver_can_0],
broker_client=broker_client,
) as bm:
await bm.run_forever()
if __name__ == "__main__":
asyncio.run(main())
Reacting to incoming frames
The examples so far only send frames. To react to incoming frames, attach an input handler. A
namespace builds one with its create_input_handler() method: given a set of filters (see
remotivelabs.topology.namespaces.filters), the handler subscribes to every matching frame and
invokes the provided async callback each time one arrives. Passing the handler to
BehavioralModel(input_handlers=[...]) wires it into the lifecycle - it starts with the model and
stops with it.
By default, BehavioralModel delivers frames to input handlers only when at least one of their
signals has changed (on_change=True). The delivered frame still contains all signals, unchanged
ones included. Pass on_change=False if a handler needs to fire on every transmission.
The example below reacts to a single CAN frame:
import asyncio
from remotivelabs.broker import BrokerClient, Frame
from remotivelabs.broker.filters import FrameFilter
from remotivelabs.topology.behavioral_model import BehavioralModel
from remotivelabs.topology.namespaces.can import CanNamespace
async def _on_hazard_button_pressed(frame: Frame) -> None:
print(f"Hazard light frame received: {frame}")
async def main():
async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
driver_can_0 = CanNamespace("BodyCanModule-DriverCan0", broker_client)
async with BehavioralModel(
"BodyCanModule",
namespaces=[driver_can_0],
broker_client=broker_client,
input_handlers=[
driver_can_0.create_input_handler(
[FrameFilter("HazardLightButton")],
_on_hazard_button_pressed,
)
],
) as bm:
await bm.run_forever()
if __name__ == "__main__":
asyncio.run(main())
Excluding noisy signals
Some frames carry signals that change on every transmission but aren't particularly interesting to user code - E2E counters and CRCs are a common example. With on-change delivery, those signals can be enough on their own to wake an input handler on every cycle, even when the rest of the payload is unchanged.
One way to avoid that is to attach a filter to BehavioralModel(input_filters=[...]), which is
applied to every input handler on the model. For E2E, passing
remotivelabs.broker.filters.E2eSignalsFilter with exclude=True drops the CRC and
counter entirely - they no longer take part in change detection, and they don't appear on the
frames delivered to your callbacks either.
Per-handler filters remain available on
create_input_handler(...) when a single handler needs different treatment.
import asyncio
from remotivelabs.broker import BrokerClient, Frame
from remotivelabs.broker.filters import FrameFilter
from remotivelabs.topology.behavioral_model import BehavioralModel
from remotivelabs.topology.namespaces.can import CanNamespace
from remotivelabs.topology.namespaces.filters import E2eSignalsFilter
async def _on_brake_pedal(frame: Frame) -> None:
# Callback fires only when payload signals change, not when the E2E
# counter ticks or the CRC is recomputed on an otherwise identical frame.
print(f"Brake pedal frame received: {frame}")
async def main():
async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
driver_can_0 = CanNamespace("BodyCanModule-DriverCan0", broker_client)
async with BehavioralModel(
"BodyCanModule",
namespaces=[driver_can_0],
broker_client=broker_client,
input_filters=[E2eSignalsFilter(exclude=True)],
input_handlers=[
driver_can_0.create_input_handler(
[FrameFilter("BrakePedalPositionSensor")],
_on_brake_pedal,
)
],
) as bm:
await bm.run_forever()
if __name__ == "__main__":
asyncio.run(main())
Subclassing for custom control handlers
For a more advanced pattern, subclass remotivelabs.topology.behavioral_model.BehavioralModel to encapsulate namespaces and custom control
handlers. Note that adding custom control messages introduces behaviour that cannot be replicated on
real hardware:
import asyncio
from dataclasses import dataclass
from typing import Final
from remotivelabs.broker import BrokerClient
from remotivelabs.topology.behavioral_model import BehavioralModel
from remotivelabs.topology.control.request import ControlRequest
from remotivelabs.topology.control.response import ControlResponse
from remotivelabs.topology.namespaces import filters
from remotivelabs.topology.namespaces.can import CanNamespace, RestbusConfig
@dataclass
class ResetRestbus(ControlRequest):
"""Custom control request to reset the restbus to signal database defaults."""
type: str = "reset_restbus"
class HazardLightControlUnit(BehavioralModel):
"""Behavioral model for the HazardLightControlUnit ECU."""
ecu_name: Final[str] = "HazardLightControlUnit"
def __init__(self, broker_client: BrokerClient) -> None:
self._can_ns = CanNamespace(
name=f"{self.ecu_name}-DriverCan0",
broker_client=broker_client,
restbus_configs=[RestbusConfig([filters.SenderFilter(ecu_name=self.ecu_name)])],
)
super().__init__(
name=self.ecu_name,
broker_client=broker_client,
namespaces=[self._can_ns],
control_handlers=[
(ResetRestbus.type, self.on_reset_restbus),
],
)
async def on_reset_restbus(self, request: ControlRequest) -> ControlResponse: # noqa: ARG002
await self._can_ns.restbus.reset()
return ControlResponse(status="ok")
async def main():
async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
async with HazardLightControlUnit(broker_client=broker_client) as bm:
await bm.run_forever()
if __name__ == "__main__":
asyncio.run(main())
A BehavioralModel is used to emulate some behavior instead of a real ECU.
It manages lifecycle operations for namespaces (e.g., CanNamespace, SomeIPNamespace), handles inputs,
routes control requests, and provides a unified interface for testing setups.
Initialize the BehavioralModel instance.
Prometheus metrics are opt-in: set the METRICS_PORT environment variable to
enable a process-wide exporter that serves /metrics on that port.
Arguments:
- name: Identifier for the ECU stub instance, then name which receives control messages.
- broker_client: The client used for communication with the broker.
- namespaces: list of Namespace instances (
CanNamespace,SomeIPNamespace, etc.). - input_handlers: Optional list of (namespace, handler) pairs to receive callbacks
on inputs. Create these using the namespace's
create_input_handlermethod. - control_handlers: Optional list of (command, handler) pairs for routing control messages.
- input_filters: Optional filters applied to every input handler across all namespaces.
Useful for cross-cutting concerns like excluding E2E signals:
input_filters=[E2eSignalsFilter(exclude=True)]. - on_change: When
True(default), a frame is only delivered when at least one of its signals has changed. The delivered frame always contains all signals, including unchanged ones. WhenFalse, every frame is delivered regardless of whether its signals have changed.
Note:
Start the instance using a context manager:
async with BehavioralModel(...) as bm: ... await bm.run_forever()Or use the start/stop methods directly:
bm = BehavioralModel(...) await bm.start() # ... await bm.stop()
Start the behavioral model, open all namespaces, and initialize input handlers. This is an idempotent operation - calling it multiple times has no additional effect.
Stop the behavioral model, close all namespaces, and clean up resources. This is an idempotent operation - calling it multiple times has no additional effect.
Control request to check if the BehavioralModel is alive and responsive.
Use remotivelabs.topology.control.ControlClient to send control requests.
Control request to reset all namespace restbus to default values.
Use remotivelabs.topology.control.ControlClient to send control requests.