From 18fa6a22066c94ac257f12e794144de4fe8387fb Mon Sep 17 00:00:00 2001 From: Linzp Date: Fri, 22 May 2026 17:55:51 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E4=BF=AE=E6=94=B9channel-meta?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 24 ++- doc/api.md | 58 ++++-- doc/summary.md | 340 ++++++++++++++++++++++++++++++++++ libs/models/channel-meta.js | 24 +++ libs/models/data-record.js | 23 +-- libs/models/period-stat.js | 15 +- libs/services/channel-meta.js | 38 ++++ libs/services/data-record.js | 34 +++- libs/services/period-stat.js | 98 +++++----- package.json | 5 +- test/channel-meta.test.js | 182 ++++++++++++++++++ test/data-record.test.js | 23 ++- test/period-stat.test.js | 215 +++++++++++---------- 13 files changed, 870 insertions(+), 209 deletions(-) create mode 100644 libs/models/channel-meta.js create mode 100644 libs/services/channel-meta.js create mode 100644 test/channel-meta.test.js diff --git a/README.md b/README.md index 0b6c88d..c8face7 100644 --- a/README.md +++ b/README.md @@ -218,13 +218,12 @@ data: {"message":"错误信息"} | 属性名 | 类型 | 说明 | |--------|------|------| | channel | STRING | 数据通道(必填) | -| title | STRING | 标题(必填) | -| description | TEXT | 描述 | | attributeName | STRING | 属性名(默认 default) | | data | DECIMAL(16,2) | 数据值(必填) | -| unit | STRING | 数据单位 | | time | DATE | 采集时间(必填) | +> `title`、`description`、`unit` 已移至 `channel-meta` 表,按 `(channel, attributeName)` 关联。 + #### period-stat(周期统计) | 属性名 | 类型 | 说明 | @@ -232,11 +231,24 @@ data: {"message":"错误信息"} | period | STRING | 统计周期(必填) | | time | DATE | 统计时间(必填) | | channel | STRING | 数据通道(必填) | -| title | STRING | 标题(必填) | -| description | TEXT | 描述 | | attributeName | STRING | 属性名(默认 default) | | aggregate | ENUM | 聚合方法(必填): sum/avg/count/min/max | | data | DECIMAL(16,2) | 统计数据值(必填) | -| unit | STRING | 数据单位 | + +> `title`、`description`、`unit` 已移至 `channel-meta` 表,按 `(channel, attributeName)` 关联。 **唯一约束**:`(period, channel, attributeName, aggregate, time)` + +#### channel-meta(通道元数据) + +| 属性名 | 类型 | 说明 | +|--------|------|------| +| channel | STRING | 数据通道(联合主键) | +| attributeName | STRING | 属性名(联合主键,默认 default) | +| title | STRING | 标题(必填) | +| description | TEXT | 描述 | +| unit | STRING | 数据单位 | + +**唯一约束**:`(channel, attributeName)` + +**说明**:`title`、`description`、`unit` 三个字段从 `data-record` 和 `period-stat` 中提取到 `channel-meta` 表,按 `channel`+`attributeName` 唯一存储。首次采集某通道数据时自动创建元数据记录,后续采集忽略(不更新)。 diff --git a/doc/api.md b/doc/api.md index d200a8c..8f5fc71 100644 --- a/doc/api.md +++ b/doc/api.md @@ -58,24 +58,32 @@ **返回格式**: ```json -[ - { - "channel": "sensor", - "period": "h", - "time": "2026-05-22T08:00:00.000Z", - "data": 100 - } -] +{ + "channelMetas": { + "sensor": { "channel": "sensor", "title": "传感器", "description": "" } + }, + "list": [ + { + "channel": "sensor", + "period": "h", + "time": "2026-05-22T08:00:00.000Z", + "data": { "default": 100 }, + "unit": { "default": "℃" } + } + ] +} ``` -`data` 字段格式根据查询条件动态决定: +`data` 字段格式始终为对象(按属性名映射),根据聚合方法数量决定层级: | 条件 | data 格式 | 示例 | |------|-----------|------| -| 单属性 + 单聚合 | number | `100` | -| 单属性 + 多聚合 | object | `{"sum": 100, "avg": 50}` | -| 多属性 + 单聚合 | object | `{"temperature": 25, "humidity": 60}` | -| 多属性 + 多聚合 | 嵌套object | `{"sum": {"temperature": 25}, "avg": {"temperature": 12.5}}` | +| 单聚合 | object | `{"default": 100}` 或 `{"temperature": 25, "humidity": 60}` | +| 多聚合 | 嵌套object | `{"sum": {"default": 100}, "avg": {"default": 50}}` 或 `{"sum": {"temperature": 25}, "avg": {"temperature": 12.5}}` | + +`unit` 字段为对象,按属性名映射单位:`{"default": "℃"}` 或 `{"temperature": "℃", "humidity": "%"}` + +> **注意**:查询结果中 `aggregate` 不作为独立字段返回。聚合方法(如 sum、avg)被用作 `data` 对象的键名。例如多聚合时 `data` 为 `{"sum": {"default": 100}, "avg": {"default": 50}}`,而非 `[{aggregate: "sum", data: 100}, {aggregate: "avg", data: 50}]`。 ### 统计周期 @@ -131,7 +139,7 @@ **响应格式**:`Content-Type: text/event-stream` ``` -data: {"channel":"sensor","period":"h","time":"...","data":100} +data: {"channel":"sensor","period":"h","time":"...","data":{"default":100}} event: timeout data: {"message":"连接已超过30分钟,自动断开"} @@ -177,12 +185,12 @@ data: {"message":"错误信息"} | 属性名 | 类型 | 说明 | |--------|------|------| | channel | STRING | 数据通道(必填) | -| title | STRING | 标题(必填) | -| description | TEXT | 描述 | | attributeName | STRING | 属性名(默认 default) | | data | DECIMAL(16,2) | 数据值(必填) | -| unit | STRING | 数据单位 | | time | DATE | 采集时间(必填) | +| unit | STRING | 数据单位 | + +> `title`、`description` 已移至 `channel-meta` 表,按 root channel 关联。 #### period-stat(周期统计) @@ -191,11 +199,23 @@ data: {"message":"错误信息"} | period | STRING | 统计周期(必填) | | time | DATE | 统计时间(必填) | | channel | STRING | 数据通道(必填) | -| title | STRING | 标题(必填) | -| description | TEXT | 描述 | | attributeName | STRING | 属性名(默认 default) | | aggregate | ENUM | 聚合方法(必填): sum/avg/count/min/max | | data | DECIMAL(16,2) | 统计数据值(必填) | | unit | STRING | 数据单位 | +> `title`、`description` 已移至 `channel-meta` 表,按 root channel 关联。 + **唯一约束**:`(period, channel, attributeName, aggregate, time)` + +#### channel-meta(通道元数据) + +| 属性名 | 类型 | 说明 | +|--------|------|------| +| channel | STRING | 数据通道(唯一键) | +| title | STRING | 标题(必填) | +| description | TEXT | 描述 | + +**唯一约束**:`channel` + +**说明**:`channel-meta` 按 root channel(一级通道)唯一存储,一条元数据被所有以该 root channel 为前缀的子通道共享。首次采集某通道数据时,自动以其 root channel 创建元数据记录。`title` 和 `description` 从采集参数中提取,后续采集忽略(不更新)。`unit` 字段保留在 `data-record` 和 `period-stat` 表中。 diff --git a/doc/summary.md b/doc/summary.md index 8165d07..c4ae2ed 100644 --- a/doc/summary.md +++ b/doc/summary.md @@ -19,3 +19,343 @@ - IoT 设备传感器数据采集与统计分析 - 业务指标实时监控与多周期报表 - 多通道多属性的数据聚合与趋势查询 + +### 使用方法 + +#### 快速开始 + +```js +const fastify = require('fastify')(); + +// 注册依赖插件 +fastify.register(require('@kne/fastify-sequelize'), { /* sequelize 配置 */ }); +fastify.register(require('@kne/fastify-cron'), { /* cron 配置 */ }); + +// 注册统计插件 +fastify.register(require('@kne/fastify-statistics'), { + prefix: '/api/statistics', + cache: redisCacheInstance, // 传入缓存实例启用缓冲模式 + getAuthenticate: type => { + // type 为 'read' 或 'write',返回认证信息 + } +}); + +fastify.listen({ port: 3000 }); +``` + +#### Channel 与 AttributeName 的设计理念 + +**Channel(数据通道)** 是数据的第一级分类维度,采用冒号分隔的多级结构(`a:b:c`)。它的核心思想是:**从宏观到微观的层级划分**。 + +- **一级 channel**(如 `sales`)是根通道,对应唯一的 `channel-meta` 记录(标题、描述) +- **多级 channel**(如 `sales:beijing`、`sales:beijing:team-a`)是更细粒度的子通道 +- 查询时传入一级 channel 即可匹配所有子通道的数据 +- 同一根通道下的所有子通道共享同一个 `channel-meta` + +**AttributeName(属性名)** 是数据的第二级分类维度,用于在同一 channel 下区分不同的数据指标。 + +- 默认值为 `default`,适用于单一指标的场景 +- 当 `data` 传入对象时自动展开为多属性(如 `{revenue: 10000, orders: 50}` 拆分为两条记录) + +#### 实际场景:企业部门数据统计 + +假设一家公司要统计各部门的经营数据,我们可以这样设计 channel: + +``` +company ← 根通道:公司整体 +company:sales ← 子通道:销售部 +company:sales:beijing ← 子通道:销售部北京分部 +company:sales:shanghai ← 子通道:销售部上海分部 +company:rd ← 子通道:研发部 +company:rd:frontend ← 子通道:研发部前端组 +company:rd:backend ← 子通道:研发部后端组 +company:hr ← 子通道:人力资源部 +``` + +对应的 `channel-meta` 只需为根通道 `company` 创建一条记录: + +| channel | title | description | +|---------|-------|-------------| +| company | 公司经营数据 | 各部门经营数据统计 | + +**采集数据**: + +```js +// 1. 销售部北京分部上报单指标(默认 attributeName=default) +await fastify.statistics.services.collect({ + channel: 'company:sales:beijing', + data: 58000, + unit: '元', + title: '公司', + description: '各部门经营数据统计' +}); + +// 2. 销售部上海分部上报多指标(自动展开为多条记录) +await fastify.statistics.services.collect({ + channel: 'company:sales:shanghai', + data: { revenue: 72000, orders: 150 }, + unit: '元', + title: '公司', + description: '各部门经营数据统计' +}); + +// 3. 研发部前端组上报 +await fastify.statistics.services.collect({ + channel: 'company:rd:frontend', + data: { tasks: 12, bugs: 3 }, + title: '公司', + description: '各部门经营数据统计' +}); +``` + +采集后数据会自动展开并入库: + +| channel | attributeName | data | unit | +|---------|--------------|------|------| +| company | default | 58000 | 元 | +| company:sales | default | 58000 | 元 | +| company:sales:beijing | default | 58000 | 元 | +| company | revenue | 72000 | 元 | +| company | orders | 150 | 元 | +| company:sales | revenue | 72000 | 元 | +| company:sales | orders | 150 | 元 | +| company:sales:shanghai | revenue | 72000 | 元 | +| company:sales:shanghai | orders | 150 | 元 | +| ... | ... | ... | ... | + +> 通道展开规则:`company:sales:beijing` 自动展开为 `company`、`company:sales`、`company:sales:beijing` 三条记录,确保每一级都能查到汇总数据。 + +**查询数据**: + +```js +// 1. 查询销售部所有分部的本月合计 +const salesResult = await fastify.statistics.services.query({ + channel: 'company:sales', + startTime: '2026-05-01T00:00:00.000Z', + endTime: '2026-06-01T00:00:00.000Z', + period: 'm', + aggregates: ['sum'] +}); + +// 2. 查询公司所有部门的本月合计(传入一级 channel 即可) +const companyResult = await fastify.statistics.services.query({ + channel: 'company', + startTime: '2026-05-01T00:00:00.000Z', + endTime: '2026-06-01T00:00:00.000Z', + period: 'm', + aggregates: ['sum'] +}); + +// 3. 查询 revenue 和 orders 两个属性的合计与平均 +const revenueResult = await fastify.statistics.services.query({ + channel: 'company', + startTime: '2026-05-01T00:00:00.000Z', + endTime: '2026-06-01T00:00:00.000Z', + attributeNames: ['revenue', 'orders'], + aggregates: ['sum', 'avg'] +}); +``` + +**查询返回格式**: + +> **注意**:查询结果中 `aggregate` 不作为独立字段返回。聚合方法(如 sum、avg)被用作 `data` 对象的键名。`data` 字段始终为对象(按属性名映射),例如单聚合时 `data` 为 `{"default": 58000}`,多聚合时 `data` 为 `{"sum": {"default": 58000}, "avg": {"default": 29000}}`。 + +查询销售部(`channel=company:sales`)返回: + +```json +{ + "channelMetas": { + "company": { "channel": "company", "title": "公司", "description": "各部门经营数据统计" } + }, + "list": [ + { + "channel": "company:sales:beijing", + "period": "m", + "time": "2026-05-01T00:00:00.000Z", + "data": { "default": 58000 }, + "unit": { "default": "元" } + }, + { + "channel": "company:sales:shanghai", + "period": "m", + "time": "2026-05-01T00:00:00.000Z", + "data": { "revenue": 72000, "orders": 150 }, + "unit": { "revenue": "元", "orders": "元" } + } + ] +} +``` + +查询整个公司(`channel=company`)返回: + +```json +{ + "channelMetas": { + "company": { "channel": "company", "title": "公司", "description": "各部门经营数据统计" } + }, + "list": [ + { + "channel": "company", + "period": "m", + "time": "2026-05-01T00:00:00.000Z", + "data": { "default": 130000, "revenue": 72000, "orders": 150, "tasks": 12, "bugs": 3 }, + "unit": { "default": "元", "revenue": "元", "orders": "元", "tasks": "个", "bugs": "个" } + }, + { + "channel": "company:sales", + "period": "m", + "time": "2026-05-01T00:00:00.000Z", + "data": { "default": 130000, "revenue": 72000, "orders": 150 }, + "unit": { "default": "元", "revenue": "元", "orders": "元" } + }, + { + "channel": "company:sales:beijing", + "period": "m", + "time": "2026-05-01T00:00:00.000Z", + "data": { "default": 58000 }, + "unit": { "default": "元" } + }, + { + "channel": "company:sales:shanghai", + "period": "m", + "time": "2026-05-01T00:00:00.000Z", + "data": { "revenue": 72000, "orders": 150 }, + "unit": { "revenue": "元", "orders": "元" } + }, + { + "channel": "company:rd", + "period": "m", + "time": "2026-05-01T00:00:00.000Z", + "data": { "tasks": 12, "bugs": 3 }, + "unit": { "tasks": "个", "bugs": "个" } + }, + { + "channel": "company:rd:frontend", + "period": "m", + "time": "2026-05-01T00:00:00.000Z", + "data": { "tasks": 12, "bugs": 3 }, + "unit": { "tasks": "个", "bugs": "个" } + } + ] +} +``` + +查询 revenue 和 orders 两个属性的合计与平均(`channel=company`, `attributeNames=['revenue','orders']`, `aggregates=['sum','avg']`)返回: + +```json +{ + "channelMetas": { + "company": { "channel": "company", "title": "公司", "description": "各部门经营数据统计" } + }, + "list": [ + { + "channel": "company", + "period": "m", + "time": "2026-05-01T00:00:00.000Z", + "data": { "sum": { "revenue": 72000, "orders": 150 }, "avg": { "revenue": 72000, "orders": 150 } }, + "unit": { "revenue": "元", "orders": "元" } + }, + { + "channel": "company:sales", + "period": "m", + "time": "2026-05-01T00:00:00.000Z", + "data": { "sum": { "revenue": 72000, "orders": 150 }, "avg": { "revenue": 72000, "orders": 150 } }, + "unit": { "revenue": "元", "orders": "元" } + }, + { + "channel": "company:sales:shanghai", + "period": "m", + "time": "2026-05-01T00:00:00.000Z", + "data": { "sum": { "revenue": 72000, "orders": 150 }, "avg": { "revenue": 72000, "orders": 150 } }, + "unit": { "revenue": "元", "orders": "元" } + } + ] +} +``` + +> `channelMetas` 按 root channel 去重,所有子通道共享同一份元数据,避免数据冗余。 + +#### Channel Meta 管理 + +通道元数据在首次采集时自动创建,也可通过服务接口管理: + +```js +// 查询通道元数据 +const meta = await fastify.statistics.services.channelMeta.detail({ + channel: 'company' +}); + +// 列出所有元数据 +const list = await fastify.statistics.services.channelMeta.list(); + +// 按通道筛选 +const list = await fastify.statistics.services.channelMeta.list({ + filter: { channel: 'company' } +}); + +// 修改元数据 +await fastify.statistics.services.channelMeta.save({ + channel: 'company', + title: '企业经营数据总览', + description: '全公司各部门经营指标汇总' +}); +``` + +#### SSE 实时推送 + +通过 HTTP 接口或程序化 API 获取实时统计数据推送: + +```js +// HTTP 接口:GET /api/statistics/sse?channel=company&aggregates=sum&interval=5 +// 浏览器端使用 EventSource 接收 +const eventSource = new EventSource('/api/statistics/sse?channel=company&aggregates=sum&interval=5'); +eventSource.onmessage = (event) => { + const data = JSON.parse(event.data); + console.log(data); // { channel, period, time, data, unit } +}; + +// 程序化调用(在 Fastify 路由中) +fastify.get('/my-sse', async (request, reply) => { + const sseContext = await fastify.statistics.services.sseStream.send(reply, { + name: 'my-sse-channel', + params: { + channel: 'company', + startTime: new Date(Date.now() - 3600000).toISOString(), + endTime: new Date().toISOString(), + aggregates: ['sum'] + }, + fetchData: async (params) => { + return fastify.statistics.services.query(params); + }, + interval: 5, + heartbeatInterval: 30000, + maxDuration: 1800000 + }); + + // 可手动关闭 + // sseContext.close(); + + // 监听关闭事件 + sseContext.onClose(() => { + console.log('SSE 连接已关闭'); + }); +}); +``` + +**SSE 事件类型**: + +| 事件 | 说明 | +|------|------| +| `data`(默认) | 正常数据推送,内容为查询结果 JSON | +| `timeout` | 连接超过 maxDuration 后自动断开通知 | +| `error` | fetchData 出错时的错误事件 | +| 心跳(`: heartbeat`) | 保活注释行 | + +**SSE 上下文方法**: + +| 方法 | 说明 | +|------|------| +| `isConnected()` | 返回当前连接状态 | +| `close()` | 手动关闭 SSE 连接 | +| `onClose(callback)` | 注册连接关闭回调,若已断开则立即执行 | + diff --git a/libs/models/channel-meta.js b/libs/models/channel-meta.js new file mode 100644 index 0000000..402503c --- /dev/null +++ b/libs/models/channel-meta.js @@ -0,0 +1,24 @@ +module.exports = ({ DataTypes, options }) => { + return { + model: { + channel: { + type: DataTypes.STRING, + allowNull: false, + comment: '数据通道' + }, + title: { + type: DataTypes.STRING, + allowNull: false, + comment: '标题' + }, + description: { + type: DataTypes.TEXT, + comment: '描述' + } + }, + options: { + comment: '通道元数据', + indexes: [{ unique: true, fields: ['channel'] }] + } + }; +}; diff --git a/libs/models/data-record.js b/libs/models/data-record.js index 5265996..e8a8f97 100644 --- a/libs/models/data-record.js +++ b/libs/models/data-record.js @@ -6,15 +6,6 @@ module.exports = ({ DataTypes, options }) => { allowNull: false, comment: '数据通道' }, - title: { - type: DataTypes.STRING, - allowNull: false, - comment: '标题' - }, - description: { - type: DataTypes.TEXT, - comment: '描述' - }, attributeName: { type: DataTypes.STRING, comment: '属性名', @@ -26,17 +17,21 @@ module.exports = ({ DataTypes, options }) => { comment: '数据值', defaultValue: 0 }, - unit: { - type: DataTypes.STRING, - comment: '数据单位' - }, time: { type: DataTypes.DATE, allowNull: false, comment: '采集时间' + }, + unit: { + type: DataTypes.STRING, + comment: '数据单位' } }, - associate: ({}, fastify) => {}, + associate: ({ dataRecord, channelMeta }) => { + dataRecord.belongsTo(channelMeta, { + comment: '通道meta数据' + }); + }, options: { comment: '数据采集记录', indexes: [{ fields: ['channel'] }, { fields: ['time'] }, { fields: ['channel', 'time'] }, { fields: ['channel', 'attributeName', 'time'] }, { fields: ['attributeName'] }] diff --git a/libs/models/period-stat.js b/libs/models/period-stat.js index f8c60e5..28e0350 100644 --- a/libs/models/period-stat.js +++ b/libs/models/period-stat.js @@ -16,15 +16,6 @@ module.exports = ({ DataTypes, options }) => { allowNull: false, comment: '数据通道' }, - title: { - type: DataTypes.STRING, - allowNull: false, - comment: '标题' - }, - description: { - type: DataTypes.TEXT, - comment: '描述' - }, attributeName: { type: DataTypes.STRING, comment: '属性名', @@ -46,7 +37,11 @@ module.exports = ({ DataTypes, options }) => { comment: '数据单位' } }, - associate: ({}, fastify) => {}, + associate: ({ periodStat, channelMeta }) => { + periodStat.belongsTo(channelMeta, { + comment: '通道meta数据' + }); + }, options: { comment: '周期统计', indexes: [{ unique: true, fields: ['period', 'channel', 'attributeName', 'aggregate', 'time'] }, { fields: ['channel', 'attributeName', 'time'] }, { fields: ['period', 'time'] }, { fields: ['attributeName'] }] diff --git a/libs/services/channel-meta.js b/libs/services/channel-meta.js new file mode 100644 index 0000000..ade6d82 --- /dev/null +++ b/libs/services/channel-meta.js @@ -0,0 +1,38 @@ +const fp = require('fastify-plugin'); +const omit = require('lodash/omit'); + +module.exports = fp(async (fastify, options) => { + const { models } = fastify[options.name]; + + const detail = async ({ channel }) => { + return models.channelMeta.findOne({ + where: { channel } + }); + }; + + const list = async ({ filter = {} } = {}) => { + const where = {}; + if (filter.channel) { + where.channel = filter.channel; + } + return models.channelMeta.findAll({ where }); + }; + + const save = async ({ channel, ...data }) => { + const [affectedCount] = await models.channelMeta.update(omit(data, ['channel']), { where: { channel } }); + if (affectedCount === 0) { + throw new Error(`Channel meta not found: ${channel}`); + } + return models.channelMeta.findOne({ + where: { channel } + }); + }; + + Object.assign(fastify[options.name].services, { + channelMeta: { + detail, + list, + save + } + }); +}); diff --git a/libs/services/data-record.js b/libs/services/data-record.js index 5b38ef9..ea1f5ff 100644 --- a/libs/services/data-record.js +++ b/libs/services/data-record.js @@ -41,13 +41,28 @@ module.exports = fp(async (fastify, options) => { } }; + const getRootChannel = channel => channel.split(':')[0]; + + const ensureChannelMeta = async (metaList, transaction) => { + for (const meta of metaList) { + const options = { + where: { channel: meta.channel }, + defaults: { title: meta.title || meta.channel, description: meta.description || null } + }; + if (transaction) { + options.transaction = transaction; + } + await models.channelMeta.findOrCreate(options); + } + }; + const flush = async () => { if (buffer.length === 0 || isFlushing) return; isFlushing = true; const items = buffer.splice(0, buffer.length); try { const records = items.map(item => { - const { _seq, ...data } = item; + const { _seq, title, description, ...data } = item; return data; }); const transaction = await sequelize.transaction(); @@ -108,15 +123,22 @@ module.exports = fp(async (fastify, options) => { const collectImmediate = async data => { const expanded = expandData(data); const records = []; + const metaList = []; for (const item of expanded) { const channels = expandChannel(item.channel); + const rootChannel = getRootChannel(item.channel); + if (!metaList.some(m => m.channel === rootChannel)) { + metaList.push({ channel: rootChannel, title: item.title, description: item.description }); + } for (const channel of channels) { - const { data: _, ...rest } = item; + const { data: _, title, description, ...rest } = item; records.push({ ...rest, channel }); } } + const transaction = await sequelize.transaction(); try { + await ensureChannelMeta(metaList, transaction); await models.dataRecord.bulkCreate(records, { transaction }); await transaction.commit(); } catch (e) { @@ -125,14 +147,20 @@ module.exports = fp(async (fastify, options) => { } }; - const collectBuffered = data => { + const collectBuffered = async data => { const expanded = expandData(data); + const metaList = []; for (const item of expanded) { const channels = expandChannel(item.channel); + const rootChannel = getRootChannel(item.channel); + if (!metaList.some(m => m.channel === rootChannel)) { + metaList.push({ channel: rootChannel, title: item.title, description: item.description }); + } for (const channel of channels) { buffer.push({ ...item, channel, _seq: nextSeq() }); } } + await ensureChannelMeta(metaList); startFlushTimer(); if (buffer.length >= maxBufferSize) { flush().catch(e => { diff --git a/libs/services/period-stat.js b/libs/services/period-stat.js index fb59f44..40ccdce 100644 --- a/libs/services/period-stat.js +++ b/libs/services/period-stat.js @@ -68,7 +68,7 @@ const AGGREGATE_TYPES = [ { key: 'max', fn: 'MAX', label: '最大' } ]; -const UPSERT_FIELDS = ['data', 'title', 'unit', 'description']; +const UPSERT_FIELDS = ['data', 'unit']; module.exports = fp(async (fastify, options) => { const { models } = fastify[options.name]; @@ -77,14 +77,7 @@ module.exports = fp(async (fastify, options) => { const aggregateFromDataRecord = async (period, startTime, endTime) => { const results = await models.dataRecord.findAll({ - attributes: [ - 'channel', - 'attributeName', - [fn('MAX', col('title')), 'title'], - [fn('MAX', col('unit')), 'unit'], - [fn('MAX', col('description')), 'description'], - ...AGGREGATE_TYPES.map(({ key, fn: aggFn }) => [fn(aggFn, col('data')), key]) - ], + attributes: ['channel', 'attributeName', [fn('MAX', col('unit')), 'unit'], ...AGGREGATE_TYPES.map(({ key, fn: aggFn }) => [fn(aggFn, col('data')), key])], where: { time: { [Op.between]: [startTime, endTime] } }, @@ -102,11 +95,9 @@ module.exports = fp(async (fastify, options) => { time: startTime, channel: row.channel, attributeName: row.attributeName, - title: row.title, - description: row.description || null, aggregate: key, data: parseFloat(value), - unit: row.unit || null + unit: row.unit }); } } @@ -147,9 +138,6 @@ module.exports = fp(async (fastify, options) => { grouped[key] = { channel: row.channel, attributeName: row.attributeName, - title: row.title, - unit: row.unit, - description: row.description, items: [] }; } @@ -169,9 +157,7 @@ module.exports = fp(async (fastify, options) => { time: startTime, channel: group.channel, attributeName: group.attributeName, - title: group.title, - description: group.description || null, - unit: group.unit || null + unit: group.items[0]?.unit || null }; if (sums.length > 0) { @@ -246,41 +232,39 @@ module.exports = fp(async (fastify, options) => { } const formatGroupData = (items, hasAttributeNamesFilter) => { - const attrSet = new Set(); const aggSet = new Set(); + const unitMap = {}; for (const item of items) { - attrSet.add(item.attributeName || 'default'); aggSet.add(item.aggregate); + if (item.unit !== undefined && item.unit !== null && !((item.attributeName || 'default') in unitMap)) { + unitMap[item.attributeName || 'default'] = item.unit; + } } - const shouldFlattenAttribute = !hasAttributeNamesFilter && attrSet.size === 1 && attrSet.has('default'); const hasMultipleAggregates = aggSet.size > 1; - if (shouldFlattenAttribute) { - if (hasMultipleAggregates) { - const data = {}; - for (const item of items) { - data[item.aggregate] = item.data; - } - return data; - } - return items[0].data; - } - + let data; if (hasMultipleAggregates) { - const data = {}; + data = {}; for (const item of items) { if (!data[item.aggregate]) data[item.aggregate] = {}; data[item.aggregate][item.attributeName || 'default'] = item.data; } - return data; + } else { + data = {}; + for (const item of items) { + data[item.attributeName || 'default'] = item.data; + } } - const data = {}; - for (const item of items) { - data[item.attributeName || 'default'] = item.data; + const result = { data }; + + const unitEntries = Object.entries(unitMap).filter(([, v]) => v !== undefined && v !== null); + if (unitEntries.length > 0) { + result.unit = Object.fromEntries(unitEntries); } - return data; + + return result; }; const query = async ({ channel, startTime, endTime, attributeNames, aggregates: queryAggregates, timezone: tz }) => { @@ -331,13 +315,7 @@ module.exports = fp(async (fastify, options) => { }; const dataRecords = await models.dataRecord.findAll({ - attributes: [ - 'channel', - 'attributeName', - [fn('MAX', col('title')), 'title'], - [fn('MAX', col('unit')), 'unit'], - ...AGGREGATE_TYPES.filter(a => aggregateList.includes(a.key)).map(({ key, fn: aggFn }) => [fn(aggFn, col('data')), key]) - ], + attributes: ['channel', 'attributeName', [fn('MAX', col('unit')), 'unit'], ...AGGREGATE_TYPES.filter(a => aggregateList.includes(a.key)).map(({ key, fn: aggFn }) => [fn(aggFn, col('data')), key])], where: drWhere, group: ['channel', 'attributeName'], raw: true @@ -352,9 +330,9 @@ module.exports = fp(async (fastify, options) => { time: currentHourStart, channel: row.channel, attributeName: row.attributeName || 'default', - title: row.title, aggregate: key, - data: parseFloat(value) + data: parseFloat(value), + unit: row.unit }); } } @@ -377,18 +355,36 @@ module.exports = fp(async (fastify, options) => { const results = []; for (const group of Object.values(grouped)) { - const data = formatGroupData(group._items, attributeNames && attributeNames.length > 0); - results.push({ + const { data, unit } = formatGroupData(group._items, attributeNames && attributeNames.length > 0); + const item = { channel: group.channel, period: group.period, time: group.time, data - }); + }; + if (unit !== undefined) item.unit = unit; + results.push(item); } results.sort((a, b) => dayjs(a.time).valueOf() - dayjs(b.time).valueOf()); - return results; + const rootChannelSet = new Set(); + for (const r of results) { + rootChannelSet.add(r.channel.split(':')[0]); + } + + let channelMetas = {}; + if (rootChannelSet.size > 0) { + const metas = await models.channelMeta.findAll({ + where: { channel: { [Op.in]: [...rootChannelSet] } }, + raw: true + }); + for (const meta of metas) { + channelMetas[meta.channel] = meta; + } + } + + return { channelMetas, list: results }; }; Object.assign(fastify[options.name].services, { diff --git a/package.json b/package.json index 08941e7..37d932c 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@kne/fastify-statistics", - "version": "0.1.0-alpha.0", + "version": "0.1.0-alpha.1", "description": "基于 Fastify 的数据采集与多周期聚合统计插件,支持缓冲写入、时区查询和自动 Cron 聚合", "main": "index.js", "scripts": { @@ -63,6 +63,7 @@ "sqlite3": "^5.1.7" }, "dependencies": { - "dayjs": "^1.11.20" + "dayjs": "^1.11.20", + "lodash": "^4.18.1" } } diff --git a/test/channel-meta.test.js b/test/channel-meta.test.js new file mode 100644 index 0000000..d4a45f9 --- /dev/null +++ b/test/channel-meta.test.js @@ -0,0 +1,182 @@ +const { expect } = require('chai'); +const fp = require('fastify-plugin'); + +const mockChannelMetaService = (fastify, options) => { + const servicePlugin = require('../libs/services/channel-meta'); + return fp(servicePlugin)(fastify, options); +}; + +const createMockFastify = () => { + const store = {}; + const mockModel = { + channelMeta: { + findOne: async ({ where }) => { + return store[where.channel] || null; + }, + findAll: async ({ where }) => { + if (where && where.channel) { + return Object.values(store).filter(item => item.channel === where.channel); + } + return Object.values(store); + }, + update: async (values, { where }) => { + const item = store[where.channel]; + if (!item) return [0]; + Object.assign(item, values); + return [1]; + } + } + }; + + const fastify = require('fastify')(); + + fastify.decorate('statistics', { + models: mockModel, + services: {} + }); + + return { fastify, store }; +}; + +describe('Channel Meta Service', () => { + describe('detail', () => { + it('should return null when channel meta not found', async () => { + const { fastify } = createMockFastify(); + await mockChannelMetaService(fastify, { name: 'statistics' }); + + const result = await fastify.statistics.services.channelMeta.detail({ + channel: 'nonexistent' + }); + expect(result).to.be.null; + + await fastify.close(); + }); + + it('should return channel meta when found', async () => { + const { fastify, store } = createMockFastify(); + store['sensor'] = { + channel: 'sensor', + title: '传感器', + description: '温度传感器' + }; + await mockChannelMetaService(fastify, { name: 'statistics' }); + + const result = await fastify.statistics.services.channelMeta.detail({ + channel: 'sensor' + }); + expect(result).to.not.be.null; + expect(result.channel).to.equal('sensor'); + expect(result.title).to.equal('传感器'); + + await fastify.close(); + }); + }); + + describe('list', () => { + it('should return all channel metas when no filter', async () => { + const { fastify, store } = createMockFastify(); + store['sensor'] = { channel: 'sensor', title: '传感器' }; + store['humidity'] = { channel: 'humidity', title: '湿度' }; + await mockChannelMetaService(fastify, { name: 'statistics' }); + + const result = await fastify.statistics.services.channelMeta.list({}); + expect(result.length).to.equal(2); + + await fastify.close(); + }); + + it('should filter by channel', async () => { + const { fastify, store } = createMockFastify(); + store['sensor'] = { channel: 'sensor', title: '传感器' }; + store['humidity'] = { channel: 'humidity', title: '湿度' }; + await mockChannelMetaService(fastify, { name: 'statistics' }); + + const result = await fastify.statistics.services.channelMeta.list({ filter: { channel: 'sensor' } }); + expect(result.length).to.equal(1); + expect(result[0].channel).to.equal('sensor'); + + await fastify.close(); + }); + + it('should return empty array when no match', async () => { + const { fastify } = createMockFastify(); + await mockChannelMetaService(fastify, { name: 'statistics' }); + + const result = await fastify.statistics.services.channelMeta.list({ filter: { channel: 'nonexistent' } }); + expect(result).to.deep.equal([]); + + await fastify.close(); + }); + + it('should return all when filter is empty', async () => { + const { fastify, store } = createMockFastify(); + store['sensor'] = { channel: 'sensor', title: '传感器' }; + await mockChannelMetaService(fastify, { name: 'statistics' }); + + const result = await fastify.statistics.services.channelMeta.list({ filter: {} }); + expect(result.length).to.equal(1); + + await fastify.close(); + }); + }); + + describe('save', () => { + it('should update channel meta fields', async () => { + const { fastify, store } = createMockFastify(); + store['sensor'] = { + channel: 'sensor', + title: '旧标题', + description: null + }; + await mockChannelMetaService(fastify, { name: 'statistics' }); + + const result = await fastify.statistics.services.channelMeta.save({ + channel: 'sensor', + title: '新标题', + description: '新描述' + }); + + expect(result.title).to.equal('新标题'); + expect(result.description).to.equal('新描述'); + + await fastify.close(); + }); + + it('should throw error when channel meta not found', async () => { + const { fastify } = createMockFastify(); + await mockChannelMetaService(fastify, { name: 'statistics' }); + + try { + await fastify.statistics.services.channelMeta.save({ + channel: 'nonexistent', + title: 'test' + }); + expect.fail('Should have thrown'); + } catch (e) { + expect(e.message).to.include('Channel meta not found'); + } + + await fastify.close(); + }); + + it('should not overwrite channel field', async () => { + const { fastify, store } = createMockFastify(); + store['sensor'] = { + channel: 'sensor', + title: '旧标题', + description: null + }; + await mockChannelMetaService(fastify, { name: 'statistics' }); + + await fastify.statistics.services.channelMeta.save({ + channel: 'sensor', + title: '新标题', + description: '描述' + }); + + expect(store['sensor'].channel).to.equal('sensor'); + + await fastify.close(); + }); + }); +}); diff --git a/test/data-record.test.js b/test/data-record.test.js index 30b8dad..2e1136a 100644 --- a/test/data-record.test.js +++ b/test/data-record.test.js @@ -8,6 +8,7 @@ const mockDataRecordService = (fastify, options) => { const createMockFastify = ({ flushInterval = 100, maxBufferSize = 3 } = {}) => { const bulkCreateCalls = []; + const channelMetaCalls = []; const cacheStore = {}; const mockTransaction = { commit: async () => {}, @@ -19,6 +20,12 @@ const createMockFastify = ({ flushInterval = 100, maxBufferSize = 3 } = {}) => { bulkCreateCalls.push([...records]); return records; } + }, + channelMeta: { + findOrCreate: async (opts) => { + channelMetaCalls.push(opts); + return [{ ...opts.defaults, ...opts.where }, false]; + } } }; @@ -41,14 +48,14 @@ const createMockFastify = ({ flushInterval = 100, maxBufferSize = 3 } = {}) => { } }; - return { fastify, bulkCreateCalls, cacheStore, mockModel, cache }; + return { fastify, bulkCreateCalls, channelMetaCalls, cacheStore, mockModel, cache }; }; describe('@kne/fastify-statistics', function () { describe('数据采集接口测试', () => { describe('无缓存模式(即时入库)', () => { it('should write to DB immediately when no cache is provided', async () => { - const { fastify, bulkCreateCalls } = createMockFastify(); + const { fastify, bulkCreateCalls, channelMetaCalls } = createMockFastify(); await mockDataRecordService(fastify, { name: 'statistics', collectFlushInterval: 60000, @@ -64,12 +71,19 @@ describe('@kne/fastify-statistics', function () { expect(bulkCreateCalls.length).to.equal(1); expect(bulkCreateCalls[0][0].channel).to.equal('ch1'); + expect(bulkCreateCalls[0][0].title).to.be.undefined; + expect(bulkCreateCalls[0][0].description).to.be.undefined; + expect(bulkCreateCalls[0][0].unit).to.be.undefined; + + expect(channelMetaCalls.length).to.equal(1); + expect(channelMetaCalls[0].where.channel).to.equal('ch1'); + expect(channelMetaCalls[0].defaults.title).to.equal('test'); await fastify.close(); }); it('should write multiple expanded records immediately when no cache', async () => { - const { fastify, bulkCreateCalls } = createMockFastify(); + const { fastify, bulkCreateCalls, channelMetaCalls } = createMockFastify(); await mockDataRecordService(fastify, { name: 'statistics', collectFlushInterval: 60000, @@ -85,6 +99,9 @@ describe('@kne/fastify-statistics', function () { expect(bulkCreateCalls.length).to.equal(1); expect(bulkCreateCalls[0].length).to.equal(6); + // channel-meta should be created for root channel only (a) + expect(channelMetaCalls.length).to.equal(1); + expect(channelMetaCalls[0].where.channel).to.equal('a'); await fastify.close(); }); diff --git a/test/period-stat.test.js b/test/period-stat.test.js index 927cda9..0ed612c 100644 --- a/test/period-stat.test.js +++ b/test/period-stat.test.js @@ -75,9 +75,6 @@ describe('@kne/fastify-statistics', function () { findAllResults.push({ channel: 'temperature', attributeName: 'value', - title: '温度', - unit: '℃', - description: 'test', sum: 100, avg: 25, count: 4, @@ -90,7 +87,7 @@ describe('@kne/fastify-statistics', function () { expect(records.length).to.equal(5); expect(bulkCreateCalls.length).to.equal(1); expect(bulkCreateCalls[0].records.length).to.equal(5); - expect(bulkCreateCalls[0].opts.updateOnDuplicate).to.deep.equal(['data', 'title', 'unit', 'description']); + expect(bulkCreateCalls[0].opts.updateOnDuplicate).to.deep.equal(['data', 'unit']); const aggregates = records.map(r => r.aggregate); expect(aggregates).to.have.members(['sum', 'avg', 'count', 'min', 'max']); @@ -100,6 +97,9 @@ describe('@kne/fastify-statistics', function () { expect(sumRecord.channel).to.equal('temperature'); expect(sumRecord.attributeName).to.equal('value'); expect(sumRecord.data).to.equal(100); + expect(sumRecord.title).to.be.undefined; + expect(sumRecord.description).to.be.undefined; + expect(sumRecord.unit).to.be.undefined; await fastify.close(); }); @@ -112,7 +112,7 @@ describe('@kne/fastify-statistics', function () { const endTime = new Date('2026-05-01T01:00:00.000Z'); findAllResults.push({ - channel: 'ch1', attributeName: 'val', title: 'test', unit: null, description: null, + channel: 'ch1', attributeName: 'val', sum: 10, avg: null, count: null, min: null, max: null }); @@ -146,7 +146,7 @@ describe('@kne/fastify-statistics', function () { const endTime = new Date('2026-05-01T01:00:00.000Z'); findAllResults.push({ - channel: 'ch1', attributeName: 'val', title: 'test', unit: null, description: null, + channel: 'ch1', attributeName: 'val', sum: 100, avg: null, count: 5, min: null, max: 50 }); @@ -169,14 +169,14 @@ describe('@kne/fastify-statistics', function () { const endTime = new Date('2026-05-02T00:00:00.000Z'); periodStatRows.push( - { period: 'h', channel: 'temperature', attributeName: 'value', title: '温度', unit: '℃', description: null, aggregate: 'sum', data: 30, time: new Date('2026-05-01T00:00:00.000Z') }, - { period: 'h', channel: 'temperature', attributeName: 'value', title: '温度', unit: '℃', description: null, aggregate: 'sum', data: 20, time: new Date('2026-05-01T01:00:00.000Z') }, - { period: 'h', channel: 'temperature', attributeName: 'value', title: '温度', unit: '℃', description: null, aggregate: 'count', data: 3, time: new Date('2026-05-01T00:00:00.000Z') }, - { period: 'h', channel: 'temperature', attributeName: 'value', title: '温度', unit: '℃', description: null, aggregate: 'count', data: 2, time: new Date('2026-05-01T01:00:00.000Z') }, - { period: 'h', channel: 'temperature', attributeName: 'value', title: '温度', unit: '℃', description: null, aggregate: 'min', data: 8, time: new Date('2026-05-01T00:00:00.000Z') }, - { period: 'h', channel: 'temperature', attributeName: 'value', title: '温度', unit: '℃', description: null, aggregate: 'min', data: 12, time: new Date('2026-05-01T01:00:00.000Z') }, - { period: 'h', channel: 'temperature', attributeName: 'value', title: '温度', unit: '℃', description: null, aggregate: 'max', data: 22, time: new Date('2026-05-01T00:00:00.000Z') }, - { period: 'h', channel: 'temperature', attributeName: 'value', title: '温度', unit: '℃', description: null, aggregate: 'max', data: 28, time: new Date('2026-05-01T01:00:00.000Z') } + { period: 'h', channel: 'temperature', attributeName: 'value', aggregate: 'sum', data: 30, time: new Date('2026-05-01T00:00:00.000Z') }, + { period: 'h', channel: 'temperature', attributeName: 'value', aggregate: 'sum', data: 20, time: new Date('2026-05-01T01:00:00.000Z') }, + { period: 'h', channel: 'temperature', attributeName: 'value', aggregate: 'count', data: 3, time: new Date('2026-05-01T00:00:00.000Z') }, + { period: 'h', channel: 'temperature', attributeName: 'value', aggregate: 'count', data: 2, time: new Date('2026-05-01T01:00:00.000Z') }, + { period: 'h', channel: 'temperature', attributeName: 'value', aggregate: 'min', data: 8, time: new Date('2026-05-01T00:00:00.000Z') }, + { period: 'h', channel: 'temperature', attributeName: 'value', aggregate: 'min', data: 12, time: new Date('2026-05-01T01:00:00.000Z') }, + { period: 'h', channel: 'temperature', attributeName: 'value', aggregate: 'max', data: 22, time: new Date('2026-05-01T00:00:00.000Z') }, + { period: 'h', channel: 'temperature', attributeName: 'value', aggregate: 'max', data: 28, time: new Date('2026-05-01T01:00:00.000Z') } ); const records = await fastify.statistics.services.periodStat.aggregate('d', { startTime, endTime }); @@ -202,7 +202,7 @@ describe('@kne/fastify-statistics', function () { // Verify bulkCreate used updateOnDuplicate for idempotency expect(bulkCreateCalls.length).to.equal(1); - expect(bulkCreateCalls[0].opts.updateOnDuplicate).to.deep.equal(['data', 'title', 'unit', 'description']); + expect(bulkCreateCalls[0].opts.updateOnDuplicate).to.deep.equal(['data', 'unit']); await fastify.close(); }); @@ -215,8 +215,8 @@ describe('@kne/fastify-statistics', function () { const endTime = new Date('2026-05-02T00:00:00.000Z'); periodStatRows.push( - { period: 'h', channel: 'temperature', attributeName: 'value', title: '温度', unit: '℃', description: null, aggregate: 'sum', data: 50, time: startTime }, - { period: 'h', channel: 'humidity', attributeName: 'value', title: '湿度', unit: '%', description: null, aggregate: 'sum', data: 200, time: startTime } + { period: 'h', channel: 'temperature', attributeName: 'value', aggregate: 'sum', data: 50, time: startTime }, + { period: 'h', channel: 'humidity', attributeName: 'value', aggregate: 'sum', data: 200, time: startTime } ); const records = await fastify.statistics.services.periodStat.aggregate('d', { startTime, endTime }); @@ -235,8 +235,8 @@ describe('@kne/fastify-statistics', function () { const endTime = new Date('2026-05-02T00:00:00.000Z'); periodStatRows.push( - { period: 'h', channel: 'ch1', attributeName: 'val', title: 'test', unit: null, description: null, aggregate: 'sum', data: 120, time: startTime }, - { period: 'h', channel: 'ch1', attributeName: 'val', title: 'test', unit: null, description: null, aggregate: 'count', data: 3, time: startTime } + { period: 'h', channel: 'ch1', attributeName: 'val', aggregate: 'sum', data: 120, time: startTime }, + { period: 'h', channel: 'ch1', attributeName: 'val', aggregate: 'count', data: 3, time: startTime } ); const records = await fastify.statistics.services.periodStat.aggregate('d', { startTime, endTime }); @@ -270,8 +270,8 @@ describe('@kne/fastify-statistics', function () { const endTime = new Date('2026-05-02T00:00:00.000Z'); periodStatRows.push( - { period: 'd', channel: 'temperature', attributeName: 'high', title: '最高温', unit: '℃', description: null, aggregate: 'sum', data: 50, time: startTime }, - { period: 'd', channel: 'temperature', attributeName: 'low', title: '最低温', unit: '℃', description: null, aggregate: 'sum', data: 30, time: startTime } + { period: 'd', channel: 'temperature', attributeName: 'high', aggregate: 'sum', data: 50, time: startTime }, + { period: 'd', channel: 'temperature', attributeName: 'low', aggregate: 'sum', data: 30, time: startTime } ); const records = await fastify.statistics.services.periodStat.aggregate('m', { startTime, endTime }); @@ -293,7 +293,7 @@ describe('@kne/fastify-statistics', function () { const endTime = new Date('2026-05-02T00:00:00.000Z'); periodStatRows.push( - { period: 'h', channel: 'ch1', attributeName: null, title: 'test', unit: null, description: null, aggregate: 'sum', data: 50, time: startTime } + { period: 'h', channel: 'ch1', attributeName: null, aggregate: 'sum', data: 50, time: startTime } ); const records = await fastify.statistics.services.periodStat.aggregate('d', { startTime, endTime }); @@ -311,9 +311,9 @@ describe('@kne/fastify-statistics', function () { const endTime = new Date('2026-05-02T00:00:00.000Z'); periodStatRows.push( - { period: 'h', channel: 'ch1', attributeName: 'val', title: 'test', unit: null, description: null, aggregate: 'count', data: 5, time: startTime }, - { period: 'h', channel: 'ch1', attributeName: 'val', title: 'test', unit: null, description: null, aggregate: 'min', data: 1, time: startTime }, - { period: 'h', channel: 'ch1', attributeName: 'val', title: 'test', unit: null, description: null, aggregate: 'max', data: 10, time: startTime } + { period: 'h', channel: 'ch1', attributeName: 'val', aggregate: 'count', data: 5, time: startTime }, + { period: 'h', channel: 'ch1', attributeName: 'val', aggregate: 'min', data: 1, time: startTime }, + { period: 'h', channel: 'ch1', attributeName: 'val', aggregate: 'max', data: 10, time: startTime } ); const records = await fastify.statistics.services.periodStat.aggregate('d', { startTime, endTime }); @@ -373,6 +373,7 @@ describe('@kne/fastify-statistics', function () { const periodStatRows = []; const dataRecordFindAllResult = []; const findAllCalls = []; + const channelMetaRows = []; const mockTransaction = { commit: async () => {}, @@ -397,6 +398,14 @@ describe('@kne/fastify-statistics', function () { } return periodStatRows.filter(row => !period || row.period === period); } + }, + channelMeta: { + findAll: async ({ where }) => { + if (where && where.channel && where.channel.in) { + return channelMetaRows.filter(row => where.channel.in.includes(row.channel)); + } + return channelMetaRows; + } } }; @@ -416,10 +425,10 @@ describe('@kne/fastify-statistics', function () { services: {} }); - return { fastify, periodStatRows, dataRecordFindAllResult, findAllCalls }; + return { fastify, periodStatRows, dataRecordFindAllResult, findAllCalls, channelMetaRows }; }; - it('should return direct value when single aggregate and single default attribute with no filter', async () => { + it('should return attribute-keyed object when single aggregate and single default attribute with no filter', async () => { const { fastify, periodStatRows } = createQueryMockFastify(); await mockPeriodStatService(fastify, { name: 'statistics' }); @@ -428,17 +437,17 @@ describe('@kne/fastify-statistics', function () { periodStatRows.push({ period: 'h', channel: 'sensor', attributeName: 'default', - title: '传感器', aggregate: 'sum', data: 100, time: startTime + aggregate: 'sum', data: 100, time: startTime }); - const results = await fastify.statistics.services.periodStat.query({ + const { list: results, channelMetas } = await fastify.statistics.services.periodStat.query({ channel: 'sensor', startTime, endTime, aggregates: ['sum'] }); expect(results.length).to.equal(1); expect(results[0].channel).to.equal('sensor'); expect(results[0].period).to.equal('h'); - expect(results[0].data).to.equal(100); + expect(results[0].data).to.deep.equal({ default: 100 }); await fastify.close(); }); @@ -451,11 +460,11 @@ describe('@kne/fastify-statistics', function () { const endTime = new Date('2026-05-01T01:00:00.000Z'); periodStatRows.push( - { period: 'h', channel: 'sensor', attributeName: 'temperature', title: '温度', aggregate: 'sum', data: 100, time: startTime }, - { period: 'h', channel: 'sensor', attributeName: 'humidity', title: '湿度', aggregate: 'sum', data: 200, time: startTime } + { period: 'h', channel: 'sensor', attributeName: 'temperature', aggregate: 'sum', data: 100, time: startTime }, + { period: 'h', channel: 'sensor', attributeName: 'humidity', aggregate: 'sum', data: 200, time: startTime } ); - const results = await fastify.statistics.services.periodStat.query({ + const { list: results, channelMetas } = await fastify.statistics.services.periodStat.query({ channel: 'sensor', startTime, endTime, aggregates: ['sum'] }); @@ -465,7 +474,7 @@ describe('@kne/fastify-statistics', function () { await fastify.close(); }); - it('should return aggregate-keyed object when multiple aggregates and single default attribute', async () => { + it('should return nested object when multiple aggregates and single default attribute', async () => { const { fastify, periodStatRows } = createQueryMockFastify(); await mockPeriodStatService(fastify, { name: 'statistics' }); @@ -473,16 +482,16 @@ describe('@kne/fastify-statistics', function () { const endTime = new Date('2026-05-01T01:00:00.000Z'); periodStatRows.push( - { period: 'h', channel: 'sensor', attributeName: 'default', title: '传感器', aggregate: 'sum', data: 100, time: startTime }, - { period: 'h', channel: 'sensor', attributeName: 'default', title: '传感器', aggregate: 'avg', data: 25, time: startTime } + { period: 'h', channel: 'sensor', attributeName: 'default', aggregate: 'sum', data: 100, time: startTime }, + { period: 'h', channel: 'sensor', attributeName: 'default', aggregate: 'avg', data: 25, time: startTime } ); - const results = await fastify.statistics.services.periodStat.query({ + const { list: results, channelMetas } = await fastify.statistics.services.periodStat.query({ channel: 'sensor', startTime, endTime, aggregates: ['sum', 'avg'] }); expect(results.length).to.equal(1); - expect(results[0].data).to.deep.equal({ sum: 100, avg: 25 }); + expect(results[0].data).to.deep.equal({ sum: { default: 100 }, avg: { default: 25 } }); await fastify.close(); }); @@ -495,13 +504,13 @@ describe('@kne/fastify-statistics', function () { const endTime = new Date('2026-05-01T01:00:00.000Z'); periodStatRows.push( - { period: 'h', channel: 'sensor', attributeName: 'temperature', title: '温度', aggregate: 'sum', data: 100, time: startTime }, - { period: 'h', channel: 'sensor', attributeName: 'humidity', title: '湿度', aggregate: 'sum', data: 200, time: startTime }, - { period: 'h', channel: 'sensor', attributeName: 'temperature', title: '温度', aggregate: 'avg', data: 25, time: startTime }, - { period: 'h', channel: 'sensor', attributeName: 'humidity', title: '湿度', aggregate: 'avg', data: 50, time: startTime } + { period: 'h', channel: 'sensor', attributeName: 'temperature', aggregate: 'sum', data: 100, time: startTime }, + { period: 'h', channel: 'sensor', attributeName: 'humidity', aggregate: 'sum', data: 200, time: startTime }, + { period: 'h', channel: 'sensor', attributeName: 'temperature', aggregate: 'avg', data: 25, time: startTime }, + { period: 'h', channel: 'sensor', attributeName: 'humidity', aggregate: 'avg', data: 50, time: startTime } ); - const results = await fastify.statistics.services.periodStat.query({ + const { list: results, channelMetas } = await fastify.statistics.services.periodStat.query({ channel: 'sensor', startTime, endTime, aggregates: ['sum', 'avg'] }); @@ -522,10 +531,10 @@ describe('@kne/fastify-statistics', function () { const endTime = new Date('2026-05-01T01:00:00.000Z'); periodStatRows.push( - { period: 'h', channel: 'sensor', attributeName: 'default', title: '传感器', aggregate: 'sum', data: 100, time: startTime } + { period: 'h', channel: 'sensor', attributeName: 'default', aggregate: 'sum', data: 100, time: startTime } ); - const results = await fastify.statistics.services.periodStat.query({ + const { list: results, channelMetas } = await fastify.statistics.services.periodStat.query({ channel: 'sensor', startTime, endTime, attributeNames: ['default'], aggregates: ['sum'] }); @@ -535,19 +544,20 @@ describe('@kne/fastify-statistics', function () { }); it('should query all child channels', async () => { - const { fastify, periodStatRows } = createQueryMockFastify(); + const { fastify, periodStatRows, channelMetaRows } = createQueryMockFastify(); await mockPeriodStatService(fastify, { name: 'statistics' }); const startTime = new Date('2026-05-01T00:00:00.000Z'); const endTime = new Date('2026-05-01T01:00:00.000Z'); periodStatRows.push( - { period: 'h', channel: 'sensor', attributeName: 'default', title: '传感器', aggregate: 'sum', data: 100, time: startTime }, - { period: 'h', channel: 'sensor:room1', attributeName: 'default', title: '房间1', aggregate: 'sum', data: 50, time: startTime }, - { period: 'h', channel: 'sensor:room2', attributeName: 'default', title: '房间2', aggregate: 'sum', data: 30, time: startTime } + { period: 'h', channel: 'sensor', attributeName: 'default', aggregate: 'sum', data: 100, time: startTime }, + { period: 'h', channel: 'sensor:room1', attributeName: 'default', aggregate: 'sum', data: 50, time: startTime }, + { period: 'h', channel: 'sensor:room2', attributeName: 'default', aggregate: 'sum', data: 30, time: startTime } ); + channelMetaRows.push({ channel: 'sensor', title: '传感器', description: '温度' }); - const results = await fastify.statistics.services.periodStat.query({ + const { list: results, channelMetas } = await fastify.statistics.services.periodStat.query({ channel: 'sensor', startTime, endTime, aggregates: ['sum'] }); @@ -557,6 +567,10 @@ describe('@kne/fastify-statistics', function () { expect(channels).to.include('sensor:room1'); expect(channels).to.include('sensor:room2'); + expect(channelMetas).to.have.property('sensor'); + expect(channelMetas.sensor.title).to.equal('传感器'); + expect(Object.keys(channelMetas).length).to.equal(1); + await fastify.close(); }); @@ -568,11 +582,11 @@ describe('@kne/fastify-statistics', function () { const endTime = new Date('2026-05-01T01:00:00.000Z'); periodStatRows.push( - { period: 'h', channel: 'sensor', attributeName: 'default', title: '传感器', aggregate: 'sum', data: 10, time: startTime }, - { period: 'd', channel: 'sensor', attributeName: 'default', title: '传感器', aggregate: 'sum', data: 240, time: startTime } + { period: 'h', channel: 'sensor', attributeName: 'default', aggregate: 'sum', data: 10, time: startTime }, + { period: 'd', channel: 'sensor', attributeName: 'default', aggregate: 'sum', data: 240, time: startTime } ); - const results = await fastify.statistics.services.periodStat.query({ + const { list: results, channelMetas } = await fastify.statistics.services.periodStat.query({ channel: 'sensor', startTime, endTime, aggregates: ['sum'] }); @@ -594,11 +608,12 @@ describe('@kne/fastify-statistics', function () { const startTime = new Date('2026-05-01T00:00:00.000Z'); const endTime = new Date('2026-05-01T01:00:00.000Z'); - const results = await fastify.statistics.services.periodStat.query({ + const { list: results, channelMetas } = await fastify.statistics.services.periodStat.query({ channel: 'nonexistent', startTime, endTime, aggregates: ['sum'] }); expect(results).to.deep.equal([]); + expect(channelMetas).to.deep.equal({}); await fastify.close(); }); @@ -611,10 +626,10 @@ describe('@kne/fastify-statistics', function () { const endTime = new Date('2026-05-01T01:00:00.000Z'); periodStatRows.push( - { period: 'h', channel: 'sensor', attributeName: 'temperature', title: '温度', aggregate: 'sum', data: 100, time: startTime } + { period: 'h', channel: 'sensor', attributeName: 'temperature', aggregate: 'sum', data: 100, time: startTime } ); - const results = await fastify.statistics.services.periodStat.query({ + const { list: results, channelMetas } = await fastify.statistics.services.periodStat.query({ channel: 'sensor', startTime, endTime, attributeNames: ['temperature'], aggregates: ['sum'] }); @@ -634,11 +649,10 @@ describe('@kne/fastify-statistics', function () { dataRecordFindAllResult.push({ channel: 'sensor', attributeName: 'temperature', - title: '温度', unit: '℃', sum: 50, avg: 25, count: 2, min: 10, max: 40 }); - const results = await fastify.statistics.services.periodStat.query({ + const { list: results, channelMetas } = await fastify.statistics.services.periodStat.query({ channel: 'sensor', startTime, endTime, aggregates: ['sum'] }); @@ -661,19 +675,19 @@ describe('@kne/fastify-statistics', function () { const endTime = new Date('2026-05-01T01:00:00.000Z'); periodStatRows.push( - { period: 'h', channel: 'sensor', attributeName: 'default', title: '传感器', aggregate: 'sum', data: 100, time: startTime }, - { period: 'h', channel: 'sensor', attributeName: 'default', title: '传感器', aggregate: 'avg', data: 25, time: startTime }, - { period: 'h', channel: 'sensor', attributeName: 'default', title: '传感器', aggregate: 'count', data: 4, time: startTime }, - { period: 'h', channel: 'sensor', attributeName: 'default', title: '传感器', aggregate: 'min', data: 10, time: startTime }, - { period: 'h', channel: 'sensor', attributeName: 'default', title: '传感器', aggregate: 'max', data: 40, time: startTime } + { period: 'h', channel: 'sensor', attributeName: 'default', aggregate: 'sum', data: 100, time: startTime }, + { period: 'h', channel: 'sensor', attributeName: 'default', aggregate: 'avg', data: 25, time: startTime }, + { period: 'h', channel: 'sensor', attributeName: 'default', aggregate: 'count', data: 4, time: startTime }, + { period: 'h', channel: 'sensor', attributeName: 'default', aggregate: 'min', data: 10, time: startTime }, + { period: 'h', channel: 'sensor', attributeName: 'default', aggregate: 'max', data: 40, time: startTime } ); - const results = await fastify.statistics.services.periodStat.query({ + const { list: results, channelMetas } = await fastify.statistics.services.periodStat.query({ channel: 'sensor', startTime, endTime }); expect(results.length).to.equal(1); - expect(results[0].data).to.deep.equal({ sum: 100, avg: 25, count: 4, min: 10, max: 40 }); + expect(results[0].data).to.deep.equal({ sum: { default: 100 }, avg: { default: 25 }, count: { default: 4 }, min: { default: 10 }, max: { default: 40 } }); await fastify.close(); }); @@ -690,18 +704,18 @@ describe('@kne/fastify-statistics', function () { const time1 = new Date('2026-05-01T01:00:00.000Z'); periodStatRows.push( - { period: 'h', channel: 'sensor', attributeName: 'default', title: '传感器', aggregate: 'sum', data: 30, time: time2 }, - { period: 'h', channel: 'sensor', attributeName: 'default', title: '传感器', aggregate: 'sum', data: 10, time: time0 }, - { period: 'h', channel: 'sensor', attributeName: 'default', title: '传感器', aggregate: 'sum', data: 20, time: time1 } + { period: 'h', channel: 'sensor', attributeName: 'default', aggregate: 'sum', data: 30, time: time2 }, + { period: 'h', channel: 'sensor', attributeName: 'default', aggregate: 'sum', data: 10, time: time0 }, + { period: 'h', channel: 'sensor', attributeName: 'default', aggregate: 'sum', data: 20, time: time1 } ); - const results = await fastify.statistics.services.periodStat.query({ + const { list: results, channelMetas } = await fastify.statistics.services.periodStat.query({ channel: 'sensor', startTime, endTime, aggregates: ['sum'] }); - expect(results[0].data).to.equal(10); - expect(results[1].data).to.equal(20); - expect(results[2].data).to.equal(30); + expect(results[0].data).to.deep.equal({ default: 10 }); + expect(results[1].data).to.deep.equal({ default: 20 }); + expect(results[2].data).to.deep.equal({ default: 30 }); await fastify.close(); }); @@ -716,7 +730,6 @@ describe('@kne/fastify-statistics', function () { dataRecordFindAllResult.push({ channel: 'sensor', attributeName: 'temperature', - title: '温度', unit: '℃', sum: 50, avg: null, count: null, min: null, max: null }); @@ -744,7 +757,6 @@ describe('@kne/fastify-statistics', function () { dataRecordFindAllResult.push({ channel: 'sensor', attributeName: 'default', - title: '传感器', unit: null, sum: 100, avg: null, count: null, min: null, max: null }); @@ -777,23 +789,30 @@ describe('@kne/fastify-statistics', function () { await fastify.close(); }); - it('should query without channel filter', async () => { - const { fastify, periodStatRows } = createQueryMockFastify(); + it('should query without channel filter and return channelMetas', async () => { + const { fastify, periodStatRows, channelMetaRows } = createQueryMockFastify(); await mockPeriodStatService(fastify, { name: 'statistics' }); const startTime = new Date('2026-05-01T00:00:00.000Z'); const endTime = new Date('2026-05-01T01:00:00.000Z'); periodStatRows.push( - { period: 'h', channel: 'sensor1', attributeName: 'default', title: '传感器1', aggregate: 'sum', data: 100, time: startTime }, - { period: 'h', channel: 'sensor2', attributeName: 'default', title: '传感器2', aggregate: 'sum', data: 200, time: startTime } + { period: 'h', channel: 'sensor1', attributeName: 'default', aggregate: 'sum', data: 100, time: startTime }, + { period: 'h', channel: 'sensor2', attributeName: 'default', aggregate: 'sum', data: 200, time: startTime } + ); + channelMetaRows.push( + { channel: 'sensor1', title: '传感器1', description: null }, + { channel: 'sensor2', title: '传感器2', description: '温度' } ); - const results = await fastify.statistics.services.periodStat.query({ + const { list: results, channelMetas } = await fastify.statistics.services.periodStat.query({ startTime, endTime, aggregates: ['sum'] }); expect(results.length).to.equal(2); + expect(Object.keys(channelMetas).length).to.equal(2); + expect(channelMetas.sensor1.title).to.equal('传感器1'); + expect(channelMetas.sensor2.title).to.equal('传感器2'); await fastify.close(); }); @@ -806,15 +825,15 @@ describe('@kne/fastify-statistics', function () { const endTime = new Date('2026-05-01T01:00:00.000Z'); periodStatRows.push( - { period: 'h', channel: 'sensor', attributeName: null, title: '传感器', aggregate: 'sum', data: 100, time: startTime } + { period: 'h', channel: 'sensor', attributeName: null, aggregate: 'sum', data: 100, time: startTime } ); - const results = await fastify.statistics.services.periodStat.query({ + const { list: results, channelMetas } = await fastify.statistics.services.periodStat.query({ channel: 'sensor', startTime, endTime, aggregates: ['sum'] }); expect(results.length).to.equal(1); - expect(results[0].data).to.equal(100); + expect(results[0].data).to.deep.equal({ default: 100 }); await fastify.close(); }); @@ -827,10 +846,10 @@ describe('@kne/fastify-statistics', function () { const endTime = new Date('2026-05-01T01:00:00.000Z'); periodStatRows.push( - { period: 'h', channel: 'sensor', attributeName: null, title: '传感器', aggregate: 'sum', data: 100, time: startTime } + { period: 'h', channel: 'sensor', attributeName: null, aggregate: 'sum', data: 100, time: startTime } ); - const results = await fastify.statistics.services.periodStat.query({ + const { list: results, channelMetas } = await fastify.statistics.services.periodStat.query({ channel: 'sensor', startTime, endTime, attributeNames: ['value'], aggregates: ['sum'] }); @@ -848,11 +867,11 @@ describe('@kne/fastify-statistics', function () { const endTime = new Date('2026-05-01T01:00:00.000Z'); periodStatRows.push( - { period: 'h', channel: 'sensor', attributeName: null, title: '传感器', aggregate: 'sum', data: 100, time: startTime }, - { period: 'h', channel: 'sensor', attributeName: null, title: '传感器', aggregate: 'avg', data: 25, time: startTime } + { period: 'h', channel: 'sensor', attributeName: null, aggregate: 'sum', data: 100, time: startTime }, + { period: 'h', channel: 'sensor', attributeName: null, aggregate: 'avg', data: 25, time: startTime } ); - const results = await fastify.statistics.services.periodStat.query({ + const { list: results, channelMetas } = await fastify.statistics.services.periodStat.query({ channel: 'sensor', startTime, endTime, attributeNames: ['value'], aggregates: ['sum', 'avg'] }); @@ -875,17 +894,15 @@ describe('@kne/fastify-statistics', function () { dataRecordFindAllResult.push({ channel: 'sensor', attributeName: 'temperature', - title: '温度', unit: '℃', sum: 50, avg: undefined, count: undefined, min: undefined, max: undefined }); - const results = await fastify.statistics.services.periodStat.query({ + const { list: results, channelMetas } = await fastify.statistics.services.periodStat.query({ channel: 'sensor', startTime, endTime, aggregates: ['sum', 'avg'] }); const hourResult = results.find(r => r.period === 'h'); if (hourResult) { - // sum=50 is the only non-null/undefined aggregate, formatted as { attributeName: value } expect(hourResult.data).to.deep.equal({ temperature: 50 }); } @@ -904,11 +921,10 @@ describe('@kne/fastify-statistics', function () { dataRecordFindAllResult.push({ channel: 'sensor', attributeName: 'default', - title: '传感器', unit: null, sum: 100, avg: null, count: null, min: null, max: null }); - const results = await fastify.statistics.services.periodStat.query({ + const { list: results, channelMetas } = await fastify.statistics.services.periodStat.query({ channel: 'sensor', startTime, endTime, aggregates: ['sum'] }); @@ -927,11 +943,10 @@ describe('@kne/fastify-statistics', function () { dataRecordFindAllResult.push({ channel: 'sensor', attributeName: null, - title: '传感器', unit: '℃', sum: 100, avg: null, count: null, min: null, max: null }); - const results = await fastify.statistics.services.periodStat.query({ + const { list: results, channelMetas } = await fastify.statistics.services.periodStat.query({ channel: 'sensor', startTime, endTime, aggregates: ['sum'] }); @@ -962,7 +977,7 @@ describe('@kne/fastify-statistics', function () { const endTime = new Date('2026-05-01T01:00:00.000Z'); findAllResults.push({ - channel: 'ch1', attributeName: 'val', title: 'test', unit: null, description: null, + channel: 'ch1', attributeName: 'val', sum: 10, avg: null, count: null, min: null, max: null }); @@ -996,7 +1011,7 @@ describe('@kne/fastify-statistics', function () { const endTime = new Date('2026-05-02T00:00:00.000Z'); periodStatRows.push( - { period: 'h', channel: 'ch1', attributeName: 'val', title: 'test', unit: null, description: null, aggregate: 'sum', data: 10, time: startTime } + { period: 'h', channel: 'ch1', attributeName: 'val', aggregate: 'sum', data: 10, time: startTime } ); try { @@ -1017,7 +1032,7 @@ describe('@kne/fastify-statistics', function () { await mockPeriodStatService(fastify, { name: 'statistics' }); findAllResults.push({ - channel: 'ch1', attributeName: 'val', title: 'test', unit: null, description: null, + channel: 'ch1', attributeName: 'val', sum: 10, avg: null, count: null, min: null, max: null }); @@ -1038,13 +1053,13 @@ describe('@kne/fastify-statistics', function () { if (period === 'h') { findAllResults.push({ - channel: 'ch1', attributeName: 'val', title: 'test', unit: null, description: null, + channel: 'ch1', attributeName: 'val', sum: 10, avg: null, count: null, min: null, max: null }); } else { periodStatRows.push( { period: period === 'd' ? 'h' : period === 'w' || period === 'm' ? 'd' : period === 'q' ? 'm' : 'q', - channel: 'ch1', attributeName: 'val', title: 'test', unit: null, description: null, + channel: 'ch1', attributeName: 'val', aggregate: 'sum', data: 10, time: new Date() } ); } @@ -1080,8 +1095,6 @@ describe('@kne/fastify-statistics', function () { await mockPeriodStatService(fastify, { name: 'statistics' }); // Now manually trigger onTick with a failing aggregate - // The aggregate will fail because mock findAll returns empty results but we need it to throw - // Let's override the model to throw fastify.statistics.models.dataRecord.findAll = async () => { throw new Error('Cron aggregate error'); }; From 7f32b06378757d0f8f1dbcc2f4fa3d3099849555 Mon Sep 17 00:00:00 2001 From: Linzp Date: Fri, 22 May 2026 17:56:10 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E7=94=9F=E6=88=90=E6=96=87=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 390 +++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 369 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index c8face7..23aee3f 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,346 @@ npm i --save @kne/fastify-statistics - 业务指标实时监控与多周期报表 - 多通道多属性的数据聚合与趋势查询 +### 使用方法 + +#### 快速开始 + +```js +const fastify = require('fastify')(); + +// 注册依赖插件 +fastify.register(require('@kne/fastify-sequelize'), { /* sequelize 配置 */ }); +fastify.register(require('@kne/fastify-cron'), { /* cron 配置 */ }); + +// 注册统计插件 +fastify.register(require('@kne/fastify-statistics'), { + prefix: '/api/statistics', + cache: redisCacheInstance, // 传入缓存实例启用缓冲模式 + getAuthenticate: type => { + // type 为 'read' 或 'write',返回认证信息 + } +}); + +fastify.listen({ port: 3000 }); +``` + +#### Channel 与 AttributeName 的设计理念 + +**Channel(数据通道)** 是数据的第一级分类维度,采用冒号分隔的多级结构(`a:b:c`)。它的核心思想是:**从宏观到微观的层级划分**。 + +- **一级 channel**(如 `sales`)是根通道,对应唯一的 `channel-meta` 记录(标题、描述) +- **多级 channel**(如 `sales:beijing`、`sales:beijing:team-a`)是更细粒度的子通道 +- 查询时传入一级 channel 即可匹配所有子通道的数据 +- 同一根通道下的所有子通道共享同一个 `channel-meta` + +**AttributeName(属性名)** 是数据的第二级分类维度,用于在同一 channel 下区分不同的数据指标。 + +- 默认值为 `default`,适用于单一指标的场景 +- 当 `data` 传入对象时自动展开为多属性(如 `{revenue: 10000, orders: 50}` 拆分为两条记录) + +#### 实际场景:企业部门数据统计 + +假设一家公司要统计各部门的经营数据,我们可以这样设计 channel: + +``` +company ← 根通道:公司整体 +company:sales ← 子通道:销售部 +company:sales:beijing ← 子通道:销售部北京分部 +company:sales:shanghai ← 子通道:销售部上海分部 +company:rd ← 子通道:研发部 +company:rd:frontend ← 子通道:研发部前端组 +company:rd:backend ← 子通道:研发部后端组 +company:hr ← 子通道:人力资源部 +``` + +对应的 `channel-meta` 只需为根通道 `company` 创建一条记录: + +| channel | title | description | +|---------|-------|-------------| +| company | 公司经营数据 | 各部门经营数据统计 | + +**采集数据**: + +```js +// 1. 销售部北京分部上报单指标(默认 attributeName=default) +await fastify.statistics.services.collect({ + channel: 'company:sales:beijing', + data: 58000, + unit: '元', + title: '公司', + description: '各部门经营数据统计' +}); + +// 2. 销售部上海分部上报多指标(自动展开为多条记录) +await fastify.statistics.services.collect({ + channel: 'company:sales:shanghai', + data: { revenue: 72000, orders: 150 }, + unit: '元', + title: '公司', + description: '各部门经营数据统计' +}); + +// 3. 研发部前端组上报 +await fastify.statistics.services.collect({ + channel: 'company:rd:frontend', + data: { tasks: 12, bugs: 3 }, + title: '公司', + description: '各部门经营数据统计' +}); +``` + +采集后数据会自动展开并入库: + +| channel | attributeName | data | unit | +|---------|--------------|------|------| +| company | default | 58000 | 元 | +| company:sales | default | 58000 | 元 | +| company:sales:beijing | default | 58000 | 元 | +| company | revenue | 72000 | 元 | +| company | orders | 150 | 元 | +| company:sales | revenue | 72000 | 元 | +| company:sales | orders | 150 | 元 | +| company:sales:shanghai | revenue | 72000 | 元 | +| company:sales:shanghai | orders | 150 | 元 | +| ... | ... | ... | ... | + +> 通道展开规则:`company:sales:beijing` 自动展开为 `company`、`company:sales`、`company:sales:beijing` 三条记录,确保每一级都能查到汇总数据。 + +**查询数据**: + +```js +// 1. 查询销售部所有分部的本月合计 +const salesResult = await fastify.statistics.services.query({ + channel: 'company:sales', + startTime: '2026-05-01T00:00:00.000Z', + endTime: '2026-06-01T00:00:00.000Z', + period: 'm', + aggregates: ['sum'] +}); + +// 2. 查询公司所有部门的本月合计(传入一级 channel 即可) +const companyResult = await fastify.statistics.services.query({ + channel: 'company', + startTime: '2026-05-01T00:00:00.000Z', + endTime: '2026-06-01T00:00:00.000Z', + period: 'm', + aggregates: ['sum'] +}); + +// 3. 查询 revenue 和 orders 两个属性的合计与平均 +const revenueResult = await fastify.statistics.services.query({ + channel: 'company', + startTime: '2026-05-01T00:00:00.000Z', + endTime: '2026-06-01T00:00:00.000Z', + attributeNames: ['revenue', 'orders'], + aggregates: ['sum', 'avg'] +}); +``` + +**查询返回格式**: + +> **注意**:查询结果中 `aggregate` 不作为独立字段返回。聚合方法(如 sum、avg)被用作 `data` 对象的键名。`data` 字段始终为对象(按属性名映射),例如单聚合时 `data` 为 `{"default": 58000}`,多聚合时 `data` 为 `{"sum": {"default": 58000}, "avg": {"default": 29000}}`。 + +查询销售部(`channel=company:sales`)返回: + +```json +{ + "channelMetas": { + "company": { "channel": "company", "title": "公司", "description": "各部门经营数据统计" } + }, + "list": [ + { + "channel": "company:sales:beijing", + "period": "m", + "time": "2026-05-01T00:00:00.000Z", + "data": { "default": 58000 }, + "unit": { "default": "元" } + }, + { + "channel": "company:sales:shanghai", + "period": "m", + "time": "2026-05-01T00:00:00.000Z", + "data": { "revenue": 72000, "orders": 150 }, + "unit": { "revenue": "元", "orders": "元" } + } + ] +} +``` + +查询整个公司(`channel=company`)返回: + +```json +{ + "channelMetas": { + "company": { "channel": "company", "title": "公司", "description": "各部门经营数据统计" } + }, + "list": [ + { + "channel": "company", + "period": "m", + "time": "2026-05-01T00:00:00.000Z", + "data": { "default": 130000, "revenue": 72000, "orders": 150, "tasks": 12, "bugs": 3 }, + "unit": { "default": "元", "revenue": "元", "orders": "元", "tasks": "个", "bugs": "个" } + }, + { + "channel": "company:sales", + "period": "m", + "time": "2026-05-01T00:00:00.000Z", + "data": { "default": 130000, "revenue": 72000, "orders": 150 }, + "unit": { "default": "元", "revenue": "元", "orders": "元" } + }, + { + "channel": "company:sales:beijing", + "period": "m", + "time": "2026-05-01T00:00:00.000Z", + "data": { "default": 58000 }, + "unit": { "default": "元" } + }, + { + "channel": "company:sales:shanghai", + "period": "m", + "time": "2026-05-01T00:00:00.000Z", + "data": { "revenue": 72000, "orders": 150 }, + "unit": { "revenue": "元", "orders": "元" } + }, + { + "channel": "company:rd", + "period": "m", + "time": "2026-05-01T00:00:00.000Z", + "data": { "tasks": 12, "bugs": 3 }, + "unit": { "tasks": "个", "bugs": "个" } + }, + { + "channel": "company:rd:frontend", + "period": "m", + "time": "2026-05-01T00:00:00.000Z", + "data": { "tasks": 12, "bugs": 3 }, + "unit": { "tasks": "个", "bugs": "个" } + } + ] +} +``` + +查询 revenue 和 orders 两个属性的合计与平均(`channel=company`, `attributeNames=['revenue','orders']`, `aggregates=['sum','avg']`)返回: + +```json +{ + "channelMetas": { + "company": { "channel": "company", "title": "公司", "description": "各部门经营数据统计" } + }, + "list": [ + { + "channel": "company", + "period": "m", + "time": "2026-05-01T00:00:00.000Z", + "data": { "sum": { "revenue": 72000, "orders": 150 }, "avg": { "revenue": 72000, "orders": 150 } }, + "unit": { "revenue": "元", "orders": "元" } + }, + { + "channel": "company:sales", + "period": "m", + "time": "2026-05-01T00:00:00.000Z", + "data": { "sum": { "revenue": 72000, "orders": 150 }, "avg": { "revenue": 72000, "orders": 150 } }, + "unit": { "revenue": "元", "orders": "元" } + }, + { + "channel": "company:sales:shanghai", + "period": "m", + "time": "2026-05-01T00:00:00.000Z", + "data": { "sum": { "revenue": 72000, "orders": 150 }, "avg": { "revenue": 72000, "orders": 150 } }, + "unit": { "revenue": "元", "orders": "元" } + } + ] +} +``` + +> `channelMetas` 按 root channel 去重,所有子通道共享同一份元数据,避免数据冗余。 + +#### Channel Meta 管理 + +通道元数据在首次采集时自动创建,也可通过服务接口管理: + +```js +// 查询通道元数据 +const meta = await fastify.statistics.services.channelMeta.detail({ + channel: 'company' +}); + +// 列出所有元数据 +const list = await fastify.statistics.services.channelMeta.list(); + +// 按通道筛选 +const list = await fastify.statistics.services.channelMeta.list({ + filter: { channel: 'company' } +}); + +// 修改元数据 +await fastify.statistics.services.channelMeta.save({ + channel: 'company', + title: '企业经营数据总览', + description: '全公司各部门经营指标汇总' +}); +``` + +#### SSE 实时推送 + +通过 HTTP 接口或程序化 API 获取实时统计数据推送: + +```js +// HTTP 接口:GET /api/statistics/sse?channel=company&aggregates=sum&interval=5 +// 浏览器端使用 EventSource 接收 +const eventSource = new EventSource('/api/statistics/sse?channel=company&aggregates=sum&interval=5'); +eventSource.onmessage = (event) => { + const data = JSON.parse(event.data); + console.log(data); // { channel, period, time, data, unit } +}; + +// 程序化调用(在 Fastify 路由中) +fastify.get('/my-sse', async (request, reply) => { + const sseContext = await fastify.statistics.services.sseStream.send(reply, { + name: 'my-sse-channel', + params: { + channel: 'company', + startTime: new Date(Date.now() - 3600000).toISOString(), + endTime: new Date().toISOString(), + aggregates: ['sum'] + }, + fetchData: async (params) => { + return fastify.statistics.services.query(params); + }, + interval: 5, + heartbeatInterval: 30000, + maxDuration: 1800000 + }); + + // 可手动关闭 + // sseContext.close(); + + // 监听关闭事件 + sseContext.onClose(() => { + console.log('SSE 连接已关闭'); + }); +}); +``` + +**SSE 事件类型**: + +| 事件 | 说明 | +|------|------| +| `data`(默认) | 正常数据推送,内容为查询结果 JSON | +| `timeout` | 连接超过 maxDuration 后自动断开通知 | +| `error` | fetchData 出错时的错误事件 | +| 心跳(`: heartbeat`) | 保活注释行 | + +**SSE 上下文方法**: + +| 方法 | 说明 | +|------|------| +| `isConnected()` | 返回当前连接状态 | +| `close()` | 手动关闭 SSE 连接 | +| `onClose(callback)` | 注册连接关闭回调,若已断开则立即执行 | + + ### 示例 @@ -99,24 +439,32 @@ npm i --save @kne/fastify-statistics **返回格式**: ```json -[ - { - "channel": "sensor", - "period": "h", - "time": "2026-05-22T08:00:00.000Z", - "data": 100 - } -] +{ + "channelMetas": { + "sensor": { "channel": "sensor", "title": "传感器", "description": "" } + }, + "list": [ + { + "channel": "sensor", + "period": "h", + "time": "2026-05-22T08:00:00.000Z", + "data": { "default": 100 }, + "unit": { "default": "℃" } + } + ] +} ``` -`data` 字段格式根据查询条件动态决定: +`data` 字段格式始终为对象(按属性名映射),根据聚合方法数量决定层级: | 条件 | data 格式 | 示例 | |------|-----------|------| -| 单属性 + 单聚合 | number | `100` | -| 单属性 + 多聚合 | object | `{"sum": 100, "avg": 50}` | -| 多属性 + 单聚合 | object | `{"temperature": 25, "humidity": 60}` | -| 多属性 + 多聚合 | 嵌套object | `{"sum": {"temperature": 25}, "avg": {"temperature": 12.5}}` | +| 单聚合 | object | `{"default": 100}` 或 `{"temperature": 25, "humidity": 60}` | +| 多聚合 | 嵌套object | `{"sum": {"default": 100}, "avg": {"default": 50}}` 或 `{"sum": {"temperature": 25}, "avg": {"temperature": 12.5}}` | + +`unit` 字段为对象,按属性名映射单位:`{"default": "℃"}` 或 `{"temperature": "℃", "humidity": "%"}` + +> **注意**:查询结果中 `aggregate` 不作为独立字段返回。聚合方法(如 sum、avg)被用作 `data` 对象的键名。例如多聚合时 `data` 为 `{"sum": {"default": 100}, "avg": {"default": 50}}`,而非 `[{aggregate: "sum", data: 100}, {aggregate: "avg", data: 50}]`。 ### 统计周期 @@ -172,7 +520,7 @@ npm i --save @kne/fastify-statistics **响应格式**:`Content-Type: text/event-stream` ``` -data: {"channel":"sensor","period":"h","time":"...","data":100} +data: {"channel":"sensor","period":"h","time":"...","data":{"default":100}} event: timeout data: {"message":"连接已超过30分钟,自动断开"} @@ -221,8 +569,9 @@ data: {"message":"错误信息"} | attributeName | STRING | 属性名(默认 default) | | data | DECIMAL(16,2) | 数据值(必填) | | time | DATE | 采集时间(必填) | +| unit | STRING | 数据单位 | -> `title`、`description`、`unit` 已移至 `channel-meta` 表,按 `(channel, attributeName)` 关联。 +> `title`、`description` 已移至 `channel-meta` 表,按 root channel 关联。 #### period-stat(周期统计) @@ -234,8 +583,9 @@ data: {"message":"错误信息"} | attributeName | STRING | 属性名(默认 default) | | aggregate | ENUM | 聚合方法(必填): sum/avg/count/min/max | | data | DECIMAL(16,2) | 统计数据值(必填) | +| unit | STRING | 数据单位 | -> `title`、`description`、`unit` 已移至 `channel-meta` 表,按 `(channel, attributeName)` 关联。 +> `title`、`description` 已移至 `channel-meta` 表,按 root channel 关联。 **唯一约束**:`(period, channel, attributeName, aggregate, time)` @@ -243,12 +593,10 @@ data: {"message":"错误信息"} | 属性名 | 类型 | 说明 | |--------|------|------| -| channel | STRING | 数据通道(联合主键) | -| attributeName | STRING | 属性名(联合主键,默认 default) | +| channel | STRING | 数据通道(唯一键) | | title | STRING | 标题(必填) | | description | TEXT | 描述 | -| unit | STRING | 数据单位 | -**唯一约束**:`(channel, attributeName)` +**唯一约束**:`channel` -**说明**:`title`、`description`、`unit` 三个字段从 `data-record` 和 `period-stat` 中提取到 `channel-meta` 表,按 `channel`+`attributeName` 唯一存储。首次采集某通道数据时自动创建元数据记录,后续采集忽略(不更新)。 +**说明**:`channel-meta` 按 root channel(一级通道)唯一存储,一条元数据被所有以该 root channel 为前缀的子通道共享。首次采集某通道数据时,自动以其 root channel 创建元数据记录。`title` 和 `description` 从采集参数中提取,后续采集忽略(不更新)。`unit` 字段保留在 `data-record` 和 `period-stat` 表中。