from typing import Union
Library to enable easy communication of basic commands or binary payloads. Intended for embedded systems.
Written in python
It is supposed to be very light and easy to understand. It is not intended for high performance applications.
- (in development) A Half-Duplex wired connection using the RPi.GPIO library.
- (in development) A RF24 connection using the RF24 library.
- (planned) A websocket based connection
- (soon) Your own connection type. Find out more below.
Here is how an example setup would look like in python. RemoteController uses asyncio.
# Initialize a connection
import asyncio
from RemoteController.connections import WiredConnection
def handle_command(command: int, throttle: float):
print(f"Recieved command: {command} with throttle: {throttle}")
def handle_binary_data(data: bytes):
print(f"Recieved binary data: {data}")
async def main():
connection = WiredConnection(
0.001, # Time to wait between checking for new data
handle_command, # Function to call when a command is recieved
handle_binary_data, # Function to call when binary data is recieved
12, # Pin that I will send data on
16, # Pin that I will recieve data on
b'\x01\x02\x03\x04' # Acknowledgement payload (example)
)
# Do something else, the connection will keep itself alive
for _ in range(10):
connection.send_command(1, 0.5)
# Send a command
await asyncio.sleep(1)
# When finished
connection.stop()
main()
# The connection will automatically start listening for data- Create a connection object and define what should happen on recieving connection data.
- Send commands / data when needed
- Close the connection when finished
Is easy and requires minimal setup from your side.
from RemoteController.base_connection import Connection, PacketSizeTooBigException, ConnectionDataFetchException
from typing import Union
class CustomConnection(Connection):
def _send_data(self, data: bytes) -> bool:
# Implement your own way of sending data here
# Raise PacketSizeTooBigException if the packet is too big or split the packet into smaller packets
pass
def _check(self) -> Union[bytes, None]:
# Implement your own way of checking for new data here
# Return the data as a bytes object or None if no data is available
# Raise ConnectionDataFetchException if an error occured
passCurrently Protocol version 1.0 is implemented.