remotivelabs.topology.testing.retry

async def assert_until( func: Callable, matcher: hamcrest.core.matcher.Matcher, timeout: float = 10) -> None:

Waits until the result of the given function matches the provided matcher, retrying until timeout.

Parameters
  • func: A callable that returns the result to be tested.
  • matcher: A matcher from hamcrest to evaluate the result against.
  • timeout: The maximum duration to wait before giving up, in seconds.
Raises
  • TypeError: If func is not a callable.
def await_at_most(seconds: float) -> remotivelabs.topology.testing.retry._RetryUntil:

Factory function to create a partial function with a specified timeout.

Usage: await await_at_most(5).until(func, matcher)

Parameters
  • seconds: The maximum duration to wait before giving up, in seconds.
Returns

An object with an until method configured with the specified timeout.