remotivelabs.broker.sync

Synchronous connection to the remotiveBroker API.

Create a connection with the method create_channel.

This API uses protobuffer and gRPC stubs directly. Which are availble in the submodules:

  • remotivelabs.broker.sync.common_pb2.
  • remotivelabs.broker.sync.common_pb2_grpc.
  • remotivelabs.broker.sync.diagnostics_api_pb2.
  • remotivelabs.broker.sync.diagnostics_api_pb2_grpc.
  • remotivelabs.broker.sync.functional_api_pb2.
  • remotivelabs.broker.sync.functional_api_pb2_grpc.
  • remotivelabs.broker.sync.network_api_pb2.
  • remotivelabs.broker.sync.network_api_pb2_grpc.
  • remotivelabs.broker.sync.system_api_pb2.
  • remotivelabs.broker.sync.system_api_pb2_grpc.
  • remotivelabs.broker.sync.traffic_api_pb2.
  • remotivelabs.broker.sync.traffic_api_pb2_grpc.

For an example on how to use these we recommend looking at the samples for this library. Which is available at the repository remotiveLabs samples:

Link: https://github.com/remotivelabs/remotivelabs-samples/tree/main/python.

class SignalsInFrame(typing.Iterable):

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

SignalsInFrame(signals: List[SignalValue])
signals
index
class Client:
Client(client_id: str = 'broker_client')
client_id
url: Optional[str]
api_key: Optional[str]
on_connect: Optional[Callable[[Client], NoneType]]
on_signals: Optional[Callable[[SignalsInFrame], NoneType]]
def connect(self, url: str, api_key: Optional[str] = None):
def subscribe( self, signals_to_subscribe_to: List[SignalIdentifier], on_signals: Optional[Callable[[SignalsInFrame], NoneType]] = None, changed_values_only: bool = True):
def list_signal_names(self) -> List[SignalIdentifier]:
class SignalIdentifier:
SignalIdentifier(name: str, namespace: str)
name
namespace
@staticmethod
def parse(signal_id: str) -> SignalIdentifier:
class SignalValue:

Wrapper around protobuf generated class to make it a bit simpler to use to make us learn how we want the next version of the API to look like.

Use the signal.is_{type}() functions to validate type before you get value. Use signal.value() to get the actual value without any validation. Use signal.get_raw() to get the raw bytes Use signal.{type}_value() to get a validated typed value or error if something is wrong

SignalValue(signal: network_api_pb2.Signal)
signal: network_api_pb2.Signal
def to_json(self) -> str:
def is_integer(self) -> bool:
def is_double(self) -> bool:
def is_arbitration(self) -> bool:
def is_raw(self) -> bool:
def get_raw(self) -> Optional[bytes]:
def timestamp_us(self):
def name(self):
def namespace(self):
def value(self):
def float_value(self):
def int_value(self):
def bool_value(self):
def bytes_value(self):
def as_dict(self):
class SignalCreator:

Class for prepearing and writing signals via gRPC.

SignalCreator( system_stub: remotivelabs.broker.generated.sync.system_api_pb2_grpc.SystemServiceStub, namespaces: Optional[List[str]] = None)
def get_meta( self, name: str, namespace_name: str) -> remotivelabs.broker.sync.signalcreator.MetaGetter:

Get meta fields for signal or frame

Parameters
  • name: Name of signal or frame
  • namespace_name: Namespace for given signal or frame
def signal(self, name: str, namespace_name: str) -> common_pb2.SignalId:

Create object for signal.

Parameters
  • name: Name of signal
  • namespace_name: Namespace for signal
def frames(self, namespace_name: str) -> Sequence[common_pb2.SignalId]:

Get all frames in given namespace

def frame_by_signal(self, name: str, namespace_name: str) -> common_pb2.SignalId:

Get frame for the given signal.

Parameters
  • name: Name of signal
  • namespace_name: Name of namespace
Returns

Protobuffer type for Signal

def signals_in_frame(self, name: str, namespace_name: str) -> Sequence[common_pb2.SignalId]:

Get all signals residing in the frame.

Parameters
  • name: Name of frame
  • namespace_name: Namespace for frame
def signal_with_payload( self, name: str, namespace_name: str, value_pair, allow_malformed: bool = False) -> network_api_pb2.Signal:

Create value with signal for writing.

Parameters
  • name: Name of frame
  • namespace_name: Namespace for frame
Returns

Signal with value

class BrokerException(builtins.Exception):

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
def create_channel( url: str, x_api_key: Optional[str] = None, authorization_token: Optional[str] = None) -> grpc.Channel:

Create communication channels for gRPC calls.

Parameters
  • url: URL to broker
  • x_api_key: API key used with RemotiveBroker running in cloud (deprecated).
  • authorization_token: Access token replacing api-keys moving forward.
Returns

gRPC channel

def publish_signals(client_id, stub, signals_with_payload, frequency: int = 0) -> None:

Publish array of values for signals

Parameters
  • ClientId client_id: Named source for publisher
  • NetworkServiceStub stub: Channel stub to send request
  • Signal signals_with_payload: Array of signals with data
def printer(signals: Sequence[common_pb2.SignalId]) -> None:

Debug printing of received array of signal with values.

Parameters
  • signals: Array of signals with values
def get_sha256(path: str) -> str:

Calculate SHA256 for a file.

Parameters
  • path: Path to file
def generate_data( file, dest_path, chunk_size, sha256) -> Generator[system_api_pb2.FileUploadRequest, NoneType, NoneType]:
def upload_file( system_stub: remotivelabs.broker.generated.sync.system_api_pb2_grpc.SystemServiceStub, path: str, dest_path: str) -> None:

Upload single file to internal storage on broker.

Parameters
  • system_stub: System gRPC channel stub
  • path: Path to file in local file system
  • dest_path: Path to file in broker remote storage
def download_file( system_stub: remotivelabs.broker.generated.sync.system_api_pb2_grpc.SystemServiceStub, path: str, dest_path: str) -> None:

Download file from Broker remote storage.

Parameters
  • system_stub: System gRPC channel stub
  • path: Path to file in broker remote storage
  • dest_path: Path to file in local file system
def upload_folder( system_stub: remotivelabs.broker.generated.sync.system_api_pb2_grpc.SystemServiceStub, folder: str) -> None:

Upload directory and its content to Broker remote storage.

Parameters
  • system_stub: System gRPC channel stub
  • folder: Path to directory in local file storage
def reload_configuration( system_stub: remotivelabs.broker.generated.sync.system_api_pb2_grpc.SystemServiceStub) -> None:

Trigger reload of configuration on Broker.

Parameters
  • system_stub: System gRPC channel stub
def check_license( system_stub: remotivelabs.broker.generated.sync.system_api_pb2_grpc.SystemServiceStub) -> None:

Check license to Broker. Throws exception if failure.

Parameters
  • system_stub: System gRPC channel stub
def act_on_signal( client_id: common_pb2.ClientId, network_stub: remotivelabs.broker.generated.sync.network_api_pb2_grpc.NetworkServiceStub, sub_signals: Sequence[common_pb2.SignalId], on_change: bool, fun: Callable[[Sequence[network_api_pb2.Signal]], NoneType], on_subscribed: Optional[Callable[..., NoneType]] = None) -> None:

Bind callback to be triggered when receiving any of the specified signals.

Parameters
  • ClientId client_id: Named source for listener
  • network_stub: Network gRPC channel stub
  • sub_signals: List of signals to listen for
  • on_change: Callback to be triggered
  • fun: Callback for receiving signals update
  • on_subscribed: Callback for successful subscription
def act_on_scripted_signal( client_id: common_pb2.ClientId, network_stub: remotivelabs.broker.generated.sync.network_api_pb2_grpc.NetworkServiceStub, script: bytes, on_change: bool, fun: Callable[[Sequence[network_api_pb2.Signal]], NoneType], on_subscribed: Optional[Callable[..., NoneType]] = None) -> None:

Bind callback to be triggered when receiving any of the specified signals.

Parameters
  • ClientId client_id: Named source for listener
  • network_stub: Network gRPC channel stub
  • script: Custom Lua mapping code
  • on_change: Callback to be triggered
  • fun: Callback for receiving signals update
  • on_subscribed: Callback for successful subscription