remotivelabs.topology.namespaces
Namespace access module for RemotiveBroker.
Provides an interface to a namespace configured in a RemotiveBroker.
Supported types include:
someip: Enables sending requests and subscribing to events.generic: Enables Restbus access, signal subscriptions, and more.can: Same as genericscripted: Enables subscribing to frames transformed by scripts
Namespaces can be used standalone or injected into a BehavioralModel for simulation or testing.
See individual module documentation for protocol-specific details.
This example creates and configures a CAN namespace, but does not start the restbus.
It is not really useful as is; in a real application you would likely either use it together with a BehavioralModel
or start the restbus to send periodic messages.
import asyncio
from remotivelabs.broker import BrokerClient
from remotivelabs.topology.namespaces import filters
from remotivelabs.topology.namespaces.can import CanNamespace, RestbusConfig
async def main():
async with (
BrokerClient(url="http://127.0.0.1:50051") as broker_client,
CanNamespace(
"HazardLightControlUnit-DriverCan0",
broker_client,
restbus_configs=[RestbusConfig([filters.SenderFilter(ecu_name="HazardLightControlUnit")])],
),
):
# wait until cancelled
await asyncio.Future()
if __name__ == "__main__":
asyncio.run(main())
It is common to use namespaces together with a behavioral model, as shown in the example below.
import asyncio
from remotivelabs.broker import BrokerClient
from remotivelabs.topology.behavioral_model import BehavioralModel
from remotivelabs.topology.namespaces import filters
from remotivelabs.topology.namespaces.can import CanNamespace, RestbusConfig
async def main():
async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
driver_can_0 = CanNamespace(
"HazardLightControlUnit-DriverCan0",
broker_client,
restbus_configs=[RestbusConfig([filters.SenderFilter(ecu_name="HazardLightControlUnit")])],
)
async with BehavioralModel(
"HazardLightControlUnit",
namespaces=[driver_can_0],
broker_client=broker_client,
) as bm:
# The behavioral model is now running, with an active namespace restbus, until cancelled
await bm.run_forever()
if __name__ == "__main__":
asyncio.run(main())