remotivelabs.topology.namespaces.input_handlers
Handlers for filtered processing of inputs in RemotiveTopology.
This module defines handlers for processing inputs, such as frames, that match specific filters. Handlers include:
FrameHandler: For general frame handling using frame or signal filters.SomeIPRequestHandler: For handling SOME/IP request frames and responding.SomeIPEventHandler: For handling SOME/IP event frames.
class
InputHandler(typing.Protocol):
async def
handle( self, frame: remotivelabs.broker.Frame) -> None | tuple[str, list[remotivelabs.broker.WriteSignal]]:
class
FilterEngine:
FilterEngine( filters: Sequence[Union[remotivelabs.topology.namespaces.filters.FrameFilterPredicate, remotivelabs.topology.namespaces.filters.SignalFilterPredicate]], decode_named_values: bool = False)
Add frame infos to the filter engine, building subscriptions.
A frame is included in the subscriptions if it matches any frame filters (if present) or has signals that match signal filters.
Handler for general frames using frame/signal filters.
FrameHandler( filters: Sequence[Union[remotivelabs.topology.namespaces.filters.FrameFilterPredicate, remotivelabs.topology.namespaces.filters.SignalFilterPredicate]], cb: Optional[Callable[[remotivelabs.broker.Frame], Awaitable[NoneType]]] = None, decode_named_values: bool = False)
Handler for SOME/IP requests.
SomeIPRequestHandler( filters: Sequence[remotivelabs.topology.namespaces.filters.SomeIPRequestFilter], cb: Optional[Callable[[remotivelabs.topology.namespaces.some_ip.SomeIPRequest], Awaitable[remotivelabs.topology.namespaces.some_ip.SomeIPResponse | remotivelabs.topology.namespaces.some_ip.SomeIPError | None]]] = None, decode_named_values: bool = False)
async def
handle( self, frame: remotivelabs.broker.Frame) -> None | tuple[str, list[remotivelabs.broker.WriteSignal]]:
Handler for SOME/IP events.
SomeIPEventHandler( filters: Sequence[remotivelabs.topology.namespaces.filters.SomeIPEventFilter], cb: Optional[Callable[[remotivelabs.topology.namespaces.some_ip.SomeIPEvent], Awaitable[NoneType]]] = None, decode_named_values: bool = False)