Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,855 changes: 937 additions & 918 deletions monitoring/grafana/dashboards/storage-otel.json

Large diffs are not rendered by default.

1,429 changes: 170 additions & 1,259 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"@aws-sdk/s3-request-presigner": "^3.979.0",
"@fastify/accepts": "^5.0.4",
"@fastify/multipart": "^9.4.0",
"@fastify/otel": "^0.17.1",
"@fastify/rate-limit": "^10.3.0",
"@fastify/swagger": "^9.7.0",
"@fastify/swagger-ui": "^5.2.5",
Expand All @@ -48,7 +49,6 @@
"@opentelemetry/exporter-prometheus": "^0.213.0",
"@opentelemetry/host-metrics": "^0.38.3",
"@opentelemetry/instrumentation-aws-sdk": "^0.59.0",
"@opentelemetry/instrumentation-fastify": "^0.57.0",
"@opentelemetry/instrumentation-http": "^0.213.0",
"@opentelemetry/instrumentation-knex": "^0.53.1",
"@opentelemetry/instrumentation-pg": "^0.64.0",
Expand Down
9 changes: 5 additions & 4 deletions src/http/plugins/log-request.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { logger, logSchema, redactQueryParamFromRequest } from '@internal/monitoring'
import { trace } from '@opentelemetry/api'
import { FastifyReply } from 'fastify/types/reply'
import { FastifyRequest } from 'fastify/types/request'
import fastifyPlugin from 'fastify-plugin'
Expand Down Expand Up @@ -77,8 +76,8 @@ export const logRequest = (options: RequestLoggerOptions) =>
req.resources = resources
req.operation = req.routeOptions.config.operation

if (req.operation) {
trace.getActiveSpan()?.setAttribute('http.operation', req.operation.type)
if (req.operation && typeof req.opentelemetry === 'function') {
req.opentelemetry()?.span?.setAttribute('http.operation', req.operation.type)
}
})

Expand Down Expand Up @@ -135,7 +134,9 @@ function doRequestLog(req: FastifyRequest, options: LogRequestOptions) {

if (reqMetadata) {
try {
trace.getActiveSpan()?.setAttribute('http.metadata', JSON.stringify(reqMetadata))
if (typeof req.opentelemetry === 'function') {
req.opentelemetry()?.span?.setAttribute('http.metadata', JSON.stringify(reqMetadata))
}
} catch (e) {
// do nothing
logSchema.warning(logger, 'Failed to serialize log metadata', {
Expand Down
1 change: 0 additions & 1 deletion src/http/plugins/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ export const httpMetrics = (options: HttpMetricsOptions = {}) =>

const attributes = {
method,
route,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the reason? cardinality + covered by operation?

sounds good but there are still references to it in dashboard though

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, cardinality, we do use the operation only, i don't think we reference the route

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Top 10 panels refer it

operation: request.operation?.type || 'unknown',
status_code: statusCode,
tenantId: request.tenantId || '',
Expand Down
13 changes: 13 additions & 0 deletions src/http/plugins/tracing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,19 @@ export const tracing = fastifyPlugin(
} else {
request.tracingMode = defaultTracingMode
}

// Use request.opentelemetry().span to get the root request span,
// not trace.getActiveSpan() which returns a child hook span.
const span =
typeof request.opentelemetry === 'function' ? request.opentelemetry()?.span : undefined
if (span) {
if (request.tenantId) {
span.setAttribute('tenant.ref', request.tenantId)
}
if (request.tracingMode) {
span.setAttribute('trace.mode', request.tracingMode)
}
}
} catch (e) {
logSchema.error(request.log, 'failed setting tracing mode', { error: e, type: 'tracing' })
}
Expand Down
5 changes: 2 additions & 3 deletions src/http/routes/s3/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import fastifyMultipart from '@fastify/multipart'
import { trace } from '@opentelemetry/api'
import { FastifyInstance, RouteHandlerMethod } from 'fastify'
import { JSONSchema } from 'json-schema-to-ts'
import { getConfig } from '../../../config'
Expand Down Expand Up @@ -63,8 +62,8 @@ export default async function routes(fastify: FastifyInstance) {

req.operation = { type: operation }

if (req.operation.type) {
trace.getActiveSpan()?.setAttribute('http.operation', req.operation.type)
if (req.operation.type && typeof req.opentelemetry === 'function') {
req.opentelemetry()?.span?.setAttribute('http.operation', req.operation.type)
}

const data: RequestInput<any> = {
Expand Down
1 change: 0 additions & 1 deletion src/http/routes/tus/lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ export async function onIncomingRequest(rawReq: Request, id: string) {
}

res.on('finish', () => {
console.log('Tus request finished')
req.upload.db.dispose().catch((e) => {
req.log.error({ error: e }, 'Error disposing db connection')
})
Expand Down
124 changes: 91 additions & 33 deletions src/internal/database/pool.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import { wait } from '@internal/concurrency'
import { getSslSettings } from '@internal/database/ssl'
import { logger, logSchema } from '@internal/monitoring'
import { dbActiveConnection, dbActivePool, dbInUseConnection } from '@internal/monitoring/metrics'
import {
dbActiveConnection,
dbActivePool,
dbInUseConnection,
isMetricEnabled,
meter,
} from '@internal/monitoring/metrics'
import TTLCache from '@isaacs/ttlcache'
import { JWTPayload } from 'jose'
import { Knex, knex } from 'knex'
Expand Down Expand Up @@ -41,10 +47,16 @@ export interface User {
payload: { role?: string } & JWTPayload
}

export interface PoolStats {
used: number
total: number
}

export interface PoolStrategy {
acquire(): Knex
rebalance(options: { clusterSize: number }): void
destroy(): Promise<void>
getPoolStats(): PoolStats | null
}

export const searchPath = ['storage', 'public', 'extensions', ...dbSearchPath.split(',')].filter(
Expand Down Expand Up @@ -72,22 +84,84 @@ const tenantPools = new TTLCache<string, PoolStrategy>({
},
})

// ============================================================================
// Pool stats collection — chunked to avoid blocking the event loop
// ============================================================================
interface PoolStatsSnapshot {
poolCount: number
totalConnections: number
totalInUse: number
}

const STATS_CHUNK_SIZE = 100
const STATS_INTERVAL_MS = 5_000

let cachedPoolStats: PoolStatsSnapshot = {
poolCount: 0,
totalConnections: 0,
totalInUse: 0,
}
let collectInProgress = false

async function collectPoolStats() {
if (collectInProgress) return
collectInProgress = true

try {
let poolCount = 0
let totalConnections = 0
let totalInUse = 0
let chunkCount = 0

for (const [, pool] of tenantPools.entries()) {
poolCount++
const stats = pool.getPoolStats()
if (stats) {
totalConnections += stats.total
totalInUse += stats.used
}
// Yield to the event loop between chunks
if (++chunkCount % STATS_CHUNK_SIZE === 0) {
await new Promise<void>((resolve) => setImmediate(resolve))
}
}

cachedPoolStats = {
poolCount,
totalConnections,
totalInUse,
}
} finally {
collectInProgress = false
}
}

/**
* PoolManager is a class that manages a pool of Knex connections.
* It creates a new pool for each tenant and reuses existing pools.
*/
export class PoolManager {
monitor(signal: AbortSignal) {
const monitorInterval = setInterval(() => {
dbActivePool.record(tenantPools.size)
}, 2000)

signal.addEventListener(
'abort',
() => {
clearInterval(monitorInterval)
monitor() {
// Periodically collect stats in a non-blocking way
const interval = setInterval(() => {
void collectPoolStats()
}, STATS_INTERVAL_MS)
interval.unref()

// Observable callback reads the cached snapshot — O(1)
meter.addBatchObservableCallback(
(observer) => {
if (isMetricEnabled('db_active_local_pools')) {
observer.observe(dbActivePool, cachedPoolStats.poolCount)
}
if (isMetricEnabled('db_connections')) {
observer.observe(dbActiveConnection, cachedPoolStats.totalConnections)
}
if (isMetricEnabled('db_connections_in_use')) {
observer.observe(dbInUseConnection, cachedPoolStats.totalInUse)
}
},
{ once: true }
[dbActivePool, dbActiveConnection, dbInUseConnection]
)
}

Expand Down Expand Up @@ -152,7 +226,6 @@ export class PoolManager {
*/
class TenantPool implements PoolStrategy {
protected pool?: Knex
protected monitorHandle?: ReturnType<typeof setInterval>

constructor(protected readonly options: TenantConnectionOptions) {}

Expand All @@ -162,12 +235,10 @@ class TenantPool implements PoolStrategy {
}

this.pool = this.createKnexPool()
this.startMonitor()
return this.pool
}

destroy(): Promise<void> {
this.stopMonitor()
const originalPool = this.pool

if (!originalPool) {
Expand All @@ -178,24 +249,12 @@ class TenantPool implements PoolStrategy {
return this.drainPool(originalPool)
}

protected startMonitor() {
this.monitorHandle = setInterval(() => {
const tarnPool = this.pool?.client?.pool
if (!tarnPool) return

dbInUseConnection.record(tarnPool.numUsed(), { tenant_id: this.options.tenantId })
dbActiveConnection.record(tarnPool.numUsed() + tarnPool.numFree(), {
tenant_id: this.options.tenantId,
})
}, 2000)

this.monitorHandle.unref()
}

protected stopMonitor() {
if (this.monitorHandle) {
clearInterval(this.monitorHandle)
this.monitorHandle = undefined
getPoolStats(): PoolStats | null {
const tarnPool = this.pool?.client?.pool
if (!tarnPool) return null
return {
used: tarnPool.numUsed(),
total: tarnPool.numUsed() + tarnPool.numFree(),
}
}

Expand Down Expand Up @@ -227,7 +286,6 @@ class TenantPool implements PoolStrategy {
return
}

this.stopMonitor()
const originalPool = this.pool

this.options.clusterSize = options.clusterSize
Expand Down
Loading