Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
759 changes: 283 additions & 476 deletions README.md

Large diffs are not rendered by default.

64 changes: 11 additions & 53 deletions benchmark/aggregate-bench.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
*/
const dayjs = require('dayjs');
const {
createRealFastify, seedDataRecords, seedPeriodStats, ensureChannelMetas,
createRealFastify, createCronFastify, setWatermark,
seedDataRecords, seedPeriodStats, ensureChannelMetas,
measure, formatStatsHeader, formatStats, memorySnapshot, formatMemoryDelta,
createResultCollector, getResultFilePath
} = require('./helpers');
Expand Down Expand Up @@ -128,7 +129,7 @@ async function run() {
console.log('\n--- 4a. 小窗口补偿 (h 周期, 从 dataRecord) ---');

for (const windowCount of windowCounts) {
const { fastify, cleanup } = await createRealFastify();
const { fastify, cleanup, createdJobs } = await createCronFastify({ pluginOptions: { compensationBatchSize: 100 } });
const now = dayjs().startOf('hour').toDate();
const pastStart = dayjs(now).subtract(windowCount, 'hour').toDate();
const channels = ['sensor'];
Expand All @@ -143,29 +144,13 @@ async function run() {
});

// 设置 watermark 到 pastStart(需要补偿 windowCount 个窗口)
await fastify.statistics.models.aggregationWatermark.upsert({
period: 'h', nextTime: pastStart
});

// 通过 cron onTick 触发 compensate
const createdJobs = [];
fastify.decorate('cron', {
createJob: (jobConfig) => { createdJobs.push(jobConfig); }
});

// 重新注册 periodStat 服务(带 cron)
const fp = require('fastify-plugin');
await fp(require('../libs/services/period-stat'))(fastify, {
name: 'statistics', compensationEnabled: false, compensationBatchSize: 100
});
await setWatermark(fastify, 'h', pastStart);

const hJob = createdJobs.find(j => j.name === 'statistics-period-stat-h');

const stats = await measure(async () => {
// 重置 watermark
await fastify.statistics.models.aggregationWatermark.upsert({
period: 'h', nextTime: pastStart
});
await setWatermark(fastify, 'h', pastStart);
await hJob.onTick();
}, { iterations: 10, warmup: 2 });

Expand All @@ -181,7 +166,7 @@ async function run() {
console.log('\n--- 4b. 大批量补偿 (h 周期, compensationBatchSize 变化) ---');

for (const batchSize of batchSizes) {
const { fastify, cleanup } = await createRealFastify();
const { fastify, cleanup, createdJobs } = await createCronFastify({ pluginOptions: { compensationBatchSize: batchSize } });
const now = dayjs().startOf('hour').toDate();
const dayAgo = dayjs(now).subtract(24, 'hour').toDate();
const channels = ['sensor'];
Expand All @@ -193,19 +178,7 @@ async function run() {
startTime: dayAgo, endTime: now
});

await fastify.statistics.models.aggregationWatermark.upsert({
period: 'h', nextTime: dayAgo
});

const createdJobs = [];
fastify.decorate('cron', {
createJob: (jobConfig) => { createdJobs.push(jobConfig); }
});

const fp = require('fastify-plugin');
await fp(require('../libs/services/period-stat'))(fastify, {
name: 'statistics', compensationEnabled: false, compensationBatchSize: batchSize
});
await setWatermark(fastify, 'h', dayAgo);

const hJob = createdJobs.find(j => j.name === 'statistics-period-stat-h');

Expand All @@ -224,7 +197,7 @@ async function run() {
{
console.log('\n--- 4c. 级联补偿 (d → h) ---');

const { fastify, cleanup } = await createRealFastify();
const { fastify, cleanup, createdJobs } = await createCronFastify({ pluginOptions: { compensationBatchSize: 100 } });
const now = dayjs().startOf('day').toDate();
const dayAgo = dayjs(now).subtract(1, 'day').toDate();
const channels = ['sensor'];
Expand All @@ -236,24 +209,9 @@ async function run() {
startTime: dayAgo, endTime: now
});

// d 的 watermark 在过去
await fastify.statistics.models.aggregationWatermark.upsert({
period: 'd', nextTime: dayAgo
});
// h 的 watermark 也在过去,需要先补偿 h
await fastify.statistics.models.aggregationWatermark.upsert({
period: 'h', nextTime: dayAgo
});

const createdJobs = [];
fastify.decorate('cron', {
createJob: (jobConfig) => { createdJobs.push(jobConfig); }
});

const fp = require('fastify-plugin');
await fp(require('../libs/services/period-stat'))(fastify, {
name: 'statistics', compensationEnabled: false, compensationBatchSize: 100
});
// d 和 h 的 watermark 都在过去
await setWatermark(fastify, 'd', dayAgo);
await setWatermark(fastify, 'h', dayAgo);

const dJob = createdJobs.find(j => j.name === 'statistics-period-stat-d');

Expand Down
101 changes: 101 additions & 0 deletions benchmark/helpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ async function createRealFastify({ dbPath, pluginOptions = {} } = {}) {
]
});

// 初始化 periodStat 服务(必须在 ready 之后调用 init)
await fastify.ready();
await fastify.statistics.services.periodStat.init();

const cleanup = async () => {
await fastify.close();
try { fs.unlinkSync(dbPath); } catch (e) { /* ignore */ }
Expand Down Expand Up @@ -421,8 +425,105 @@ function printSummaryReport(scenarioKeys) {
}
}

/**
* 创建带 cron 支持的 Fastify 实例(用于 compensate 基准测试)
* cron 任务被收集到 createdJobs 数组,便于手动触发 onTick
* @param {object} [options]
* @param {string} [options.dbPath] - SQLite 文件路径
* @param {object} [options.pluginOptions] - 传给 services 的选项
* @returns {Promise<{fastify, dbPath, cleanup, createdJobs}>}
*/
async function createCronFastify({ dbPath, pluginOptions = {} } = {}) {
if (!dbPath) {
dbPath = path.join(os.tmpdir(), `benchmark-${Date.now()}-${Math.random().toString(36).slice(2)}.db`);
}

const fastify = require('fastify')({ logger: false });

// 注册 sequelize
await fastify.register(require('@kne/fastify-sequelize'), {
db: {
dialect: 'sqlite',
storage: dbPath,
logging: false,
dialectOptions: {
mode: require('sqlite3').OPEN_READWRITE | require('sqlite3').OPEN_CREATE
}
},
syncOptions: { alter: true }
});

// 加载模型
const models = await fastify.sequelize.addModels(
path.resolve(__dirname, '../libs/models'),
{ prefix: 't_', modelPrefix: 'statistics' }
);

// 同步数据库
await fastify.sequelize.sync();

// 启用 WAL 模式
const seqInstance = fastify.sequelize.instance;
await seqInstance.query('PRAGMA journal_mode=WAL');
await seqInstance.query('PRAGMA busy_timeout=5000');

// 收集 cron jobs
const createdJobs = [];

// 注册 namespace + services(带 cron 收集器)
await fastify.register(require('@kne/fastify-namespace'), {
options: Object.assign({
prefix: '/api/statistics',
dbTableNamePrefix: 't_',
name: 'statistics',
compensationEnabled: false,
queryCacheEnabled: true,
dataRetentionDays: 365
}, pluginOptions),
name: 'statistics',
modules: [
['models', models],
['services', path.resolve(__dirname, '../libs/services')]
]
});

// 注册 cron 收集器(必须在 namespace 之后,因为 periodStat init() 会检查 fastify.cron)
fastify.decorate('cron', {
createJob: (jobConfig) => { createdJobs.push(jobConfig); }
});

// 初始化 periodStat 服务
await fastify.ready();
await fastify.statistics.services.periodStat.init();

const cleanup = async () => {
await fastify.close();
try { fs.unlinkSync(dbPath); } catch (e) { /* ignore */ }
};

return { fastify, dbPath, cleanup, createdJobs };
}

/**
* 设置指定周期的 watermark(兼容 findOne+update/create 模式)
* @param {object} fastify
* @param {string} period
* @param {Date} nextTime
*/
async function setWatermark(fastify, period, nextTime) {
const models = fastify.statistics.models;
const existing = await models.aggregationWatermark.findOne({ where: { period } });
if (existing) {
await existing.update({ nextTime });
} else {
await models.aggregationWatermark.create({ period, nextTime });
}
}

module.exports = {
createRealFastify,
createCronFastify,
setWatermark,
seedDataRecords,
seedPeriodStats,
ensureChannelMetas,
Expand Down
Loading
Loading