remotivelabs.topology
RemotiveTopology framework
RemotiveTopology framework is a python library that allows you to easily interact with RemotiveTopology. Using the framework you can create Behavioral Models or write test cases probing communication channels.
Installation
pip install remotivelabs-topology
Project Links
Usage
The RemotiveTopology Framework enables simulation of automotive ECUs and networks using Python. It provides tools for modeling ECU behavior, sending and receiving real network traffic via RemotiveBroker, and configuring communication over protocols like CAN and SOME/IP. The following sections describe how to define behavioral models, configure namespaces, use the Restbus, handle inputs, and implement ECUs with flexible logic.
Behavioral Models
A behavioral model in the RemotiveTopology Framework is a component that mimics the behavior of an ECU. Like a real ECU, it is typically triggered by network messages, performs logic, and then sends new network messages to other components. Behavioral models in RemotiveTopology will run on top of the RemotiveBroker, making them use real network protocols in their communication.
As with all nodes in a RemotiveTopology, a behavioral model communicates using real network traffic over protocols such as CAN buses or SOME/IP networks. For a full list of supported network types, refer to the RemotiveBroker documentation.
For more documentation, see BehavioralModel.
Below is the simplest way to use a behavioral model, although this one does nothing, except connecting to a RemotiveBroker.
import asyncio
from remotivelabs.broker import BrokerClient
from remotivelabs.topology.behavioral_model import BehavioralModel
async def main():
async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
async with BehavioralModel(
"BodyCanModule",
broker_client=broker_client,
) as bm:
await bm.run_forever()
if __name__ == "__main__":
asyncio.run(main())
Namespace
Refer to the RemotiveBroker documentation for full configuration documentation.
Namespace access module for RemotiveBroker.
Provides an interface to a namespace configured in a RemotiveBroker.
Supported types include:
someip: Enables sending requests and subscribing to events.generic: Enables Restbus access, signal subscriptions, and more.can: Same as genericscripted: Enables subscribing to frames transformed by scripts
Namespaces can be used standalone or injected into a BehavioralModel for simulation or testing.
See individual module documentation for protocol-specific details.
This example creates and configures a CAN namespace, but does not start the restbus.
It is not really useful as is; in a real application you would likely either use it together with a BehavioralModel
or start the restbus to send periodic messages.
import asyncio
from remotivelabs.broker import BrokerClient
from remotivelabs.topology.namespaces import filters
from remotivelabs.topology.namespaces.can import CanNamespace, RestbusConfig
async def main():
async with (
BrokerClient(url="http://127.0.0.1:50051") as broker_client,
CanNamespace(
"HazardLightControlUnit-DriverCan0",
broker_client,
restbus_configs=[RestbusConfig([filters.SenderFilter(ecu_name="HazardLightControlUnit")])],
),
):
# wait until cancelled
await asyncio.Future()
if __name__ == "__main__":
asyncio.run(main())
It is common to use namespaces together with a behavioral model, as shown in the example below.
import asyncio
from remotivelabs.broker import BrokerClient
from remotivelabs.topology.behavioral_model import BehavioralModel
from remotivelabs.topology.namespaces import filters
from remotivelabs.topology.namespaces.can import CanNamespace, RestbusConfig
async def main():
async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
driver_can_0 = CanNamespace(
"HazardLightControlUnit-DriverCan0",
broker_client,
restbus_configs=[RestbusConfig([filters.SenderFilter(ecu_name="HazardLightControlUnit")])],
)
async with BehavioralModel(
"HazardLightControlUnit",
namespaces=[driver_can_0],
broker_client=broker_client,
) as bm:
# The behavioral model is now running, with an active namespace restbus, until cancelled
await bm.run_forever()
if __name__ == "__main__":
asyncio.run(main())
The Restbus - Sending Periodic Network Messages
The communication on CAN buses, in particular, is often sent periodically, several times a second.
The Restbus can be used to accomplish this. You initiate a namespace, for example a CanNamespace with a list of messages that you would
like to send. The cycle time and the default value is normally taken from the signal database, but can be changed later on.
The restbus is configured by passing in a list of filters that will be applied to the signal databases.
They will produce a list of frames that should be sent by the restbus. In the example below, we use a filter that will match all frames
that can be sent on the HazardLightControlUnit-DriverCan0 namespace by the HazardLightControlUnit.
There are several filters to choose from. See more in the section about filters below.
import asyncio
from remotivelabs.broker import BrokerClient
from remotivelabs.topology.namespaces import filters
from remotivelabs.topology.namespaces.can import CanNamespace, RestbusConfig
async def main():
async with (
BrokerClient(url="http://127.0.0.1:50051") as broker_client,
CanNamespace(
"HazardLightControlUnit-DriverCan0",
broker_client,
restbus_configs=[RestbusConfig([filters.SenderFilter(ecu_name="HazardLightControlUnit")])],
) as hlcu_can,
):
# start the restbus with signal database defaults and wait until cancelled
await hlcu_can.restbus.start()
await asyncio.Future()
if __name__ == "__main__":
asyncio.run(main())
We can modify the values using the restbus component at any time. In this example, we update the HazardLightButton
signal within the HazardLightButton frame to 1.
import asyncio
from remotivelabs.broker import BrokerClient
from remotivelabs.topology.namespaces import filters
from remotivelabs.topology.namespaces.can import CanNamespace, RestbusConfig
async def main():
async with (
BrokerClient(url="http://127.0.0.1:50051") as broker_client,
CanNamespace(
"HazardLightControlUnit-DriverCan0",
broker_client,
restbus_configs=[RestbusConfig([filters.SenderFilter(ecu_name="HazardLightControlUnit")])],
) as hlcu_can,
):
# update signals in restbus before starting it
await hlcu_can.restbus.update_signals(
("HazardLightButton.HazardLightButton", 1),
)
# start the restbus and loop until cancelled
await hlcu_can.restbus.start()
await asyncio.Future()
if __name__ == "__main__":
asyncio.run(main())
The restbus also supports assigning an array of values to a signal. When configured with multiple values, the restbus will cycle through them sequentially, sending one value per tick, and repeat the sequence until reconfigured with new values.
By default, the restbus uses cycle times from the signal database. However, you can explicitly configure timing parameters using either
cycle_time or delay_multiplier:
restbus_configs = RestbusConfig(
restbus_filters=[filters.SenderFilter(ecu_name="HazardLightControlUnit")],
cycle_time_millis=20, # Fixed cycle time of 0.02 seconds (50Hz)
# and/or
delay_multiplier=2, # Scale database cycle times by factor 2
)
These timing parameters serve different purposes:
cycle_time_millis: Sets a fixed cycle time for all signals matched by the filters, including non-cyclic ones. This allows you to send non-cyclic signals at a fixed rate.delay_multiplier: Scales the cycle times of the signals by the specified factor. Use this to slow down the restbus (saving CPU) or speed up testing. Note that it is the cycle time that is scaled, so a larger value (> 1) will result slow things down and a small value (< 1) will speed things up.
You can, of course, use the restbus together with a behavioral model, as shown in the example below. The main difference here is that the restbus will actually be started before the signals are updated, in contrast to the previous example.
import asyncio
from remotivelabs.broker import BrokerClient
from remotivelabs.topology.behavioral_model import BehavioralModel
from remotivelabs.topology.namespaces import filters
from remotivelabs.topology.namespaces.can import CanNamespace, RestbusConfig
async def main():
async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
driver_can_0 = CanNamespace(
"HazardLightControlUnit-DriverCan0",
broker_client,
restbus_configs=[RestbusConfig([filters.SenderFilter(ecu_name="HazardLightControlUnit")])],
)
async with BehavioralModel(
"HazardLightControlUnit",
namespaces=[driver_can_0],
broker_client=broker_client,
) as bm:
# Simulate pressing the hazard light button
await driver_can_0.restbus.update_signals(("HazardLightButton.HazardLightButton", 1))
await bm.run_forever()
if __name__ == "__main__":
asyncio.run(main())
Restbus interaction is done through the restbus property of each namespace as the example above shows. However,
sometimes you may want to reset all restbus signals to their default values as defined in the signal database:
await bm.reset_restbuses()
Input handlers
When starting the BehavioralModel, you can add handlers for incoming messages and react to them. In the previous section,
the Hazard Light Control Unit sent frames indicating whether the hazard light button was pressed.
Below is an example of another ECU taking that frame as input and printing a message whenever it is received:
import asyncio
from remotivelabs.broker import BrokerClient, Frame
from remotivelabs.topology.behavioral_model import BehavioralModel
from remotivelabs.topology.namespaces.can import CanNamespace
from remotivelabs.topology.namespaces.filters import FrameFilter
async def _on_hazard_button_pressed(frame: Frame) -> None:
print(f"Hazard light frame received: {frame}")
async def main():
async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
driver_can_0 = CanNamespace("BodyCanModule-DriverCan0", broker_client)
async with BehavioralModel(
"BodyCanModule",
namespaces=[driver_can_0],
broker_client=broker_client,
input_handlers=[
driver_can_0.create_input_handler(
[FrameFilter("HazardLightButton")],
_on_hazard_button_pressed,
)
],
) as bm:
await bm.run_forever()
if __name__ == "__main__":
asyncio.run(main())
As you can see, it's possible to set up multiple handlers in the array, but in the example we only add one single handler. Each subscription will use filters to specify which messages that are of interest. More under filters below.
Filters
Filters for RemotiveTopology.
This module provides filters to select subsets of frames (FrameInfo) and signals (SignalInfo)
based on various criteria.
All filters have an exclude flag to indicate whether matches should be excluded (True) or
included (False, default). Filters are callable and can be passed to Python built-in functions that
expect a predicate, such as filter(), any(), or all(). The snippet below shows examples
using filter():
Filter Strategy:
The
filter_recursive()function applies frame and signal filters with context-dependent behavior. Frame filtering determines eligibility, then signal filtering applies differently based on frame inclusion: included frames start with all signals (exclude-only), excluded frames require explicit signal inclusion. Exclusion filters always take priority over inclusion filters.
Examples:
from remotivelabs.broker import FrameInfo, SignalInfo
from remotivelabs.topology.namespaces.filters import (
AllFramesFilter,
FrameFilter,
ReceiverFilter,
SenderFilter,
SignalFilter,
SomeIPEventFilter,
SomeIPRequestFilter,
filter_recursive,
is_frame_filter,
is_signal_filter,
)
# assume frames and signals are populated elsewhere
frames: list[FrameInfo] = []
signals: list[SignalInfo] = []
# Example 1: Include all frames
all_frames_filter = AllFramesFilter()
filtered_frames = list(filter(all_frames_filter, frames))
# Example 2: Include all frames but exclude a specific frame
frame_exclude_filter = FrameFilter(frame_name="Frame1", exclude=True)
filtered_frames = list(filter(frame_exclude_filter, frames))
# Example 3: Filter frames sent by a specific ECU
sender_filter = SenderFilter(ecu_name="ECU1")
filtered_frames = list(filter(sender_filter, frames))
# Example 4: Filter frames received by a specific ECU, excluding one signal
receiver_filter = ReceiverFilter(ecu_name="ECU2")
signal_exclude_filter = SignalFilter(signal_name="SignalA", exclude=True)
filtered_frames = list(filter(receiver_filter, frames))
filtered_signals = list(filter(signal_exclude_filter, signals))
# Example 5: SOME/IP request filter
someip_request_filter = SomeIPRequestFilter(service_instance_name="ServiceA", method_name="RequestX")
filtered_frames = list(filter(someip_request_filter, frames))
filtered_signals = list(filter(someip_request_filter, signals))
# Example 6: SOME/IP event filter
someip_event_filter = SomeIPEventFilter(service_instance_name="ServiceB", event_name="EventY")
filtered_frames = list(filter(someip_event_filter, frames))
filtered_signals = list(filter(someip_event_filter, signals))
# Example 7: Chaining filters
frame_include = FrameFilter(frame_name="FrameA")
frame_exclude = FrameFilter(frame_name="FrameB", exclude=True)
filtered_frames = list(filter(frame_exclude, filter(frame_include, frames)))
# Example 8: Combining inclusion and exclusion with AllFramesFilter and FrameFilter
all_frames_filter = AllFramesFilter()
frame_exclude_filter = FrameFilter(frame_name="FrameC", exclude=True)
filtered_frames = list(filter(frame_exclude_filter, filter(all_frames_filter, frames)))
# Example 9: Recursive filtering of signals (SignalInfos) in frames (FrameInfos)
filtered_frames = [
filtered_frame
for frame in frames
if (filtered_frame := filter_recursive(frame, filters=[all_frames_filter, signal_exclude_filter])) is not None
]
# Example 10: Type checking filters
assert is_frame_filter(all_frames_filter) # works with frames
assert not is_signal_filter(all_frames_filter) # doesn't work with signals
assert not is_frame_filter(signal_exclude_filter) # doesn't work with frames
assert is_signal_filter(signal_exclude_filter) # works with signals
assert is_frame_filter(sender_filter) # works with frames
assert is_signal_filter(sender_filter) # works with signals
"""
from __future__ import annotations
from dataclasses import dataclass from typing import Literal, Protocol, Sequence, TypeGuard, Union, overload, runtime_checkable
from remotivelabs.broker import FrameInfo, SignalInfo
@runtime_checkable class FrameFilterPredicate(Protocol): @property def exclude(self) -> bool: ...
@property
def type(self) -> Literal["frame", "any"]: ...
def __call__(self, info: FrameInfo) -> bool: ...
@runtime_checkable class SignalFilterPredicate(Protocol): @property def exclude(self) -> bool: ...
@property
def type(self) -> Literal["signal", "any"]: ...
def __call__(self, info: SignalInfo) -> bool: ...
Public filter types
FilterLike = Union[FrameFilterPredicate, SignalFilterPredicate]
def is_frame_filter(filter: FilterLike) -> TypeGuard[FrameFilterPredicate]: # A filter is a frame filter if it has type "frame" or "any", # and it can be called with FrameInfo return filter.type in ("frame", "any")
def is_signal_filter(filter: FilterLike) -> TypeGuard[SignalFilterPredicate]: # A filter is a signal filter if it has type "signal" or "any", # and it can be called with SignalInfo return filter.type in ("signal", "any")
def is_filter(obj: object) -> TypeGuard[FilterLike]: return hasattr(obj, "type") and getattr(obj, "type", None) in ("frame", "signal", "any")
@dataclass(frozen=True) class AllFramesFilter: exclude: bool = False type: Literal["frame"] = "frame"
def __call__(self, info: FrameInfo) -> bool: # noqa: ARG002
return not self.exclude
@dataclass(frozen=True) class FrameFilter: frame_name: str exclude: bool = False type: Literal["frame"] = "frame"
def __call__(self, info: FrameInfo) -> bool:
matches = info.name == self.frame_name
return not matches if self.exclude else matches
@dataclass(frozen=True) class SignalFilter: signal_name: str exclude: bool = False type: Literal["signal"] = "signal"
def __call__(self, info: SignalInfo) -> bool:
matches = info.name == self.signal_name
return not matches if self.exclude else matches
@dataclass(frozen=True) class ReceiverFilter: ecu_name: str exclude: bool = False type: Literal["any"] = "any"
@overload
def __call__(self, info: FrameInfo) -> bool: ...
@overload
def __call__(self, info: SignalInfo) -> bool: ...
def __call__(self, info: FrameInfo | SignalInfo) -> bool:
if isinstance(info, FrameInfo):
receivers = set(info.receiver)
receivers.update(r for sig in info.signals.values() for r in sig.receiver)
matches = self.ecu_name in receivers
elif isinstance(info, SignalInfo):
matches = self.ecu_name in info.receiver
else:
matches = False
return not matches if self.exclude else matches
@dataclass(frozen=True) class SenderFilter: ecu_name: str exclude: bool = False type: Literal["any"] = "any"
@overload
def __call__(self, info: FrameInfo) -> bool: ...
@overload
def __call__(self, info: SignalInfo) -> bool: ...
def __call__(self, info: FrameInfo | SignalInfo) -> bool:
if isinstance(info, FrameInfo):
senders = set(info.sender)
senders.update(s for sig in info.signals.values() for s in sig.sender)
matches = self.ecu_name in senders
elif isinstance(info, SignalInfo):
matches = self.ecu_name in info.sender
else:
matches = False
return not matches if self.exclude else matches
def _someip_match(sep: str, service: str | None, name: str | None, is_signal: bool, full_name: str) -> bool: if sep not in full_name: return False service_part, rest = full_name.split(sep, 1) if not rest or (service and service_part != service): return False if is_signal and name: return rest.startswith(f"{name}.") return name == rest if name else True
@dataclass(frozen=True) class SomeIPRequestFilter: service_instance_name: str | None = None method_name: str | None = None exclude: bool = False type: Literal["any"] = "any"
@overload
def __call__(self, info: FrameInfo) -> bool: ...
@overload
def __call__(self, info: SignalInfo) -> bool: ...
def __call__(self, info: FrameInfo | SignalInfo) -> bool:
name = getattr(info, "name", None)
if not isinstance(name, str):
return False
matches = _someip_match(
".Request.",
self.service_instance_name,
self.method_name,
isinstance(info, SignalInfo),
name,
)
return not matches if self.exclude else matches
@dataclass(frozen=True) class SomeIPEventFilter: service_instance_name: str | None = None event_name: str | None = None exclude: bool = False type: Literal["any"] = "any"
@overload
def __call__(self, info: FrameInfo) -> bool: ...
@overload
def __call__(self, info: SignalInfo) -> bool: ...
def __call__(self, info: FrameInfo | SignalInfo) -> bool:
name = getattr(info, "name", None)
if not isinstance(name, str):
return False
matches = _someip_match(
".Event.",
self.service_instance_name,
self.event_name,
isinstance(info, SignalInfo),
name,
)
return not matches if self.exclude else matches
def _apply_frame_filters(frame_info: FrameInfo, frame_filters: Sequence[FrameFilterPredicate]) -> FrameInfo | None: include_filters = [f for f in frame_filters if not f.exclude] exclude_filters = [f for f in frame_filters if f.exclude]
# Check exclusions first (they have priority)
frame_excluded = any(not f(frame_info) for f in exclude_filters)
if frame_excluded:
return None
# And then inclusions
if any(f(frame_info) for f in include_filters):
return frame_info
# No match, no inclusion
return None
def _apply_signal_filters( signal_infos: Sequence[SignalInfo], signal_filters: Sequence[SignalFilterPredicate], include_by_default: bool ) -> list[SignalInfo]: include_filters = [f for f in signal_filters if not f.exclude] exclude_filters = [f for f in signal_filters if f.exclude]
included_signals = []
for signal_info in signal_infos:
# Check exclusion filters first (they have priority)
is_excluded = any(not f(signal_info) for f in exclude_filters)
if is_excluded:
continue
# Handle inclusion logic
if include_by_default:
# Include unless explicitly excluded (already handled above)
included_signals.append(signal_info)
else:
# Must be explicitly included by an inclusion filter
is_included = any(f(signal_info) for f in include_filters)
if is_included:
included_signals.append(signal_info)
return included_signals
def filter_recursive(frame_info: FrameInfo, filters: Sequence[FilterLike]) -> FrameInfo | None: """ Apply filters to a FrameInfo and return filtered result or None.
Filter Strategy:
Frame filtering determines if the frame should be considered at all.
Signal filtering then determines which signals to include, with behavior
depending on frame inclusion:
- Frame included: Include all signals except those explicitly excluded
- Frame excluded: Only include signals that are explicitly included
- No filters: Returns None (nothing to subscribe to)
Exclusion Priority:
Exclusion filters (exclude=True) always take priority over inclusion filters
for both frames and signals.
Returns: Filtered FrameInfo with matching signals, or None if no matches
"""
frame_filters = [f for f in filters if is_frame_filter(f)]
signal_filters = [f for f in filters if is_signal_filter(f)]
# If we have no filters, there is nothing to subscribe to
if not frame_filters and not signal_filters:
return None
# Check if frame passes frame filters
# If frame filters exist, frame must pass at least one include filter AND not be excluded
frame_included = bool(_apply_frame_filters(frame_info, frame_filters))
# If frame is excluded and we have no signal filters, exclude entirely
if not frame_included and not signal_filters:
return None
# If frame is included and no signal filters, include all signals
if frame_included and not signal_filters:
return frame_info
# Apply signal filters
# If frame is included, start with all signals and apply exclusions (include_by_default=True)
# If frame is excluded, only include signals that are explicitly included (include_by_default=False)
included_signals = _apply_signal_filters(list(frame_info.signals.values()), signal_filters, include_by_default=frame_included)
if not included_signals:
# No signals match, exclude the whole frame
return None
# Return frame with filtered signals
return FrameInfo(
name=frame_info.name,
namespace=frame_info.namespace,
sender=frame_info.sender,
receiver=frame_info.receiver,
signals={sig.name: sig for sig in included_signals},
cycle_time_millis=frame_info.cycle_time_millis,
Logging
This library uses Python's standard logging module. By default, the library does not configure any logging handlers,
allowing applications to fully control their logging setup.
To enable logs from this library in your application or tests, configure logging as follows:
import logging
logging.basicConfig(level=logging.INFO)
logging.getLogger("remotivelabs.topology").setLevel(logging.DEBUG)
For more advanced configurations, refer to the Python logging documentation.