diff --git a/lib/extension/frontend.ts b/lib/extension/frontend.ts index a7fc48ea12..c571c384ee 100644 --- a/lib/extension/frontend.ts +++ b/lib/extension/frontend.ts @@ -10,6 +10,7 @@ import bind from "bind-decorator"; import expressStaticGzip from "express-static-gzip"; import finalhandler from "finalhandler"; import stringify from "json-stable-stringify-without-jsonify"; +import type {IPublishPacket} from "mqtt"; import WebSocket from "ws"; import data from "../util/data"; @@ -174,7 +175,7 @@ export class Frontend extends Extension { if (!isBinary && data) { const message = data.toString(); const {topic, payload} = JSON.parse(message); - this.mqtt.onMessage(`${this.mqttBaseTopic}/${topic}`, Buffer.from(stringify(payload))); + this.mqtt.onMessage(`${this.mqttBaseTopic}/${topic}`, Buffer.from(stringify(payload)), {qos: 0} as IPublishPacket); } }); diff --git a/lib/extension/publish.ts b/lib/extension/publish.ts index d0998292ec..91be40c0a1 100644 --- a/lib/extension/publish.ts +++ b/lib/extension/publish.ts @@ -13,7 +13,7 @@ import Extension from "./extension"; let topicGetSetRegex: RegExp; // Used by `publish.test.ts` to reload regex when changing `mqtt.base_topic`. export const loadTopicGetSetRegex = (): void => { - topicGetSetRegex = new RegExp(`^${settings.get().mqtt.base_topic}/(?!bridge)(.+?)/(get|set)(?:/(.+))?$`); + topicGetSetRegex = new RegExp(`^${settings.get().mqtt.base_topic}/(?!bridge)(.+?)/(request/)?(get|set)(?:/(.+))?$`); }; const STATE_VALUES: ReadonlyArray = ["on", "off", "toggle", "open", "close", "stop", "lock", "unlock"]; @@ -24,6 +24,7 @@ interface ParsedTopic { endpoint: string | undefined; attribute: string; type: "get" | "set"; + isRequest: boolean; } export default class Publish extends Extension { @@ -50,11 +51,13 @@ export default class Publish extends Extension { } const deviceNameAndEndpoint = match[1]; - const attribute = match[3]; + const isRequest = match[2] !== undefined; + const type = match[3] as "get" | "set"; + const attribute = match[4]; // Now parse the device/group name, and endpoint name const entity = this.zigbee.resolveEntityAndEndpoint(deviceNameAndEndpoint); - return {ID: entity.ID, endpoint: entity.endpointID, type: match[2] as "get" | "set", attribute: attribute}; + return {ID: entity.ID, endpoint: entity.endpointID, type, attribute, isRequest}; } parseMessage(parsedTopic: ParsedTopic, data: eventdata.MQTTMessage): KeyValue | undefined { @@ -130,6 +133,22 @@ export default class Publish extends Extension { return; } + // Extract and strip z2m_transaction before forwarding to converters + const z2mTransaction = parsedTopic.isRequest ? message.z2m_transaction : undefined; + if (message.z2m_transaction !== undefined) { + delete message.z2m_transaction; + } + + // Ping: /request/ topic with empty payload after stripping z2m_transaction + if (parsedTopic.isRequest && Object.keys(message).length === 0) { + const response: KeyValue = {data: {}, status: "ok"}; + if (z2mTransaction !== undefined) response.z2m_transaction = z2mTransaction; + await this.mqtt.publish(`${re.name}/response/${parsedTopic.type}`, stringify(response), { + clientOptions: {qos: /* v8 ignore next */ data.qos ?? 0, retain: false}, + }); + return; + } + const device = re instanceof Device ? re.zh : undefined; const entitySettings = re.options; const entityState = this.state.get(re); @@ -173,8 +192,14 @@ export default class Publish extends Extension { const endpointNames = re instanceof Device ? re.getEndpointNames() : []; const propertyEndpointRegex = new RegExp(`^(.*?)_(${endpointNames.join("|")})$`); let scenesChanged = false; + const responseData: KeyValue = {}; + const supersededKeys: string[] = []; + const failedKeys: string[] = []; + // Future: send responses per attribute as each settles, rather than waiting for all. + // Currently, multi-attribute requests to sleepy devices block sequentially per attribute. for (const entry of entries) { + const originalKey = entry[0]; let key = entry[0]; const value = entry[1]; let endpointName = parsedTopic.endpoint; @@ -280,6 +305,14 @@ export default class Publish extends Extension { logger.error(message); // biome-ignore lint/style/noNonNullAssertion: always Error logger.debug((error as Error).stack!); + if (parsedTopic.isRequest) { + ((error as Error).message?.includes("Request superseded") ? supersededKeys : failedKeys).push(originalKey); + } + } + + // Track result for response (set echoes requested value; get returns status only) + if (parsedTopic.isRequest && parsedTopic.type === "set" && !failedKeys.includes(originalKey) && !supersededKeys.includes(originalKey)) { + responseData[originalKey] = value; } usedConverters[endpointOrGroupID].push(converter); @@ -289,6 +322,18 @@ export default class Publish extends Extension { } } + if (parsedTopic.isRequest) { + const errorParts: string[] = []; + if (supersededKeys.length > 0) errorParts.push(`superseded:${supersededKeys.join(",")}`); + if (failedKeys.length > 0) errorParts.push(`failed:${failedKeys.join(",")}`); + const response: KeyValue = + errorParts.length > 0 ? {data: responseData, status: "error", error: errorParts.join("|")} : {data: responseData, status: "ok"}; + if (z2mTransaction !== undefined) response.z2m_transaction = z2mTransaction; + await this.mqtt.publish(`${re.name}/response/${parsedTopic.type}`, stringify(response), { + clientOptions: {qos: /* v8 ignore next */ data.qos ?? 0, retain: false}, + }); + } + for (const [ID, payload] of Object.entries(toPublish)) { if (!utils.objectIsEmpty(payload)) { await this.publishEntityState(toPublishEntity[ID], payload); diff --git a/lib/mqtt.ts b/lib/mqtt.ts index 8104ad6f55..3df16f7745 100644 --- a/lib/mqtt.ts +++ b/lib/mqtt.ts @@ -1,6 +1,6 @@ import fs from "node:fs"; import bind from "bind-decorator"; -import type {IClientOptions, IClientPublishOptions, MqttClient} from "mqtt"; +import type {IClientOptions, IClientPublishOptions, IPublishPacket, MqttClient} from "mqtt"; import {connectAsync} from "mqtt"; import type {Zigbee2MQTTAPI} from "./types/api"; @@ -179,11 +179,11 @@ export default class Mqtt { await this.subscribe(`${settings.get().mqtt.base_topic}/#`); } - @bind public onMessage(topic: string, message: Buffer): void { + @bind public onMessage(topic: string, message: Buffer, packet: IPublishPacket): void { // Since we subscribe to zigbee2mqtt/# we also receive the message we send ourselves, skip these. if (!this.publishedTopics.has(topic)) { logger.debug(() => `Received MQTT message on '${topic}' with data '${message.toString()}'`, NS); - this.eventBus.emitMQTTMessage({topic, message: message.toString()}); + this.eventBus.emitMQTTMessage({topic, message: message.toString(), qos: /* v8 ignore next */ packet?.qos ?? 0}); } if (this.republishRetainedTimer && topic === `${settings.get().mqtt.base_topic}/bridge/info`) { diff --git a/lib/types/types.d.ts b/lib/types/types.d.ts index cc09c32d3f..995f866ea3 100644 --- a/lib/types/types.d.ts +++ b/lib/types/types.d.ts @@ -47,7 +47,7 @@ declare global { namespace eventdata { type EntityRenamed = {entity: Device | Group; homeAssisantRename: boolean; from: string; to: string}; type EntityRemoved = {entity: Device | Group; name: string}; - type MQTTMessage = {topic: string; message: string}; + type MQTTMessage = {topic: string; message: string; qos?: 0 | 1 | 2}; type MQTTMessagePublished = {topic: string; payload: string; options: MqttPublishOptions}; type StateChange = { entity: Device | Group; diff --git a/test/controller.test.ts b/test/controller.test.ts index 6fdcd1923d..8cf32bcccc 100644 --- a/test/controller.test.ts +++ b/test/controller.test.ts @@ -761,7 +761,7 @@ describe("Controller", () => { await controller.start(); mockLogger.debug.mockClear(); await mockMQTTEvents.message("dummytopic", "dummymessage"); - expect(spyEventbusEmitMQTTMessage).toHaveBeenCalledWith({topic: "dummytopic", message: "dummymessage"}); + expect(spyEventbusEmitMQTTMessage).toHaveBeenCalledWith({topic: "dummytopic", message: "dummymessage", qos: 0}); expect(mockLogger.log).toHaveBeenCalledWith("debug", "Received MQTT message on 'dummytopic' with data 'dummymessage'", LOG_MQTT_NS); }); @@ -771,7 +771,7 @@ describe("Controller", () => { await controller.start(); mockLogger.debug.mockClear(); await mockMQTTEvents.message("zigbee2mqtt/skip-this-topic", "skipped"); - expect(spyEventbusEmitMQTTMessage).toHaveBeenCalledWith({topic: "zigbee2mqtt/skip-this-topic", message: "skipped"}); + expect(spyEventbusEmitMQTTMessage).toHaveBeenCalledWith({topic: "zigbee2mqtt/skip-this-topic", message: "skipped", qos: 0}); mockLogger.debug.mockClear(); await controller.mqtt.publish("skip-this-topic", "", {}); await mockMQTTEvents.message("zigbee2mqtt/skip-this-topic", "skipped"); diff --git a/test/extensions/publish.test.ts b/test/extensions/publish.test.ts index c9b284e24d..12e8fb4ddb 100644 --- a/test/extensions/publish.test.ts +++ b/test/extensions/publish.test.ts @@ -2024,4 +2024,135 @@ describe("Extension: Publish", () => { await flushPromises(); expect(mockLogger.error).toHaveBeenCalledWith("Entity 'an_unknown_entity' is unknown"); }); + + describe("Transaction Response", () => { + it("Should respond on /request/set with success and echo z2m_transaction", async () => { + const endpoint = devices.bulb_color.getEndpoint(1)!; + await mockMQTTEvents.message("zigbee2mqtt/bulb_color/request/set", stringify({brightness: "200", z2m_transaction: "tx1"})); + await flushPromises(); + expect(endpoint.command).toHaveBeenCalledTimes(1); + expect(endpoint.command).toHaveBeenCalledWith( + "genLevelCtrl", + "moveToLevelWithOnOff", + {level: 200, transtime: 0, optionsMask: 0, optionsOverride: 0}, + {}, + ); + expect(mockMQTTPublishAsync).toHaveBeenCalledWith( + "zigbee2mqtt/bulb_color/response/set", + stringify({data: {brightness: "200"}, status: "ok", z2m_transaction: "tx1"}), + {qos: 0, retain: false}, + ); + // z2m_transaction must be stripped before converters + expect(mockLogger.error).not.toHaveBeenCalledWith(expect.stringContaining("z2m_transaction")); + }); + + it("Should respond on /request/set without z2m_transaction when not provided", async () => { + await mockMQTTEvents.message("zigbee2mqtt/bulb_color/request/set", stringify({brightness: "200"})); + await flushPromises(); + expect(mockMQTTPublishAsync).toHaveBeenCalledWith( + "zigbee2mqtt/bulb_color/response/set", + stringify({data: {brightness: "200"}, status: "ok"}), + {qos: 0, retain: false}, + ); + }); + + it("Should respond on /request/get with status only (no data)", async () => { + const device = controller.zigbee.resolveEntity(devices.bulb_color.ieeeAddr)!; + controller.state.set(device, {state: "ON", brightness: 200}); + await mockMQTTEvents.message("zigbee2mqtt/bulb_color/request/get", stringify({state: ""})); + await flushPromises(); + expect(mockMQTTPublishAsync).toHaveBeenCalledWith("zigbee2mqtt/bulb_color/response/get", stringify({data: {}, status: "ok"}), { + qos: 0, + retain: false, + }); + }); + + it("Should respond with error on /request/set when converter fails", async () => { + const endpoint = devices.bulb_color.getEndpoint(1)!; + endpoint.command.mockRejectedValueOnce(new Error("Zigbee error")); + await mockMQTTEvents.message("zigbee2mqtt/bulb_color/request/set", stringify({brightness: "200", z2m_transaction: "tx2"})); + await flushPromises(); + expect(mockMQTTPublishAsync).toHaveBeenCalledWith( + "zigbee2mqtt/bulb_color/response/set", + stringify({data: {}, error: "failed:brightness", status: "error", z2m_transaction: "tx2"}), + {qos: 0, retain: false}, + ); + }); + + it("Should respond with superseded error when herdsman reports Request superseded", async () => { + const endpoint = devices.bulb_color.getEndpoint(1)!; + endpoint.command.mockRejectedValueOnce(new Error("Request superseded")); + await mockMQTTEvents.message("zigbee2mqtt/bulb_color/request/set", stringify({brightness: "200", z2m_transaction: "tx3"})); + await flushPromises(); + expect(mockMQTTPublishAsync).toHaveBeenCalledWith( + "zigbee2mqtt/bulb_color/response/set", + stringify({data: {}, error: "superseded:brightness", status: "error", z2m_transaction: "tx3"}), + {qos: 0, retain: false}, + ); + }); + + it("Should NOT publish response for legacy /set topic", async () => { + await mockMQTTEvents.message("zigbee2mqtt/bulb_color/set", stringify({brightness: "200"})); + await flushPromises(); + // Verify the message was processed (optimistic state published) + expect(mockMQTTPublishAsync).toHaveBeenCalled(); + // But no response topic was published + const responseCall = mockMQTTPublishAsync.mock.calls.find((c: unknown[]) => String(c[0]).includes("/response/")); + expect(responseCall).toBeUndefined(); + }); + + it("Should respond to ping (empty payload after z2m_transaction strip)", async () => { + const endpoint = devices.bulb_color.getEndpoint(1)!; + await mockMQTTEvents.message("zigbee2mqtt/bulb_color/request/set", stringify({z2m_transaction: "ping1"})); + await flushPromises(); + expect(endpoint.command).toHaveBeenCalledTimes(0); + expect(mockMQTTPublishAsync).toHaveBeenCalledWith( + "zigbee2mqtt/bulb_color/response/set", + stringify({data: {}, status: "ok", z2m_transaction: "ping1"}), + {qos: 0, retain: false}, + ); + }); + + it("Should match QoS of request in response", async () => { + await mockMQTTEvents.message("zigbee2mqtt/bulb_color/request/set", stringify({brightness: "200"}), {qos: 1}); + await flushPromises(); + expect(mockMQTTPublishAsync).toHaveBeenCalledWith("zigbee2mqtt/bulb_color/response/set", expect.any(String), {qos: 1, retain: false}); + }); + + it("Should respond on group /request/set", async () => { + const group = groups.group_1; + group.members.push(devices.bulb_color.getEndpoint(1)!); + await mockMQTTEvents.message("zigbee2mqtt/group_1/request/set", stringify({state: "ON", z2m_transaction: "grp1"})); + await flushPromises(); + expect(group.command).toHaveBeenCalledTimes(1); + expect(mockMQTTPublishAsync).toHaveBeenCalledWith( + "zigbee2mqtt/group_1/response/set", + stringify({data: {state: "ON"}, status: "ok", z2m_transaction: "grp1"}), + {qos: 0, retain: false}, + ); + group.members.pop(); + }); + + it("Should respond on /request/set with attribute in topic", async () => { + await mockMQTTEvents.message("zigbee2mqtt/bulb_color/request/set/brightness", "200"); + await flushPromises(); + expect(mockMQTTPublishAsync).toHaveBeenCalledWith( + "zigbee2mqtt/bulb_color/response/set", + stringify({data: {brightness: 200}, status: "ok"}), + {qos: 0, retain: false}, + ); + }); + + it("Should respond on /request/set with endpoint in topic", async () => { + await mockMQTTEvents.message("zigbee2mqtt/0x0017880104e45542/left/request/set", stringify({state: "ON", z2m_transaction: "ep1"})); + await flushPromises(); + const endpoint = devices.QBKG03LM.getEndpoint(2)!; + expect(endpoint.command).toHaveBeenCalledTimes(1); + expect(mockMQTTPublishAsync).toHaveBeenCalledWith( + "zigbee2mqtt/wall_switch_double/response/set", + stringify({data: {state: "ON"}, status: "ok", z2m_transaction: "ep1"}), + {qos: 0, retain: false}, + ); + }); + }); });