remotivelabs.topology

RemotiveTopology framework

RemotiveTopology framework is a python library that allows you to easily interact with RemotiveTopology. Using the framework you can create Behavioral Models or write test cases probing communication channels.

Installation

pip install remotivelabs-topology

Relation to remotivelabs-broker

RemotiveTopology is built on top of remotivelabs-broker, which provides the low-level gRPC client, signal types, and protocol primitives. If you need direct broker access beyond what this library exposes, use remotivelabs-broker directly.

Usage

The RemotiveTopology Framework enables simulation of automotive ECUs and networks using Python. It provides tools for modeling ECU behavior, sending and receiving real network traffic via RemotiveBroker, and configuring communication over protocols like CAN and SOME/IP. The following sections describe how to define behavioral models, configure namespaces, use the Restbus, handle inputs, and implement ECUs with flexible logic.

Behavioral Models

A behavioral model in the RemotiveTopology Framework is a component that mimics the behavior of an ECU. Like a real ECU, it is typically triggered by network messages, performs logic, and then sends new network messages to other components. Behavioral models in RemotiveTopology will run on top of the RemotiveBroker, making them use real network protocols in their communication.

As with all nodes in a RemotiveTopology, a behavioral model communicates using real network traffic over protocols such as CAN buses or SOME/IP networks. For a full list of supported network types, refer to the RemotiveBroker documentation.

For more documentation, see remotivelabs.topology.behavioral_model.BehavioralModel. Below is the simplest way to use a behavioral model, although this one does nothing, except connecting to a RemotiveBroker.

import asyncio

from remotivelabs.broker import BrokerClient

from remotivelabs.topology.behavioral_model import BehavioralModel


async def main():
    async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
        async with BehavioralModel(
            "BodyCanModule",
            broker_client=broker_client,
        ) as bm:
            await bm.run_forever()


if __name__ == "__main__":
    asyncio.run(main())

Namespace

Refer to the RemotiveBroker documentation for full configuration documentation.

Namespace access module for RemotiveBroker.

Provides an interface to a namespace configured in a RemotiveBroker.

Supported types include:
  • someip: Enables sending requests and subscribing to events.
  • generic: Enables Restbus access, signal subscriptions, and more.
  • can: Same as generic
  • scripted: Enables subscribing to frames transformed by scripts

Namespaces can be used standalone or injected into a remotivelabs.topology.behavioral_model.BehavioralModel for simulation or testing. See individual module documentation for protocol-specific details.

The Restbus - Sending Periodic Network Messages

The communication on CAN buses, in particular, is often sent periodically, several times a second. The remotivelabs.topology.namespaces.generic.Restbus can be used to accomplish this. You initiate a namespace, for example a remotivelabs.topology.namespaces.can.CanNamespace with a list of messages that you would like to send. The cycle time and the default value is normally taken from the signal database, but can be changed later on.

The restbus is configured by passing in a list of filters that will be applied to the signal databases. They will produce a list of frames that should be sent by the restbus. In the example below, we use a filter that will match all frames that can be sent on the HazardLightControlUnit-DriverCan0 namespace by the HazardLightControlUnit. There are several filters to choose from. See more in the section about filters below.

import asyncio

from remotivelabs.broker import BrokerClient

from remotivelabs.topology.namespaces import filters
from remotivelabs.topology.namespaces.can import CanNamespace, RestbusConfig


async def main():
    async with (
        BrokerClient(url="http://127.0.0.1:50051") as broker_client,
        CanNamespace(
            "HazardLightControlUnit-DriverCan0",
            broker_client,
            restbus_configs=[RestbusConfig([filters.SenderFilter(ecu_name="HazardLightControlUnit")])],
        ) as hlcu_can,
    ):
        # start the restbus with signal database defaults and wait until cancelled
        await hlcu_can.restbus.start()
        await asyncio.Future()


if __name__ == "__main__":
    asyncio.run(main())

We can modify the values using the restbus component at any time. In this example, we update the HazardLightButton signal within the HazardLightButton frame to 1.

import asyncio

from remotivelabs.broker import BrokerClient

from remotivelabs.topology.namespaces import filters
from remotivelabs.topology.namespaces.can import CanNamespace, RestbusConfig


async def main():
    async with (
        BrokerClient(url="http://127.0.0.1:50051") as broker_client,
        CanNamespace(
            "HazardLightControlUnit-DriverCan0",
            broker_client,
            restbus_configs=[RestbusConfig([filters.SenderFilter(ecu_name="HazardLightControlUnit")])],
        ) as hlcu_can,
    ):
        # update signals in restbus before starting it
        await hlcu_can.restbus.update_signals(
            ("HazardLightButton.HazardLightButton", 1),
        )

        # start the restbus and loop until cancelled
        await hlcu_can.restbus.start()
        await asyncio.Future()


if __name__ == "__main__":
    asyncio.run(main())

The restbus also supports assigning an array of values to a signal. When configured with multiple values, the restbus will cycle through them sequentially, sending one value per tick, and repeat the sequence until reconfigured with new values.

By default, the restbus uses cycle times from the signal database. However, you can explicitly configure timing parameters using either remotivelabs.topology.namespaces.generic.RestbusConfig.cycle_time_millis or remotivelabs.topology.namespaces.generic.RestbusConfig.delay_multiplier:

restbus_configs = RestbusConfig(
    restbus_filters=[filters.SenderFilter(ecu_name="HazardLightControlUnit")],
    cycle_time_millis=20,  # Fixed cycle time of 0.02 seconds (50Hz)
    # and/or
    delay_multiplier=2,  # Scale database cycle times by factor 2
)

These timing parameters serve different purposes:

  • cycle_time_millis: Sets a fixed cycle time for all signals matched by the filters, including non-cyclic ones. This allows you to send non-cyclic signals at a fixed rate.
  • delay_multiplier: Scales the cycle times of the signals by the specified factor. Use this to slow down the restbus (saving CPU) or speed up testing. Note that it is the cycle time that is scaled, so a larger value (> 1) will result slow things down and a small value (< 1) will speed things up.

You can, of course, use the restbus together with a behavioral model, as shown in the example below. The main difference here is that the restbus will actually be started before the signals are updated, in contrast to the previous example.

import asyncio

from remotivelabs.broker import BrokerClient

from remotivelabs.topology.behavioral_model import BehavioralModel
from remotivelabs.topology.namespaces import filters
from remotivelabs.topology.namespaces.can import CanNamespace, RestbusConfig


async def main():
    async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
        driver_can_0 = CanNamespace(
            "HazardLightControlUnit-DriverCan0",
            broker_client,
            restbus_configs=[RestbusConfig([filters.SenderFilter(ecu_name="HazardLightControlUnit")])],
        )

        async with BehavioralModel(
            "HazardLightControlUnit",
            namespaces=[driver_can_0],
            broker_client=broker_client,
        ) as bm:
            # Simulate pressing the hazard light button
            await driver_can_0.restbus.update_signals(("HazardLightButton.HazardLightButton", 1))
            await bm.run_forever()


if __name__ == "__main__":
    asyncio.run(main())

Restbus interaction is done through the restbus property of each namespace as the example above shows. However, sometimes you may want to reset all restbus signals to their default values as defined in the signal database:

await bm.reset_restbuses()

Input handlers

When starting the BehavioralModel, you can add handlers for incoming messages and react to them. In the previous section, the Hazard Light Control Unit sent frames indicating whether the hazard light button was pressed.

Below is an example of another ECU taking that frame as input and printing a message whenever it is received:

import asyncio

from remotivelabs.broker import BrokerClient, Frame

from remotivelabs.topology.behavioral_model import BehavioralModel
from remotivelabs.topology.namespaces.can import CanNamespace
from remotivelabs.topology.namespaces.filters import FrameFilter


async def _on_hazard_button_pressed(frame: Frame) -> None:
    print(f"Hazard light frame received: {frame}")


async def main():
    async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
        driver_can_0 = CanNamespace("BodyCanModule-DriverCan0", broker_client)

        async with BehavioralModel(
            "BodyCanModule",
            namespaces=[driver_can_0],
            broker_client=broker_client,
            input_handlers=[
                driver_can_0.create_input_handler(
                    [FrameFilter("HazardLightButton")],
                    _on_hazard_button_pressed,
                )
            ],
        ) as bm:
            await bm.run_forever()


if __name__ == "__main__":
    asyncio.run(main())

As you can see, it's possible to set up multiple handlers in the array, but in the example we only add one single handler. Each subscription will use filters to specify which messages that are of interest. More under filters below.

Filters

Filters for RemotiveTopology.

Note:

Filters currently exist in both this module and remotivelabs.broker.filters. Work is in progress to migrate all filters to the broker package.

This module provides filters to select subsets of frames (remotivelabs.broker.FrameInfo) and signals (remotivelabs.broker.SignalInfo) based on various criteria.

All filters have an exclude flag to indicate whether matches should be excluded (True) or included (False, default). Filters are callable and can be passed to Python built-in functions that expect a predicate, such as filter(), any(), or all(). The snippet below shows examples using filter():

Filter Strategy:

The filter_recursive() function applies frame and signal filters with context-dependent behavior. Frame filtering determines eligibility, then signal filtering applies differently based on frame inclusion: included frames start with all signals (exclude-only), excluded frames require explicit signal inclusion. Exclusion filters always take priority over inclusion filters.

Examples:

from remotivelabs.broker import FrameInfo, SignalInfo

from remotivelabs.topology.namespaces.filters import (
    AllFramesFilter,
    FrameFilter,
    ReceiverFilter,
    SenderFilter,
    SignalFilter,
    SomeIPEventFilter,
    SomeIPRequestFilter,
    filter_recursive,
    is_frame_filter,
    is_signal_filter,
)

# assume frames and signals are populated elsewhere
frames: list[FrameInfo] = []
signals: list[SignalInfo] = []

# Example 1: Include all frames
all_frames_filter = AllFramesFilter()
filtered_frames = list(filter(all_frames_filter, frames))

# Example 2: Include all frames but exclude a specific frame
frame_exclude_filter = FrameFilter(frame_name="Frame1", exclude=True)
filtered_frames = list(filter(frame_exclude_filter, frames))

# Example 3: Filter frames sent by a specific ECU
sender_filter = SenderFilter(ecu_name="ECU1")
filtered_frames = list(filter(sender_filter, frames))

# Example 4: Filter frames received by a specific ECU, excluding one signal
receiver_filter = ReceiverFilter(ecu_name="ECU2")
signal_exclude_filter = SignalFilter(signal_name="SignalA", exclude=True)
filtered_frames = list(filter(receiver_filter, frames))
filtered_signals = list(filter(signal_exclude_filter, signals))

# Example 5: SOME/IP request filter
someip_request_filter = SomeIPRequestFilter(service_instance_name="ServiceA", method_name="RequestX")
filtered_frames = list(filter(someip_request_filter, frames))
filtered_signals = list(filter(someip_request_filter, signals))

# Example 6: SOME/IP event filter
someip_event_filter = SomeIPEventFilter(service_instance_name="ServiceB", event_name="EventY")
filtered_frames = list(filter(someip_event_filter, frames))
filtered_signals = list(filter(someip_event_filter, signals))

# Example 7: Chaining filters
frame_include = FrameFilter(frame_name="FrameA")
frame_exclude = FrameFilter(frame_name="FrameB", exclude=True)
filtered_frames = list(filter(frame_exclude, filter(frame_include, frames)))

# Example 8: Combining inclusion and exclusion with AllFramesFilter and FrameFilter
all_frames_filter = AllFramesFilter()
frame_exclude_filter = FrameFilter(frame_name="FrameC", exclude=True)
filtered_frames = list(filter(frame_exclude_filter, filter(all_frames_filter, frames)))

# Example 9: Recursive filtering of signals (SignalInfos) in frames (FrameInfos)
filtered_frames = [
    filtered_frame
    for frame in frames
    if (filtered_frame := filter_recursive(frame, filters=[all_frames_filter, signal_exclude_filter])) is not None
]

# Example 10: Type checking filters
assert is_frame_filter(all_frames_filter)  # works with frames
assert not is_signal_filter(all_frames_filter)  # doesn't work with signals

assert not is_frame_filter(signal_exclude_filter)  # doesn't work with frames
assert is_signal_filter(signal_exclude_filter)  # works with signals

assert is_frame_filter(sender_filter)  # works with frames
assert is_signal_filter(sender_filter)  # works with signals

Logging

This library uses Python's standard logging module. By default, the library does not configure any logging handlers, allowing applications to fully control their logging setup.

To enable logs from this library in your application or tests, configure logging as follows:

import logging

logging.basicConfig(level=logging.INFO)
logging.getLogger("remotivelabs.topology").setLevel(logging.DEBUG)

For more advanced configurations, refer to the Python logging documentation.