remotivelabs.topology.testing.retry

class RetryAssert:

A class to perform asynchronous retries on a condition, with configurable timeout. Use await_at_most() to create this class.

RetryAssert(timeout=10)

Initializes the RetryAssert instance.

Parameters
  • timeout: The maximum duration to wait before giving up, in seconds.
timeout
async def until(self, func: Callable, matcher: hamcrest.core.matcher.Matcher) -> None:

Waits until the result of the given function matches the provided matcher, retrying until timeout.

Parameters
  • func: A callable that returns the result to be tested.
  • matcher: A matcher from hamcrest to evaluate the result against.
Raises
  • TypeError: If func is not a callable.
def await_at_most(seconds: float) -> RetryAssert:

Factory function to create a RetryAssert instance with a specified timeout.

Parameters
  • seconds: The maximum duration to wait before giving up, in seconds.
Returns

A configured instance of RetryAssert.