remotivelabs.topology.testing

Utilities for validating network communication.

Capturing frames

Use remotivelabs.topology.testing.frames.capture_frames to subscribe to CAN frames from a namespace and assert on the received frames and signal values:

from remotivelabs.broker import BrokerClient

from remotivelabs.topology.testing.frames import capture_frames


async def test_hazard_light_sequence(broker_client: BrokerClient) -> None:
    # Subscribe to frames published by the HazardLightControlUnit behavioral model
    async with capture_frames(
        (broker_client, "HazardLightControlUnit-DriverCan0"),
        frames=["HazardLightButton"],
    ) as cap:
        # Assert that the signal transitions 0 → 1 → 0 (button press and release)
        await cap.wait_for_signal_values(
            "HazardLightButton",
            "HazardLightButton.HazardLightButton",
            values=[0, 1, 0],
            timeout=5.0,
        )

Capturing SOME/IP events

Use remotivelabs.topology.testing.some_ip.capture_events to subscribe to SOME/IP events:

from remotivelabs.broker import BrokerClient

from remotivelabs.topology.testing.some_ip import capture_events

MY_SERVICE = "MyTestService"
SPEED_EVENT = (MY_SERVICE, "SpeedEvent")


async def test_speed_event_sequence(broker_client: BrokerClient) -> None:
    async with capture_events(
        (broker_client, "consuming_service", 99),
        events=[SPEED_EVENT],
    ) as cap:
        # Wait for the speed to reach 50, then drop back to 0
        await cap.wait_for_event_parameter_values(SPEED_EVENT, "speed", values=[50, 0], timeout=5.0)

Polling assertions

For lower-level polling assertions, install the optional PyHamcrest extras:

pip install remotivelabs-topology[hamcrest]

remotivelabs.topology.testing.hamcrest.await_at_most provides a fluent retry API with signal value matchers:

from remotivelabs.broker import BrokerClient, FrameSubscription, SignalValue

from remotivelabs.topology.namespaces.can import CanNamespace
from remotivelabs.topology.testing.hamcrest import await_at_most, contains_signal_value_sequence


async def test_signal_sequence(broker_client: BrokerClient) -> None:
    async with CanNamespace("HazardLightControlUnit-DriverCan0", broker_client) as ns:
        stream = await ns.subscribe_frames(FrameSubscription(name="HazardLightButton"))
        values: list[SignalValue] = []

        async def consume_next() -> list[SignalValue]:
            frame = await anext(stream)
            values.append(frame.signals["HazardLightButton.HazardLightButton"])
            return values

        # Assert the button transitions 0 → 1 → 0 within 5 seconds
        await await_at_most(5).until(consume_next, contains_signal_value_sequence(0, 1, 0))

See remotivelabs.topology.testing.hamcrest for details.

class Capture(typing.Generic[~T]):

Utility for capturing items from an async iterator in a background task.

The Capture instance is stateful: each successful call to a wait or retrieval method advances the internal position, so subsequent calls operate on new items. This enables sequential, windowed assertions and waits within a single capture context.

Capture( iterator: AsyncIterator[~T], name: str | None = None, subscribed_frames: list[str] | None = None)
results: list[~T]

Return a list of all captured items.

async def next(self, timeout: float = 1.0) -> ~T:

Wait for and return the next captured item.

Resumes from the last read position.

async def wait_for_item( self, predicate: Callable[[~T], bool], timeout: float = 1.0, waiting_for: str = 'item matching predicate') -> ~T:

Wait for an item matching a predicate.

Arguments:
  • predicate: Predicate to match items.
  • timeout: Maximum time to wait in seconds.
  • waiting_for: Description of what is being waited for (used in timeout error).
Returns:

The matching item.

Raises:
  • CaptureTimeoutError: If no matching item is received in time.
async def wait_for_items( self, predicates: Sequence[Callable[[~T], bool]], timeout: float = 1.0) -> list[~T]:

Wait for a sequence of items matching the predicates in order.

Items that do not match the current predicate are skipped. This is intentional for interleaved sequences (e.g. waiting for frames A, B, A across a mixed stream), but means unexpected values are silently ignored rather than causing immediate failure.

Note:

A future strict mode could fail immediately on mismatch using a private exception that does not inherit from Exception (so it escapes the broad except Exception in retry).

Arguments:
  • predicates: A sequence of predicate functions to match in order.
  • timeout: Maximum time to wait for the entire sequence.
Returns:

A list of items matching each predicate in order.

Raises:
  • CaptureTimeoutError: If the sequence is not received in time.
class FrameCapture:

Frame-specific capture with typed wait methods.

Wraps a Capture[Frame] and provides methods for waiting on specific frames and signal values. Obtain an instance via capture_frames.

FrameCapture( cap: Capture[remotivelabs.broker.Frame])
async def next(self, timeout: float = 1.0) -> remotivelabs.broker.Frame:

Wait for and return the next captured frame.

Resumes from the last read position.

async def wait_for_frame( self, frame: str, signals: dict[str, typing.Any], timeout: float = 1.0) -> remotivelabs.broker.Frame:

Wait for a frame matching the given name and signal values.

Arguments:
  • frame: The frame name to match.
  • signals: A mapping of signal name to expected value.
  • timeout: Maximum time to wait in seconds.
Returns:

The first matching frame.

Raises:
  • CaptureTimeoutError: If no matching frame is received in time.
async def wait_for_frames( self, sequence: Sequence[tuple[str, dict[str, Any]]], timeout: float = 1.0) -> list[remotivelabs.broker.Frame]:

Wait for an ordered sequence of frames matching (frame_name, signals) pairs.

Unlike wait_for_signal_values, this supports sequences that span multiple different frames interleaved in any order. Frames not matching the current expected pair are skipped.

Arguments:
  • sequence: An ordered list of (frame_name, signals) pairs to match in sequence.
  • timeout: Maximum time to wait for the entire sequence.
Returns:

A list of frames, one per pair, in order.

Raises:
  • CaptureTimeoutError: If the full sequence is not received in time.
Example:
await cap.wait_for_frames([
    ("TurnLightControl", {"TurnLightControl.LeftTurnLightRequest": 1}),
    ("SpeedControl", {"SpeedControl.Speed": 0}),
    ("TurnLightControl", {"TurnLightControl.LeftTurnLightRequest": 0}),
])
async def wait_for_signal_values( self, frame: str, signal: str, values: Sequence[Any], timeout: float = 1.0) -> list[remotivelabs.broker.Frame]:

Wait for a sequence of frames where a signal takes each value in order.

Each value in values must appear in a frame (in order). Frames not matching the current expected value are skipped — unexpected values are not treated as failures.

Arguments:
  • frame: The frame name to match.
  • signal: The signal name (full path, e.g. "FrameName.SignalName").
  • values: A sequence of expected signal values to observe in order.
  • timeout: Maximum time to wait for the entire sequence.
Returns:

A list of frames, one per value, in order.

Raises:
  • CaptureTimeoutError: If the full sequence is not received in time.
@asynccontextmanager
async def capture_frames( source: remotivelabs.topology.namespaces.generic.GenericNamespace | tuple[remotivelabs.broker.BrokerClient, str], frames: Optional[Sequence[remotivelabs.broker.Frame | remotivelabs.broker.FrameInfo | str]] = None, name: str | None = None) -> AsyncIterator[FrameCapture]:

Async context manager to capture frames from a namespace.

The returned FrameCapture object is stateful: each successful call to a wait or retrieval method advances the internal position, so subsequent calls resume from where the last left off.

Arguments:
  • source: A GenericNamespace, or a (broker_client, namespace_name) tuple.
  • frames: Frames to subscribe to. If None, subscribes to all frames in the namespace.
  • name: Name for logging. If None, generates a unique name.
Yields:

A FrameCapture instance.

Example:
async with capture_frames((broker_client, "ecu_b"), ["FrameA"]) as cap:
    frame = await cap.wait_for_frame("FrameA", {"FrameA.Signal": 1})
class SomeIPCapture:

SOME/IP-specific capture with typed wait methods.

Wraps a Capture[SomeIPEvent] and provides methods for waiting on specific events and parameter values. Obtain an instance via capture_events.

async def next( self, timeout: float = 1.0) -> remotivelabs.topology.namespaces.some_ip.SomeIPEvent:

Wait for and return the next captured event.

Resumes from the last read position.

async def wait_for_event( self, event: tuple[str, str], parameters: dict[str, typing.Any] | None = None, timeout: float = 1.0) -> remotivelabs.topology.namespaces.some_ip.SomeIPEvent:

Wait for a SOME/IP event matching the given event spec and optional parameters.

Arguments:
  • event: A (service_name, event_name) tuple identifying the event to match.
  • parameters: A mapping of parameter name to expected value. If None, only the event identity is checked.
  • timeout: Maximum time to wait in seconds.
Returns:

The first matching SomeIPEvent.

Raises:
  • CaptureTimeoutError: If no matching event is received in time.
async def wait_for_events( self, sequence: Sequence[tuple[tuple[str, str], dict[str, Any] | None]], timeout: float = 1.0) -> list[remotivelabs.topology.namespaces.some_ip.SomeIPEvent]:

Wait for an ordered sequence of events matching (event_spec, parameters) pairs.

Unlike wait_for_event_parameter_values, this supports sequences that span multiple different events interleaved in any order. Events not matching the current expected pair are skipped.

Arguments:
  • sequence: An ordered list of (event_spec, parameters) pairs to match in sequence. parameters may be None to match any event of that type.
  • timeout: Maximum time to wait for the entire sequence.
Returns:

A list of events, one per pair, in order.

Raises:
  • CaptureTimeoutError: If the full sequence is not received in time.
Example:
await cap.wait_for_events([
    (("MyService", "SpeedEvent"), {"speed": 0}),
    (("MyService", "GearEvent"), {"gear": 1}),
    (("MyService", "SpeedEvent"), {"speed": 50}),
])
async def wait_for_event_parameter_values( self, event: tuple[str, str], parameter: str, values: Sequence[Union[int, float, bytes, str, NoneType]], timeout: float = 1.0) -> list[remotivelabs.topology.namespaces.some_ip.SomeIPEvent]:

Wait for a sequence of SOME/IP events where a parameter takes each value in order.

Each value in values must appear in an event (in order). Events not matching the current expected value are skipped — unexpected values are not treated as failures.

Arguments:
  • event: A (service_name, event_name) tuple identifying the event to match.
  • parameter: The parameter name to check.
  • values: A sequence of expected parameter values to observe in order.
  • timeout: Maximum time to wait for the entire sequence.
Returns:

A list of events, one per value, in order.

Raises:
  • CaptureTimeoutError: If the full sequence is not received in time.
@asynccontextmanager
async def capture_events( source: remotivelabs.topology.namespaces.some_ip.SomeIPNamespace | tuple[remotivelabs.broker.BrokerClient, str, int], events: Sequence[tuple[str, str]], name: str | None = None) -> AsyncIterator[SomeIPCapture]:

Async context manager to capture SOME/IP events from a namespace.

Arguments:
  • source: A SomeIPNamespace, or a (broker_client, namespace_name, client_id) tuple that will be used to create a transient SomeIPNamespace.
  • events: A sequence of (service_name, event_name) tuples to subscribe to.
  • name: Name for logging. If None, generates a unique name.
Yields:

A SomeIPCapture instance.

Example:
async with capture_events(
    some_ip_eth,
    [("MyTestService", "ByteEncodedMessages")],
) as cap:
    event = await cap.wait_for_event(("MyTestService", "ByteEncodedMessages"))