remotivelabs.topology.namespaces

Namespace access module for RemotiveBroker.

Provides an interface to a namespace configured in a RemotiveBroker.

Supported types include:
  • someip: Enables sending requests and subscribing to events.
  • generic: Enables Restbus access, signal subscriptions, and more.
  • can: Same as generic
  • scripted: Enables subscribing to frames transformed by scripts

Namespaces can be used standalone or injected into a BehavioralModel for simulation or testing. See individual module documentation for protocol-specific details.

This example creates and configures a CAN namespace, but does not start the restbus. It is not really useful as is; in a real application you would likely either use it together with a BehavioralModel or start the restbus to send periodic messages.

import asyncio

from remotivelabs.broker import BrokerClient

from remotivelabs.topology.namespaces import filters
from remotivelabs.topology.namespaces.can import CanNamespace, RestbusConfig


async def main():
    async with (
        BrokerClient(url="http://127.0.0.1:50051") as broker_client,
        CanNamespace(
            "HazardLightControlUnit-DriverCan0",
            broker_client,
            restbus_configs=[RestbusConfig([filters.SenderFilter(ecu_name="HazardLightControlUnit")])],
        ),
    ):
        # wait until cancelled
        await asyncio.Future()


if __name__ == "__main__":
    asyncio.run(main())

It is common to use namespaces together with a behavioral model, as shown in the example below.

import asyncio

from remotivelabs.broker import BrokerClient

from remotivelabs.topology.behavioral_model import BehavioralModel
from remotivelabs.topology.namespaces import filters
from remotivelabs.topology.namespaces.can import CanNamespace, RestbusConfig


async def main():
    async with BrokerClient(url="http://127.0.0.1:50051") as broker_client:
        driver_can_0 = CanNamespace(
            "HazardLightControlUnit-DriverCan0",
            broker_client,
            restbus_configs=[RestbusConfig([filters.SenderFilter(ecu_name="HazardLightControlUnit")])],
        )
        async with BehavioralModel(
            "HazardLightControlUnit",
            namespaces=[driver_can_0],
            broker_client=broker_client,
        ) as bm:
            # The behavioral model is now running, with an active namespace restbus, until cancelled
            await bm.run_forever()


if __name__ == "__main__":
    asyncio.run(main())