From 3db087556ed14b0ef9f0e758e384bce48bb6681c Mon Sep 17 00:00:00 2001 From: Dennis Moschina <45356478+DennisMoschina@users.noreply.github.com> Date: Thu, 26 Feb 2026 14:51:39 +0100 Subject: [PATCH 1/5] Add IPC Control API and related error handling for WebSocket communication - Introduced `OpenEarableIPCClient` for managing WebSocket connections. - Added `StreamSubscription` for handling live data streams. - Implemented error classes for IPC communication: `IPCError`, `IPCRemoteError`, `IPCProtocolError`, `IPCClosedError`, and `IPCStreamError`. - Created `StreamEvent` model for data events emitted by subscriptions. - Updated README with usage examples for the IPC Control API. - Added `websockets` dependency to `pyproject.toml`. --- pyproject.toml | 1 + src/open_wearables/__init__.py | 18 ++ src/open_wearables/ipc/__init__.py | 20 ++ src/open_wearables/ipc/client.py | 425 +++++++++++++++++++++++++++++ src/open_wearables/ipc/errors.py | 39 +++ src/open_wearables/ipc/models.py | 15 + 6 files changed, 518 insertions(+) create mode 100644 src/open_wearables/ipc/__init__.py create mode 100644 src/open_wearables/ipc/client.py create mode 100644 src/open_wearables/ipc/errors.py create mode 100644 src/open_wearables/ipc/models.py diff --git a/pyproject.toml b/pyproject.toml index b270ff5..07bd4aa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,6 +28,7 @@ dependencies = [ "pandas", "ipython", "scipy", + "websockets", ] [project.urls] diff --git a/src/open_wearables/__init__.py b/src/open_wearables/__init__.py index 0bad4d9..d29b260 100644 --- a/src/open_wearables/__init__.py +++ b/src/open_wearables/__init__.py @@ -2,8 +2,26 @@ SensorDataset, load_recordings, ) +from .ipc import ( + IPCClosedError, + IPCError, + IPCProtocolError, + IPCRemoteError, + IPCStreamError, + OpenEarableIPCClient, + StreamEvent, + StreamSubscription, +) __all__ = [ + "IPCClosedError", + "IPCError", + "IPCProtocolError", + "IPCRemoteError", + "IPCStreamError", + "OpenEarableIPCClient", "SensorDataset", + "StreamEvent", + "StreamSubscription", "load_recordings", ] diff --git a/src/open_wearables/ipc/__init__.py b/src/open_wearables/ipc/__init__.py new file mode 100644 index 0000000..ec1b983 --- /dev/null +++ b/src/open_wearables/ipc/__init__.py @@ -0,0 +1,20 @@ +from .client import OpenEarableIPCClient, StreamSubscription +from .errors import ( + IPCClosedError, + IPCError, + IPCProtocolError, + IPCRemoteError, + IPCStreamError, +) +from .models import StreamEvent + +__all__ = [ + "IPCClosedError", + "IPCError", + "IPCProtocolError", + "IPCRemoteError", + "IPCStreamError", + "OpenEarableIPCClient", + "StreamEvent", + "StreamSubscription", +] diff --git a/src/open_wearables/ipc/client.py b/src/open_wearables/ipc/client.py new file mode 100644 index 0000000..2fd8473 --- /dev/null +++ b/src/open_wearables/ipc/client.py @@ -0,0 +1,425 @@ +from __future__ import annotations + +import asyncio +import inspect +import itertools +import json +from collections import defaultdict +from typing import Any, Awaitable, Callable, Dict, Optional, Set + +from websockets import ConnectionClosed +from websockets.client import WebSocketClientProtocol, connect + +from .errors import ( + IPCClosedError, + IPCProtocolError, + IPCRemoteError, + IPCStreamError, +) +from .models import StreamEvent + +EventCallback = Callable[[dict[str, Any]], Optional[Awaitable[None]]] + + +class _StreamEnd: + pass + + +_STREAM_END = _StreamEnd() + + +class StreamSubscription: + """Represents a live subscription and supports async iteration.""" + + def __init__( + self, + client: "OpenEarableIPCClient", + subscription_id: int, + stream: str, + device_id: str, + queue: "asyncio.Queue[Any]", + ) -> None: + self._client = client + self.subscription_id = subscription_id + self.stream = stream + self.device_id = device_id + self._queue = queue + self._closed = False + + def __aiter__(self) -> "StreamSubscription": + return self + + async def __anext__(self) -> StreamEvent: + item = await self._queue.get() + if item is _STREAM_END: + self._closed = True + raise StopAsyncIteration + if isinstance(item, Exception): + self._closed = True + raise item + return item + + async def close(self) -> dict[str, Any]: + """Unsubscribe and close the local iterator.""" + if self._closed: + return {"subscription_id": self.subscription_id, "cancelled": True} + + self._closed = True + return await self._client.unsubscribe(self.subscription_id) + + +class OpenEarableIPCClient: + """Async client for OpenEarable WebSocket IPC daemon.""" + + def __init__( + self, + uri: str = "ws://127.0.0.1:8765/ws", + request_timeout: float = 10.0, + subscription_queue_size: int = 0, + ) -> None: + self.uri = uri + self.request_timeout = request_timeout + self.subscription_queue_size = subscription_queue_size + + self._ws: Optional[WebSocketClientProtocol] = None + self._receiver_task: Optional[asyncio.Task[None]] = None + self._next_request_id = itertools.count(1) + self._pending: Dict[int, "asyncio.Future[Any]"] = {} + self._callbacks: Dict[str, Set[EventCallback]] = defaultdict(set) + self._subscriptions: Dict[int, "asyncio.Queue[Any]"] = {} + self._waiters: Set["asyncio.Future[dict[str, Any]]"] = set() + + async def __aenter__(self) -> "OpenEarableIPCClient": + await self.connect() + return self + + async def __aexit__(self, exc_type, exc, tb) -> None: + await self.close() + + @property + def is_connected(self) -> bool: + return self._ws is not None + + async def connect(self, wait_for_ready: bool = True) -> Optional[dict[str, Any]]: + """Open WebSocket connection and optionally wait for `ready` event.""" + if self._ws is not None: + return None + + self._ws = await connect(self.uri) + self._receiver_task = asyncio.create_task(self._receiver_loop()) + + if wait_for_ready: + return await self.wait_for_event("ready", timeout=self.request_timeout) + return None + + async def close(self) -> None: + """Close connection and tear down pending requests/subscriptions.""" + ws = self._ws + self._ws = None + + if ws is not None: + await ws.close() + + if self._receiver_task is not None: + try: + await self._receiver_task + finally: + self._receiver_task = None + + for future in list(self._pending.values()): + if not future.done(): + future.set_exception(IPCClosedError("IPC client closed")) + self._pending.clear() + + for queue in self._subscriptions.values(): + await queue.put(_STREAM_END) + self._subscriptions.clear() + + for waiter in list(self._waiters): + if not waiter.done(): + waiter.set_exception(IPCClosedError("IPC client closed")) + self._waiters.clear() + + def on_event(self, event: str, callback: EventCallback) -> None: + """Register callback for a server event. + + Use `event='*'` to receive all events. + """ + self._callbacks[event].add(callback) + + def remove_event_listener(self, event: str, callback: EventCallback) -> None: + """Remove a callback for an event name.""" + self._callbacks[event].discard(callback) + + async def wait_for_event( + self, + event: str, + timeout: Optional[float] = None, + ) -> dict[str, Any]: + """Wait for the next event with the given name.""" + loop = asyncio.get_running_loop() + waiter: "asyncio.Future[dict[str, Any]]" = loop.create_future() + waiter._open_earable_event_name = event # type: ignore[attr-defined] + self._waiters.add(waiter) + + try: + if timeout is None: + return await waiter + return await asyncio.wait_for(waiter, timeout) + finally: + self._waiters.discard(waiter) + + async def call(self, method: str, params: Optional[dict[str, Any]] = None) -> Any: + """Invoke a method on the IPC server and return its `result`.""" + if self._ws is None: + raise IPCClosedError("Call attempted before connect()") + + request_id = next(self._next_request_id) + payload = { + "id": request_id, + "method": method, + "params": params or {}, + } + + loop = asyncio.get_running_loop() + future: "asyncio.Future[Any]" = loop.create_future() + self._pending[request_id] = future + + await self._ws.send(json.dumps(payload)) + + try: + return await asyncio.wait_for(future, timeout=self.request_timeout) + finally: + self._pending.pop(request_id, None) + + async def ping(self) -> dict[str, Any]: + return await self.call("ping") + + async def methods(self) -> list[str]: + return await self.call("methods") + + async def has_permissions(self) -> bool: + return await self.call("has_permissions") + + async def check_and_request_permissions(self) -> bool: + return await self.call("check_and_request_permissions") + + async def start_scan(self, check_and_request_permissions: bool = True) -> dict[str, Any]: + return await self.call( + "start_scan", + {"check_and_request_permissions": check_and_request_permissions}, + ) + + async def get_discovered_devices(self) -> list[dict[str, Any]]: + return await self.call("get_discovered_devices") + + async def connect_device( + self, + device_id: str, + connected_via_system: bool = False, + ) -> dict[str, Any]: + return await self.call( + "connect", + { + "device_id": device_id, + "connected_via_system": connected_via_system, + }, + ) + + async def connect_system_devices( + self, + ignored_device_ids: Optional[list[str]] = None, + ) -> list[dict[str, Any]]: + params: dict[str, Any] = {} + if ignored_device_ids is not None: + params["ignored_device_ids"] = ignored_device_ids + return await self.call("connect_system_devices", params) + + async def list_connected(self) -> list[dict[str, Any]]: + return await self.call("list_connected") + + async def disconnect(self, device_id: str) -> dict[str, Any]: + return await self.call("disconnect", {"device_id": device_id}) + + async def set_auto_connect(self, device_ids: list[str]) -> dict[str, Any]: + return await self.call("set_auto_connect", {"device_ids": device_ids}) + + async def get_wearable(self, device_id: str) -> dict[str, Any]: + return await self.call("get_wearable", {"device_id": device_id}) + + async def get_actions(self, device_id: str) -> list[str]: + return await self.call("get_actions", {"device_id": device_id}) + + async def invoke_action( + self, + device_id: str, + action: str, + args: Optional[dict[str, Any]] = None, + ) -> Any: + params: dict[str, Any] = { + "device_id": device_id, + "action": action, + } + if args is not None: + params["args"] = args + return await self.call("invoke_action", params) + + async def subscribe( + self, + device_id: str, + stream: str, + args: Optional[dict[str, Any]] = None, + ) -> StreamSubscription: + params: dict[str, Any] = { + "device_id": device_id, + "stream": stream, + } + if args is not None: + params["args"] = args + + result = await self.call("subscribe", params) + subscription_id = int(result["subscription_id"]) + queue: "asyncio.Queue[Any]" = asyncio.Queue(maxsize=self.subscription_queue_size) + self._subscriptions[subscription_id] = queue + + return StreamSubscription( + client=self, + subscription_id=subscription_id, + stream=str(result.get("stream", stream)), + device_id=str(result.get("device_id", device_id)), + queue=queue, + ) + + async def unsubscribe(self, subscription_id: int) -> dict[str, Any]: + result = await self.call("unsubscribe", {"subscription_id": subscription_id}) + queue = self._subscriptions.pop(subscription_id, None) + if queue is not None: + await queue.put(_STREAM_END) + return result + + async def _receiver_loop(self) -> None: + ws = self._ws + if ws is None: + return + + try: + async for raw in ws: + message = self._decode_message(raw) + if "id" in message and ("result" in message or "error" in message): + self._handle_response(message) + elif "event" in message: + await self._handle_event(message) + else: + raise IPCProtocolError(f"Invalid IPC message: {message}") + except ConnectionClosed: + pass + finally: + for future in list(self._pending.values()): + if not future.done(): + future.set_exception(IPCClosedError("WebSocket connection closed")) + + for queue in self._subscriptions.values(): + await queue.put(_STREAM_END) + self._subscriptions.clear() + + for waiter in list(self._waiters): + if not waiter.done(): + waiter.set_exception(IPCClosedError("WebSocket connection closed")) + self._waiters.clear() + + def _decode_message(self, raw: Any) -> dict[str, Any]: + if isinstance(raw, bytes): + raw = raw.decode("utf-8") + + if not isinstance(raw, str): + raise IPCProtocolError(f"Unexpected websocket payload type: {type(raw)!r}") + + message = json.loads(raw) + if not isinstance(message, dict): + raise IPCProtocolError(f"Expected JSON object message, got: {type(message)!r}") + return message + + def _handle_response(self, message: dict[str, Any]) -> None: + request_id = message.get("id") + if not isinstance(request_id, int): + raise IPCProtocolError(f"Response has non-int id: {message}") + + future = self._pending.get(request_id) + if future is None: + return + + if "error" in message and message["error"] is not None: + error_payload = message["error"] + if not isinstance(error_payload, dict): + future.set_exception(IPCProtocolError(f"Invalid error payload: {error_payload!r}")) + return + + future.set_exception( + IPCRemoteError( + message=str(error_payload.get("message", "Unknown IPC error")), + error_type=error_payload.get("type"), + stack=error_payload.get("stack"), + payload=error_payload, + ) + ) + return + + future.set_result(message.get("result")) + + async def _handle_event(self, message: dict[str, Any]) -> None: + event_name = message.get("event") + if not isinstance(event_name, str): + raise IPCProtocolError(f"Event has invalid name: {message}") + + for waiter in list(self._waiters): + target = getattr(waiter, "_open_earable_event_name", None) + if target == event_name and not waiter.done(): + waiter.set_result(message) + + for callback in list(self._callbacks.get(event_name, set())) + list( + self._callbacks.get("*", set()) + ): + try: + maybe_awaitable = callback(message) + if inspect.isawaitable(maybe_awaitable): + asyncio.create_task(maybe_awaitable) + except Exception: + # Event listener failures should not stop the receiver loop. + continue + + if event_name == "stream": + subscription_id = message.get("subscription_id") + if isinstance(subscription_id, int): + queue = self._subscriptions.get(subscription_id) + if queue is not None: + stream_event = StreamEvent( + subscription_id=subscription_id, + stream=str(message.get("stream", "")), + device_id=str(message.get("device_id", "")), + data=message.get("data"), + raw=message, + ) + await queue.put(stream_event) + return + + if event_name == "stream_error": + subscription_id = message.get("subscription_id") + if isinstance(subscription_id, int): + queue = self._subscriptions.pop(subscription_id, None) + if queue is not None: + error_message = "Stream failed" + maybe_error = message.get("error") + if isinstance(maybe_error, dict): + error_message = str(maybe_error.get("message", error_message)) + elif isinstance(maybe_error, str): + error_message = maybe_error + await queue.put(IPCStreamError(error_message, event_payload=message)) + await queue.put(_STREAM_END) + return + + if event_name == "stream_done": + subscription_id = message.get("subscription_id") + if isinstance(subscription_id, int): + queue = self._subscriptions.pop(subscription_id, None) + if queue is not None: + await queue.put(_STREAM_END) diff --git a/src/open_wearables/ipc/errors.py b/src/open_wearables/ipc/errors.py new file mode 100644 index 0000000..f860c9a --- /dev/null +++ b/src/open_wearables/ipc/errors.py @@ -0,0 +1,39 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any, Optional + + +class IPCError(Exception): + """Base exception for IPC client failures.""" + + +@dataclass +class IPCRemoteError(IPCError): + """Error returned by the IPC server.""" + + message: str + error_type: Optional[str] = None + stack: Optional[str] = None + payload: Optional[dict[str, Any]] = None + + def __str__(self) -> str: + if self.error_type: + return f"{self.error_type}: {self.message}" + return self.message + + +class IPCProtocolError(IPCError): + """Raised when messages violate the IPC protocol.""" + + +class IPCClosedError(IPCError): + """Raised when using a client that is not connected.""" + + +class IPCStreamError(IPCError): + """Raised when a stream subscription fails.""" + + def __init__(self, message: str, event_payload: Optional[dict[str, Any]] = None): + super().__init__(message) + self.event_payload = event_payload diff --git a/src/open_wearables/ipc/models.py b/src/open_wearables/ipc/models.py new file mode 100644 index 0000000..25a7598 --- /dev/null +++ b/src/open_wearables/ipc/models.py @@ -0,0 +1,15 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + + +@dataclass(frozen=True) +class StreamEvent: + """A data event emitted by a subscription stream.""" + + subscription_id: int + stream: str + device_id: str + data: Any + raw: dict[str, Any] From fa8c3afe41371e34d7653e870d26c7d8f3d2e680 Mon Sep 17 00:00:00 2001 From: Dennis Moschina <45356478+DennisMoschina@users.noreply.github.com> Date: Thu, 26 Feb 2026 14:54:29 +0100 Subject: [PATCH 2/5] refactor: introduce layered package architecture --- README.md | 11 + docs/api-reference.md | 13 + src/open_wearables/__init__.py | 9 +- src/open_wearables/data/__init__.py | 14 + src/open_wearables/data/accessors.py | 57 ++++ src/open_wearables/data/constants.py | 50 ++++ src/open_wearables/data/sensor_dataset.py | 218 +++++++++++++++ src/open_wearables/ipc/__init__.py | 4 +- src/open_wearables/ipc/client.py | 6 +- src/open_wearables/parsing/__init__.py | 19 ++ src/open_wearables/parsing/audio.py | 48 ++++ src/open_wearables/parsing/base.py | 33 +++ src/open_wearables/parsing/payload_parsers.py | 127 +++++++++ src/open_wearables/parsing/stream.py | 255 ++++++++++++++++++ src/open_wearables/schema/__init__.py | 15 ++ src/open_wearables/schema/defaults.py | 86 ++++++ src/open_wearables/schema/types.py | 56 ++++ src/open_wearables/scheme.py | 141 ++-------- 18 files changed, 1030 insertions(+), 132 deletions(-) create mode 100644 src/open_wearables/data/__init__.py create mode 100644 src/open_wearables/data/accessors.py create mode 100644 src/open_wearables/data/constants.py create mode 100644 src/open_wearables/data/sensor_dataset.py create mode 100644 src/open_wearables/parsing/__init__.py create mode 100644 src/open_wearables/parsing/audio.py create mode 100644 src/open_wearables/parsing/base.py create mode 100644 src/open_wearables/parsing/payload_parsers.py create mode 100644 src/open_wearables/parsing/stream.py create mode 100644 src/open_wearables/schema/__init__.py create mode 100644 src/open_wearables/schema/defaults.py create mode 100644 src/open_wearables/schema/types.py diff --git a/README.md b/README.md index 6d97636..80f4fb1 100644 --- a/README.md +++ b/README.md @@ -41,6 +41,17 @@ audio_df = dataset.get_audio_dataframe() - [Data model and sensor channels](docs/data-model.md) - [API reference](docs/api-reference.md) +## Package Architecture + +The library is organized into focused layers: + +- `open_wearable.schema`: sensor schema types and default schema builders. +- `open_wearable.parsing`: stream parsing, payload parsers, and microphone helpers. +- `open_wearable.data`: high-level dataset API (`SensorDataset`) and sensor accessors. +- `open_wearable.ipc`: asynchronous WebSocket IPC client and protocol models. + +Legacy flat modules (`open_wearable.scheme`, `open_wearable.parser`, `open_wearable.dataset`) remain available as compatibility facades. + ## License MIT. See `LICENSE`. diff --git a/docs/api-reference.md b/docs/api-reference.md index b526972..3b5431e 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -6,6 +6,13 @@ from open_wearables import SensorDataset, load_recordings ``` +Primary internal package layers: + +- `open_wearable.schema` +- `open_wearable.parsing` +- `open_wearable.data` +- `open_wearable.ipc` + ## `SensorDataset` High-level API for loading and analyzing a single `.oe` recording. @@ -114,6 +121,9 @@ Core classes and helpers for decoding binary packets: - `interleaved_mic_to_stereo(samples)`: converts interleaved samples to stereo. - `mic_packet_to_stereo_frames(packet, sampling_rate)`: timestamp + stereo frame conversion. +Note: `open_wearables.parser` is a compatibility facade. New code should prefer +`open_wearables.parsing`. + ## Scheme Module (`open_wearables.scheme`) Defines sensor schema primitives: @@ -123,3 +133,6 @@ Defines sensor schema primitives: - `SensorComponentGroupScheme` - `SensorScheme` - `build_default_sensor_schemes(sensor_sid)` + +Note: `open_wearables.scheme` is a compatibility facade. New code should prefer +`open_wearables.schema`. diff --git a/src/open_wearables/__init__.py b/src/open_wearables/__init__.py index d29b260..7f28a80 100644 --- a/src/open_wearables/__init__.py +++ b/src/open_wearables/__init__.py @@ -1,14 +1,11 @@ -from .dataset import ( - SensorDataset, - load_recordings, -) +from .data import SensorDataset, load_recordings from .ipc import ( IPCClosedError, IPCError, IPCProtocolError, IPCRemoteError, IPCStreamError, - OpenEarableIPCClient, + OpenWearableIPCClient, StreamEvent, StreamSubscription, ) @@ -19,7 +16,7 @@ "IPCProtocolError", "IPCRemoteError", "IPCStreamError", - "OpenEarableIPCClient", + "OpenWearableIPCClient", "SensorDataset", "StreamEvent", "StreamSubscription", diff --git a/src/open_wearables/data/__init__.py b/src/open_wearables/data/__init__.py new file mode 100644 index 0000000..9b040f6 --- /dev/null +++ b/src/open_wearables/data/__init__.py @@ -0,0 +1,14 @@ +from .accessors import SensorAccessor +from .constants import COLORS, LABELS, SENSOR_FORMATS, SENSOR_SID, SID_NAMES +from .sensor_dataset import SensorDataset, load_recordings + +__all__ = [ + "COLORS", + "LABELS", + "SENSOR_FORMATS", + "SENSOR_SID", + "SID_NAMES", + "SensorAccessor", + "SensorDataset", + "load_recordings", +] diff --git a/src/open_wearables/data/accessors.py b/src/open_wearables/data/accessors.py new file mode 100644 index 0000000..149b5e6 --- /dev/null +++ b/src/open_wearables/data/accessors.py @@ -0,0 +1,57 @@ +from collections import defaultdict +from typing import Dict, List, Sequence + +import pandas as pd + + +class SensorAccessor: + """Convenience wrapper around a DataFrame for grouped sensor-channel access.""" + + def __init__(self, df: pd.DataFrame, labels: Sequence[str]): + self._data: Dict[str, pd.DataFrame] = {} + + groups: Dict[str, List[str]] = defaultdict(list) + for label in labels: + parts = label.split(".") + if len(parts) == 2: + group, _field = parts + if label in df: + groups[group].append(label) + elif label in df: + self._data[label] = df[label] + + for group, columns in groups.items(): + short_names = [label.split(".")[1] for label in columns] + subdf = df[columns].copy() + subdf.columns = short_names + self._data[group] = subdf + + self._full_df = df.copy() + + @property + def df(self) -> pd.DataFrame: + return self._full_df + + def to_dataframe(self) -> pd.DataFrame: + return self._full_df + + def __getitem__(self, key): + if key in self._data: + return self._data[key] + + if key in self._full_df.columns: + return self._full_df[key] + + raise KeyError(f"{key!r} not found in available sensor groups or channels") + + def __getattr__(self, name): + if name in self._data: + return self._data[name] + + if hasattr(self._full_df, name): + return getattr(self._full_df, name) + + raise AttributeError(f"'SensorAccessor' object has no attribute '{name}'") + + def __repr__(self) -> str: + return repr(self._full_df) diff --git a/src/open_wearables/data/constants.py b/src/open_wearables/data/constants.py new file mode 100644 index 0000000..e26b0ec --- /dev/null +++ b/src/open_wearables/data/constants.py @@ -0,0 +1,50 @@ +from typing import Dict, List + +SENSOR_SID: Dict[str, int] = { + "imu": 0, + "barometer": 1, + "microphone": 2, + "ppg": 4, + "optical_temp": 6, + "bone_acc": 7, +} + +SID_NAMES: Dict[int, str] = { + 0: "imu", + 1: "barometer", + 2: "microphone", + 4: "ppg", + 6: "optical_temp", + 7: "bone_acc", +} + +SENSOR_FORMATS: Dict[int, str] = { + SENSOR_SID["imu"]: "<9f", + SENSOR_SID["barometer"]: "<2f", + SENSOR_SID["ppg"]: "<4I", + SENSOR_SID["optical_temp"]: " Parser: + sensor_schemes = build_default_sensor_schemes(cls.SENSOR_SID) + dataset_parser = Parser.from_sensor_schemes( + sensor_schemes=sensor_schemes, + verbose=verbose, + ) + dataset_parser.parsers[cls.SENSOR_SID["microphone"]] = MicPayloadParser( + sample_count=48000, + verbose=verbose, + ) + return dataset_parser + + def parse(self) -> None: + with open(self.filename, "rb") as stream: + self.parse_result = self.parser.parse(stream) + + def _build_accessors(self) -> None: + self.audio_stereo = self.parse_result.audio_stereo + self.audio_df = pd.DataFrame() + self._audio_df_sampling_rate = None + self.sensor_dfs = {} + + data_dict = self.parse_result.sensor_dfs + for name, sid in self.SENSOR_SID.items(): + labels = LABELS.get(name, []) + if name == "microphone": + df = self.get_audio_dataframe() + elif sid in data_dict and isinstance(data_dict[sid], pd.DataFrame): + df = data_dict[sid] + df = df[~df.index.duplicated(keep="first")] + else: + df = pd.DataFrame(columns=labels) + + self.sensor_dfs[sid] = df + setattr(self, name, SensorAccessor(df, labels)) + + self.df = pd.DataFrame() + + def list_sensors(self) -> List[str]: + available_sensors = [] + for name in self.SENSOR_SID: + accessor = getattr(self, name, None) + if isinstance(accessor, SensorAccessor) and not accessor.df.empty: + available_sensors.append(name) + return available_sensors + + def get_sensor_dataframe(self, name: str) -> pd.DataFrame: + if name not in self.SENSOR_SID: + raise KeyError( + f"Unknown sensor name: {name!r}. " + f"Known sensors: {sorted(self.SENSOR_SID.keys())}" + ) + + accessor = getattr(self, name, None) + if isinstance(accessor, SensorAccessor): + return accessor.to_dataframe() + + return pd.DataFrame() + + def get_dataframe(self) -> pd.DataFrame: + if not self.df.empty: + return self.df + + if not getattr(self, "sensor_dfs", None): + return self.df + + dfs = [df for df in self.sensor_dfs.values() if not df.empty] + if not dfs: + return self.df + + common_index = pd.Index([]) + for df in dfs: + common_index = common_index.union(df.index) + common_index = common_index.sort_values() + + reindexed_dfs = [df.reindex(common_index) for df in dfs] + self.df = pd.concat(reindexed_dfs, axis=1) + + return self.df + + def get_audio_dataframe(self, sampling_rate: int = 48000) -> pd.DataFrame: + if sampling_rate <= 0: + raise ValueError(f"sampling_rate must be > 0, got {sampling_rate}") + + if self._audio_df_sampling_rate == sampling_rate: + return self.audio_df + + mic_packets = getattr(self.parse_result, "mic_packets", []) + if not mic_packets: + self.audio_df = pd.DataFrame(columns=["mic.inner", "mic.outer"]) + self.audio_df.index.name = "timestamp" + self._audio_df_sampling_rate = sampling_rate + return self.audio_df + + timestamps: List[np.ndarray] = [] + stereo_frames: List[np.ndarray] = [] + + for packet in mic_packets: + ts, stereo = mic_packet_to_stereo_frames( + packet=packet, + sampling_rate=sampling_rate, + ) + if stereo.size == 0: + continue + timestamps.append(ts) + stereo_frames.append(stereo) + + if not timestamps: + self.audio_df = pd.DataFrame(columns=["mic.inner", "mic.outer"]) + self.audio_df.index.name = "timestamp" + self._audio_df_sampling_rate = sampling_rate + return self.audio_df + + all_ts = np.concatenate(timestamps) + all_stereo = np.vstack(stereo_frames) + + self.audio_df = pd.DataFrame( + { + "mic.inner": all_stereo[:, 0], + "mic.outer": all_stereo[:, 1], + }, + index=all_ts, + ) + self.audio_df.index.name = "timestamp" + self.audio_df = self.audio_df[~self.audio_df.index.duplicated(keep="first")] + self._audio_df_sampling_rate = sampling_rate + + if sampling_rate == 48000: + self.sensor_dfs[self.SENSOR_SID["microphone"]] = self.audio_df + + return self.audio_df + + def export_csv(self) -> None: + base_filename, _ = os.path.splitext(self.filename) + self.save_csv(base_filename + ".csv") + + def save_csv(self, path: str) -> None: + if not self.df.empty: + self.df.to_csv(path) + + def play_audio(self, sampling_rate: int = 48000) -> None: + if self.audio_stereo is None: + print("❌ No microphone data available.") + return + + with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp: + write(tmp.name, sampling_rate, self.audio_stereo) + display(Audio(tmp.name)) + + def save_audio(self, path: str, sampling_rate: int = 48000) -> None: + if self.audio_stereo is None: + print("❌ No microphone data available to save.") + return + try: + write(path, sampling_rate, self.audio_stereo) + print(f"✅ Audio saved successfully to {path}") + except Exception as exc: + print(f"❌ Error saving audio to {path}: {exc}") + + +def load_recordings(file_paths: Sequence[str]) -> List[SensorDataset]: + return [SensorDataset(path) for path in file_paths if os.path.isfile(path)] + + +__all__ = [ + "COLORS", + "LABELS", + "SensorAccessor", + "SensorDataset", + "load_recordings", +] diff --git a/src/open_wearables/ipc/__init__.py b/src/open_wearables/ipc/__init__.py index ec1b983..1063337 100644 --- a/src/open_wearables/ipc/__init__.py +++ b/src/open_wearables/ipc/__init__.py @@ -1,4 +1,4 @@ -from .client import OpenEarableIPCClient, StreamSubscription +from .client import OpenWearableIPCClient, StreamSubscription from .errors import ( IPCClosedError, IPCError, @@ -14,7 +14,7 @@ "IPCProtocolError", "IPCRemoteError", "IPCStreamError", - "OpenEarableIPCClient", + "OpenWearableIPCClient", "StreamEvent", "StreamSubscription", ] diff --git a/src/open_wearables/ipc/client.py b/src/open_wearables/ipc/client.py index 2fd8473..6c225a0 100644 --- a/src/open_wearables/ipc/client.py +++ b/src/open_wearables/ipc/client.py @@ -68,8 +68,8 @@ async def close(self) -> dict[str, Any]: return await self._client.unsubscribe(self.subscription_id) -class OpenEarableIPCClient: - """Async client for OpenEarable WebSocket IPC daemon.""" +class OpenWearableIPCClient: + """Async client for OpenWearable WebSocket IPC daemon.""" def __init__( self, @@ -89,7 +89,7 @@ def __init__( self._subscriptions: Dict[int, "asyncio.Queue[Any]"] = {} self._waiters: Set["asyncio.Future[dict[str, Any]]"] = set() - async def __aenter__(self) -> "OpenEarableIPCClient": + async def __aenter__(self) -> "OpenWearableIPCClient": await self.connect() return self diff --git a/src/open_wearables/parsing/__init__.py b/src/open_wearables/parsing/__init__.py new file mode 100644 index 0000000..ae81d0c --- /dev/null +++ b/src/open_wearables/parsing/__init__.py @@ -0,0 +1,19 @@ +from .audio import ( + MicPacket, + interleaved_mic_to_stereo, + mic_packet_to_stereo_frames, +) +from .base import ParseResult, PayloadParser +from .payload_parsers import MicPayloadParser, SchemePayloadParser +from .stream import Parser + +__all__ = [ + "MicPacket", + "MicPayloadParser", + "ParseResult", + "Parser", + "PayloadParser", + "SchemePayloadParser", + "interleaved_mic_to_stereo", + "mic_packet_to_stereo_frames", +] diff --git a/src/open_wearables/parsing/audio.py b/src/open_wearables/parsing/audio.py new file mode 100644 index 0000000..f9faf4e --- /dev/null +++ b/src/open_wearables/parsing/audio.py @@ -0,0 +1,48 @@ +from typing import List, Optional, Tuple, TypedDict, Union + +import numpy as np + + +class MicPacket(TypedDict): + timestamp: float + samples: tuple[int, ...] + + +def interleaved_mic_to_stereo( + samples: Union[np.ndarray, List[int], tuple[int, ...]], +) -> np.ndarray: + """Convert interleaved [outer, inner, ...] int16 samples to [inner, outer] frames.""" + interleaved = np.asarray(samples, dtype=np.int16) + if interleaved.size < 2: + return np.empty((0, 2), dtype=np.int16) + + frame_count = interleaved.size // 2 + interleaved = interleaved[: frame_count * 2] + return np.column_stack((interleaved[1::2], interleaved[0::2])) + + +def mic_packet_to_stereo_frames( + packet: MicPacket, + sampling_rate: int, +) -> Tuple[np.ndarray, np.ndarray]: + """Return timestamps and stereo frames for a parsed microphone packet.""" + if sampling_rate <= 0: + raise ValueError(f"sampling_rate must be > 0, got {sampling_rate}") + + stereo = interleaved_mic_to_stereo(packet["samples"]) + if stereo.size == 0: + return np.empty((0,), dtype=np.float64), stereo + + timestamps = float(packet["timestamp"]) + ( + np.arange(stereo.shape[0], dtype=np.float64) / sampling_rate + ) + return timestamps, stereo + + +def mic_samples_to_stereo(mic_samples: List[int]) -> Optional[np.ndarray]: + if not mic_samples: + return None + stereo = interleaved_mic_to_stereo(mic_samples) + if stereo.size == 0: + return None + return stereo diff --git a/src/open_wearables/parsing/base.py b/src/open_wearables/parsing/base.py new file mode 100644 index 0000000..7dcd1cd --- /dev/null +++ b/src/open_wearables/parsing/base.py @@ -0,0 +1,33 @@ +from dataclasses import dataclass, field +from typing import Dict, List, Optional + +import numpy as np +import pandas as pd + +from .audio import MicPacket, mic_samples_to_stereo + + +class PayloadParser: + """Abstract base class for payload parsers.""" + + expected_size: int + + def parse(self, data: bytes, **kwargs) -> List[dict]: + raise NotImplementedError + + def should_build_df(self) -> bool: + return True + + +@dataclass +class ParseResult: + """Result of parsing a stream.""" + + sensor_dfs: Dict[int, pd.DataFrame] + mic_samples: List[int] + mic_packets: List[MicPacket] = field(default_factory=list) + audio_stereo: Optional[np.ndarray] = None + + @staticmethod + def mic_samples_to_stereo(mic_samples: List[int]) -> Optional[np.ndarray]: + return mic_samples_to_stereo(mic_samples) diff --git a/src/open_wearables/parsing/payload_parsers.py b/src/open_wearables/parsing/payload_parsers.py new file mode 100644 index 0000000..fca0742 --- /dev/null +++ b/src/open_wearables/parsing/payload_parsers.py @@ -0,0 +1,127 @@ +import struct +from typing import List + +from open_wearables.schema import ParseType, SensorScheme + +from .base import PayloadParser + + +class MicPayloadParser(PayloadParser): + """Payload parser for microphone packets (int16 PCM samples).""" + + def __init__(self, sample_count: int, verbose: bool = False): + self.sample_count = sample_count + self.expected_size = sample_count * 2 + self.verbose = verbose + + def parse(self, data: bytes, **kwargs) -> List[dict]: + if len(data) != self.expected_size and self.verbose: + print( + f"Mic payload size {len(data)} bytes does not match expected " + f"{self.expected_size} bytes (sample_count={self.sample_count})." + ) + + if len(data) % 2 != 0 and self.verbose: + print(f"Mic payload has odd size {len(data)}; last byte will be ignored.") + + n_samples = len(data) // 2 + format_str = f"<{n_samples}h" + samples = struct.unpack_from(format_str, data, 0) + return [{"samples": samples}] + + def should_build_df(self) -> bool: + return False + + +class SchemePayloadParser(PayloadParser): + def __init__(self, sensor_scheme: SensorScheme): + self.sensor_scheme = sensor_scheme + + size = 0 + for group in self.sensor_scheme.groups: + for component in group.components: + if component.data_type in (ParseType.UINT8, ParseType.INT8): + size += 1 + elif component.data_type in (ParseType.UINT16, ParseType.INT16): + size += 2 + elif component.data_type in ( + ParseType.UINT32, + ParseType.INT32, + ParseType.FLOAT, + ): + size += 4 + elif component.data_type == ParseType.DOUBLE: + size += 8 + else: + raise ValueError( + f"Unsupported data type in scheme: {component.data_type}" + ) + self.expected_size = size + + def check_size(self, data: bytes) -> None: + size = len(data) + if size != self.expected_size and not ( + size > self.expected_size and (size - 2) % self.expected_size == 0 + ): + raise ValueError( + f"Payload size {size} bytes does not match expected size " + f"{self.expected_size} bytes for sensor '{self.sensor_scheme.name}'" + ) + + def is_buffered(self, data: bytes) -> bool: + size = len(data) + return size > self.expected_size and (size - 2) % self.expected_size == 0 + + def parse(self, data: bytes, **kwargs) -> List[dict]: + self.check_size(data) + if self.is_buffered(data): + results = [] + t_delta = struct.unpack_from(" dict: + parsed_data = {} + offset = 0 + + for group in self.sensor_scheme.groups: + group_data = {} + for component in group.components: + if component.data_type == ParseType.UINT8: + value = struct.unpack_from(" "Parser": + parsers: dict[int, PayloadParser] = { + sid: SchemePayloadParser(scheme) for sid, scheme in sensor_schemes.items() + } + return cls(parsers=parsers, verbose=verbose) + + def parse( + self, + data_stream: BinaryIO, + *, + chunk_size: int = 4096, + max_resync_scan_bytes: int = 256, + ) -> ParseResult: + rows_by_sid: dict[int, list[dict]] = {} + + header_size = 10 + buffer = bytearray() + packet_idx = 0 + mic_samples: List[int] = [] + mic_packets: List[MicPacket] = [] + + def flush_to_dataframes() -> Dict[int, pd.DataFrame]: + result: Dict[int, pd.DataFrame] = {} + for sid, rows in rows_by_sid.items(): + df = pd.DataFrame(rows) + if not df.empty and "timestamp" in df.columns: + df.set_index("timestamp", inplace=True) + result[sid] = df + return result + + while True: + if len(buffer) < header_size: + chunk = data_stream.read(chunk_size) + if not chunk: + if self.verbose and buffer: + print( + f"End of stream with {len(buffer)} leftover bytes (incomplete header/payload)." + ) + break + buffer.extend(chunk) + continue + + header = bytes(buffer[:header_size]) + sid, size, time = self._parse_header(header) + timestamp_s = time / 1e6 + + if self.verbose: + print( + f"Packet #{packet_idx}: SID={sid}, size={size}, time={timestamp_s:.6f}s " + f"(buffer_len={len(buffer)})" + ) + + if sid not in self.parsers: + if self.verbose: + print(f"Warning: No parser registered for SID={sid}. Attempting resync...") + new_offset = self._attempt_resync( + bytes(buffer), + 0, + packet_idx, + max_scan_bytes=max_resync_scan_bytes, + ) + if new_offset is None: + del buffer[:1] + else: + del buffer[:new_offset] + continue + + if size <= 0: + if self.verbose: + print(f"Invalid size={size} for SID={sid}. Attempting resync...") + new_offset = self._attempt_resync( + bytes(buffer), + 0, + packet_idx, + max_scan_bytes=max_resync_scan_bytes, + ) + if new_offset is None: + del buffer[:1] + else: + del buffer[:new_offset] + continue + + parser = self.parsers[sid] + needed = header_size + size + if len(buffer) < needed: + chunk = data_stream.read(chunk_size) + if not chunk: + if self.verbose: + print( + f"Truncated payload at packet #{packet_idx}: need {needed} bytes, " + f"have {len(buffer)} bytes and stream ended." + ) + break + buffer.extend(chunk) + continue + + payload = bytes(buffer[header_size:needed]) + try: + values_list = parser.parse(payload) + if isinstance(parser, MicPayloadParser): + for item in values_list: + samples = item.get("samples") + if samples is None: + continue + mic_samples.extend(list(samples)) + mic_packets.append( + { + "timestamp": timestamp_s, + "samples": samples, + } + ) + if self.verbose: + if isinstance(parser, MicPayloadParser): + print( + f"Parsed mic packet #{packet_idx} (SID={sid}) successfully: " + f"{len(values_list[0].get('samples', [])) if values_list else 0} samples" + ) + else: + print( + f"Parsed packet #{packet_idx} (SID={sid}) successfully: {values_list}" + ) + except Exception as exc: + if self.verbose: + print( + f"struct.error while parsing payload at packet #{packet_idx} " + f"(SID={sid}, size={size}): {exc}. Attempting resync..." + ) + new_offset = self._attempt_resync( + bytes(buffer), + 0, + packet_idx, + max_scan_bytes=max_resync_scan_bytes, + ) + if new_offset is None: + del buffer[:1] + else: + del buffer[:new_offset] + continue + + if parser.should_build_df(): + for values in values_list: + flat_values: dict[str, object] = {} + for key, val in values.items(): + if key == "t_delta": + timestamp_s += val / 1e6 + continue + if isinstance(val, dict): + for sub_key, sub_val in val.items(): + flat_values[f"{key}.{sub_key}"] = sub_val + else: + flat_values[key] = val + + row = { + "timestamp": timestamp_s, + **flat_values, + } + rows_by_sid.setdefault(sid, []).append(row) + + del buffer[:needed] + packet_idx += 1 + + sensor_dfs = flush_to_dataframes() + audio_stereo = mic_samples_to_stereo(mic_samples) + return ParseResult( + sensor_dfs=sensor_dfs, + mic_samples=mic_samples, + mic_packets=mic_packets, + audio_stereo=audio_stereo, + ) + + def _parse_header(self, header: bytes) -> tuple[int, int, int]: + sid, size, time = struct.unpack(" bool: + if sid not in self.parsers: + return False + if size <= 0 or size > remaining: + return False + + parser = self.parsers[sid] + if hasattr(parser, "expected_size") and parser.expected_size is not None: + if size != parser.expected_size: + return False + + return True + + def _attempt_resync( + self, + data: bytes, + packet_start: int, + packet_idx: int, + max_scan_bytes: int = 64, + ) -> Optional[int]: + total_len = len(data) + header_size = 10 + + if self.verbose: + print( + f"Attempting resync after packet #{packet_idx} from offset {packet_start} " + f"(scan up to {max_scan_bytes} bytes ahead)..." + ) + + for delta in range(1, max_scan_bytes + 1): + candidate = packet_start + delta + if candidate + header_size > total_len: + break + + header = data[candidate : candidate + header_size] + try: + sid, size, time = self._parse_header(header) + except struct.error: + continue + + remaining = total_len - (candidate + header_size) + if not self._is_plausible_header(sid, size, remaining): + continue + + if self.verbose: + timestamp_s = time / 1e6 + print( + f"Resynced at offset {candidate} (skipped {delta} bytes): " + f"SID={sid}, size={size}, time={timestamp_s:.6f}s" + ) + + return candidate + + if self.verbose: + print( + f"Resync failed within {max_scan_bytes} bytes after packet #{packet_idx}; " + "giving up on this buffer." + ) + return None diff --git a/src/open_wearables/schema/__init__.py b/src/open_wearables/schema/__init__.py new file mode 100644 index 0000000..f97ec7a --- /dev/null +++ b/src/open_wearables/schema/__init__.py @@ -0,0 +1,15 @@ +from .defaults import build_default_sensor_schemes +from .types import ( + ParseType, + SensorComponentGroupScheme, + SensorComponentScheme, + SensorScheme, +) + +__all__ = [ + "ParseType", + "SensorComponentGroupScheme", + "SensorComponentScheme", + "SensorScheme", + "build_default_sensor_schemes", +] diff --git a/src/open_wearables/schema/defaults.py b/src/open_wearables/schema/defaults.py new file mode 100644 index 0000000..91b27a1 --- /dev/null +++ b/src/open_wearables/schema/defaults.py @@ -0,0 +1,86 @@ +from typing import Dict, Mapping + +from .types import ParseType, SensorScheme, group + + +def build_default_sensor_schemes(sensor_sid: Mapping[str, int]) -> Dict[int, SensorScheme]: + """Build default non-microphone sensor schemes keyed by SID.""" + return { + sensor_sid["imu"]: SensorScheme( + name="imu", + sid=sensor_sid["imu"], + groups=[ + group( + "acc", + [ + ("x", ParseType.FLOAT), + ("y", ParseType.FLOAT), + ("z", ParseType.FLOAT), + ], + ), + group( + "gyro", + [ + ("x", ParseType.FLOAT), + ("y", ParseType.FLOAT), + ("z", ParseType.FLOAT), + ], + ), + group( + "mag", + [ + ("x", ParseType.FLOAT), + ("y", ParseType.FLOAT), + ("z", ParseType.FLOAT), + ], + ), + ], + ), + sensor_sid["barometer"]: SensorScheme( + name="barometer", + sid=sensor_sid["barometer"], + groups=[ + group( + "barometer", + [ + ("temperature", ParseType.FLOAT), + ("pressure", ParseType.FLOAT), + ], + ) + ], + ), + sensor_sid["ppg"]: SensorScheme( + name="ppg", + sid=sensor_sid["ppg"], + groups=[ + group( + "ppg", + [ + ("red", ParseType.UINT32), + ("ir", ParseType.UINT32), + ("green", ParseType.UINT32), + ("ambient", ParseType.UINT32), + ], + ) + ], + ), + sensor_sid["optical_temp"]: SensorScheme( + name="optical_temp", + sid=sensor_sid["optical_temp"], + groups=[group("optical_temp", [("optical_temp", ParseType.FLOAT)])], + ), + sensor_sid["bone_acc"]: SensorScheme( + name="bone_acc", + sid=sensor_sid["bone_acc"], + groups=[ + group( + "bone_acc", + [ + ("x", ParseType.INT16), + ("y", ParseType.INT16), + ("z", ParseType.INT16), + ], + ) + ], + ), + } diff --git a/src/open_wearables/schema/types.py b/src/open_wearables/schema/types.py new file mode 100644 index 0000000..1f9763e --- /dev/null +++ b/src/open_wearables/schema/types.py @@ -0,0 +1,56 @@ +import enum +from typing import Sequence + + +class ParseType(enum.Enum): + UINT8 = "uint8" + UINT16 = "uint16" + UINT32 = "uint32" + INT8 = "int8" + INT16 = "int16" + INT32 = "int32" + FLOAT = "float" + DOUBLE = "double" + + +class SensorComponentScheme: + def __init__(self, name: str, data_type: ParseType): + self.name = name + self.data_type = data_type + + def __repr__(self) -> str: + return f"SensorComponentScheme(name={self.name}, data_type={self.data_type})" + + +class SensorComponentGroupScheme: + def __init__(self, name: str, components: list[SensorComponentScheme]): + self.name = name + self.components = components + + def __repr__(self) -> str: + return f"SensorComponentGroupScheme(name={self.name}, components={self.components})" + + +class SensorScheme: + """Schema definition for one OpenEarable sensor stream.""" + + def __init__(self, name: str, sid: int, groups: list[SensorComponentGroupScheme]): + self.name = name + self.sid = sid + self.groups = groups + + def __repr__(self) -> str: + return f"SensorScheme(name={self.name}, sid={self.sid}, groups={self.groups})" + + +def group( + name: str, + components: Sequence[tuple[str, ParseType]], +) -> SensorComponentGroupScheme: + return SensorComponentGroupScheme( + name=name, + components=[ + SensorComponentScheme(component_name, parse_type) + for component_name, parse_type in components + ], + ) diff --git a/src/open_wearables/scheme.py b/src/open_wearables/scheme.py index 1e47992..06995b3 100644 --- a/src/open_wearables/scheme.py +++ b/src/open_wearables/scheme.py @@ -1,121 +1,20 @@ -import enum -from typing import Dict, Mapping, Sequence - -class ParseType(enum.Enum): - UINT8 = "uint8" - UINT16 = "uint16" - UINT32 = "uint32" - INT8 = "int8" - INT16 = "int16" - INT32 = "int32" - FLOAT = "float" - DOUBLE = "double" - -class SensorComponentScheme: - def __init__(self, name: str, data_type: ParseType): - self.name = name - self.data_type = data_type - - def __repr__(self): - return f"SensorComponentScheme(name={self.name}, data_type={self.data_type})" - -class SensorComponentGroupScheme: - def __init__(self, name: str, components: list[SensorComponentScheme]): - self.name = name - self.components = components - - def __repr__(self): - return f"SensorComponentGroupScheme(name={self.name}, components={self.components})" - -class SensorScheme: - """ - A class representing the schema for sensor data in an earable device. - """ - - def __init__(self, name: str, sid: int, groups: list[SensorComponentGroupScheme]): - self.name = name - self.sid = sid - self.groups = groups - - def __repr__(self): - return f"SensorScheme(name={self.name}, sid={self.sid}, groups={self.groups})" - - -def _group( - name: str, - components: Sequence[tuple[str, ParseType]], -) -> SensorComponentGroupScheme: - return SensorComponentGroupScheme( - name=name, - components=[ - SensorComponentScheme(component_name, parse_type) - for component_name, parse_type in components - ], - ) - - -def build_default_sensor_schemes(sensor_sid: Mapping[str, int]) -> Dict[int, SensorScheme]: - """Build default non-microphone sensor schemes keyed by SID.""" - return { - sensor_sid["imu"]: SensorScheme( - name="imu", - sid=sensor_sid["imu"], - groups=[ - _group( - "acc", - [("x", ParseType.FLOAT), ("y", ParseType.FLOAT), ("z", ParseType.FLOAT)], - ), - _group( - "gyro", - [("x", ParseType.FLOAT), ("y", ParseType.FLOAT), ("z", ParseType.FLOAT)], - ), - _group( - "mag", - [("x", ParseType.FLOAT), ("y", ParseType.FLOAT), ("z", ParseType.FLOAT)], - ), - ], - ), - sensor_sid["barometer"]: SensorScheme( - name="barometer", - sid=sensor_sid["barometer"], - groups=[ - _group( - "barometer", - [ - ("temperature", ParseType.FLOAT), - ("pressure", ParseType.FLOAT), - ], - ) - ], - ), - sensor_sid["ppg"]: SensorScheme( - name="ppg", - sid=sensor_sid["ppg"], - groups=[ - _group( - "ppg", - [ - ("red", ParseType.UINT32), - ("ir", ParseType.UINT32), - ("green", ParseType.UINT32), - ("ambient", ParseType.UINT32), - ], - ) - ], - ), - sensor_sid["optical_temp"]: SensorScheme( - name="optical_temp", - sid=sensor_sid["optical_temp"], - groups=[_group("optical_temp", [("optical_temp", ParseType.FLOAT)])], - ), - sensor_sid["bone_acc"]: SensorScheme( - name="bone_acc", - sid=sensor_sid["bone_acc"], - groups=[ - _group( - "bone_acc", - [("x", ParseType.INT16), ("y", ParseType.INT16), ("z", ParseType.INT16)], - ) - ], - ), - } +"""Backward-compatible schema module. + +Prefer imports from ``open_wearable.schema`` for the layered architecture. +""" + +from .schema import ( + ParseType, + SensorComponentGroupScheme, + SensorComponentScheme, + SensorScheme, + build_default_sensor_schemes, +) + +__all__ = [ + "ParseType", + "SensorComponentGroupScheme", + "SensorComponentScheme", + "SensorScheme", + "build_default_sensor_schemes", +] From 08f355a5de3b216872cc19cd2274dd3498527f34 Mon Sep 17 00:00:00 2001 From: Dennis Moschina <45356478+DennisMoschina@users.noreply.github.com> Date: Thu, 26 Feb 2026 14:54:50 +0100 Subject: [PATCH 3/5] fix: restore OpenEarableIPCClient import compatibility --- src/open_wearables/__init__.py | 2 ++ src/open_wearables/ipc/__init__.py | 3 ++- src/open_wearables/ipc/client.py | 4 ++++ 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/src/open_wearables/__init__.py b/src/open_wearables/__init__.py index 7f28a80..945b2bc 100644 --- a/src/open_wearables/__init__.py +++ b/src/open_wearables/__init__.py @@ -5,6 +5,7 @@ IPCProtocolError, IPCRemoteError, IPCStreamError, + OpenEarableIPCClient, OpenWearableIPCClient, StreamEvent, StreamSubscription, @@ -16,6 +17,7 @@ "IPCProtocolError", "IPCRemoteError", "IPCStreamError", + "OpenEarableIPCClient", "OpenWearableIPCClient", "SensorDataset", "StreamEvent", diff --git a/src/open_wearables/ipc/__init__.py b/src/open_wearables/ipc/__init__.py index 1063337..985fad2 100644 --- a/src/open_wearables/ipc/__init__.py +++ b/src/open_wearables/ipc/__init__.py @@ -1,4 +1,4 @@ -from .client import OpenWearableIPCClient, StreamSubscription +from .client import OpenEarableIPCClient, OpenWearableIPCClient, StreamSubscription from .errors import ( IPCClosedError, IPCError, @@ -14,6 +14,7 @@ "IPCProtocolError", "IPCRemoteError", "IPCStreamError", + "OpenEarableIPCClient", "OpenWearableIPCClient", "StreamEvent", "StreamSubscription", diff --git a/src/open_wearables/ipc/client.py b/src/open_wearables/ipc/client.py index 6c225a0..b2fce1b 100644 --- a/src/open_wearables/ipc/client.py +++ b/src/open_wearables/ipc/client.py @@ -423,3 +423,7 @@ async def _handle_event(self, message: dict[str, Any]) -> None: queue = self._subscriptions.pop(subscription_id, None) if queue is not None: await queue.put(_STREAM_END) + + +# Backward-compatible alias for older public API users. +OpenEarableIPCClient = OpenWearableIPCClient From 59030805fc31d208b767e8e7b101f26964d3b408 Mon Sep 17 00:00:00 2001 From: Dennis Moschina <45356478+DennisMoschina@users.noreply.github.com> Date: Thu, 26 Feb 2026 14:55:06 +0100 Subject: [PATCH 4/5] chore: remove OpenEarableIPCClient compatibility alias --- src/open_wearables/__init__.py | 2 -- src/open_wearables/ipc/__init__.py | 3 +-- src/open_wearables/ipc/client.py | 6 +----- 3 files changed, 2 insertions(+), 9 deletions(-) diff --git a/src/open_wearables/__init__.py b/src/open_wearables/__init__.py index 945b2bc..7f28a80 100644 --- a/src/open_wearables/__init__.py +++ b/src/open_wearables/__init__.py @@ -5,7 +5,6 @@ IPCProtocolError, IPCRemoteError, IPCStreamError, - OpenEarableIPCClient, OpenWearableIPCClient, StreamEvent, StreamSubscription, @@ -17,7 +16,6 @@ "IPCProtocolError", "IPCRemoteError", "IPCStreamError", - "OpenEarableIPCClient", "OpenWearableIPCClient", "SensorDataset", "StreamEvent", diff --git a/src/open_wearables/ipc/__init__.py b/src/open_wearables/ipc/__init__.py index 985fad2..1063337 100644 --- a/src/open_wearables/ipc/__init__.py +++ b/src/open_wearables/ipc/__init__.py @@ -1,4 +1,4 @@ -from .client import OpenEarableIPCClient, OpenWearableIPCClient, StreamSubscription +from .client import OpenWearableIPCClient, StreamSubscription from .errors import ( IPCClosedError, IPCError, @@ -14,7 +14,6 @@ "IPCProtocolError", "IPCRemoteError", "IPCStreamError", - "OpenEarableIPCClient", "OpenWearableIPCClient", "StreamEvent", "StreamSubscription", diff --git a/src/open_wearables/ipc/client.py b/src/open_wearables/ipc/client.py index b2fce1b..9bc19de 100644 --- a/src/open_wearables/ipc/client.py +++ b/src/open_wearables/ipc/client.py @@ -33,7 +33,7 @@ class StreamSubscription: def __init__( self, - client: "OpenEarableIPCClient", + client: "OpenWearableIPCClient", subscription_id: int, stream: str, device_id: str, @@ -423,7 +423,3 @@ async def _handle_event(self, message: dict[str, Any]) -> None: queue = self._subscriptions.pop(subscription_id, None) if queue is not None: await queue.put(_STREAM_END) - - -# Backward-compatible alias for older public API users. -OpenEarableIPCClient = OpenWearableIPCClient From ef9f666987fa5bf70e0beab1c2022819325bc45a Mon Sep 17 00:00:00 2001 From: Dennis Moschina <45356478+DennisMoschina@users.noreply.github.com> Date: Thu, 26 Feb 2026 14:56:05 +0100 Subject: [PATCH 5/5] feat: add IPC WebSocket example and enhance IPC client with device discovery and actions --- README.md | 27 +++ docs/api-reference.md | 65 ++++++- src/open_wearables/__init__.py | 16 ++ src/open_wearables/ipc/__init__.py | 25 ++- src/open_wearables/ipc/client.py | 267 ++++++++++++++++++++++++++--- src/open_wearables/ipc/models.py | 129 +++++++++++++- 6 files changed, 501 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index 80f4fb1..8e4ab9b 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,33 @@ ppg_red = dataset.ppg["ppg.red"] audio_df = dataset.get_audio_dataframe() ``` +## IPC WebSocket Example + +```python +import asyncio +from open_wearable import OpenWearableIPCClient + + +async def main() -> None: + async with OpenWearableIPCClient() as client: + await client.start_scan() + devices = await client.get_discovered_devices() + wearable = client.wearable(devices[0].id) + + await wearable.connect() + await wearable.actions.synchronize_time() + + sensors = await wearable.actions.list_sensors() + stream = await wearable.streams.sensor_values(sensor_id=sensors[0].sensor_id) + async for event in stream: + print(event.data) + break + await stream.close() + + +asyncio.run(main()) +``` + ## Documentation - [Documentation index](docs/README.md) diff --git a/docs/api-reference.md b/docs/api-reference.md index 3b5431e..8dabc10 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -8,10 +8,67 @@ from open_wearables import SensorDataset, load_recordings Primary internal package layers: -- `open_wearable.schema` -- `open_wearable.parsing` -- `open_wearable.data` -- `open_wearable.ipc` +- `open_wearables.schema` +- `open_wearables.parsing` +- `open_wearables.data` +- `open_wearables.ipc` + +## IPC Client (`open_wearables.ipc`) + +`OpenWearableIPCClient` is an async JSON-RPC style client for +`ws://127.0.0.1:8765/ws` by default. + +### Connection Lifecycle + +```python +async with OpenWearableIPCClient() as client: + await client.ping() +``` + +### Discovery and Connection + +- `start_scan(check_and_request_permissions=True)` +- `start_scan_async(check_and_request_permissions=True) -> dict` +- `start_scan_stream(check_and_request_permissions=True) -> StreamSubscription` +- `get_discovered_devices() -> list[DiscoveredDevice]` +- `connect_device(device_id, connected_via_system=False) -> WearableSummary` +- `connect_system_devices(ignored_device_ids=None) -> list[WearableSummary]` +- `list_connected() -> list[WearableSummary]` +- `disconnect(device_id)` + +### Action Sugar + +- `client.synchronize_time(device_id)` +- `client.list_sensors(device_id) -> list[SensorInfo]` +- `client.list_sensor_configurations(device_id) -> list[SensorConfiguration]` +- `client.set_sensor_configuration(device_id, configuration_name=..., value_key=...)` + +Per-device handle: + +```python +wearable = client.wearable(device_id) +await wearable.connect() +await wearable.actions.synchronize_time() +``` + +### Stream Sugar + +Use the typed stream helpers: + +```python +stream = await wearable.streams.sensor_values(sensor_id="accelerometer_0") +async for event in stream: + print(event.data) +``` + +Other helpers: + +- `wearable.streams.sensor_configuration()` +- `wearable.streams.button_events()` +- `wearable.streams.battery_percentage()` +- `wearable.streams.battery_power_status()` +- `wearable.streams.battery_health_status()` +- `wearable.streams.battery_energy_status()` ## `SensorDataset` diff --git a/src/open_wearables/__init__.py b/src/open_wearables/__init__.py index 7f28a80..8dd2c60 100644 --- a/src/open_wearables/__init__.py +++ b/src/open_wearables/__init__.py @@ -1,16 +1,25 @@ from .data import SensorDataset, load_recordings from .ipc import ( + DiscoveredDevice, IPCClosedError, IPCError, IPCProtocolError, IPCRemoteError, IPCStreamError, OpenWearableIPCClient, + SensorConfiguration, + SensorConfigurationValue, + SensorInfo, StreamEvent, StreamSubscription, + Wearable, + WearableActions, + WearableStreams, + WearableSummary, ) __all__ = [ + "DiscoveredDevice", "IPCClosedError", "IPCError", "IPCProtocolError", @@ -18,7 +27,14 @@ "IPCStreamError", "OpenWearableIPCClient", "SensorDataset", + "SensorConfiguration", + "SensorConfigurationValue", + "SensorInfo", "StreamEvent", "StreamSubscription", + "Wearable", + "WearableActions", + "WearableStreams", + "WearableSummary", "load_recordings", ] diff --git a/src/open_wearables/ipc/__init__.py b/src/open_wearables/ipc/__init__.py index 1063337..a19e4ec 100644 --- a/src/open_wearables/ipc/__init__.py +++ b/src/open_wearables/ipc/__init__.py @@ -1,4 +1,10 @@ -from .client import OpenWearableIPCClient, StreamSubscription +from .client import ( + OpenWearableIPCClient, + StreamSubscription, + Wearable, + WearableActions, + WearableStreams, +) from .errors import ( IPCClosedError, IPCError, @@ -6,15 +12,30 @@ IPCRemoteError, IPCStreamError, ) -from .models import StreamEvent +from .models import ( + DiscoveredDevice, + SensorConfiguration, + SensorConfigurationValue, + SensorInfo, + StreamEvent, + WearableSummary, +) __all__ = [ + "DiscoveredDevice", "IPCClosedError", "IPCError", "IPCProtocolError", "IPCRemoteError", "IPCStreamError", "OpenWearableIPCClient", + "SensorConfiguration", + "SensorConfigurationValue", + "SensorInfo", "StreamEvent", "StreamSubscription", + "Wearable", + "WearableActions", + "WearableStreams", + "WearableSummary", ] diff --git a/src/open_wearables/ipc/client.py b/src/open_wearables/ipc/client.py index 9bc19de..9891b8d 100644 --- a/src/open_wearables/ipc/client.py +++ b/src/open_wearables/ipc/client.py @@ -5,7 +5,7 @@ import itertools import json from collections import defaultdict -from typing import Any, Awaitable, Callable, Dict, Optional, Set +from typing import Any, Awaitable, Callable, Dict, Literal, Optional, Set, Union from websockets import ConnectionClosed from websockets.client import WebSocketClientProtocol, connect @@ -16,9 +16,33 @@ IPCRemoteError, IPCStreamError, ) -from .models import StreamEvent +from .models import ( + DiscoveredDevice, + SensorConfiguration, + SensorInfo, + StreamEvent, + WearableSummary, +) EventCallback = Callable[[dict[str, Any]], Optional[Awaitable[None]]] +ActionName = Literal[ + "disconnect", + "synchronize_time", + "list_sensors", + "list_sensor_configurations", + "set_sensor_configuration", +] +AnyActionName = Union[ActionName, str] +StreamName = Literal[ + "sensor_values", + "sensor_configuration", + "button_events", + "battery_percentage", + "battery_power_status", + "battery_health_status", + "battery_energy_status", +] +AnyStreamName = Union[StreamName, str] class _StreamEnd: @@ -68,6 +92,131 @@ async def close(self) -> dict[str, Any]: return await self._client.unsubscribe(self.subscription_id) +class WearableActions: + """Typed ergonomic action wrappers for one wearable device.""" + + def __init__(self, client: "OpenWearableIPCClient", device_id: str) -> None: + self._client = client + self._device_id = device_id + + async def disconnect(self) -> Any: + return await self._client.invoke_action(self._device_id, "disconnect") + + async def synchronize_time(self) -> Any: + return await self._client.invoke_action(self._device_id, "synchronize_time") + + async def list_sensors(self) -> list[SensorInfo]: + return await self._client.list_sensors(self._device_id) + + async def list_sensor_configurations(self) -> list[SensorConfiguration]: + return await self._client.list_sensor_configurations(self._device_id) + + async def set_sensor_configuration( + self, + configuration_name: str, + value_key: str, + ) -> Any: + return await self._client.set_sensor_configuration( + self._device_id, + configuration_name=configuration_name, + value_key=value_key, + ) + + +class WearableStreams: + """Typed stream subscription wrappers for one wearable device.""" + + def __init__(self, client: "OpenWearableIPCClient", device_id: str) -> None: + self._client = client + self._device_id = device_id + + async def sensor_values( + self, + *, + sensor_id: Optional[str] = None, + sensor_index: Optional[int] = None, + sensor_name: Optional[str] = None, + ) -> StreamSubscription: + stream_selectors = [ + sensor_id is not None, + sensor_index is not None, + sensor_name is not None, + ] + if sum(stream_selectors) != 1: + raise ValueError( + "Provide exactly one of sensor_id, sensor_index, or sensor_name." + ) + + args: dict[str, Any] = {} + if sensor_id is not None: + args["sensor_id"] = sensor_id + if sensor_index is not None: + args["sensor_index"] = sensor_index + if sensor_name is not None: + args["sensor_name"] = sensor_name + + return await self._client.subscribe( + device_id=self._device_id, + stream="sensor_values", + args=args, + ) + + async def sensor_configuration(self) -> StreamSubscription: + return await self._client.subscribe( + device_id=self._device_id, + stream="sensor_configuration", + ) + + async def button_events(self) -> StreamSubscription: + return await self._client.subscribe( + device_id=self._device_id, + stream="button_events", + ) + + async def battery_percentage(self) -> StreamSubscription: + return await self._client.subscribe( + device_id=self._device_id, + stream="battery_percentage", + ) + + async def battery_power_status(self) -> StreamSubscription: + return await self._client.subscribe( + device_id=self._device_id, + stream="battery_power_status", + ) + + async def battery_health_status(self) -> StreamSubscription: + return await self._client.subscribe( + device_id=self._device_id, + stream="battery_health_status", + ) + + async def battery_energy_status(self) -> StreamSubscription: + return await self._client.subscribe( + device_id=self._device_id, + stream="battery_energy_status", + ) + + +class Wearable: + """High-level handle for a specific wearable device ID.""" + + def __init__(self, client: "OpenWearableIPCClient", device_id: str) -> None: + self.client = client + self.device_id = device_id + self.actions = WearableActions(client=client, device_id=device_id) + self.streams = WearableStreams(client=client, device_id=device_id) + + async def disconnect(self) -> dict[str, Any]: + return await self.client.disconnect(self.device_id) + + async def connect(self, connected_via_system: bool = False) -> WearableSummary: + return await self.client.connect_device( + device_id=self.device_id, + connected_via_system=connected_via_system, + ) + + class OpenWearableIPCClient: """Async client for OpenWearable WebSocket IPC daemon.""" @@ -210,33 +359,59 @@ async def start_scan(self, check_and_request_permissions: bool = True) -> dict[s {"check_and_request_permissions": check_and_request_permissions}, ) - async def get_discovered_devices(self) -> list[dict[str, Any]]: - return await self.call("get_discovered_devices") + async def start_scan_async( + self, + check_and_request_permissions: bool = True, + ) -> dict[str, Any]: + return await self.call( + "start_scan_async", + {"check_and_request_permissions": check_and_request_permissions}, + ) + + async def start_scan_stream( + self, + check_and_request_permissions: bool = True, + ) -> StreamSubscription: + result = await self.start_scan_async( + check_and_request_permissions=check_and_request_permissions + ) + return self._build_stream_subscription( + result=result, + fallback_stream="scan", + fallback_device_id="scanner", + ) + + async def get_discovered_devices(self) -> list[DiscoveredDevice]: + payload = await self.call("get_discovered_devices") + return [DiscoveredDevice.from_payload(item) for item in payload] async def connect_device( self, device_id: str, connected_via_system: bool = False, - ) -> dict[str, Any]: - return await self.call( + ) -> WearableSummary: + payload = await self.call( "connect", { "device_id": device_id, "connected_via_system": connected_via_system, }, ) + return WearableSummary.from_payload(payload) async def connect_system_devices( self, ignored_device_ids: Optional[list[str]] = None, - ) -> list[dict[str, Any]]: + ) -> list[WearableSummary]: params: dict[str, Any] = {} if ignored_device_ids is not None: params["ignored_device_ids"] = ignored_device_ids - return await self.call("connect_system_devices", params) + payload = await self.call("connect_system_devices", params) + return [WearableSummary.from_payload(item) for item in payload] - async def list_connected(self) -> list[dict[str, Any]]: - return await self.call("list_connected") + async def list_connected(self) -> list[WearableSummary]: + payload = await self.call("list_connected") + return [WearableSummary.from_payload(item) for item in payload] async def disconnect(self, device_id: str) -> dict[str, Any]: return await self.call("disconnect", {"device_id": device_id}) @@ -253,7 +428,7 @@ async def get_actions(self, device_id: str) -> list[str]: async def invoke_action( self, device_id: str, - action: str, + action: AnyActionName, args: Optional[dict[str, Any]] = None, ) -> Any: params: dict[str, Any] = { @@ -264,10 +439,47 @@ async def invoke_action( params["args"] = args return await self.call("invoke_action", params) + async def action_disconnect(self, device_id: str) -> Any: + return await self.invoke_action(device_id, "disconnect") + + async def synchronize_time(self, device_id: str) -> Any: + return await self.invoke_action(device_id, "synchronize_time") + + async def list_sensors(self, device_id: str) -> list[SensorInfo]: + payload = await self.invoke_action(device_id, "list_sensors") + return [SensorInfo.from_payload(item) for item in payload] + + async def list_sensor_configurations( + self, + device_id: str, + ) -> list[SensorConfiguration]: + payload = await self.invoke_action(device_id, "list_sensor_configurations") + return [SensorConfiguration.from_payload(item) for item in payload] + + async def set_sensor_configuration( + self, + device_id: str, + *, + configuration_name: str, + value_key: str, + ) -> Any: + return await self.invoke_action( + device_id=device_id, + action="set_sensor_configuration", + args={ + "configuration_name": configuration_name, + "value_key": value_key, + }, + ) + + def wearable(self, device_id: str) -> Wearable: + """Return a typed handle with `.actions` and `.streams` sugar.""" + return Wearable(client=self, device_id=device_id) + async def subscribe( self, device_id: str, - stream: str, + stream: AnyStreamName, args: Optional[dict[str, Any]] = None, ) -> StreamSubscription: params: dict[str, Any] = { @@ -278,6 +490,26 @@ async def subscribe( params["args"] = args result = await self.call("subscribe", params) + return self._build_stream_subscription( + result=result, + fallback_stream=stream, + fallback_device_id=device_id, + ) + + async def unsubscribe(self, subscription_id: int) -> dict[str, Any]: + result = await self.call("unsubscribe", {"subscription_id": subscription_id}) + queue = self._subscriptions.pop(subscription_id, None) + if queue is not None: + await queue.put(_STREAM_END) + return result + + def _build_stream_subscription( + self, + *, + result: dict[str, Any], + fallback_stream: str, + fallback_device_id: str, + ) -> StreamSubscription: subscription_id = int(result["subscription_id"]) queue: "asyncio.Queue[Any]" = asyncio.Queue(maxsize=self.subscription_queue_size) self._subscriptions[subscription_id] = queue @@ -285,18 +517,11 @@ async def subscribe( return StreamSubscription( client=self, subscription_id=subscription_id, - stream=str(result.get("stream", stream)), - device_id=str(result.get("device_id", device_id)), + stream=str(result.get("stream", fallback_stream)), + device_id=str(result.get("device_id", fallback_device_id)), queue=queue, ) - async def unsubscribe(self, subscription_id: int) -> dict[str, Any]: - result = await self.call("unsubscribe", {"subscription_id": subscription_id}) - queue = self._subscriptions.pop(subscription_id, None) - if queue is not None: - await queue.put(_STREAM_END) - return result - async def _receiver_loop(self) -> None: ws = self._ws if ws is None: diff --git a/src/open_wearables/ipc/models.py b/src/open_wearables/ipc/models.py index 25a7598..0555e17 100644 --- a/src/open_wearables/ipc/models.py +++ b/src/open_wearables/ipc/models.py @@ -1,7 +1,7 @@ from __future__ import annotations from dataclasses import dataclass -from typing import Any +from typing import Any, Optional @dataclass(frozen=True) @@ -13,3 +13,130 @@ class StreamEvent: device_id: str data: Any raw: dict[str, Any] + + +@dataclass(frozen=True) +class DiscoveredDevice: + """Device metadata returned by discovery scans.""" + + id: str + name: str + service_uuids: list[str] + manufacturer_data: list[int] + rssi: Optional[int] + raw: dict[str, Any] + + @classmethod + def from_payload(cls, payload: dict[str, Any]) -> "DiscoveredDevice": + return cls( + id=str(payload.get("id", "")), + name=str(payload.get("name", "")), + service_uuids=[str(value) for value in payload.get("service_uuids", [])], + manufacturer_data=[int(value) for value in payload.get("manufacturer_data", [])], + rssi=int(payload["rssi"]) if payload.get("rssi") is not None else None, + raw=payload, + ) + + +@dataclass(frozen=True) +class WearableSummary: + """Connected wearable summary returned by connect/list methods.""" + + device_id: str + name: str + type: str + capabilities: list[str] + raw: dict[str, Any] + + @classmethod + def from_payload(cls, payload: dict[str, Any]) -> "WearableSummary": + return cls( + device_id=str(payload.get("device_id", "")), + name=str(payload.get("name", "")), + type=str(payload.get("type", "")), + capabilities=[str(value) for value in payload.get("capabilities", [])], + raw=payload, + ) + + +@dataclass(frozen=True) +class SensorInfo: + """A sensor entry returned by `invoke_action(action='list_sensors')`.""" + + sensor_id: str + sensor_index: int + name: str + chart_title: str + short_chart_title: str + axis_names: list[str] + axis_units: list[str] + timestamp_exponent: int + raw: dict[str, Any] + + @classmethod + def from_payload(cls, payload: dict[str, Any]) -> "SensorInfo": + return cls( + sensor_id=str(payload.get("sensor_id", "")), + sensor_index=int(payload.get("sensor_index", 0)), + name=str(payload.get("name", "")), + chart_title=str(payload.get("chart_title", "")), + short_chart_title=str(payload.get("short_chart_title", "")), + axis_names=[str(value) for value in payload.get("axis_names", [])], + axis_units=[str(value) for value in payload.get("axis_units", [])], + timestamp_exponent=int(payload.get("timestamp_exponent", 0)), + raw=payload, + ) + + +@dataclass(frozen=True) +class SensorConfigurationValue: + """A selectable configuration value returned by `list_sensor_configurations`.""" + + key: str + frequency_hz: Optional[float] + options: list[str] + raw: dict[str, Any] + + @classmethod + def from_payload(cls, payload: dict[str, Any]) -> "SensorConfigurationValue": + frequency_raw = payload.get("frequency_hz") + frequency_hz: Optional[float] + if frequency_raw is None: + frequency_hz = None + else: + frequency_hz = float(frequency_raw) + + return cls( + key=str(payload.get("key", "")), + frequency_hz=frequency_hz, + options=[str(value) for value in payload.get("options", [])], + raw=payload, + ) + + +@dataclass(frozen=True) +class SensorConfiguration: + """Sensor configuration definition for one sensor.""" + + name: str + unit: str + values: list[SensorConfigurationValue] + off_value: Optional[str] + raw: dict[str, Any] + + @classmethod + def from_payload(cls, payload: dict[str, Any]) -> "SensorConfiguration": + return cls( + name=str(payload.get("name", "")), + unit=str(payload.get("unit", "")), + values=[ + SensorConfigurationValue.from_payload(value) + for value in payload.get("values", []) + ], + off_value=( + str(payload["off_value"]) + if payload.get("off_value") is not None + else None + ), + raw=payload, + )