remotivelabs.topology
RemotiveTopology framework
RemotiveTopology framework is a Python library for modelling automotive ECUs and testing automotive network communication using real protocols (CAN, SOME/IP) via RemotiveBroker. It is built on top of remotivelabs-broker, which provides the low-level gRPC client, signal types, and protocol primitives.
Installation
pip install remotivelabs-topology
# with optional hamcrest matchers for testing
pip install remotivelabs-topology[hamcrest]
# with optional FMU support
pip install remotivelabs-topology[fmu]
Project Links
Usage
The framework has two primary use-cases:
- Testing: Write test cases that interact with a running topology to verify correct behavior.
- Behavioral Models: Create ECU stubs that model real ECU behavior on a running RemotiveTopology instance, sending and receiving network messages over real protocols.
Testing
Capturing frames
Use remotivelabs.topology.testing.frames.capture_frames to subscribe to CAN frames from a
namespace and assert on the received frames and signal values:
from remotivelabs.broker import BrokerClient
from remotivelabs.topology.testing.frames import capture_frames
async def test_hazard_light_sequence(broker_client: BrokerClient) -> None:
# Subscribe to frames published by the HazardLightControlUnit behavioral model
async with capture_frames(
(broker_client, "HazardLightControlUnit-DriverCan0"),
frames=["HazardLightButton"],
) as cap:
# Assert that the signal transitions 0 → 1 → 0 (button press and release)
await cap.wait_for_signal_values(
"HazardLightButton",
"HazardLightButton.HazardLightButton",
values=[0, 1, 0],
timeout=5.0,
)
Capturing SOME/IP events
Use remotivelabs.topology.testing.some_ip.capture_events to subscribe to SOME/IP events:
from remotivelabs.broker import BrokerClient
from remotivelabs.topology.testing.some_ip import capture_events
MY_SERVICE = "MyTestService"
SPEED_EVENT = (MY_SERVICE, "SpeedEvent")
async def test_speed_event_sequence(broker_client: BrokerClient) -> None:
async with capture_events(
(broker_client, "consuming_service", 99),
events=[SPEED_EVENT],
) as cap:
# Wait for the speed to reach 50, then drop back to 0
await cap.wait_for_event_parameter_values(SPEED_EVENT, "speed", values=[50, 0], timeout=5.0)
See remotivelabs.topology.testing for full documentation.
Behavioral Models
remotivelabs.topology.behavioral_model.BehavioralModel instances in RemotiveTopology run on top of RemotiveBroker, allowing them to use
real network protocols such as CAN buses or SOME/IP networks.
The example below shows the simplest possible behavioral model - it connects to a broker and handles built-in control messages (ping, reboot), but performs no other work:
import asyncio
from remotivelabs.broker import BrokerClient
from remotivelabs.topology.behavioral_model import BehavioralModel
async def main():
async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
async with BehavioralModel(
"BodyCanModule",
broker_client=broker_client,
) as bm:
await bm.run_forever()
if __name__ == "__main__":
asyncio.run(main())
See remotivelabs.topology.behavioral_model for full documentation and more examples.
Namespaces
A namespace maps to a single namespace in a RemotiveBroker topology, providing
protocol-specific access to frames and signals. See remotivelabs.broker for more details.
Available namespace types:
remotivelabs.topology.namespaces.can— CAN bus. Sends and receives frames; drives the restbus.remotivelabs.topology.namespaces.some_ip— SOME/IP. Sends requests and subscribes to events.remotivelabs.topology.namespaces.scripted— Enables subscribing to frames transformed by scripts.
The Restbus - Sending Periodic Network Messages
The communication on CAN buses is often sent periodically, several times a second.
remotivelabs.topology.namespaces.generic.Restbus handles this by publishing configured frames at
their database cycle time. Configure it by passing a list of remotivelabs.topology.namespaces.generic.RestbusConfig objects with filters
that select which frames to send:
import asyncio
from remotivelabs.broker import BrokerClient
from remotivelabs.topology.namespaces import filters
from remotivelabs.topology.namespaces.can import CanNamespace, RestbusConfig
async def main():
async with (
BrokerClient(url="http://127.0.0.1:50051") as broker_client,
CanNamespace(
"HazardLightControlUnit-DriverCan0",
broker_client,
restbus_configs=[RestbusConfig([filters.SenderFilter(ecu_name="HazardLightControlUnit")])],
) as hlcu_can,
):
# start the restbus with signal database defaults and wait until cancelled
await hlcu_can.restbus.start()
await asyncio.Future()
if __name__ == "__main__":
asyncio.run(main())
Signal values can be updated at any time via remotivelabs.topology.namespaces.generic.Restbus.update_signals:
import asyncio
from remotivelabs.broker import BrokerClient
from remotivelabs.topology.namespaces import filters
from remotivelabs.topology.namespaces.can import CanNamespace, RestbusConfig
async def main():
async with (
BrokerClient(url="http://127.0.0.1:50051") as broker_client,
CanNamespace(
"HazardLightControlUnit-DriverCan0",
broker_client,
restbus_configs=[RestbusConfig([filters.SenderFilter(ecu_name="HazardLightControlUnit")])],
) as hlcu_can,
):
# update signals in restbus before starting it
await hlcu_can.restbus.update_signals(
("HazardLightButton.HazardLightButton", 1),
)
# start the restbus and loop until cancelled
await hlcu_can.restbus.start()
await asyncio.Future()
if __name__ == "__main__":
asyncio.run(main())
See remotivelabs.topology.namespaces.generic.RestbusConfig for timing options (cycle_time_millis,
delay_multiplier) and remotivelabs.topology.namespaces.generic.Restbus for the full API.
Logging
This library uses Python's standard logging module. By default, the library does not configure any
logging handlers, allowing applications to fully control their logging setup.
import logging
logging.basicConfig(level=logging.INFO)
logging.getLogger("remotivelabs.topology").setLevel(logging.DEBUG)
For more advanced configurations, refer to the Python logging documentation.