From 1641f3f474ea516a54e199067bbd2755a4712026 Mon Sep 17 00:00:00 2001 From: rusty-art Date: Sun, 1 Mar 2026 15:41:12 +1100 Subject: [PATCH] feat: Transaction Response API plumbing Route device commands through /request/set and /request/get topics and handle responses on /response/set and /response/get with z2m_transaction correlation. - Add CommandResponse type for transaction responses - Add callback infrastructure in WebSocketManager (register, unregister, timeout handling, transaction ID generation) - Intercept /response/* messages to prevent device state corruption - Extract useDeviceCommands hook for centralized command routing - Update Exposes, DashboardItem, GroupMembers to use new topics - Suppress "request superseded" error toasts for sleepy devices - Update mock WS server to support new topic format --- mocks/ws.ts | 160 +++++++++++++++++- .../dashboard-page/DashboardItem.tsx | 16 +- src/components/device-page/tabs/Exposes.tsx | 28 +-- src/components/group-page/GroupMembers.tsx | 2 +- src/hooks/useDeviceCommands.ts | 35 ++++ src/store.ts | 20 ++- src/types.ts | 15 ++ src/websocket/WebSocketManager.ts | 108 +++++++++++- 8 files changed, 326 insertions(+), 58 deletions(-) create mode 100644 src/hooks/useDeviceCommands.ts diff --git a/mocks/ws.ts b/mocks/ws.ts index 25a9582cc..0f0517135 100644 --- a/mocks/ws.ts +++ b/mocks/ws.ts @@ -17,14 +17,37 @@ import { NETWORK_MAP_RESPONSE } from "./networkMapResponse.js"; import { PERMIT_JOIN_RESPONSE } from "./permitJoinResponse.js"; import { TOUCHLINK_RESPONSE } from "./touchlinkResponse.js"; +// Mutable device state cache — updated on successful SETs, used for GET and reconnect +const deviceStateCache = new Map>(); +for (const ds of DEVICE_STATES) { + deviceStateCache.set(ds.topic, merge({}, ds.payload)); +} + const cloneDeviceState = (ieee: string) => { const device = BRIDGE_DEVICES.payload.find((d) => d.ieee_address === ieee); if (device) { - const deviceState = DEVICE_STATES.find((state) => state.topic === device.friendly_name || state.topic === device.ieee_address); + const topic = deviceStateCache.has(device.friendly_name) ? device.friendly_name : device.ieee_address; + const cached = deviceStateCache.get(topic); + + if (cached) { + return { topic, payload: merge({}, cached) }; + } + } +}; - return merge({}, deviceState); +/** + * Resolve a topic (IEEE address or friendly name) to the friendly name. + * In real Z2M, state updates are always published to the friendly_name topic. + */ +const resolveToFriendlyName = (topic: string): string => { + if (topic.startsWith("0x")) { + const device = BRIDGE_DEVICES.payload.find((d) => d.ieee_address === topic); + if (device) { + return device.friendly_name; + } } + return topic; }; const randomString = (len: number): string => @@ -35,8 +58,9 @@ const randomString = (len: number): string => // const randomIntInclusive = (min: number, max: number) => Math.floor(Math.random() * (max - min + 1)) + min; export function startServer() { + const port = Number.parseInt(process.env.MOCK_WS_PORT || "8579", 10); const wss = new WebSocketServer({ - port: 8579, + port, }); wss.on("connection", (ws) => { @@ -59,8 +83,8 @@ export function startServer() { ws.send(JSON.stringify(message)); } - for (const message of DEVICE_STATES) { - ws.send(JSON.stringify(message)); + for (const [topic, payload] of deviceStateCache) { + ws.send(JSON.stringify({ topic, payload })); } for (const ds of DEVICE_STATES) { @@ -331,15 +355,133 @@ export function startServer() { break; } default: { - if (msg.topic.endsWith("/set")) { - if ("command" in msg.payload) { + const isDeviceSet = msg.topic.endsWith("/request/set") || msg.topic.endsWith("/set"); + const isDeviceGet = msg.topic.endsWith("/request/get") || msg.topic.endsWith("/get"); + + if (isDeviceSet || isDeviceGet) { + const deviceTopic = msg.topic.replace(/\/(?:request\/)?(set|get)$/, ""); + const commandType = isDeviceGet ? "get" : "set"; + + // SET-only special cases: command execution and attribute reading + if (isDeviceSet && "command" in msg.payload) { setTimeout(() => { ws.send(JSON.stringify(BRIDGE_LOGGING_EXECUTE_COMMAND)); }, 500); - } else if ("read" in msg.payload) { + } else if (isDeviceSet && "read" in msg.payload) { setTimeout(() => { ws.send(JSON.stringify(BRIDGE_LOGGING_READ_ATTR)); }, 500); + } else { + // Transaction Response API: Send response on {device}/response/{set|get} + const requestId = msg.payload?.z2m_transaction ?? msg.payload?.z2m?.request_id; + + if (requestId) { + const friendlyName = resolveToFriendlyName(deviceTopic); + const sleepyDelays: Record = { + "test/sleepy-device-fast": [5000, 0], + "test/sleepy-device-slow": [30000, 5000], + }; + const sleepyDelay = sleepyDelays[friendlyName]; + + // Strip z2m_transaction from payload (real backend strips before converter processing) + const { z2m_transaction: _tx, z2m: _z2m, ...dataPayload } = msg.payload; + + // Ping: no attribute keys beyond z2m_transaction + const isPing = Object.keys(dataPayload).length === 0; + + /** Send state topic update after successful command */ + const sendStateUpdate = () => { + const stateTopic = resolveToFriendlyName(deviceTopic); + + if (!isDeviceGet) { + // SET: merge set values into persistent cache + const cached = deviceStateCache.get(stateTopic) || deviceStateCache.get(deviceTopic); + if (cached) { + merge(cached, dataPayload); + } + } + + // Send full cached state (GET or SET) + const cached = deviceStateCache.get(stateTopic) || deviceStateCache.get(deviceTopic); + if (cached) { + cached.last_seen = new Date().toISOString(); + ws.send(JSON.stringify({ topic: stateTopic, payload: { ...cached } })); + } + }; + + if (isPing) { + // Ping response — immediate + setTimeout(() => { + ws.send( + JSON.stringify({ + topic: `${deviceTopic}/response/${commandType}`, + payload: { + data: {}, + status: "ok", + z2m_transaction: requestId, + }, + }), + ); + }, 25); + } else if (sleepyDelay) { + // Sleepy device: converter blocks until device wakes up. + // Fast variant (~5s) responds before frontend's 10s timeout. + // Slow variant (~30-35s) triggers "queued" UX after timeout. + const wakeupDelay = sleepyDelay[0] + Math.random() * sleepyDelay[1]; + + setTimeout(() => { + ws.send( + JSON.stringify({ + topic: `${deviceTopic}/response/${commandType}`, + payload: { + status: "ok", + z2m_transaction: requestId, + data: isDeviceGet ? {} : dataPayload, + }, + }), + ); + + sendStateUpdate(); + }, wakeupDelay); + } else { + // Regular device: 50-200ms delay, 90% success + const delay = 50 + Math.random() * 150; + + setTimeout(() => { + const isSuccess = Math.random() > 0.1; + + if (isSuccess) { + ws.send( + JSON.stringify({ + topic: `${deviceTopic}/response/${commandType}`, + payload: { + status: "ok", + z2m_transaction: requestId, + data: isDeviceGet ? {} : dataPayload, + }, + }), + ); + + sendStateUpdate(); + } else { + // Error response with actual failed key names + const failedKeys = Object.keys(dataPayload).join(","); + + ws.send( + JSON.stringify({ + topic: `${deviceTopic}/response/${commandType}`, + payload: { + data: {}, + status: "error", + z2m_transaction: requestId, + error: `failed:${failedKeys || "unknown"}`, + }, + }), + ); + } + }, delay); + } + } } } else if (msg.topic.startsWith("bridge/request/")) { sendResponseOK(); @@ -351,5 +493,5 @@ export function startServer() { }); }); - console.log("Started WebSocket server"); + console.log(`Started WebSocket server on port ${port}`); } diff --git a/src/components/dashboard-page/DashboardItem.tsx b/src/components/dashboard-page/DashboardItem.tsx index e33611562..6a1bebad7 100644 --- a/src/components/dashboard-page/DashboardItem.tsx +++ b/src/components/dashboard-page/DashboardItem.tsx @@ -2,10 +2,9 @@ import NiceModal from "@ebay/nice-modal-react"; import { faTrash } from "@fortawesome/free-solid-svg-icons"; import { FontAwesomeIcon } from "@fortawesome/react-fontawesome"; import type { Row } from "@tanstack/react-table"; -import { useCallback } from "react"; import { useTranslation } from "react-i18next"; +import { useDeviceCommands } from "../../hooks/useDeviceCommands.js"; import type { DashboardTableData } from "../../pages/Dashboard.js"; -import { sendMessage } from "../../websocket/WebSocketManager.js"; import Button from "../Button.js"; import DeviceCard from "../device/DeviceCard.js"; import { RemoveDeviceModal } from "../modal/components/RemoveDeviceModal.js"; @@ -15,18 +14,7 @@ const DashboardItem = ({ original: { sourceIdx, device, deviceState, deviceAvailability, features, lastSeenConfig, removeDevice }, }: Row) => { const { t } = useTranslation("zigbee"); - - const onCardChange = useCallback( - async (value: unknown) => { - await sendMessage<"{friendlyNameOrId}/set">( - sourceIdx, - // @ts-expect-error templated API endpoint - `${device.ieee_address}/set`, - value, - ); - }, - [sourceIdx, device.ieee_address], - ); + const { onChange: onCardChange } = useDeviceCommands(sourceIdx, device); return (
state.deviceStates[sourceIdx][device.friendly_name] ?? {})); - - const onChange = useCallback( - async (value: Record) => { - await sendMessage<"{friendlyNameOrId}/set">( - sourceIdx, - // @ts-expect-error templated API endpoint - `${device.ieee_address}/set`, - value, - ); - }, - [sourceIdx, device.ieee_address], - ); - - const onRead = useCallback( - async (value: Record) => { - await sendMessage<"{friendlyNameOrId}/get">( - sourceIdx, - // @ts-expect-error templated API endpoint - `${device.ieee_address}/get`, - value, - ); - }, - [sourceIdx, device.ieee_address], - ); + const { onChange, onRead } = useDeviceCommands(sourceIdx, device); return device.definition?.exposes?.length ? (
diff --git a/src/components/group-page/GroupMembers.tsx b/src/components/group-page/GroupMembers.tsx index 80b4ad83a..c712d88b5 100644 --- a/src/components/group-page/GroupMembers.tsx +++ b/src/components/group-page/GroupMembers.tsx @@ -31,7 +31,7 @@ const GroupMembers = memo(({ sourceIdx, devices, group }: GroupMembersProps) => await sendMessage<"{friendlyNameOrId}/set">( sourceIdx, // @ts-expect-error templated API endpoint - `${ieee}/set`, + `${ieee}/request/set`, value, ); }, diff --git a/src/hooks/useDeviceCommands.ts b/src/hooks/useDeviceCommands.ts new file mode 100644 index 000000000..d0cbcb7a4 --- /dev/null +++ b/src/hooks/useDeviceCommands.ts @@ -0,0 +1,35 @@ +import { useCallback } from "react"; +import type { Device } from "../types.js"; +import { sendMessage } from "../websocket/WebSocketManager.js"; + +/** + * Hook that routes device set/get commands through the Transaction Response API topics. + * Sends to {ieee}/request/set and {ieee}/request/get instead of the legacy {ieee}/set and {ieee}/get. + */ +export function useDeviceCommands(sourceIdx: number, device: Device) { + const onChange = useCallback( + async (value: unknown) => { + await sendMessage<"{friendlyNameOrId}/set">( + sourceIdx, + // @ts-expect-error templated API endpoint + `${device.ieee_address}/request/set`, + value, + ); + }, + [sourceIdx, device.ieee_address], + ); + + const onRead = useCallback( + async (value: Record) => { + await sendMessage<"{friendlyNameOrId}/get">( + sourceIdx, + // @ts-expect-error templated API endpoint + `${device.ieee_address}/request/get`, + value, + ); + }, + [sourceIdx, device.ieee_address], + ); + + return { onChange, onRead }; +} diff --git a/src/store.ts b/src/store.ts index de79bcdff..c7d6663be 100644 --- a/src/store.ts +++ b/src/store.ts @@ -466,15 +466,21 @@ export const useAppStore = create((set, _get, store) => ( const match = newEntry.message.match(PUBLISH_GET_SET_REGEX); if (match) { - addedToasts = true; const [, type, key, name, error] = match; - newToasts.push({ - sourceIdx, - topic: `${name}/${type}(${key})`, - status: "error", - error, - }); + // Suppress toast for superseded commands. When rapidly sending commands + // to sleepy devices, herdsman cancels the older queued command and rejects + // it with "Request superseded". This is expected behavior, not an error. + // The log entry still appears in notifications for debugging. + if (!/request superseded/i.test(newEntry.message)) { + addedToasts = true; + newToasts.push({ + sourceIdx, + topic: `${name}/${type}(${key})`, + status: "error", + error, + }); + } } } } diff --git a/src/types.ts b/src/types.ts index b4365990b..d248b7099 100644 --- a/src/types.ts +++ b/src/types.ts @@ -165,6 +165,21 @@ export type AnySubFeature = BasicFeature | WithAnySubFeatures; + /** Result status */ + status: "ok" | "error"; + /** Error message — "group:key1,key2|group:key3" format (present when status is "error") */ + error?: string; + /** Echoed correlation ID from request */ + z2m_transaction?: string; +}; + export type RGBColor = { r: number; g: number; diff --git a/src/websocket/WebSocketManager.ts b/src/websocket/WebSocketManager.ts index b905fa8e2..fe20a7f70 100644 --- a/src/websocket/WebSocketManager.ts +++ b/src/websocket/WebSocketManager.ts @@ -4,7 +4,7 @@ import { AVAILABILITY_FEATURE_TOPIC_ENDING } from "../consts.js"; import { USE_PROXY } from "../envs.js"; import { AUTH_FLAG_KEY, AUTH_TOKEN_KEY } from "../localStoreConsts.js"; import { API_NAMES, API_URLS, useAppStore } from "../store.js"; -import type { LogMessage, Message, RecursiveMutable, ResponseMessage } from "../types.js"; +import type { CommandResponse, LogMessage, Message, RecursiveMutable, ResponseMessage } from "../types.js"; import { randomString, stringifyWithUndefinedAsNull } from "../utils.js"; // prevent stripping @@ -22,6 +22,8 @@ type PendingRequest = { timeoutId: number; }; +type DeviceSetCallback = (response: CommandResponse) => void; + type Connection = { idx: number; socket: WebSocket | undefined; @@ -30,6 +32,8 @@ type Connection = { transactionPrefix: string; transactionNumber: number; pending: Map; + /** Callbacks for transaction response messages (keyed by z2m_transaction) */ + deviceSetCallbacks: Map; deviceQueue: Message[]; logQueue: LogMessage[]; @@ -62,6 +66,7 @@ class WebSocketManager { transactionPrefix: randomString(5), transactionNumber: 1, pending: new Map(), + deviceSetCallbacks: new Map(), deviceQueue: [], logQueue: [], metricsMessagesSent: 0, @@ -174,6 +179,61 @@ class WebSocketManager { return this.#connections[idx].transactionPrefix; } + /** + * Generate a unique transaction ID for device commands. + * Use this when you want to receive a response via registerDeviceSetCallback. + */ + generateTransactionId(sourceIdx: number): string { + const conn = this.#connections[sourceIdx]; + return `${conn.transactionPrefix}-${conn.transactionNumber++}`; + } + + /** + * Register a callback for a transaction response. + * The callback will be called when a {device}/response/set or /response/get + * message arrives with a matching z2m_transaction. + * + * @param sourceIdx - The source/connection index + * @param requestId - The z2m_transaction value to listen for + * @param callback - Function to call with the CommandResponse + * @param timeoutMs - Optional timeout (default: 10000ms). Callback receives error response on timeout. + */ + registerDeviceSetCallback(sourceIdx: number, requestId: string, callback: DeviceSetCallback, timeoutMs = 10000): void { + const conn = this.#connections[sourceIdx]; + if (!conn) return; + + // Set up timeout to auto-cleanup if no response arrives + const timeoutId = window.setTimeout(() => { + const cb = conn.deviceSetCallbacks.get(requestId); + if (cb) { + conn.deviceSetCallbacks.delete(requestId); + cb({ + data: {}, + status: "error", + error: "Response timeout (frontend)", + z2m_transaction: requestId, + }); + } + }, timeoutMs); + + // Wrap callback to clear timeout when called + const wrappedCallback: DeviceSetCallback = (response) => { + clearTimeout(timeoutId); + callback(response); + }; + + conn.deviceSetCallbacks.set(requestId, wrappedCallback); + } + + /** + * Unregister a transaction response callback (e.g., on component unmount). + */ + unregisterDeviceSetCallback(sourceIdx: number, requestId: string): void { + const conn = this.#connections[sourceIdx]; + if (!conn) return; + conn.deviceSetCallbacks.delete(requestId); + } + async sendMessage(sourceIdx: number, topic: T, payload: Zigbee2MQTTAPI[T]): Promise { if (this.#destroyed) { return; @@ -500,12 +560,41 @@ class WebSocketManager { return; } + // Handle transaction response messages ({device}/response/set or /response/get) + if (parsed.topic.endsWith("/response/set") || parsed.topic.endsWith("/response/get")) { + this.#handleDeviceSetResponse(conn, parsed); + this.#scheduleFlush(); // ensure metrics commit + + return; + } + conn.metricsMessagesDevice++; // this.#scheduleFlush() called inside this.#queueUpdateDeviceState(conn, parsed as Message); } + /** + * Handle transaction response messages. + * These arrive on {device}/response/set or /response/get when the + * frontend sent to {device}/request/set or /request/get with z2m_transaction. + */ + #handleDeviceSetResponse(conn: Connection, msg: Message): void { + const payload = msg.payload as CommandResponse; + const txId = payload?.z2m_transaction; + + if (!txId) { + return; + } + + const callback = conn.deviceSetCallbacks.get(txId); + + if (callback) { + conn.deviceSetCallbacks.delete(txId); + callback(payload); + } + } + #handleBridge(conn: Connection, msg: Message): void { const store = useAppStore.getState(); @@ -754,3 +843,20 @@ export async function sendMessage(sourceI export function getTransactionPrefix(sourceIdx: number): string { return manager.getTransactionPrefix(sourceIdx); } + +export function generateTransactionId(sourceIdx: number): string { + return manager.generateTransactionId(sourceIdx); +} + +export function registerDeviceSetCallback( + sourceIdx: number, + requestId: string, + callback: (response: CommandResponse) => void, + timeoutMs?: number, +): void { + manager.registerDeviceSetCallback(sourceIdx, requestId, callback, timeoutMs); +} + +export function unregisterDeviceSetCallback(sourceIdx: number, requestId: string): void { + manager.unregisterDeviceSetCallback(sourceIdx, requestId); +}