remotivelabs.topology.testing
Utilities for validating network communication.
Capturing frames
Use remotivelabs.topology.testing.frames.capture_frames to subscribe to CAN frames from a
namespace and assert on the received frames and signal values:
from remotivelabs.broker import BrokerClient
from remotivelabs.topology.testing.frames import capture_frames
async def test_hazard_light_sequence(broker_client: BrokerClient) -> None:
# Subscribe to frames published by the HazardLightControlUnit behavioral model
async with capture_frames(
(broker_client, "HazardLightControlUnit-DriverCan0"),
frames=["HazardLightButton"],
) as cap:
# Assert that the signal transitions 0 → 1 → 0 (button press and release)
await cap.wait_for_signal_values(
"HazardLightButton",
"HazardLightButton.HazardLightButton",
values=[0, 1, 0],
timeout=5.0,
)
Capturing SOME/IP events
Use remotivelabs.topology.testing.some_ip.capture_events to subscribe to SOME/IP events:
from remotivelabs.broker import BrokerClient
from remotivelabs.topology.testing.some_ip import capture_events
MY_SERVICE = "MyTestService"
SPEED_EVENT = (MY_SERVICE, "SpeedEvent")
async def test_speed_event_sequence(broker_client: BrokerClient) -> None:
async with capture_events(
(broker_client, "consuming_service", 99),
events=[SPEED_EVENT],
) as cap:
# Wait for the speed to reach 50, then drop back to 0
await cap.wait_for_event_parameter_values(SPEED_EVENT, "speed", values=[50, 0], timeout=5.0)
Polling assertions
For lower-level polling assertions, install the optional PyHamcrest extras:
pip install remotivelabs-topology[hamcrest]
remotivelabs.topology.testing.hamcrest.await_at_most provides a fluent retry API with signal
value matchers:
from remotivelabs.broker import BrokerClient, FrameSubscription, SignalValue
from remotivelabs.topology.namespaces.can import CanNamespace
from remotivelabs.topology.testing.hamcrest import await_at_most, contains_signal_value_sequence
async def test_signal_sequence(broker_client: BrokerClient) -> None:
async with CanNamespace("HazardLightControlUnit-DriverCan0", broker_client) as ns:
stream = await ns.subscribe_frames(FrameSubscription(name="HazardLightButton"))
values: list[SignalValue] = []
async def consume_next() -> list[SignalValue]:
frame = await anext(stream)
values.append(frame.signals["HazardLightButton.HazardLightButton"])
return values
# Assert the button transitions 0 → 1 → 0 within 5 seconds
await await_at_most(5).until(consume_next, contains_signal_value_sequence(0, 1, 0))
See remotivelabs.topology.testing.hamcrest for details.
Utility for capturing items from an async iterator in a background task.
The Capture instance is stateful: each successful call to a wait or retrieval method advances the internal position, so subsequent calls operate on new items. This enables sequential, windowed assertions and waits within a single capture context.
Wait for and return the next captured item.
Resumes from the last read position.
Wait for an item matching a predicate.
Arguments:
- predicate: Predicate to match items.
- timeout: Maximum time to wait in seconds.
- waiting_for: Description of what is being waited for (used in timeout error).
Returns:
The matching item.
Raises:
- CaptureTimeoutError: If no matching item is received in time.
Wait for a sequence of items matching the predicates in order.
Items that do not match the current predicate are skipped. This is intentional for interleaved sequences (e.g. waiting for frames A, B, A across a mixed stream), but means unexpected values are silently ignored rather than causing immediate failure.
Note:
A future
strictmode could fail immediately on mismatch using a private exception that does not inherit fromException(so it escapes the broadexcept Exceptioninretry).
Arguments:
- predicates: A sequence of predicate functions to match in order.
- timeout: Maximum time to wait for the entire sequence.
Returns:
A list of items matching each predicate in order.
Raises:
- CaptureTimeoutError: If the sequence is not received in time.
Frame-specific capture with typed wait methods.
Wraps a Capture[Frame] and provides methods for waiting on specific frames
and signal values. Obtain an instance via capture_frames.
Wait for a frame matching the given name and signal values.
Arguments:
- frame: The frame name to match.
- signals: A mapping of signal name to expected value.
- timeout: Maximum time to wait in seconds.
Returns:
The first matching frame.
Raises:
- CaptureTimeoutError: If no matching frame is received in time.
Wait for an ordered sequence of frames matching (frame_name, signals) pairs.
Unlike wait_for_signal_values, this supports sequences that span multiple different
frames interleaved in any order. Frames not matching the current expected pair are skipped.
Arguments:
- sequence: An ordered list of
(frame_name, signals)pairs to match in sequence. - timeout: Maximum time to wait for the entire sequence.
Returns:
A list of frames, one per pair, in order.
Raises:
- CaptureTimeoutError: If the full sequence is not received in time.
Example:
await cap.wait_for_frames([ ("TurnLightControl", {"TurnLightControl.LeftTurnLightRequest": 1}), ("SpeedControl", {"SpeedControl.Speed": 0}), ("TurnLightControl", {"TurnLightControl.LeftTurnLightRequest": 0}), ])
Wait for a sequence of frames where a signal takes each value in order.
Each value in values must appear in a frame (in order). Frames not matching
the current expected value are skipped — unexpected values are not treated as failures.
Arguments:
- frame: The frame name to match.
- signal: The signal name (full path, e.g.
"FrameName.SignalName"). - values: A sequence of expected signal values to observe in order.
- timeout: Maximum time to wait for the entire sequence.
Returns:
A list of frames, one per value, in order.
Raises:
- CaptureTimeoutError: If the full sequence is not received in time.
Async context manager to capture frames from a namespace.
The returned FrameCapture object is stateful: each successful call to a wait or retrieval
method advances the internal position, so subsequent calls resume from where the last
left off.
Arguments:
- source: A
GenericNamespace, or a(broker_client, namespace_name)tuple. - frames: Frames to subscribe to. If
None, subscribes to all frames in the namespace. - name: Name for logging. If
None, generates a unique name.
Yields:
A
FrameCaptureinstance.
Example:
async with capture_frames((broker_client, "ecu_b"), ["FrameA"]) as cap: frame = await cap.wait_for_frame("FrameA", {"FrameA.Signal": 1})
SOME/IP-specific capture with typed wait methods.
Wraps a Capture[SomeIPEvent] and provides methods for waiting on specific events
and parameter values. Obtain an instance via capture_events.
Wait for and return the next captured event.
Resumes from the last read position.
Wait for a SOME/IP event matching the given event spec and optional parameters.
Arguments:
- event: A
(service_name, event_name)tuple identifying the event to match. - parameters: A mapping of parameter name to expected value. If
None, only the event identity is checked. - timeout: Maximum time to wait in seconds.
Returns:
The first matching
SomeIPEvent.
Raises:
- CaptureTimeoutError: If no matching event is received in time.
Wait for an ordered sequence of events matching (event_spec, parameters) pairs.
Unlike wait_for_event_parameter_values, this supports sequences that span multiple
different events interleaved in any order. Events not matching the current expected
pair are skipped.
Arguments:
- sequence: An ordered list of
(event_spec, parameters)pairs to match in sequence.parametersmay beNoneto match any event of that type. - timeout: Maximum time to wait for the entire sequence.
Returns:
A list of events, one per pair, in order.
Raises:
- CaptureTimeoutError: If the full sequence is not received in time.
Example:
await cap.wait_for_events([ (("MyService", "SpeedEvent"), {"speed": 0}), (("MyService", "GearEvent"), {"gear": 1}), (("MyService", "SpeedEvent"), {"speed": 50}), ])
Wait for a sequence of SOME/IP events where a parameter takes each value in order.
Each value in values must appear in an event (in order). Events not matching
the current expected value are skipped — unexpected values are not treated as failures.
Arguments:
- event: A
(service_name, event_name)tuple identifying the event to match. - parameter: The parameter name to check.
- values: A sequence of expected parameter values to observe in order.
- timeout: Maximum time to wait for the entire sequence.
Returns:
A list of events, one per value, in order.
Raises:
- CaptureTimeoutError: If the full sequence is not received in time.
Async context manager to capture SOME/IP events from a namespace.
Arguments:
- source: A
SomeIPNamespace, or a(broker_client, namespace_name, client_id)tuple that will be used to create a transientSomeIPNamespace. - events: A sequence of
(service_name, event_name)tuples to subscribe to. - name: Name for logging. If
None, generates a unique name.
Yields:
A
SomeIPCaptureinstance.
Example:
async with capture_events( some_ip_eth, [("MyTestService", "ByteEncodedMessages")], ) as cap: event = await cap.wait_for_event(("MyTestService", "ByteEncodedMessages"))