remotivelabs.topology

RemotiveTopology framework

RemotiveTopology framework is a Python library for modelling automotive ECUs and testing automotive network communication using real protocols (CAN, SOME/IP) via RemotiveBroker. It is built on top of remotivelabs-broker, which provides the low-level gRPC client, signal types, and protocol primitives.

Installation

pip install remotivelabs-topology

# with optional hamcrest matchers for testing
pip install remotivelabs-topology[hamcrest]

# with optional FMU support
pip install remotivelabs-topology[fmu]

Usage

The framework has two primary use-cases:

  1. Testing: Write test cases that interact with a running topology to verify correct behavior.
  2. Behavioral Models: Create ECU stubs that model real ECU behavior on a running RemotiveTopology instance, sending and receiving network messages over real protocols.

Testing

Capturing frames

Use remotivelabs.topology.testing.frames.capture_frames to subscribe to CAN frames from a namespace and assert on the received frames and signal values:

from remotivelabs.broker import BrokerClient

from remotivelabs.topology.testing.frames import capture_frames


async def test_hazard_light_sequence(broker_client: BrokerClient) -> None:
    # Subscribe to frames published by the HazardLightControlUnit behavioral model
    async with capture_frames(
        (broker_client, "HazardLightControlUnit-DriverCan0"),
        frames=["HazardLightButton"],
    ) as cap:
        # Assert that the signal transitions 0 → 1 → 0 (button press and release)
        await cap.wait_for_signal_values(
            "HazardLightButton",
            "HazardLightButton.HazardLightButton",
            values=[0, 1, 0],
            timeout=5.0,
        )

Capturing SOME/IP events

Use remotivelabs.topology.testing.some_ip.capture_events to subscribe to SOME/IP events:

from remotivelabs.broker import BrokerClient

from remotivelabs.topology.testing.some_ip import capture_events

MY_SERVICE = "MyTestService"
SPEED_EVENT = (MY_SERVICE, "SpeedEvent")


async def test_speed_event_sequence(broker_client: BrokerClient) -> None:
    async with capture_events(
        (broker_client, "consuming_service", 99),
        events=[SPEED_EVENT],
    ) as cap:
        # Wait for the speed to reach 50, then drop back to 0
        await cap.wait_for_event_parameter_values(SPEED_EVENT, "speed", values=[50, 0], timeout=5.0)

See remotivelabs.topology.testing for full documentation.

Behavioral Models

remotivelabs.topology.behavioral_model.BehavioralModel instances in RemotiveTopology run on top of RemotiveBroker, allowing them to use real network protocols such as CAN buses or SOME/IP networks.

The example below shows the simplest possible behavioral model - it connects to a broker and handles built-in control messages (ping, reboot), but performs no other work:

import asyncio

from remotivelabs.broker import BrokerClient

from remotivelabs.topology.behavioral_model import BehavioralModel


async def main():
    async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
        async with BehavioralModel(
            "BodyCanModule",
            broker_client=broker_client,
        ) as bm:
            await bm.run_forever()


if __name__ == "__main__":
    asyncio.run(main())

See remotivelabs.topology.behavioral_model for full documentation and more examples.

Namespaces

A namespace maps to a single namespace in a RemotiveBroker topology, providing protocol-specific access to frames and signals. See remotivelabs.broker for more details.

Available namespace types:

The Restbus - Sending Periodic Network Messages

The communication on CAN buses is often sent periodically, several times a second. remotivelabs.topology.namespaces.generic.Restbus handles this by publishing configured frames at their database cycle time. Configure it by passing a list of remotivelabs.topology.namespaces.generic.RestbusConfig objects with filters that select which frames to send:

import asyncio

from remotivelabs.broker import BrokerClient

from remotivelabs.topology.namespaces import filters
from remotivelabs.topology.namespaces.can import CanNamespace, RestbusConfig


async def main():
    async with (
        BrokerClient(url="http://127.0.0.1:50051") as broker_client,
        CanNamespace(
            "HazardLightControlUnit-DriverCan0",
            broker_client,
            restbus_configs=[RestbusConfig([filters.SenderFilter(ecu_name="HazardLightControlUnit")])],
        ) as hlcu_can,
    ):
        # start the restbus with signal database defaults and wait until cancelled
        await hlcu_can.restbus.start()
        await asyncio.Future()


if __name__ == "__main__":
    asyncio.run(main())

Signal values can be updated at any time via remotivelabs.topology.namespaces.generic.Restbus.update_signals:

import asyncio

from remotivelabs.broker import BrokerClient

from remotivelabs.topology.namespaces import filters
from remotivelabs.topology.namespaces.can import CanNamespace, RestbusConfig


async def main():
    async with (
        BrokerClient(url="http://127.0.0.1:50051") as broker_client,
        CanNamespace(
            "HazardLightControlUnit-DriverCan0",
            broker_client,
            restbus_configs=[RestbusConfig([filters.SenderFilter(ecu_name="HazardLightControlUnit")])],
        ) as hlcu_can,
    ):
        # update signals in restbus before starting it
        await hlcu_can.restbus.update_signals(
            ("HazardLightButton.HazardLightButton", 1),
        )

        # start the restbus and loop until cancelled
        await hlcu_can.restbus.start()
        await asyncio.Future()


if __name__ == "__main__":
    asyncio.run(main())

See remotivelabs.topology.namespaces.generic.RestbusConfig for timing options (cycle_time_millis, delay_multiplier) and remotivelabs.topology.namespaces.generic.Restbus for the full API.

Logging

This library uses Python's standard logging module. By default, the library does not configure any logging handlers, allowing applications to fully control their logging setup.

import logging

logging.basicConfig(level=logging.INFO)
logging.getLogger("remotivelabs.topology").setLevel(logging.DEBUG)

For more advanced configurations, refer to the Python logging documentation.