remotivelabs.topology.namespaces.input_handlers

Handlers for filtered processing of inputs in RemotiveTopology.

This module defines handlers for processing inputs, such as frames, that match specific filters. Handlers include:

class InputHandler(typing.Protocol):
InputHandler(*args, **kwargs)
def add(self, *frame_infos: remotivelabs.broker.FrameInfo) -> None:
def subscriptions(self) -> list[remotivelabs.broker.FrameSubscription]:
async def handle( self, frame: remotivelabs.broker.Frame) -> tuple[str, list[remotivelabs.broker.WriteSignal]] | None:
class FrameHandler(_BaseHandler):

Handler for Frames.

This class wraps a callback that and passes objects matching provided filters. It should be passed to an BehavioralModel for automatic subscription and dispatch.

Arguments:
  • filters: A sequence of filters used to match relevant frames.
  • cb: Async callback invoked with a Frame when matched.
  • decode_named_values: True will decode named values to str.
Inherited Members
_BaseHandler
add
subscriptions
handle
class SomeIPRequestHandler(_BaseHandler):

Handler for SOME/IP requests.

This class wraps a callback that and passes objects matching provided filters. It should be passed to an BehavioralModel for automatic subscription and dispatch.

Arguments:
  • filters: A sequence of SomeIPRequestFilter used to match incoming request frames.
  • cb: Async callback that receives a SomeIPRequest and returns a response or None.
  • decode_named_values: True will decode named values to str.
Inherited Members
_BaseHandler
add
subscriptions
handle
class SomeIPEventHandler(_BaseHandler):

Handler for SOME/IP events.

This class wraps a callback that and passes objects matching provided filters. It should be passed to an BehavioralModel for automatic subscription and dispatch.

Arguments:
  • filters: A sequence of SomeIPEventFilter used to match event frames.
  • cb: Async callback invoked with a SomeIPEvent.
  • decode_named_values: True will decode named values to str.
SomeIPEventHandler( filters: Sequence[remotivelabs.topology.namespaces.filters.SomeIPEventFilter], cb: Optional[Callable[[remotivelabs.topology.namespaces.some_ip.SomeIPEvent], Awaitable[NoneType]]] = None, decode_named_values: bool = False)
Inherited Members
_BaseHandler
add
subscriptions
handle