Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ venv
.pytest_cache
__pycache__/
.vscode
examples
examples
/dist
52 changes: 52 additions & 0 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
name: Publish to PyPI

on:
push:
tags:
- 'v*'

jobs:
build-and-publish:
runs-on: ubuntu-latest
permissions:
contents: read

steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.x'

- name: Install build dependencies
run: |
python -m pip install --upgrade pip
pip install build twine

- name: Extract version from tag
run: |
# Strip 'refs/tags/v' from the ref
VERSION=${GITHUB_REF#refs/tags/v}
echo "VERSION=$VERSION" >> $GITHUB_ENV
echo "Releasing version: $VERSION"

- name: Update version in pyproject.toml
run: |
# Update version = "x.x.x" in pyproject.toml
# We use a temp file to ensure compatibility with different sed versions if run locally,
# effectively overwriting the original.
sed -i "s/^version = \".*\"/version = \"$VERSION\"/" pyproject.toml

# Verify replacement
grep "version =" pyproject.toml

- name: Build package
run: python -m build

- name: Publish to PyPI
env:
TWINE_USERNAME: __token__
TWINE_PASSWORD: ${{ secrets.PYPI_API_TOKEN }}
run: twine upload dist/*
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ __pycache__/
.pytest_cache/
cbor_rpc.egg-info/
.coverage
/dist
74 changes: 74 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# CBOR-RPC Project Documentation for LLMs

## Architecture Overview
The stack consists of: Transport (Pipe) -> Serialization (Transformer) -> Protocol (RPC).

**IMPORTANT**: All public components MUST be imported directly from `cbor_rpc`. DO NOT import from submodules (e.g., use `from cbor_rpc import TcpPipe`, NOT `from cbor_rpc.tcp.tcp import TcpPipe`).

## 1. Pipes (Transport)
All pipes implement `Pipe` (async base) or `EventPipe` (event-based).
Common methods: `write(chunk)`, `read(timeout)`, `terminate()`.

- **TcpPipe**: `await TcpPipe.create_connection(host, port)` (Client) or `await TcpPipe.create_server(host, port)` (Server).
- **StdioPipe**: `await StdioPipe.open()` (Use stdin/stdout to communicate) or `await StdioPipe.start_process(*args)` (Subprocess).
- **SshPipe**: Works over `asyncssh` channels.
- **EventPipe**: High-level wrapper emitting "data", "close", "error" events.

## 2. Transformers (Serialization)
Used to convert raw bytes from pipes into Python objects.
- **Implementations**: `CborStreamTransformer` (default), `JsonStreamTransformer`.
- **Usage**: `transformed_pipe = transformer.apply_transformer(raw_pipe)`.
- **Note**: Always use `*StreamTransformer` for TCP/Stdio to handle packet fragmentation.

## 3. RPC Layer
Provides high-level method calling over pipes.

- **`call_method(name, *args)`**: Async call. Wait for return value. Throws on timeout or remote error.
- **`fire_method(name, *args)`**: Async "fire and forget". No return value, no waiting.
- **`wait_next_event(topic)`**: Wait for a specific pulse/event from the other side.
- **`set_timeout(ms)`**: Set default timeout for all calls (default 30000ms).

### Client Construction
- **`RpcV1.read_only_client(pipe)`**: Use when you only need to call methods on the remote side.
```python
from cbor_rpc import TcpPipe, CborStreamTransformer, RpcV1

pipe = await TcpPipe.create_connection("localhost", 8080)
t_pipe = CborStreamTransformer().apply_transformer(pipe)
rpc = RpcV1.read_only_client(t_pipe)
result = await rpc.call_method("method_name", arg1, arg2)
```

### Server Usage
To run a server, subclass `RpcV1Server` for logic and `TcpServer` (or other) for transport. Override `accept` to apply the transformer and register the connection.

```python
from cbor_rpc import RpcV1Server, TcpServer, CborStreamTransformer

# 1. Implementation
class MyRpcApp(RpcV1Server):
async def handle_method_call(self, conn_id, context, method, args):
if method == "ping": return "pong"

# 2. Setup transport
class MyServer(TcpServer):
def set_app(self, app): self.app = app

async def accept(self, pipe):
# Apply transformer to the raw connection pipe
rpc_pipe = CborStreamTransformer().apply_transformer(pipe)
# Register with the RPC app
await self.app.add_connection(f"{pipe.get_peer_info()}", rpc_pipe)
return True

# 3. Execution
app = MyRpcApp()
server = await MyServer.create(port=8080)
server.set_app(app)
```

## Tips
- **Timeouts**: Default 30s. Set via `rpc.set_timeout(ms)`.
- **Context**: `RpcCallContext` in handlers provides metadata/logging.
- **Logging**: Controlled via `cbor_rpc.RpcLogger`.
m
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There seems to be a stray 'm' character at the end of the file. Please remove it.

242 changes: 239 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,248 @@ cbor-rpc
========
[![codecov](https://codecov.io/github/mesudip/cbor-rpc-py/graph/badge.svg)](https://codecov.io/github/mesudip/cbor-rpc-py)

A lightweight, event-based RPC framework for Python using CBOR (optionally JSON) over various transport layers.

## Table of Contents
- [RPC System](#rpc-system)
- [Capabilities](#capabilities)
- [Creating a Server](#creating-a-server)
- [Creating a Client](#creating-a-client)
- [Pipes and Event Pipes](#pipes-and-event-pipes)
- [Transformers](#transformers)
- [High-level Pipes](#high-level-pipes)

---

## RPC System

The RPC system is built on top of `EventPipe` and `Transformers`.

### Capabilities
- **Bidirectional**: Both sides call rpc methods, both side can emit events.
- **Logs & Progress**: Real-time streaming of log messages and progress updates during a rpc call.
- **Events**: Broadcast and listen to topics.
- **Async/Await**: Native support for Python's `asyncio`.
- **Method Cancellation**: Long-running calls can be cancelled by the caller.

### Creating a Server
Extend `RpcV1Server` and implement `handle_method_call`.

```python
import asyncio
from cbor_rpc import RpcV1Server, TcpServer, CborStreamTransformer,RpcCallContext

class MyService(RpcV1Server):
def __init__(self, tcp_server: TcpServer):
super().__init__()
# Configure server to handle new connections
tcp_server.on("connection", self.on_connection)

async def on_connection(self, tcp_pipe):
print(f"New connection from {tcp_pipe.get_peer_info()}")
rpc_pipe = CborStreamTransformer().apply_transformer(tcp_pipe)

# Add connection to RPC system
conn_id = str(tcp_pipe.get_peer_info())
await self.add_connection(conn_id, rpc_pipe)

async def handle_method_call(self, connection_id, context: RpcCallContext, method, args):
if method == "add":
return args[0] + args[1]
raise Exception("Unknown method")

async def run_server():
server = await TcpServer.create("0.0.0.0", 9000)
service = MyService(server)
print("Server running on 9000...")
await asyncio.Future() # block forever

# asyncio.run(run_server())
```

### Creating a Client
Use the `RpcV1` class to wrap an object-oriented pipe.

```python
import asyncio
from typing import Any, List
from cbor_rpc import RpcV1, RpcCallContext, TcpPipe, CborStreamTransformer

# 1. Define Client with Methods (Bidirectional)
class MyClient(RpcV1):
def get_id(self) -> str:
return "client-node"

async def handle_method_call(self, context: RpcCallContext, method: str, args: List[Any]) -> Any:
# Handle calls FROM the server
if method == "ping":
return "pong"
raise Exception(f"Unknown method {method}")

async def on_event(self, topic: str, message: Any) -> None:
print(f"Event: {topic} -> {message}")

async def run_client():
# 2. Connect via TCP
tcp_pipe = await TcpPipe.create_connection("localhost", 9000)

# 3. Apply CBOR Transformer
cbor_pipe = CborStreamTransformer().apply_transformer(tcp_pipe)

# 4. Instantiate Custom Client
client = MyClient(cbor_pipe)

# 5. Make calls (Client -> Server)
result = await client.call_method("add", 5, 10)
print(f"5 + 10 = {result}")

# Call with logs and progress
handle = client.create_call("long_task")
handle.on_log(lambda level, msg: print(f"LOG: {msg}"))
handle.on_progress(lambda val, meta: print(f"Progress: {val}%"))

# await handle.call()

# asyncio.run(run_client())
```

---

## Pipes and Event Pipes

The core of `cbor-rpc` is the **Pipe** abstraction. Unlike traditional unidirectional pipes, a **Pipe** in this framework represents a **duplex connection**. It allows you to both:
- **Write** messages to the remote side.
- **Read** replies or incoming messages from the remote side.

This abstraction provides a consistent interface for bidirectional communication across different transport layers (TCP streams, SSH channels, Stdio, etc.).

### Basic Usage (`EventPipe`)
Most "real-world" pipes (TCP, SSH, Stdio) are `EventPipe`s. They are event-driven, meaning you register listeners for incoming data instead of polling.

#### Consuming Data
There are two ways to listen for data:
1. **`pipeline("data", handler)`**: Used for serial processing. Handlers are awaited in order. If a pipeline handler throws an error, it stops the chain and emits an `"error"`.
2. **`on("data", handler)`**: Simple pub/sub. The handler is called whenever data arrives. If it's a coroutine, it's run in the background.

```python
# Simple listener
pipe.on("data", lambda chunk: print(f"Received {len(chunk)} bytes"))

# Serial processing (e.g., for transformers)
async def process_data(chunk):
# This is awaited before the next chunk is processed
await do_something(chunk)

pipe.pipeline("data", process_data)
```

#### Sending Data
Use the `write()` method to send data through the pipe.

```python
await pipe.write(b"Request data")
```

### Converting a `Pipe` to an `EventPipe`
If you have a raw `Pipe` (which uses `read()`/`write()`), you can convert it to an `EventPipe` using `make_event_based()`:

```python
event_pipe = raw_pipe.make_event_based()
event_pipe.on("data", handle_incoming)
```

## Transformers

Transformers allow you to convert raw data (typically `bytes`) into high-level Python objects and vice-versa.

### Available Transformers
The library comes with two built-in transformer types:
- **`JsonTransformer`** / **`JsonStreamTransformer`**: Encodes/decodes JSON data.
- **`CborTransformer`** / **`CborStreamTransformer`**: Encodes/decodes CBOR data (ideal for binary efficiency).

### Stream Support
It is important to note that **not all transformers support streams**.
- A standard **`Transformer`** (like `JsonTransformer`) expects a complete message in each chunk. It maps 1 input -> 1 output.
- A **Stream Transformer** (like `JsonStreamTransformer`) is designed to handle fragmented data (e.g., from TCP). It buffers incoming bytes until a complete message can be decoded.

**When using TCP or SSH pipes, you almost always want to use a `...StreamTransformer`.**

### Using Transformers
You can wrap a byte-based pipe with a transformer to create an object-based pipe.

```python
from cbor_rpc.transformer.json_transformer import JsonStreamTransformer

# Wrap a raw pipe
object_pipe = JsonStreamTransformer().apply_transformer(raw_pipe)

# Now 'data' events emit Python objects
object_pipe.on("data", lambda obj: print(f"Received object: {obj}"))
await object_pipe.write({"method": "hello", "params": []})
```

### Making a Custom Transformer
To create your own transformer, subclass `Transformer` (for single packets) or `AsyncTransformer` (for streams):

```python
from cbor_rpc.transformer.base import Transformer

class MyUpperTransformer(Transformer[str, str]):
def encode(self, data: str) -> str:
return data.upper()

def decode(self, data: str) -> str:
return data.lower()
```
---

## High-level Pipes

`cbor-rpc` provides several ready-to-use pipe implementations for different transport layers.

### TCP Pipe (`TcpPipe`)
Used for network communication over TCP.

```python
from cbor_rpc.tcp import TcpPipe

# Client
pipe = await TcpPipe.create_connection("localhost", 8000)

# Server
from cbor_rpc.tcp import TcpServer
class MyServer(TcpServer):
async def accept(self, pipe: TcpPipe) -> bool:
print("New connection!")
return True

server = await MyServer.create("0.0.0.0", 8000)
```

### SSH Pipe (`SshPipe`)
Tunneling through SSH using `asyncssh`.

```python
from cbor_rpc.ssh import SshPipe
# Used typically to run a command on a remote host and communicate with it
```

### Stdio Pipe (`StdioPipe`)
Communicate with subprocesses via stdin/stdout.

```python
from cbor_rpc.stdio import StdioPipe

# Start a subprocess
pipe = await StdioPipe.start_process("python3", "worker.py")
```

---

Development
-----------
Enable local git hooks to auto-format on commit:

```
pre-commit install
```

This runs `black` on staged Python files and fails the commit if `black` is not installed.
```
Loading
Loading