remotivelabs.topology.testing.retry
async def
assert_until( func: Callable, matcher: hamcrest.core.matcher.Matcher, timeout: float = 10) -> None:
Waits until the result of the given function matches the provided matcher, retrying until timeout.
Parameters
- func: A callable that returns the result to be tested.
- matcher: A matcher from hamcrest to evaluate the result against.
- timeout: The maximum duration to wait before giving up, in seconds.
Raises
- TypeError: If func is not a callable.
def
await_at_most(seconds: float) -> remotivelabs.topology.testing.retry._RetryUntil:
Factory function to create a partial function with a specified timeout.
Usage: await await_at_most(5).until(func, matcher)
Parameters
- seconds: The maximum duration to wait before giving up, in seconds.
Returns
An object with an until method configured with the specified timeout.