Skip to content

feat(mqtt): topic-based measurement + tag extraction in topic_mapping #479

@xe-nvdk

Description

@xe-nvdk

Summary

Implement the topic-pattern → measurement + positional tag-extraction feature for MQTT subscriptions. This was documented but never implemented — the docs described a rich topic_mapping schema while the code only supports a flat per-topic database override. The docs were corrected to match current behavior in docs.basekick.net#11; this issue tracks building the feature the docs originally promised.

Background

Reported on Discord: a user followed the MQTT docs verbatim and got 400 "Invalid request body" on POST /api/v1/mqtt/subscriptions because the documented request body couldn't deserialize.

What the docs described (and users expect):

"topic_mapping": {
  "factory/+/+/metrics": {
    "measurement": "machine_metrics",
    "tags_from_topic": [
      {"position": 1, "tag_name": "line"},
      {"position": 2, "tag_name": "machine_id"}
    ]
  }
}

i.e. route a topic pattern to a measurement and extract tag values from topic path segments.

What the code actually does today:

  • CreateSubscriptionRequest.TopicMapping is map[string]string (internal/mqtt/subscription.go:78).
  • onMessage uses it only as a topic→database override, by exact-string match (internal/mqtt/subscriber.go:302-305).
  • Measurement comes from the payload m/measurement field, defaulting to "mqtt" (subscriber.go:413-418); tags come from the payload tags object (subscriber.go:425-431). The topic is never parsed for measurement or tags.

Proposed behavior

For a topic-mapping rule matched against an incoming message topic, allow:

  • database (string, optional) — overrides the subscription default (preserves today's capability).
  • measurement (string, optional) — sets the measurement, overriding the payload-derived value.
  • tags_from_topic (array, optional) — [{ "position": <int>, "tag_name": "<string>" }], extracting topic path segments (split on /) into tags. Position indexing convention must be documented (the old docs implied 0-based for the first wildcard segment — pin this down).

Matching should support MQTT wildcard patterns (+, #) against the concrete message topic, not just exact strings, so a single rule like factory/+/+/metrics covers many topics.

Payload-derived measurement/tags remain the default when no rule matches or a rule omits a field.

Design notes / open questions

  1. Backward-compatible request schema. topic_mapping is map[string]string today. Changing it to map[string]<object> is a breaking change to the request body and the persisted form (repository.go stores topic_mapping as TEXT/JSON). Options:
    • Accept both shapes (string value = db-override shorthand; object value = full rule) via a custom UnmarshalJSON. Keeps existing subscriptions working.
    • Or introduce a new field (e.g. topic_rules) and keep topic_mapping as the string-map db-override, deprecating nothing.
      Decide before implementing — option (a) matches the original docs’ key shape.
  2. Persistence/migration. Existing rows store the flat {topic: db} JSON; the loader must tolerate both shapes.
  3. Wildcard matching cost. Per-message topic→rule matching runs on the hot ingest path (onMessage); precompile/normalize rules at subscription start, don’t parse per message.
  4. Position semantics + bounds. Define whether position is over the full topic segments or only wildcard segments, and handle out-of-range positions (skip vs error).
  5. Tests for: exact + wildcard match, multi-segment extraction, position out of range, rule with only database, rule with only measurement, payload value vs rule precedence, and the both-shapes unmarshal.

References

Out of scope

  • Re-introducing the old (incorrect) docs — already fixed in docs#11.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions