remotivelabs.broker.recording_session

@dataclass
class File:
File( path: str, type: remotivelabs.broker.recording_session.file.FileType, created_time: datetime.datetime, modified_time: datetime.datetime, size: int)
path: str
type: remotivelabs.broker.recording_session.file.FileType
created_time: datetime.datetime
modified_time: datetime.datetime
size: int
@dataclass
class PlaybackOffset:

Current offset in micro seconds.

PlaybackOffset(offset: int = 0)
offset: int = 0
@dataclass
class PlaybackRepeat:

Playback repeat settings.

PlaybackRepeat(start_offset: int = 0, end_offset: int = 0)
start_offset: int = 0

Current cycle start in micro seconds.

end_offset: int = 0

Current cycle end in micro seconds.

class PlaybackMode(enum.Enum):

An enumeration.

PLAYBACK_PLAYING = <PlaybackMode.PLAYBACK_PLAYING: 0>

Playing a file.

PLAYBACK_PAUSED = <PlaybackMode.PLAYBACK_PAUSED: 1>

Playback is paused.

PLAYBACK_CLOSED = <PlaybackMode.PLAYBACK_CLOSED: 2>

Playback is closed.

class RecordingSessionClient(remotivelabs.broker.client.BrokerClient):

TODO: We probably dont want to inherit from BrokerClient, but rather use composition to hide functionality not relevant for recording session operations. However, this will do for now.

async def list_recording_files( self, path: str | None = None) -> list[File]:

List recording files in a directory.

Arguments:
  • path: Optional path to the subdirectory containing the recording files.
def playback_status( self) -> AsyncIterator[list[RecordingSessionPlaybackStatus]]:

Get continuous status of all open recording sessions.

def get_session( self, path: str, force_reopen: bool = False) -> remotivelabs.broker.recording_session.client.RecordingSession:

Return a RecordingSession for the given path.

Arguments:
  • path: The path to the recording session file.
  • force_reopen: Whether to force close any existing session before opening.
class RecordingSessionPlaybackError(builtins.Exception):

Common base class for all non-exit exceptions.

@dataclass
class RecordingSessionPlaybackStatus:
RecordingSessionPlaybackStatus( path: str, mode: PlaybackMode, offset: PlaybackOffset, repeat: PlaybackRepeat | None = None)
path: str
mode: PlaybackMode
offset: PlaybackOffset
repeat: PlaybackRepeat | None = None