remotivelabs.topology
RemotiveTopology framework
RemotiveTopology framework is a python library that allows you to easily interact with RemotiveTopology. Using the framework you can create Behavioral Models or write test cases probing communication channels.
Installation
pip install remotivelabs-topology
Project Links
Usage
The RemotiveTopology Framework enables simulation of automotive ECUs and networks using Python. It provides tools for modeling ECU behavior, sending and receiving real network traffic via RemotiveBroker, and configuring communication over protocols like CAN and SOME/IP. The following sections describe how to define behavioral models, configure namespaces, use the Restbus, handle inputs, and implement ECUs with flexible logic.
Behavioral Models
A behavioral model in the RemotiveTopology Framework is a component that mimics the behavior of an ECU. Like a real ECU, it is typically triggered by network messages, performs logic, and then sends new network messages to other components. Behavioral models in RemotiveTopology will run on top of the RemotiveBroker, making them use real network protocols in their communication.
As with all nodes in a RemotiveTopology, a behavioral model communicates using real network traffic over protocols such as CAN buses or SOME/IP networks. For a full list of supported network types, refer to the RemotiveBroker documentation.
For more documentation, see BehavioralModel
.
Below is the simplest way to use a behavioral model, although this one does nothing, except connecting to a RemotiveBroker.
import asyncio
from remotivelabs.broker import BrokerClient
from remotivelabs.topology.behavioral_model import BehavioralModel
async def main():
async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
async with BehavioralModel(
"BodyCanModule",
broker_client=broker_client,
) as bm:
await bm.run_forever()
if __name__ == "__main__":
asyncio.run(main())
Namespace
Refer to the RemotiveBroker documentation for full configuration documentation.
Namespace access module for RemotiveBroker.
Provides an interface to a namespace configured in a RemotiveBroker.
Supported types include:
someip
: Enables sending requests and subscribing to events.generic
: Enables Restbus access, signal subscriptions, and more.can
: Same as generic
Namespaces can be used standalone or injected into a BehavioralModel for simulation or testing.
import asyncio
from remotivelabs.broker import BrokerClient
from remotivelabs.topology.namespaces import filters
from remotivelabs.topology.namespaces.can import CanNamespace, RestbusConfig
async def main():
async with (
BrokerClient(url="http://127.0.0.1:50051") as broker_client,
CanNamespace(
"HazardLightControlUnit-DriverCan0",
broker_client,
restbus_configs=[RestbusConfig([filters.SenderFilter(ecu_name="HazardLightControlUnit")])],
),
):
await asyncio.Event().wait()
if __name__ == "__main__":
asyncio.run(main())
The Restbus - Sending Periodic Network Messages
The communication on CAN buses, in particular, is often sent periodically, several times a second.
The Restbus
can be used to accomplish this. You initiate a namespace, for example a CanNamespace
with a list of messages that you would
like to send. The cycle time and the default value is normally taken from the signal database, but can be changed later on.
The restbus is configured by passing in a list of filters that will be applied to the signal databases.
They will produce a list of frames that should be sent by the restbus. In the example below, we use a filter that will match all frames
that can be sent on the DriverCan0-HLCU
namespace by the HazardLightControlUnit
. There are several filters to choose from.
See more in the section about filters below.
import asyncio
from remotivelabs.broker import BrokerClient, RestbusSignalConfig
from remotivelabs.topology.behavioral_model import BehavioralModel
from remotivelabs.topology.namespaces import filters
from remotivelabs.topology.namespaces.can import CanNamespace, RestbusConfig
async def main():
async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
driver_can_0 = CanNamespace(
"HazardLightControlUnit-DriverCan0",
broker_client,
restbus_configs=[RestbusConfig([filters.SenderFilter(ecu_name="HazardLightControlUnit")])],
)
async with BehavioralModel(
"HazardLightControlUnit",
namespaces=[driver_can_0],
broker_client=broker_client,
) as bm:
# Simulate pressing the hazard light button
await driver_can_0.restbus.update_signals(RestbusSignalConfig.set(name="HazardLightButton.HazardLightButton", value=1))
await bm.run_forever()
if __name__ == "__main__":
asyncio.run(main())
We can modify the values using the restbus context at any time. In this example, we update the HazardLightButton
signal within the HazardLightButton
frame to 1
.
The restbus also supports assigning an array of values to a signal. When configured with multiple values, the restbus will cycle through them sequentially, sending one value per tick, and repeat the sequence until reconfigured with new values.
By default, the restbus uses cycle times from the signal database. However, you can explicitly configure timing parameters using either
cycle_time
or delay_multiplier
:
restbus_configs = RestbusConfig(
restbus_filters=[filters.SenderFilter(ecu_name="HazardLightControlUnit")],
cycle_time_millis=20, # Fixed cycle time of 0.02 seconds (50Hz)
# and/or
delay_multiplier=2, # Scale database cycle times by factor 2
)
These timing parameters serve different purposes:
cycle_time_millis
: Sets a fixed cycle time for all signals matched by the filters, including non-cyclic ones. This allows you to send non-cyclic signals at a fixed rate.delay_multiplier
: Scales the cycle times of the signals by the specified factor. Use this to slow down the restbus (saving CPU) or speed up testing. Note that it is the cycle time that is scaled, so a larger value (> 1) will result slow things down and a small value (< 1) will speed things up.
Input handlers
When starting the BehavioralModel
, you can add handlers for incoming messages and react to them. In the previous section,
the Hazard Light Control Unit sent frames indicating whether the hazard light button was pressed.
Below is an example of another ECU taking that frame as input and printing a message whenever it is received:
import asyncio
from remotivelabs.broker import BrokerClient, Frame
from remotivelabs.topology.behavioral_model import BehavioralModel
from remotivelabs.topology.namespaces import filters
from remotivelabs.topology.namespaces.can import CanNamespace
async def _on_hazard_button_pressed(frame: Frame) -> None:
print(f"Hazzard light frame {frame}")
async def main():
async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
driver_can_0 = CanNamespace("BodyCanModule-DriverCan0", broker_client)
async with BehavioralModel(
"BodyCanModule",
namespaces=[driver_can_0],
broker_client=broker_client,
input_handlers=[
driver_can_0.create_input_handler(
[filters.FrameFilter("HazardLightButton")],
_on_hazard_button_pressed,
)
],
) as bm:
await bm.run_forever()
if __name__ == "__main__":
asyncio.run(main())
As you can see, it's possible to set up multiple handlers in the array, but in the example we only add one single handler. Each subscription will use filters to specify which messages that are of interest. More under filters below.
Filters
In the remotivelabs.topology.namespaces.filters
package you can find several
filters that let you select a subset of all signals associated with an ECU.
All filters have a boolean flag called include
that specifies if the filter is used to include or exclude messages. It defaults to True
.
In most cases, filters are passed in an array, meaning that you can fine tune the selection by using include and exclude filters. The orderof the filters does not matter, but exclusion filters have higher priority than inclusion. Here are two examples; both will match all frames, except "Frame1".
filters1 = [
AllFramesFilter(),
FrameFilter(frame_name="Frame1", include=False),
]
filters2 = [
FrameFilter(frame_name="Frame1", include=False),
AllFramesFilter(),
]
Logging
This library uses Python's standard logging
module. By default, the library does not configure any logging handlers,
allowing applications to fully control their logging setup.
To enable logs from this library in your application or tests, configure logging as follows:
import logging
logging.basicConfig(level=logging.INFO)
logging.getLogger("remotivelabs.topology").setLevel(logging.DEBUG)
For more advanced configurations, refer to the Python logging documentation.