remotivelabs.topology.namespaces.input_handlers

Handlers for filtered processing of inputs in RemotiveTopology.

This module defines handlers for processing inputs, such as frames, that match specific filters. Handlers include:

class InputHandler(typing.Protocol):

Base class for protocol classes.

Protocol classes are defined as::

class Proto(Protocol):
    def meth(self) -> int:
        ...

Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing), for example::

class C:
    def meth(self) -> int:
        return 0

def func(x: Proto) -> int:
    return x.meth()

func(C())  # Passes static type check

See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::

class GenProto(Protocol[T]):
    def meth(self) -> T:
        ...
InputHandler(*args, **kwargs)
def add(self, *frame_infos: remotivelabs.broker.FrameInfo) -> None:
def subscriptions(self) -> list[remotivelabs.broker.FrameSubscription]:
async def handle( self, frame: remotivelabs.broker.Frame) -> None | tuple[str, list[remotivelabs.broker.WriteSignal]]:
class FilterEngine:
def add(self, *frame_infos: remotivelabs.broker.FrameInfo) -> None:

Add frame infos to the filter engine, building subscriptions.

A frame is included in the subscriptions if it matches any frame filters (if present) or has signals that match signal filters.

def subscriptions(self) -> list[remotivelabs.broker.FrameSubscription]:
def filter_frame( self, frame: remotivelabs.broker.Frame) -> remotivelabs.broker.Frame | None:
class FrameHandler(InputHandler):

Handler for general frames using frame/signal filters.

FrameHandler( filters: Sequence[Union[remotivelabs.topology.namespaces.filters.FrameFilterPredicate, remotivelabs.topology.namespaces.filters.SignalFilterPredicate]], cb: Optional[Callable[[remotivelabs.broker.Frame], Awaitable[NoneType]]] = None, decode_named_values: bool = False)
def add(self, *frame_infos: remotivelabs.broker.FrameInfo) -> None:
def subscriptions(self) -> list[remotivelabs.broker.FrameSubscription]:
async def handle(self, frame: remotivelabs.broker.Frame) -> None:
class SomeIPRequestHandler(InputHandler):

Handler for SOME/IP requests.

def add(self, *frame_infos: remotivelabs.broker.FrameInfo) -> None:
def subscriptions(self) -> list[remotivelabs.broker.FrameSubscription]:
async def handle( self, frame: remotivelabs.broker.Frame) -> None | tuple[str, list[remotivelabs.broker.WriteSignal]]:
class SomeIPEventHandler(InputHandler):

Handler for SOME/IP events.

SomeIPEventHandler( filters: Sequence[remotivelabs.topology.namespaces.filters.SomeIPEventFilter], cb: Optional[Callable[[remotivelabs.topology.namespaces.some_ip.SomeIPEvent], Awaitable[NoneType]]] = None, decode_named_values: bool = False)
def add(self, *frame_infos: remotivelabs.broker.FrameInfo) -> None:
def subscriptions(self) -> list[remotivelabs.broker.FrameSubscription]:
async def handle(self, frame: remotivelabs.broker.Frame) -> None: