remotivelabs.topology.namespaces.input_handlers

Handlers for filtered processing of inputs in RemotiveTopology.

This module defines handlers for processing inputs, such as frames, that match specific filters. Handlers include:

class InputHandler(typing.Protocol):
InputHandler(*args, **kwargs)
def add(self, *frame_infos: remotivelabs.broker.FrameInfo) -> None:
def subscriptions(self) -> list[remotivelabs.broker.FrameSubscription]:
async def handle( self, frame: remotivelabs.broker.Frame) -> None | tuple[str, list[remotivelabs.broker.WriteSignal]]:
class FilterEngine:
def add(self, *frame_infos: remotivelabs.broker.FrameInfo) -> None:

Add frame infos to the filter engine, building subscriptions.

A frame is included in the subscriptions if it matches any frame filters (if present) or has signals that match signal filters.

def subscriptions(self) -> list[remotivelabs.broker.FrameSubscription]:
def filter_frame( self, frame: remotivelabs.broker.Frame) -> remotivelabs.broker.Frame | None:
class FrameHandler(InputHandler):

Handler for general frames using frame/signal filters.

FrameHandler( filters: Sequence[Union[remotivelabs.topology.namespaces.filters.FrameFilterPredicate, remotivelabs.topology.namespaces.filters.SignalFilterPredicate]], cb: Optional[Callable[[remotivelabs.broker.Frame], Awaitable[NoneType]]] = None, decode_named_values: bool = False)
def add(self, *frame_infos: remotivelabs.broker.FrameInfo) -> None:
def subscriptions(self) -> list[remotivelabs.broker.FrameSubscription]:
async def handle(self, frame: remotivelabs.broker.Frame) -> None:
class SomeIPRequestHandler(InputHandler):

Handler for SOME/IP requests.

def add(self, *frame_infos: remotivelabs.broker.FrameInfo) -> None:
def subscriptions(self) -> list[remotivelabs.broker.FrameSubscription]:
async def handle( self, frame: remotivelabs.broker.Frame) -> None | tuple[str, list[remotivelabs.broker.WriteSignal]]:
class SomeIPEventHandler(InputHandler):

Handler for SOME/IP events.

SomeIPEventHandler( filters: Sequence[remotivelabs.topology.namespaces.filters.SomeIPEventFilter], cb: Optional[Callable[[remotivelabs.topology.namespaces.some_ip.SomeIPEvent], Awaitable[NoneType]]] = None, decode_named_values: bool = False)
def add(self, *frame_infos: remotivelabs.broker.FrameInfo) -> None:
def subscriptions(self) -> list[remotivelabs.broker.FrameSubscription]:
async def handle(self, frame: remotivelabs.broker.Frame) -> None: