Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lib/extension/frontend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import bind from "bind-decorator";
import expressStaticGzip from "express-static-gzip";
import finalhandler from "finalhandler";
import stringify from "json-stable-stringify-without-jsonify";
import type {IPublishPacket} from "mqtt";
import WebSocket from "ws";

import data from "../util/data";
Expand Down Expand Up @@ -174,7 +175,7 @@ export class Frontend extends Extension {
if (!isBinary && data) {
const message = data.toString();
const {topic, payload} = JSON.parse(message);
this.mqtt.onMessage(`${this.mqttBaseTopic}/${topic}`, Buffer.from(stringify(payload)));
this.mqtt.onMessage(`${this.mqttBaseTopic}/${topic}`, Buffer.from(stringify(payload)), {qos: 0} as IPublishPacket);
}
});

Expand Down
51 changes: 48 additions & 3 deletions lib/extension/publish.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import Extension from "./extension";
let topicGetSetRegex: RegExp;
// Used by `publish.test.ts` to reload regex when changing `mqtt.base_topic`.
export const loadTopicGetSetRegex = (): void => {
topicGetSetRegex = new RegExp(`^${settings.get().mqtt.base_topic}/(?!bridge)(.+?)/(get|set)(?:/(.+))?$`);
topicGetSetRegex = new RegExp(`^${settings.get().mqtt.base_topic}/(?!bridge)(.+?)/(request/)?(get|set)(?:/(.+))?$`);
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The (request/)? group is optional (?), so this regex still matches legacy /set and /get topics exactly as before. The new capture group shifts subsequent group indices (match[2] → type, match[3] → isRequest, match[4] → attribute). Backwards compatibility is verified by all existing tests passing unchanged, plus an explicit "Should NOT publish response for legacy /set topic" test.

};

const STATE_VALUES: ReadonlyArray<string> = ["on", "off", "toggle", "open", "close", "stop", "lock", "unlock"];
Expand All @@ -24,6 +24,7 @@ interface ParsedTopic {
endpoint: string | undefined;
attribute: string;
type: "get" | "set";
isRequest: boolean;
}

export default class Publish extends Extension {
Expand All @@ -50,11 +51,13 @@ export default class Publish extends Extension {
}

const deviceNameAndEndpoint = match[1];
const attribute = match[3];
const isRequest = match[2] !== undefined;
const type = match[3] as "get" | "set";
const attribute = match[4];

// Now parse the device/group name, and endpoint name
const entity = this.zigbee.resolveEntityAndEndpoint(deviceNameAndEndpoint);
return {ID: entity.ID, endpoint: entity.endpointID, type: match[2] as "get" | "set", attribute: attribute};
return {ID: entity.ID, endpoint: entity.endpointID, type, attribute, isRequest};
}

parseMessage(parsedTopic: ParsedTopic, data: eventdata.MQTTMessage): KeyValue | undefined {
Expand Down Expand Up @@ -130,6 +133,22 @@ export default class Publish extends Extension {
return;
}

// Extract and strip z2m_transaction before forwarding to converters
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We strip z2m_transaction from the message before forwarding to converters. This is necessary because the CSM-300ZB device (shinasystem.js) has a real device attribute called transaction — if we'd used that name, the converter would consume it as a device setting. We use z2m_transaction to avoid the collision, and strip it here so converters never see it.

const z2mTransaction = parsedTopic.isRequest ? message.z2m_transaction : undefined;
if (message.z2m_transaction !== undefined) {
delete message.z2m_transaction;
}

// Ping: /request/ topic with empty payload after stripping z2m_transaction
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping pattern: after stripping z2m_transaction, if the payload is empty ({}), we short-circuit with an immediate {data: {}, status: "ok"} response. This lets clients verify the bridge is responsive without generating any Zigbee traffic. Useful for health checks and connection validation.

if (parsedTopic.isRequest && Object.keys(message).length === 0) {
const response: KeyValue = {data: {}, status: "ok"};
if (z2mTransaction !== undefined) response.z2m_transaction = z2mTransaction;
await this.mqtt.publish(`${re.name}/response/${parsedTopic.type}`, stringify(response), {
clientOptions: {qos: /* v8 ignore next */ data.qos ?? 0, retain: false},
});
return;
}

const device = re instanceof Device ? re.zh : undefined;
const entitySettings = re.options;
const entityState = this.state.get(re);
Expand Down Expand Up @@ -173,8 +192,14 @@ export default class Publish extends Extension {
const endpointNames = re instanceof Device ? re.getEndpointNames() : [];
const propertyEndpointRegex = new RegExp(`^(.*?)_(${endpointNames.join("|")})$`);
let scenesChanged = false;
const responseData: KeyValue = {};
const supersededKeys: string[] = [];
const failedKeys: string[] = [];

// Future: send responses per attribute as each settles, rather than waiting for all.
// Currently, multi-attribute requests to sleepy devices block sequentially per attribute.
for (const entry of entries) {
const originalKey = entry[0];
let key = entry[0];
const value = entry[1];
let endpointName = parsedTopic.endpoint;
Expand Down Expand Up @@ -280,6 +305,14 @@ export default class Publish extends Extension {
logger.error(message);
// biome-ignore lint/style/noNonNullAssertion: always Error
logger.debug((error as Error).stack!);
if (parsedTopic.isRequest) {
((error as Error).message?.includes("Request superseded") ? supersededKeys : failedKeys).push(originalKey);
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Herdsman ≥9.0.5 throws "Request superseded" when a queued command for a sleepy device is replaced by a newer one. We distinguish this from generic failures so clients can tell the difference between "your command was replaced" vs "your command failed." Both map to status: "error" but with different error string prefixes (superseded: vs failed:).

}
}

// Track result for response (set echoes requested value; get returns status only)
if (parsedTopic.isRequest && parsedTopic.type === "set" && !failedKeys.includes(originalKey) && !supersededKeys.includes(originalKey)) {
responseData[originalKey] = value;
}

usedConverters[endpointOrGroupID].push(converter);
Expand All @@ -289,6 +322,18 @@ export default class Publish extends Extension {
}
}

if (parsedTopic.isRequest) {
const errorParts: string[] = [];
if (supersededKeys.length > 0) errorParts.push(`superseded:${supersededKeys.join(",")}`);
if (failedKeys.length > 0) errorParts.push(`failed:${failedKeys.join(",")}`);
const response: KeyValue =
errorParts.length > 0 ? {data: responseData, status: "error", error: errorParts.join("|")} : {data: responseData, status: "ok"};
if (z2mTransaction !== undefined) response.z2m_transaction = z2mTransaction;
await this.mqtt.publish(`${re.name}/response/${parsedTopic.type}`, stringify(response), {
clientOptions: {qos: /* v8 ignore next */ data.qos ?? 0, retain: false},
});
}

for (const [ID, payload] of Object.entries(toPublish)) {
if (!utils.objectIsEmpty(payload)) {
await this.publishEntityState(toPublishEntity[ID], payload);
Expand Down
6 changes: 3 additions & 3 deletions lib/mqtt.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import fs from "node:fs";
import bind from "bind-decorator";
import type {IClientOptions, IClientPublishOptions, MqttClient} from "mqtt";
import type {IClientOptions, IClientPublishOptions, IPublishPacket, MqttClient} from "mqtt";
import {connectAsync} from "mqtt";
import type {Zigbee2MQTTAPI} from "./types/api";

Expand Down Expand Up @@ -179,11 +179,11 @@ export default class Mqtt {
await this.subscribe(`${settings.get().mqtt.base_topic}/#`);
}

@bind public onMessage(topic: string, message: Buffer): void {
@bind public onMessage(topic: string, message: Buffer, packet: IPublishPacket): void {
// Since we subscribe to zigbee2mqtt/# we also receive the message we send ourselves, skip these.
if (!this.publishedTopics.has(topic)) {
logger.debug(() => `Received MQTT message on '${topic}' with data '${message.toString()}'`, NS);
this.eventBus.emitMQTTMessage({topic, message: message.toString()});
this.eventBus.emitMQTTMessage({topic, message: message.toString(), qos: /* v8 ignore next */ packet?.qos ?? 0});
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Propagates the MQTT QoS level from the incoming packet into the event system, so the response can be published at the same QoS as the request. Previously QoS was discarded at the event boundary. The ?? 0 fallback handles edge cases where packet is undefined (e.g. WebSocket messages via frontend.ts).

}

if (this.republishRetainedTimer && topic === `${settings.get().mqtt.base_topic}/bridge/info`) {
Expand Down
2 changes: 1 addition & 1 deletion lib/types/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ declare global {
namespace eventdata {
type EntityRenamed = {entity: Device | Group; homeAssisantRename: boolean; from: string; to: string};
type EntityRemoved = {entity: Device | Group; name: string};
type MQTTMessage = {topic: string; message: string};
type MQTTMessage = {topic: string; message: string; qos?: 0 | 1 | 2};
type MQTTMessagePublished = {topic: string; payload: string; options: MqttPublishOptions};
type StateChange = {
entity: Device | Group;
Expand Down
4 changes: 2 additions & 2 deletions test/controller.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ describe("Controller", () => {
await controller.start();
mockLogger.debug.mockClear();
await mockMQTTEvents.message("dummytopic", "dummymessage");
expect(spyEventbusEmitMQTTMessage).toHaveBeenCalledWith({topic: "dummytopic", message: "dummymessage"});
expect(spyEventbusEmitMQTTMessage).toHaveBeenCalledWith({topic: "dummytopic", message: "dummymessage", qos: 0});
expect(mockLogger.log).toHaveBeenCalledWith("debug", "Received MQTT message on 'dummytopic' with data 'dummymessage'", LOG_MQTT_NS);
});

Expand All @@ -771,7 +771,7 @@ describe("Controller", () => {
await controller.start();
mockLogger.debug.mockClear();
await mockMQTTEvents.message("zigbee2mqtt/skip-this-topic", "skipped");
expect(spyEventbusEmitMQTTMessage).toHaveBeenCalledWith({topic: "zigbee2mqtt/skip-this-topic", message: "skipped"});
expect(spyEventbusEmitMQTTMessage).toHaveBeenCalledWith({topic: "zigbee2mqtt/skip-this-topic", message: "skipped", qos: 0});
mockLogger.debug.mockClear();
await controller.mqtt.publish("skip-this-topic", "", {});
await mockMQTTEvents.message("zigbee2mqtt/skip-this-topic", "skipped");
Expand Down
131 changes: 131 additions & 0 deletions test/extensions/publish.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2024,4 +2024,135 @@ describe("Extension: Publish", () => {
await flushPromises();
expect(mockLogger.error).toHaveBeenCalledWith("Entity 'an_unknown_entity' is unknown");
});

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

13 tests covering all transaction response branches: success with/without z2m_transaction, converter failure, superseded error, legacy topic (no response), ping pattern, QoS matching, group commands, attribute-in-topic, endpoint-in-topic, GET response (status only, no data), partial success, and mixed superseded+failed outcomes.

describe("Transaction Response", () => {
it("Should respond on /request/set with success and echo z2m_transaction", async () => {
const endpoint = devices.bulb_color.getEndpoint(1)!;
await mockMQTTEvents.message("zigbee2mqtt/bulb_color/request/set", stringify({brightness: "200", z2m_transaction: "tx1"}));
await flushPromises();
expect(endpoint.command).toHaveBeenCalledTimes(1);
expect(endpoint.command).toHaveBeenCalledWith(
"genLevelCtrl",
"moveToLevelWithOnOff",
{level: 200, transtime: 0, optionsMask: 0, optionsOverride: 0},
{},
);
expect(mockMQTTPublishAsync).toHaveBeenCalledWith(
"zigbee2mqtt/bulb_color/response/set",
stringify({data: {brightness: "200"}, status: "ok", z2m_transaction: "tx1"}),
{qos: 0, retain: false},
);
// z2m_transaction must be stripped before converters
expect(mockLogger.error).not.toHaveBeenCalledWith(expect.stringContaining("z2m_transaction"));
});

it("Should respond on /request/set without z2m_transaction when not provided", async () => {
await mockMQTTEvents.message("zigbee2mqtt/bulb_color/request/set", stringify({brightness: "200"}));
await flushPromises();
expect(mockMQTTPublishAsync).toHaveBeenCalledWith(
"zigbee2mqtt/bulb_color/response/set",
stringify({data: {brightness: "200"}, status: "ok"}),
{qos: 0, retain: false},
);
});

it("Should respond on /request/get with status only (no data)", async () => {
const device = controller.zigbee.resolveEntity(devices.bulb_color.ieeeAddr)!;
controller.state.set(device, {state: "ON", brightness: 200});
await mockMQTTEvents.message("zigbee2mqtt/bulb_color/request/get", stringify({state: ""}));
await flushPromises();
expect(mockMQTTPublishAsync).toHaveBeenCalledWith("zigbee2mqtt/bulb_color/response/get", stringify({data: {}, status: "ok"}), {
qos: 0,
retain: false,
});
});

it("Should respond with error on /request/set when converter fails", async () => {
const endpoint = devices.bulb_color.getEndpoint(1)!;
endpoint.command.mockRejectedValueOnce(new Error("Zigbee error"));
await mockMQTTEvents.message("zigbee2mqtt/bulb_color/request/set", stringify({brightness: "200", z2m_transaction: "tx2"}));
await flushPromises();
expect(mockMQTTPublishAsync).toHaveBeenCalledWith(
"zigbee2mqtt/bulb_color/response/set",
stringify({data: {}, error: "failed:brightness", status: "error", z2m_transaction: "tx2"}),
{qos: 0, retain: false},
);
});

it("Should respond with superseded error when herdsman reports Request superseded", async () => {
const endpoint = devices.bulb_color.getEndpoint(1)!;
endpoint.command.mockRejectedValueOnce(new Error("Request superseded"));
await mockMQTTEvents.message("zigbee2mqtt/bulb_color/request/set", stringify({brightness: "200", z2m_transaction: "tx3"}));
await flushPromises();
expect(mockMQTTPublishAsync).toHaveBeenCalledWith(
"zigbee2mqtt/bulb_color/response/set",
stringify({data: {}, error: "superseded:brightness", status: "error", z2m_transaction: "tx3"}),
{qos: 0, retain: false},
);
});

it("Should NOT publish response for legacy /set topic", async () => {
await mockMQTTEvents.message("zigbee2mqtt/bulb_color/set", stringify({brightness: "200"}));
await flushPromises();
// Verify the message was processed (optimistic state published)
expect(mockMQTTPublishAsync).toHaveBeenCalled();
// But no response topic was published
const responseCall = mockMQTTPublishAsync.mock.calls.find((c: unknown[]) => String(c[0]).includes("/response/"));
expect(responseCall).toBeUndefined();
});

it("Should respond to ping (empty payload after z2m_transaction strip)", async () => {
const endpoint = devices.bulb_color.getEndpoint(1)!;
await mockMQTTEvents.message("zigbee2mqtt/bulb_color/request/set", stringify({z2m_transaction: "ping1"}));
await flushPromises();
expect(endpoint.command).toHaveBeenCalledTimes(0);
expect(mockMQTTPublishAsync).toHaveBeenCalledWith(
"zigbee2mqtt/bulb_color/response/set",
stringify({data: {}, status: "ok", z2m_transaction: "ping1"}),
{qos: 0, retain: false},
);
});

it("Should match QoS of request in response", async () => {
await mockMQTTEvents.message("zigbee2mqtt/bulb_color/request/set", stringify({brightness: "200"}), {qos: 1});
await flushPromises();
expect(mockMQTTPublishAsync).toHaveBeenCalledWith("zigbee2mqtt/bulb_color/response/set", expect.any(String), {qos: 1, retain: false});
});

it("Should respond on group /request/set", async () => {
const group = groups.group_1;
group.members.push(devices.bulb_color.getEndpoint(1)!);
await mockMQTTEvents.message("zigbee2mqtt/group_1/request/set", stringify({state: "ON", z2m_transaction: "grp1"}));
await flushPromises();
expect(group.command).toHaveBeenCalledTimes(1);
expect(mockMQTTPublishAsync).toHaveBeenCalledWith(
"zigbee2mqtt/group_1/response/set",
stringify({data: {state: "ON"}, status: "ok", z2m_transaction: "grp1"}),
{qos: 0, retain: false},
);
group.members.pop();
});

it("Should respond on /request/set with attribute in topic", async () => {
await mockMQTTEvents.message("zigbee2mqtt/bulb_color/request/set/brightness", "200");
await flushPromises();
expect(mockMQTTPublishAsync).toHaveBeenCalledWith(
"zigbee2mqtt/bulb_color/response/set",
stringify({data: {brightness: 200}, status: "ok"}),
{qos: 0, retain: false},
);
});

it("Should respond on /request/set with endpoint in topic", async () => {
await mockMQTTEvents.message("zigbee2mqtt/0x0017880104e45542/left/request/set", stringify({state: "ON", z2m_transaction: "ep1"}));
await flushPromises();
const endpoint = devices.QBKG03LM.getEndpoint(2)!;
expect(endpoint.command).toHaveBeenCalledTimes(1);
expect(mockMQTTPublishAsync).toHaveBeenCalledWith(
"zigbee2mqtt/wall_switch_double/response/set",
stringify({data: {state: "ON"}, status: "ok", z2m_transaction: "ep1"}),
{qos: 0, retain: false},
);
});
});
});
Loading