Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
venv
.venv
.coverage
.pytest_cache
__pycache__/
.vscode
examples
8 changes: 8 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
repos:
- repo: local
hooks:
- id: black
name: black
entry: black
language: system
types: [python]
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
cbor-rpc
========
[![codecov](https://codecov.io/github/mesudip/cbor-rpc-py/graph/badge.svg)](https://codecov.io/github/mesudip/cbor-rpc-py)
[![codecov](https://codecov.io/github/mesudip/cbor-rpc-py/graph/badge.svg)](https://codecov.io/github/mesudip/cbor-rpc-py)

Development
-----------
Enable local git hooks to auto-format on commit:

```
pre-commit install
```

This runs `black` on staged Python files and fails the commit if `black` is not installed.
2 changes: 1 addition & 1 deletion cbor_rpc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
"RpcV1Server",
# Rpc high level
"RpcCallContext",
"RpcLogger", # TCP classes
"RpcLogger", # TCP classes
"TcpPipe",
"TcpServer",
# Transformers
Expand Down
4 changes: 1 addition & 3 deletions cbor_rpc/event/emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ def _emit(self, event_type: str, *args: Any) -> None:
warnings.warn(f"Synchronous error in handler: {e}", RuntimeWarning)

async def _notify(self, event_type: str, *args: Any) -> None:
"""

"""
""" """
for pipeline in self._pipelines.get(event_type, []):
try:
if inspect.iscoroutinefunction(pipeline):
Expand Down
13 changes: 13 additions & 0 deletions cbor_rpc/pipe/aio_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ async def _setup_connection(self) -> None:
Raises:
RuntimeError: If reader or writer is not initialized.
"""
if self._connected and not self._closed:
return
if not self._reader or not self._writer:
raise RuntimeError("Reader or writer not initialized")

Expand Down Expand Up @@ -175,6 +177,17 @@ async def write(self, chunk: T1) -> bool:
self._emit("error", e) # Synchronoous _emit
return False

async def write_eof(self) -> None:
"""
Write EOF to the writer if supported.
"""
if self._writer and self._writer.can_write_eof():
self._writer.write_eof()
try:
await self._writer.drain()
except Exception as e:
print(f"AioPipe: Failed to drain writer on EOF: {e}")

async def terminate(self, *args: Any) -> None:
"""
Terminate the connection.
Expand Down
1 change: 1 addition & 0 deletions cbor_rpc/pipe/event_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def connect_to(self, other: "ConnectedPipe"):
async def write(self, chunk: Any) -> bool:
if self._closed or not self.connected_pipe or self.connected_pipe._closed:
return False

async def _deliver() -> None:
try:
await self.connected_pipe._notify("data", chunk)
Expand Down
25 changes: 14 additions & 11 deletions cbor_rpc/rpc/rpc_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

class RpcCore(RpcClient):
protocol_id = 1

def __init__(self, pipe: EventPipe[Any, Any]):
self.pipe = pipe
self._counter = 0
Expand All @@ -40,11 +41,11 @@ async def on_data(data: List[Any]) -> None:

protocol_id = data[0]
if protocol_id == 1:
await self.handle_proto_1(data)
await self._handle_proto_1(data)
elif protocol_id == 2:
await self.handle_proto_2(data)
await self._handle_proto_2(data)
elif protocol_id == 3:
await self.handle_proto_3(data)
await self._handle_proto_3(data)
else:
logger.warning(f"RpcCore: Unsupported protocol: {data}")

Expand All @@ -53,7 +54,7 @@ async def on_data(data: List[Any]) -> None:

self.pipe.on("data", on_data)

async def handle_proto_1(self, data: List[Any]) -> None:
async def _handle_proto_1(self, data: List[Any]) -> None:
"""Handle Protocol 1 (RPC) messages."""
if len(data) < 3:
logger.warning(f"RpcCore [Proto 1]: Invalid format: {data}")
Expand All @@ -62,7 +63,7 @@ async def handle_proto_1(self, data: List[Any]) -> None:
sub_proto_id = data[1]

# Responses: [1, 0, id, result] (Success) or [1, 1, id, error] (Error)
if sub_proto_id <2:
if sub_proto_id < 2:
if len(data) < 4:
logger.warning(f"RpcCore [Proto 1]: Invalid response format: {data}")
return
Expand All @@ -77,7 +78,9 @@ async def handle_proto_1(self, data: List[Any]) -> None:
else: # Error
await promise.reject(payload)
else:
logger.warning(f"Received rpc reply for expired request id: {id_}, success={sub_proto_id==0}, data={payload}")
logger.warning(
f"Received rpc reply for expired request id: {id_}, success={sub_proto_id==0}, data={payload}"
)

# Method Call (2) or Fire (3): [1, 2/3, id, method, params]
elif sub_proto_id == 2 or sub_proto_id == 3:
Expand All @@ -95,6 +98,7 @@ async def handle_proto_1(self, data: List[Any]) -> None:
result = self.handle_method_call(context, method, params)

if sub_proto_id == 2: # Expect Response

async def handle_response() -> None:
try:
resolved_result = await self._resolve_result(result)
Expand All @@ -112,6 +116,7 @@ async def handle_fire() -> None:
await self._resolve_result(result)
except Exception as e:
logger.error(f"Fired method error: {method}, params={params}, error={e}")

asyncio.create_task(handle_fire())

except Exception as e:
Expand All @@ -122,8 +127,7 @@ async def handle_fire() -> None:
else:
logger.warning(f"RpcCore [Proto 1]: Unknown sub-protocol: {sub_proto_id}")


async def handle_proto_2(self, data: List[Any]) -> None:
async def _handle_proto_2(self, data: List[Any]) -> None:
"""Handle Protocol 2 (Logging) messages."""
# Format: [2, log_level, ref_proto, ref_id, content]
if len(data) >= 3 and data[1] == 0:
Expand All @@ -145,10 +149,9 @@ async def handle_proto_2(self, data: List[Any]) -> None:

logger.info(f"[RemoteLog:{level_str}] p{ref_proto}:{ref_id} {content}")

async def handle_proto_3(self, data: List[Any]) -> None:
async def _handle_proto_3(self, data: List[Any]) -> None:
logger.warning(f"RpcCore [Proto 3]: Unsupported event message: {data}")


async def call_method(self, method: str, *args: Any) -> Any:
counter = self._counter
self._counter += 1
Expand Down Expand Up @@ -229,7 +232,7 @@ def __init__(self, pipe: EventPipe[Any, Any]):
self._last_event_topic: Optional[str] = None
self.event_logger = RpcLogger(self._send_log, 3, self._get_last_event_topic)

async def handle_proto_3(self, data: List[Any]) -> None:
async def _handle_proto_3(self, data: List[Any]) -> None:
if len(data) < 4:
logger.warning(f"RpcV1 [Proto 3]: Invalid event format: {data}")
return
Expand Down
2 changes: 1 addition & 1 deletion cbor_rpc/ssh/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .ssh_pipe import SshPipe
from .ssh_pipe import SshPipe, SshServer
Loading
Loading