diff --git a/.github/workflows/ci-test-e2e.yml b/.github/workflows/ci-test-e2e.yml index 99395082a9fbc..62ea2af7368f7 100644 --- a/.github/workflows/ci-test-e2e.yml +++ b/.github/workflows/ci-test-e2e.yml @@ -57,6 +57,14 @@ on: required: false CODECOV_TOKEN: required: false + CI_MONGO_URL: + required: false + NETBIRD_SETUP_KEY: + required: false + NETBIRD_PSK: + required: false + NETBIRD_TEST_IP: + required: false REPORTER_JIRA_ROCKETCHAT_API_KEY: required: false @@ -71,10 +79,12 @@ jobs: runs-on: ubuntu-24.04 env: + # secrets are not available in step `if:`; use for steps that must run only with CI_MONGO_URL + HAS_CI_MONGO_URL: ${{ secrets.CI_MONGO_URL != '' && 'true' || 'false' }} # if building for production on develop branch or release, add suffix for coverage images DOCKER_TAG_SUFFIX_ROCKETCHAT: ${{ inputs.coverage == matrix.mongodb-version && (github.event_name == 'release' || github.ref == 'refs/heads/develop') && '-cov' || '' }} MONGODB_VERSION: ${{ matrix.mongodb-version }} - COVERAGE_DIR: '/tmp/coverage/${{ startsWith(inputs.type, ''api'') && ''api'' || inputs.type }}' + COVERAGE_DIR: "/tmp/coverage/${{ startsWith(inputs.type, 'api') && 'api' || inputs.type }}" COVERAGE_FILE_NAME: '${{ inputs.type }}-${{ matrix.shard }}.json' COVERAGE_REPORTER: ${{ inputs.coverage == matrix.mongodb-version && 'json' || '' }} @@ -155,6 +165,29 @@ jobs: # List loaded images docker images + - name: Connect to NetBird + if: env.HAS_CI_MONGO_URL == 'true' + uses: RocketChat/netbird-connect@main + with: + setup-key: ${{ secrets.NETBIRD_SETUP_KEY }} + preshared-key: ${{ secrets.NETBIRD_PSK }} + test-ip: ${{ secrets.NETBIRD_TEST_IP }} + + - name: Configure external MongoDB URL + env: + CI_MONGO_URL: ${{ secrets.CI_MONGO_URL }} + if: env.CI_MONGO_URL != '' + run: | + DB_NAME="rocketchat_${{ inputs.type }}_${{ matrix.shard }}_${{ github.run_id }}_${{ github.run_attempt }}" + BASE="${CI_MONGO_URL%%\?*}" + BASE="${BASE%/}" + QUERY="${CI_MONGO_URL#"$BASE"}" + FULL_URL="${BASE}/${DB_NAME}${QUERY}" + echo "MONGO_URL=${FULL_URL}" >> $GITHUB_ENV + echo "DOCKER_MONGO_URL=${FULL_URL}" >> $GITHUB_ENV + echo "EXTERNAL_MONGO=true" >> $GITHUB_ENV + echo "DOCUMENTDB=true" >> $GITHUB_ENV + - name: Set DEBUG_LOG_LEVEL (debug enabled) if: runner.debug == '1' run: echo "DEBUG_LOG_LEVEL=2" >> $GITHUB_ENV @@ -175,7 +208,11 @@ jobs: if: inputs.release == 'ce' run: | # when we are testing CE, we only need to start the rocketchat container - DEBUG_LOG_LEVEL=${DEBUG_LOG_LEVEL:-0} docker compose -f docker-compose-ci.yml up -d rocketchat --wait + EXTRA_ARGS="" + if [ "$EXTERNAL_MONGO" = "true" ]; then + EXTRA_ARGS="--scale mongo=0" + fi + DEBUG_LOG_LEVEL=${DEBUG_LOG_LEVEL:-0} docker compose -f docker-compose-ci.yml up -d rocketchat $EXTRA_ARGS --wait - name: Start containers for EE if: inputs.release == 'ee' @@ -183,7 +220,11 @@ jobs: ENTERPRISE_LICENSE: ${{ inputs.enterprise-license }} TRANSPORTER: ${{ inputs.transporter }} run: | - DEBUG_LOG_LEVEL=${DEBUG_LOG_LEVEL:-0} docker compose -f docker-compose-ci.yml up -d --wait + EXTRA_ARGS="" + if [ "$EXTERNAL_MONGO" = "true" ]; then + EXTRA_ARGS="--scale mongo=0" + fi + DEBUG_LOG_LEVEL=${DEBUG_LOG_LEVEL:-0} docker compose -f docker-compose-ci.yml up -d $EXTRA_ARGS --wait - uses: ./.github/actions/setup-playwright if: inputs.type == 'ui' diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d590223204a86..bdd84f845dfad 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -500,6 +500,10 @@ jobs: secrets: CR_USER: ${{ secrets.CR_USER }} CR_PAT: ${{ secrets.CR_PAT }} + CI_MONGO_URL: ${{ secrets.CI_MONGO_URL }} + NETBIRD_SETUP_KEY: ${{ secrets.NETBIRD_SETUP_KEY }} + NETBIRD_PSK: ${{ secrets.NETBIRD_PSK }} + NETBIRD_TEST_IP: ${{ secrets.NETBIRD_TEST_IP }} test-api-livechat: name: πŸ”¨ Test API Livechat (CE) @@ -516,6 +520,10 @@ jobs: secrets: CR_USER: ${{ secrets.CR_USER }} CR_PAT: ${{ secrets.CR_PAT }} + CI_MONGO_URL: ${{ secrets.CI_MONGO_URL }} + NETBIRD_SETUP_KEY: ${{ secrets.NETBIRD_SETUP_KEY }} + NETBIRD_PSK: ${{ secrets.NETBIRD_PSK }} + NETBIRD_TEST_IP: ${{ secrets.NETBIRD_TEST_IP }} test-ui: name: πŸ”¨ Test UI (CE) @@ -541,6 +549,10 @@ jobs: REPORTER_ROCKETCHAT_API_KEY: ${{ secrets.REPORTER_ROCKETCHAT_API_KEY }} REPORTER_ROCKETCHAT_URL: ${{ secrets.REPORTER_ROCKETCHAT_URL }} REPORTER_JIRA_ROCKETCHAT_API_KEY: ${{ secrets.REPORTER_JIRA_ROCKETCHAT_API_KEY }} + CI_MONGO_URL: ${{ secrets.CI_MONGO_URL }} + NETBIRD_SETUP_KEY: ${{ secrets.NETBIRD_SETUP_KEY }} + NETBIRD_PSK: ${{ secrets.NETBIRD_PSK }} + NETBIRD_TEST_IP: ${{ secrets.NETBIRD_TEST_IP }} test-api-ee: name: πŸ”¨ Test API (EE) @@ -561,6 +573,10 @@ jobs: secrets: CR_USER: ${{ secrets.CR_USER }} CR_PAT: ${{ secrets.CR_PAT }} + CI_MONGO_URL: ${{ secrets.CI_MONGO_URL }} + NETBIRD_SETUP_KEY: ${{ secrets.NETBIRD_SETUP_KEY }} + NETBIRD_PSK: ${{ secrets.NETBIRD_PSK }} + NETBIRD_TEST_IP: ${{ secrets.NETBIRD_TEST_IP }} test-api-livechat-ee: name: πŸ”¨ Test API Livechat (EE) @@ -581,6 +597,10 @@ jobs: secrets: CR_USER: ${{ secrets.CR_USER }} CR_PAT: ${{ secrets.CR_PAT }} + CI_MONGO_URL: ${{ secrets.CI_MONGO_URL }} + NETBIRD_SETUP_KEY: ${{ secrets.NETBIRD_SETUP_KEY }} + NETBIRD_PSK: ${{ secrets.NETBIRD_PSK }} + NETBIRD_TEST_IP: ${{ secrets.NETBIRD_TEST_IP }} test-ui-ee: name: πŸ”¨ Test UI (EE) @@ -609,6 +629,10 @@ jobs: REPORTER_ROCKETCHAT_URL: ${{ secrets.REPORTER_ROCKETCHAT_URL }} CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} REPORTER_JIRA_ROCKETCHAT_API_KEY: ${{ secrets.REPORTER_JIRA_ROCKETCHAT_API_KEY }} + CI_MONGO_URL: ${{ secrets.CI_MONGO_URL }} + NETBIRD_SETUP_KEY: ${{ secrets.NETBIRD_SETUP_KEY }} + NETBIRD_PSK: ${{ secrets.NETBIRD_PSK }} + NETBIRD_TEST_IP: ${{ secrets.NETBIRD_TEST_IP }} test-federation-matrix: name: πŸ”¨ Test Federation Matrix diff --git a/apps/meteor/app/api/server/lib/users.ts b/apps/meteor/app/api/server/lib/users.ts index 00dfc3e8f5292..c11955f3da0ef 100644 --- a/apps/meteor/app/api/server/lib/users.ts +++ b/apps/meteor/app/api/server/lib/users.ts @@ -1,5 +1,5 @@ import type { IUser } from '@rocket.chat/core-typings'; -import { Users, Subscriptions } from '@rocket.chat/models'; +import { Users, Subscriptions, getAllowDiskUse } from '@rocket.chat/models'; import { escapeRegExp } from '@rocket.chat/string-helpers'; import type { Mongo } from 'meteor/mongo'; import type { Filter, FindOptions, RootFilterOperators } from 'mongodb'; @@ -229,7 +229,7 @@ export async function findPaginatedUsersByStatus({ skip: offset, limit: count, projection, - allowDiskUse: true, + ...getAllowDiskUse(), }, ); const [users, total] = await Promise.all([cursor.toArray(), totalCount]); diff --git a/apps/meteor/app/api/server/v1/users.ts b/apps/meteor/app/api/server/v1/users.ts index 50c65abcd8d12..6181770b315b3 100644 --- a/apps/meteor/app/api/server/v1/users.ts +++ b/apps/meteor/app/api/server/v1/users.ts @@ -543,42 +543,28 @@ API.v1.addRoute( ] : []; - const result = await Users.col - .aggregate<{ sortedResults: IUser[]; totalCount: { total: number }[] }>([ - { - $match: nonEmptyQuery, - }, - { - $project: inclusiveFields, - }, - { - $addFields: { - nameInsensitive: { - $toLower: '$name', - }, - }, - }, - { - $facet: { - sortedResults: [ - { - $sort: actualSort, - }, - { - $skip: offset, - }, - ...limit, - ], - totalCount: [{ $group: { _id: null, total: { $sum: 1 } } }], + const baseQuery = [ + { + $match: nonEmptyQuery, + }, + { + $project: inclusiveFields, + }, + { + $addFields: { + nameInsensitive: { + $toLower: '$name', }, }, - ]) - .toArray(); + }, + ]; + + const [users, countResult] = await Promise.all([ + Users.col.aggregate([...baseQuery, { $sort: actualSort }, { $skip: offset }, ...limit]).toArray(), + Users.col.aggregate<{ total: number }>([...baseQuery, { $count: 'total' }]).toArray(), + ]); - const { - sortedResults: users, - totalCount: [{ total } = { total: 0 }], - } = result[0]; + const total = countResult[0]?.total || 0; return API.v1.success({ users, diff --git a/apps/meteor/app/livechat/server/lib/QueueManager.ts b/apps/meteor/app/livechat/server/lib/QueueManager.ts index c012a8a7d8a0b..784a1bdb1725f 100644 --- a/apps/meteor/app/livechat/server/lib/QueueManager.ts +++ b/apps/meteor/app/livechat/server/lib/QueueManager.ts @@ -27,7 +27,7 @@ import { afterInquiryQueued, afterRoomQueued, beforeDelegateAgent, beforeRouteCh import { checkOnlineAgents, getOnlineAgents } from './service-status'; import { getInquirySortMechanismSetting } from './settings'; import { dispatchInquiryPosition } from '../../../../ee/app/livechat-enterprise/server/lib/Helper'; -import { client, shouldRetryTransaction } from '../../../../server/database/utils'; +import { client, shouldRetryTransaction, transactionOptions } from '../../../../server/database/utils'; import { sendNotification } from '../../../lib/server'; import { notifyOnLivechatInquiryChangedById, notifyOnLivechatInquiryChanged } from '../../../lib/server/lib/notifyListener'; import { settings } from '../../../settings/server'; @@ -257,7 +257,7 @@ export class QueueManager { ): Promise<{ room: IOmnichannelRoom; inquiry: ILivechatInquiryRecord }> { const session = client.startSession(); try { - session.startTransaction(); + session.startTransaction(transactionOptions); const room = await createLivechatRoom(insertionRoom, session); logger.debug({ msg: 'Room created for visitor', visitorId: guest._id, roomId: room._id }); const inquiry = await createLivechatInquiry({ diff --git a/apps/meteor/app/livechat/server/lib/closeRoom.ts b/apps/meteor/app/livechat/server/lib/closeRoom.ts index 8ee99231ee609..f5c036e8062de 100644 --- a/apps/meteor/app/livechat/server/lib/closeRoom.ts +++ b/apps/meteor/app/livechat/server/lib/closeRoom.ts @@ -9,7 +9,7 @@ import type { ClientSession } from 'mongodb'; import type { CloseRoomParams, CloseRoomParamsByUser, CloseRoomParamsByVisitor } from './localTypes'; import { livechatLogger as logger } from './logger'; import { parseTranscriptRequest } from './parseTranscriptRequest'; -import { client, shouldRetryTransaction } from '../../../../server/database/utils'; +import { client, shouldRetryTransaction, transactionOptions } from '../../../../server/database/utils'; import { callbacks } from '../../../../server/lib/callbacks'; import { notifyOnLivechatInquiryChanged, @@ -33,7 +33,7 @@ export async function closeRoom(params: CloseRoomParams, attempts = 2): Promise< const session = client.startSession(); try { - session.startTransaction(); + session.startTransaction(transactionOptions); const { room, closedBy, removedInquiry } = await doCloseRoom(params, session); await session.commitTransaction(); diff --git a/apps/meteor/app/settings/server/SettingsRegistry.ts b/apps/meteor/app/settings/server/SettingsRegistry.ts index 3b93ca5c0bfb6..55eabd4f52b69 100644 --- a/apps/meteor/app/settings/server/SettingsRegistry.ts +++ b/apps/meteor/app/settings/server/SettingsRegistry.ts @@ -198,7 +198,17 @@ export class SettingsRegistry { const setting = isOverwritten ? settingFromCodeOverwritten : settingOverwrittenDefault; - await this.model.insertOne(setting); // no need to emit unless we remove the oplog + try { + await this.model.insertOne(setting); // no need to emit unless we remove the oplog + } catch (e: any) { + // Another process inserted the same setting first (e.g. main app + EE + // microservices booting in parallel). The check above is in-memory and + // not atomic, so concurrent boots race on the unique _id index. + // E11000 here is safe to ignore β€” the setting now exists. + if (e?.code !== 11000) { + throw e; + } + } this.store.set(setting); } @@ -224,7 +234,16 @@ export class SettingsRegistry { if (!this.store.has(_id)) { options.ts = new Date(); this.store.set(options as ISetting); - await this.model.insertOne(options as ISetting); + try { + await this.model.insertOne(options as ISetting); + } catch (e: any) { + // Race with another process (main app + EE microservices boot in + // parallel). Same rationale as in `add` above β€” E11000 here means + // another caller wrote the group first; the desired state holds. + if (e?.code !== 11000) { + throw e; + } + } } if (!callback) { diff --git a/apps/meteor/ee/server/api/abac/index.ts b/apps/meteor/ee/server/api/abac/index.ts index 0ed503678f527..8cce4c8a4ae78 100644 --- a/apps/meteor/ee/server/api/abac/index.ts +++ b/apps/meteor/ee/server/api/abac/index.ts @@ -1,7 +1,7 @@ import { Abac } from '@rocket.chat/core-services'; import type { AbacActor } from '@rocket.chat/core-services'; import type { IServerEvents, IUser } from '@rocket.chat/core-typings'; -import { ServerEvents, Users } from '@rocket.chat/models'; +import { ServerEvents, Users, getAllowDiskUse } from '@rocket.chat/models'; import { validateUnauthorizedErrorResponse } from '@rocket.chat/rest-typings/src/v1/Ajv'; import { convertSubObjectsIntoPaths } from '@rocket.chat/tools'; @@ -393,7 +393,7 @@ const abacEndpoints = API.v1 sort: _sort, skip: offset, limit: count, - allowDiskUse: true, + ...getAllowDiskUse(), }, ); diff --git a/apps/meteor/ee/server/api/audit.ts b/apps/meteor/ee/server/api/audit.ts index 77d4ebf10cf7c..b6e0f7e718ada 100644 --- a/apps/meteor/ee/server/api/audit.ts +++ b/apps/meteor/ee/server/api/audit.ts @@ -1,5 +1,5 @@ import type { IUser, IRoom } from '@rocket.chat/core-typings'; -import { Rooms, AuditLog, ServerEvents } from '@rocket.chat/models'; +import { Rooms, AuditLog, ServerEvents, getAllowDiskUse } from '@rocket.chat/models'; import { isServerEventsAuditSettingsProps, ajv, ajvQuery } from '@rocket.chat/rest-typings'; import type { PaginatedRequest, PaginatedResult } from '@rocket.chat/rest-typings'; import { convertSubObjectsIntoPaths } from '@rocket.chat/tools'; @@ -180,7 +180,7 @@ API.v1.get( sort: _sort, skip: offset, limit: count, - allowDiskUse: true, + ...getAllowDiskUse(), }, ); diff --git a/apps/meteor/ee/server/lib/engagementDashboard/channels.ts b/apps/meteor/ee/server/lib/engagementDashboard/channels.ts index a71d7c99b21df..85df2e3589bc3 100644 --- a/apps/meteor/ee/server/lib/engagementDashboard/channels.ts +++ b/apps/meteor/ee/server/lib/engagementDashboard/channels.ts @@ -44,7 +44,7 @@ export const findChannelsWithNumberOfMessages = async ({ startOfLastWeek: convertDateToInt(startOfLastWeek), endOfLastWeek: convertDateToInt(endOfLastWeek), options, - }).toArray(); + }); // The aggregation result may be undefined if there are no matching analytics or corresponding rooms in the period if (!aggregationResult.length) { diff --git a/apps/meteor/ee/server/models/raw/Users.ts b/apps/meteor/ee/server/models/raw/Users.ts index 0912a3c80b03d..a572f11c706b7 100644 --- a/apps/meteor/ee/server/models/raw/Users.ts +++ b/apps/meteor/ee/server/models/raw/Users.ts @@ -1,5 +1,5 @@ import type { RocketChatRecordDeleted, IUser, AvailableAgentsAggregation } from '@rocket.chat/core-typings'; -import { queryStatusAgentOnline, UsersRaw } from '@rocket.chat/models'; +import { queryStatusAgentOnline, UsersRaw, getAllowDiskUse } from '@rocket.chat/models'; import type { Db, Collection, Filter } from 'mongodb'; import { readSecondaryPreferred } from '../../../../server/database/readSecondaryPreferred'; @@ -30,19 +30,22 @@ export class UsersEE extends UsersRaw { { $lookup: { from: 'rocketchat_livechat_department_agents', - let: { userId: '$_id' }, - pipeline: [ - { - $match: { - $expr: { - $and: [{ $eq: ['$$userId', '$agentId'] }, { $eq: ['$departmentId', departmentId] }], - }, - }, - }, - ], + localField: '_id', + foreignField: 'agentId', as: 'department', }, }, + { + $addFields: { + department: { + $filter: { + input: '$department', + as: 'dept', + cond: { $eq: ['$$dept.departmentId', departmentId] }, + }, + }, + }, + }, { $match: { department: { $size: 1 } }, }, @@ -61,10 +64,22 @@ export class UsersEE extends UsersRaw { from: 'rocketchat_subscription', localField: '_id', foreignField: 'u._id', - pipeline: [{ $match: { $and: [{ t: 'l' }, { open: true }, { onHold: { $ne: true } }] } }], as: 'subs', }, }, + { + $addFields: { + subs: { + $filter: { + input: '$subs', + as: 'sub', + cond: { + $and: [{ $eq: ['$$sub.t', 'l'] }, { $eq: ['$$sub.open', true] }, { $ne: ['$$sub.onHold', true] }], + }, + }, + }, + }, + }, { $project: { 'agentId': '$_id', @@ -93,7 +108,7 @@ export class UsersEE extends UsersRaw { ...(customFilter ? [customFilter] : []), { $project: { username: 1 } }, ], - { allowDiskUse: true, readPreference: readSecondaryPreferred() }, + { ...getAllowDiskUse(), readPreference: readSecondaryPreferred() }, ) .toArray(); } diff --git a/apps/meteor/ee/server/patches/verifyContactChannel.ts b/apps/meteor/ee/server/patches/verifyContactChannel.ts index 8e7a2c05cf840..567a5ef4c8b80 100644 --- a/apps/meteor/ee/server/patches/verifyContactChannel.ts +++ b/apps/meteor/ee/server/patches/verifyContactChannel.ts @@ -6,7 +6,7 @@ import { LivechatContacts, LivechatInquiry, LivechatRooms } from '@rocket.chat/m import { QueueManager } from '../../../app/livechat/server/lib/QueueManager'; import { mergeContacts } from '../../../app/livechat/server/lib/contacts/mergeContacts'; import { verifyContactChannel } from '../../../app/livechat/server/lib/contacts/verifyContactChannel'; -import { client, shouldRetryTransaction } from '../../../server/database/utils'; +import { client, shouldRetryTransaction, transactionOptions } from '../../../server/database/utils'; import { contactLogger as logger } from '../../app/livechat-enterprise/server/lib/logger'; type VerifyContactChannelParams = { @@ -26,7 +26,7 @@ async function _verifyContactChannel( const session = client.startSession(); try { - session.startTransaction(); + session.startTransaction(transactionOptions); logger.debug({ msg: 'Start verifying contact channel', contactId, visitorId, roomId }); const updater = LivechatContacts.getUpdater(); diff --git a/apps/meteor/packages/rocketchat-mongo-config/server/index.js b/apps/meteor/packages/rocketchat-mongo-config/server/index.js index 721ae66e77a9d..357c8bb24c74e 100644 --- a/apps/meteor/packages/rocketchat-mongo-config/server/index.js +++ b/apps/meteor/packages/rocketchat-mongo-config/server/index.js @@ -3,6 +3,42 @@ import { PassThrough } from 'stream'; import { Email } from 'meteor/email'; import { Mongo } from 'meteor/mongo'; +import { MongoInternals } from 'meteor/mongo'; + +// DocumentDB only supports one index build at a time per collection. +// Serialize createIndexAsync calls at the MongoConnection level so Meteor +// packages (accounts-base, accounts-password, accounts-oauth) don't race. +// This package loads before accounts-base (position 53 vs 56 in .meteor/packages). +// +// NOTE: Meteor bundles its own copy of the mongodb driver (npm-mongo/node_modules/mongodb) +// separate from the app's node_modules/mongodb. Patching Collection.prototype from +// `import { Collection } from 'mongodb'` only patches the app's copy and does NOT +// affect Meteor's internal calls. That's why we patch MongoConnection here instead. +// The shared queue (via globalThis) is also used by @rocket.chat/models patchIndex.ts +// to serialize BaseRaw.createIndexes() calls through the app's mongodb driver copy. +if (process.env.DOCUMENTDB === 'true') { + const QUEUE_KEY = Symbol.for('rocketchat.documentdb.index.queues'); + if (!globalThis[QUEUE_KEY]) { + globalThis[QUEUE_KEY] = new Map(); + } + const queues = globalThis[QUEUE_KEY]; + + const enqueue = (collectionName, fn) => { + const prev = queues.get(collectionName) || Promise.resolve(); + const next = prev.then(fn, fn); + queues.set(collectionName, next.catch(() => {})); + return next; + }; + + const mongo = MongoInternals.defaultRemoteCollectionDriver().mongo; + const originalCreateIndex = mongo.createIndexAsync.bind(mongo); + + mongo.createIndexAsync = async function (collectionName, index, options) { + return enqueue(collectionName, () => originalCreateIndex(collectionName, index, options)); + }; + mongo.ensureIndexAsync = mongo.createIndexAsync; + mongo.createIndex = mongo.createIndexAsync; +} // we always want Meteor to disable oplog tailing Package['disable-oplog'] = {}; diff --git a/apps/meteor/server/database/utils.ts b/apps/meteor/server/database/utils.ts index e4b95085408cc..da57cab51559d 100644 --- a/apps/meteor/server/database/utils.ts +++ b/apps/meteor/server/database/utils.ts @@ -1,12 +1,22 @@ import type { OffCallbackHandler } from '@rocket.chat/emitter'; import { Emitter } from '@rocket.chat/emitter'; import { MongoInternals } from 'meteor/mongo'; -import type { ClientSession, MongoError } from 'mongodb'; +import { ReadPreference } from 'mongodb'; +import type { ClientSession, MongoError, TransactionOptions } from 'mongodb'; import { SystemLogger } from '../lib/logger/system'; export const { db, client } = MongoInternals.defaultRemoteCollectionDriver().mongo; +// MongoDB and DocumentDB require read preference `primary` inside transactions. +// When the client is configured with e.g. `secondaryPreferred` (common in +// read-heavy deployments and in the CI connection to DocumentDB), transactions +// must explicitly override the read preference or they will fail with +// "Read preference in a transaction must be primary". +export const transactionOptions: TransactionOptions = { + readPreference: ReadPreference.primary, +}; + /** * In MongoDB, errors like UnknownTransactionCommitResult and TransientTransactionError occur primarily in the context of distributed transactions * and are often due to temporary network issues, server failures, or timeouts. Here’s what each error means and some common causes: @@ -71,7 +81,7 @@ export const wrapInSessionTransaction = const dispatch = (session: ClientSession) => ee.emit('success', session); try { - extendedSession.startTransaction(); + extendedSession.startTransaction(transactionOptions); extendedSession.once('ended', dispatch); const result = await curriedCallback(extendedSession).apply(this, args); diff --git a/apps/meteor/server/lib/findUsersOfRoomOrderedByRole.ts b/apps/meteor/server/lib/findUsersOfRoomOrderedByRole.ts index e03544d3a5a03..48390c3f82a8e 100644 --- a/apps/meteor/server/lib/findUsersOfRoomOrderedByRole.ts +++ b/apps/meteor/server/lib/findUsersOfRoomOrderedByRole.ts @@ -1,5 +1,5 @@ import { type IUser, ROOM_ROLE_PRIORITY_MAP, type ISubscription } from '@rocket.chat/core-typings'; -import { Subscriptions, Users } from '@rocket.chat/models'; +import { Subscriptions, Users, getAllowDiskUse } from '@rocket.chat/models'; import { escapeRegExp } from '@rocket.chat/string-helpers'; import type { Document, FilterOperators } from 'mongodb'; @@ -92,18 +92,20 @@ export async function findUsersOfRoomOrderedByRole({ { $lookup: { from: Subscriptions.getCollectionName(), + localField: '_id', + foreignField: 'u._id', as: 'subscription', - let: { userId: '$_id', roomId: rid }, - pipeline: [ - { - $match: { - $expr: { - $and: [{ $eq: ['$rid', '$$roomId'] }, { $eq: ['$u._id', '$$userId'] }], - }, - }, + }, + }, + { + $addFields: { + subscription: { + $filter: { + input: '$subscription', + as: 'sub', + cond: { $eq: ['$$sub.rid', rid] }, }, - { $project: { roles: 1, status: 1, ts: 1 } }, - ], + }, }, }, { @@ -123,9 +125,7 @@ export async function findUsersOfRoomOrderedByRole({ }, }, ], - { - allowDiskUse: true, - }, + getAllowDiskUse(), ); const [members, totalCount] = await Promise.all([membersResult.toArray(), Users.countDocuments(matchUserFilter)]); diff --git a/apps/meteor/server/lib/migrations.ts b/apps/meteor/server/lib/migrations.ts index fd8c1a468bab4..5721f8efc3cba 100644 --- a/apps/meteor/server/lib/migrations.ts +++ b/apps/meteor/server/lib/migrations.ts @@ -19,7 +19,7 @@ const migrations = new Set(); // sets the control record function setControl(control: Pick): Pick { - void Migrations.updateMany( + Migrations.updateMany( { _id: 'control', }, @@ -32,7 +32,17 @@ function setControl(control: Pick): Pick { + // E11000 on a concurrent upsert of the same _id is safe to ignore β€” another + // caller won the race and inserted the document; the $set will apply on the + // next call. Without this catch, the rejected promise from the fire-and-forget + // updateMany becomes an unhandled rejection and crashes the process when + // EXIT_UNHANDLEDPROMISEREJECTION is set (CI). + if (e?.code === 11000) { + return; + } + log.error({ msg: 'Failed to set migration control', err: e }); + }); return control; } diff --git a/apps/meteor/server/services/team/service.ts b/apps/meteor/server/services/team/service.ts index 9c3c7c8b746fa..848dff050b629 100644 --- a/apps/meteor/server/services/team/service.ts +++ b/apps/meteor/server/services/team/service.ts @@ -1122,7 +1122,7 @@ export class TeamService extends ServiceClassInternal implements ITeamService { } const [{ totalCount: [{ count: total }] = [], paginatedResults: data = [] }] = - (await Rooms.findChildrenOfTeam(team._id, mainRoom._id, userId, filter, type, { skip, limit, sort }).toArray()) || []; + (await Rooms.findChildrenOfTeam(team._id, mainRoom._id, userId, filter, type, { skip, limit, sort })) || []; return { total, diff --git a/docker-compose-ci.yml b/docker-compose-ci.yml index e5f71d5937c54..4a700373e19c4 100644 --- a/docker-compose-ci.yml +++ b/docker-compose-ci.yml @@ -16,7 +16,7 @@ services: - TEST_MODE=true - DEBUG=${DEBUG:-} - EXIT_UNHANDLEDPROMISEREJECTION=true - - MONGO_URL=mongodb://mongo:27017/rocketchat?replicaSet=rs0 + - MONGO_URL=${DOCKER_MONGO_URL:-mongodb://mongo:27017/rocketchat?replicaSet=rs0} - 'MONGO_OPLOG_URL=${MONGO_OPLOG_URL:-}' - 'TRANSPORTER=${TRANSPORTER:-}' - MOLECULER_LOG_LEVEL=info @@ -28,9 +28,13 @@ services: - Federation_Service_Enabled=true - 'Federation_Service_Domain=rc.host' - HEAP_USAGE_PERCENT=99 + - 'DOCUMENTDB=${DOCUMENTDB:-}' depends_on: - - traefik - - mongo + traefik: + condition: service_started + mongo: + condition: service_started + required: false labels: traefik.enable: true traefik.http.services.rocketchat.loadbalancer.server.port: 3000 @@ -41,7 +45,10 @@ services: interval: 2s timeout: 5s retries: 5 - start_period: 60s + # Startup against an external DocumentDB cluster is significantly slower + # than the local Mongo container due to network latency on every + # initial setup query (indexes, migrations, settings load). + start_period: 180s test: wget --no-verbose --tries=1 --spider http://127.0.0.1:3000/livez || exit 1 authorization-service: @@ -56,7 +63,7 @@ services: SERVICE: authorization-service image: ghcr.io/${LOWERCASE_REPOSITORY}/authorization-service:${DOCKER_TAG} environment: - - MONGO_URL=mongodb://mongo:27017/rocketchat?replicaSet=rs0 + - MONGO_URL=${DOCKER_MONGO_URL:-mongodb://mongo:27017/rocketchat?replicaSet=rs0} - 'TRANSPORTER=${TRANSPORTER:-}' - MOLECULER_LOG_LEVEL=info extra_hosts: @@ -76,7 +83,7 @@ services: SERVICE: account-service image: ghcr.io/${LOWERCASE_REPOSITORY}/account-service:${DOCKER_TAG} environment: - - MONGO_URL=mongodb://mongo:27017/rocketchat?replicaSet=rs0 + - MONGO_URL=${DOCKER_MONGO_URL:-mongodb://mongo:27017/rocketchat?replicaSet=rs0} - 'TRANSPORTER=${TRANSPORTER:-}' - MOLECULER_LOG_LEVEL=info extra_hosts: @@ -96,7 +103,7 @@ services: SERVICE: presence-service image: ghcr.io/${LOWERCASE_REPOSITORY}/presence-service:${DOCKER_TAG} environment: - - MONGO_URL=mongodb://mongo:27017/rocketchat?replicaSet=rs0 + - MONGO_URL=${DOCKER_MONGO_URL:-mongodb://mongo:27017/rocketchat?replicaSet=rs0} - 'TRANSPORTER=${TRANSPORTER:-}' - MOLECULER_LOG_LEVEL=info extra_hosts: @@ -116,7 +123,7 @@ services: SERVICE: ddp-streamer image: ghcr.io/${LOWERCASE_REPOSITORY}/ddp-streamer-service:${DOCKER_TAG} environment: - - MONGO_URL=mongodb://mongo:27017/rocketchat?replicaSet=rs0 + - MONGO_URL=${DOCKER_MONGO_URL:-mongodb://mongo:27017/rocketchat?replicaSet=rs0} - 'TRANSPORTER=${TRANSPORTER:-}' - MOLECULER_LOG_LEVEL=info extra_hosts: @@ -142,7 +149,7 @@ services: SERVICE: queue-worker image: ghcr.io/${LOWERCASE_REPOSITORY}/queue-worker-service:${DOCKER_TAG} environment: - - MONGO_URL=mongodb://mongo:27017/rocketchat?replicaSet=rs0 + - MONGO_URL=${DOCKER_MONGO_URL:-mongodb://mongo:27017/rocketchat?replicaSet=rs0} - 'TRANSPORTER=${TRANSPORTER:-}' - MOLECULER_LOG_LEVEL=info extra_hosts: @@ -163,7 +170,7 @@ services: image: ghcr.io/${LOWERCASE_REPOSITORY}/omnichannel-transcript-service:${DOCKER_TAG} environment: - TEST_MODE=true - - MONGO_URL=mongodb://mongo:27017/rocketchat?replicaSet=rs0 + - MONGO_URL=${DOCKER_MONGO_URL:-mongodb://mongo:27017/rocketchat?replicaSet=rs0} - 'TRANSPORTER=${TRANSPORTER:-}' - MOLECULER_LOG_LEVEL=info extra_hosts: diff --git a/docs/documentdb-compatibility.md b/docs/documentdb-compatibility.md new file mode 100644 index 0000000000000..e1054e34d850e --- /dev/null +++ b/docs/documentdb-compatibility.md @@ -0,0 +1,92 @@ +# Amazon DocumentDB Compatibility + +This document describes the changes made to ensure compatibility with Amazon DocumentDB, which has several differences from MongoDB in terms of supported operators and features. + +## Environment Variable + +Set the `DOCUMENTDB` environment variable to `true` to enable DocumentDB compatibility mode: + +```bash +DOCUMENTDB=true +``` + +When enabled, this flag adjusts query behavior to avoid unsupported features in Amazon DocumentDB. + +## Changes Overview + +### 1. `allowDiskUse` option removal + +**Problem:** Amazon DocumentDB does not support the `allowDiskUse` option for the `find` command. For aggregation pipelines, DocumentDB uses sort merge by default, making `allowDiskUse` unnecessary. + +**Solution:** The `getAllowDiskUse()` utility function (in `packages/models/src/allowDiskUse.ts`) returns `{ allowDiskUse: true }` when running on MongoDB and `{}` (empty object, effectively omitting the option) when `DOCUMENTDB=true`. Usage is via spread: `{ ...getAllowDiskUse(), ...otherOptions }` or passed directly as the options argument: `aggregate(pipeline, getAllowDiskUse())`. + +**Affected files:** +- `packages/models/src/models/Analytics.ts` +- `packages/models/src/models/LivechatAgentActivity.ts` +- `packages/models/src/models/LivechatContacts.ts` +- `packages/models/src/models/LivechatRooms.ts` +- `packages/models/src/models/Messages.ts` +- `packages/models/src/models/ModerationReports.ts` +- `packages/models/src/models/Rooms.ts` +- `packages/models/src/models/Sessions.ts` +- `apps/meteor/app/api/server/lib/users.ts` +- `apps/meteor/ee/server/api/abac/index.ts` +- `apps/meteor/ee/server/api/audit.ts` +- `apps/meteor/ee/server/models/raw/Users.ts` +- `apps/meteor/server/lib/findUsersOfRoomOrderedByRole.ts` + +**Reference:** [Amazon DocumentDB - allowDiskUse](https://docs.aws.amazon.com/documentdb/latest/developerguide/how-it-works.html) + +### 2. `$lookup` with `let` / pipeline subqueries + +**Problem:** Amazon DocumentDB does not support `$lookup` stages that use `let` and `pipeline` (correlated subqueries). Only the basic `$lookup` form (`localField` / `foreignField`) is supported. + +**Solution:** Replaced pipeline-based `$lookup` stages with the basic `localField` / `foreignField` form, followed by additional pipeline stages (`$unwind`, `$match`, `$project`) to achieve equivalent results. + +**Affected files:** +- `packages/models/src/models/LivechatRooms.ts` +- `packages/models/src/models/Rooms.ts` (findChildrenOfTeam) +- `packages/models/src/models/Subscriptions.ts` (findConnectedUsersExcept) +- `packages/models/src/models/Users.ts` (getNextLeastBusyAgent, getLastAvailableAgentRouted) +- `apps/meteor/ee/server/models/raw/Users.ts` (getUnavailableAgents) +- `apps/meteor/server/lib/findUsersOfRoomOrderedByRole.ts` +- `packages/models/src/models/LivechatDepartment.ts` + +### 3. `$facet` stage replacement + +**Problem:** Amazon DocumentDB has limited support for `$facet`. Certain operators within `$facet` sub-pipelines may not work as expected, and `$facet` can cause performance issues due to the lack of index usage within sub-pipelines. + +**Solution:** Replaced `$facet` stages with parallel aggregation calls (using `Promise.all`) β€” one for the data query and one for the count query. This approach also enables better index utilization. + +**Affected files:** +- `packages/models/src/models/Analytics.ts` +- `packages/models/src/models/Rooms.ts` (findChildrenOfTeam) +- `packages/models/src/models/Sessions.ts` +- `packages/models/src/models/LivechatRooms.ts` (getQueueMetrics) +- `packages/models/src/models/LivechatBusinessHours.ts` (findHoursToScheduleJobs) +- `apps/meteor/app/api/server/lib/users.ts` (users.list endpoint) +- `apps/meteor/ee/server/models/raw/Users.ts` (findAgentsWithDepartments) + +### 4. `$$REMOVE` system variable replacement + +**Problem:** Amazon DocumentDB does not support the `$$REMOVE` system variable, which is used in `$cond` / `$ifNull` expressions to conditionally remove fields from documents. + +**Solution:** Replaced `$$REMOVE` usage with a two-step approach: +1. Set the field to a sentinel value (e.g., `null` or omit it) in the `$project` / `$addFields` stage. +2. Use a subsequent `$unset` or `$project` stage to remove the field when not needed. + +Alternatively, restructured the pipeline to avoid the conditional field removal entirely. + +**Affected files:** +- `packages/models/src/models/Sessions.ts` (listUsers) +- `packages/models/src/models/LivechatRooms.ts` (findAvailableSources) +- `packages/models/src/models/LivechatDepartment.ts` (getBusinessHoursWithDepartmentStatuses) + +### 5. Pipeline-based `$lookup` in `LivechatDepartment.ts` + +**Problem:** A complex `$lookup` with pipeline was used to join and filter department data with business hours. + +**Solution:** Replaced with basic `$lookup` using `localField` / `foreignField`, followed by `$unwind` and `$match` stages to filter the joined data. + +**Affected files:** +- `packages/models/src/models/LivechatDepartment.ts` diff --git a/packages/model-typings/src/models/IAnalyticsModel.ts b/packages/model-typings/src/models/IAnalyticsModel.ts index 1baec42d0c585..c6e91eb147aea 100644 --- a/packages/model-typings/src/models/IAnalyticsModel.ts +++ b/packages/model-typings/src/models/IAnalyticsModel.ts @@ -46,5 +46,5 @@ export interface IAnalyticsModel extends IBaseModel { startOfLastWeek: number; endOfLastWeek: number; options?: any; - }): AggregationCursor<{ channels: IChannelsWithNumberOfMessagesBetweenDate[]; total: number }>; + }): Promise<{ channels: IChannelsWithNumberOfMessagesBetweenDate[]; total: number }[]>; } diff --git a/packages/model-typings/src/models/IRoomsModel.ts b/packages/model-typings/src/models/IRoomsModel.ts index 736ccc811c403..074482f706648 100644 --- a/packages/model-typings/src/models/IRoomsModel.ts +++ b/packages/model-typings/src/models/IRoomsModel.ts @@ -325,7 +325,7 @@ export interface IRoomsModel extends IBaseModel { filter?: string, type?: 'channels' | 'discussions', options?: FindOptions, - ): AggregationCursor<{ totalCount: { count: number }[]; paginatedResults: IRoom[] }>; + ): Promise<{ totalCount: { count: number }[]; paginatedResults: IRoom[] }[]>; resetRoomKeyAndSetE2EEQueueByRoomId( roomId: string, e2eKeyId: string, diff --git a/packages/models/src/allowDiskUse.ts b/packages/models/src/allowDiskUse.ts new file mode 100644 index 0000000000000..055a20fc86180 --- /dev/null +++ b/packages/models/src/allowDiskUse.ts @@ -0,0 +1,17 @@ +/** + * Returns an object with the `allowDiskUse` option for MongoDB aggregation/find operations. + * + * Amazon DocumentDB does not support `allowDiskUse` for `find` commands and uses + * sort merge by default for aggregations when `allowDiskUse` is not specified. + * When the `DOCUMENTDB` environment variable is set to 'true', this function + * returns an empty object so the option is omitted from queries. + * + * @see https://docs.aws.amazon.com/documentdb/latest/developerguide/how-it-works.html + */ +export function getAllowDiskUse(): { allowDiskUse: true } | Record { + if (process.env.DOCUMENTDB === 'true') { + return {}; + } + + return { allowDiskUse: true }; +} diff --git a/packages/models/src/filterIndexes.ts b/packages/models/src/filterIndexes.ts new file mode 100644 index 0000000000000..5f46176908498 --- /dev/null +++ b/packages/models/src/filterIndexes.ts @@ -0,0 +1,74 @@ +import type { IndexDescription } from 'mongodb'; + +/** + * Operators that DocumentDB does not support inside `partialFilterExpression`. + * DocumentDB only allows simple comparison operators ($eq, $gt, $gte, $lt, $lte) + * in partial filter expressions. Indexes using $exists, $type, $regex, $or, etc. + * fail with "Bad query specified" at index creation time. + * + * @see https://docs.aws.amazon.com/documentdb/latest/developerguide/functional-differences.html + */ +const UNSUPPORTED_PARTIAL_FILTER_OPERATORS = ['$exists', '$type', '$regex', '$or', '$and', '$not', '$nor', '$in', '$nin']; + +const containsUnsupportedOperator = (expr: unknown): boolean => { + if (!expr || typeof expr !== 'object') { + return false; + } + + for (const [key, value] of Object.entries(expr)) { + if (UNSUPPORTED_PARTIAL_FILTER_OPERATORS.includes(key)) { + return true; + } + if (typeof value === 'object' && value !== null && containsUnsupportedOperator(value)) { + return true; + } + } + return false; +}; + +const isTextIndex = (index: IndexDescription): boolean => { + const key = index.key as Record | undefined; + if (!key) return false; + return Object.values(key).some((v) => v === 'text'); +}; + +const isUnsupportedPartialIndex = (index: IndexDescription): boolean => { + if (!index.partialFilterExpression) return false; + return containsUnsupportedOperator(index.partialFilterExpression); +}; + +/** + * Filters out indexes that DocumentDB cannot create. + * + * When `DOCUMENTDB=true`, removes: + * - Text indexes (`{ field: 'text' }`) β€” DocumentDB has no native text search + * - Partial indexes with operators outside the supported subset + * (`$exists`, `$type`, `$regex`, etc.) + * + * Trade-off: the affected queries fall back to collection scans on DocumentDB, + * which may be slower. Functionality is preserved. + */ +export function filterIndexesForDocumentDB(indexes: IndexDescription[], collectionName: string): IndexDescription[] { + if (process.env.DOCUMENTDB !== 'true') { + return indexes; + } + + const skipped: string[] = []; + const filtered = indexes.filter((index) => { + if (isTextIndex(index)) { + skipped.push(`text index on ${JSON.stringify(index.key)}`); + return false; + } + if (isUnsupportedPartialIndex(index)) { + skipped.push(`partial index on ${JSON.stringify(index.key)} (unsupported operator in partialFilterExpression)`); + return false; + } + return true; + }); + + if (skipped.length) { + console.warn(`[DocumentDB] Skipping ${skipped.length} unsupported index(es) on '${collectionName}':\n\t${skipped.join('\n\t')}`); + } + + return filtered; +} diff --git a/packages/models/src/index.ts b/packages/models/src/index.ts index 4805f237ef585..005d8c9a1027d 100644 --- a/packages/models/src/index.ts +++ b/packages/models/src/index.ts @@ -1,3 +1,4 @@ +import './patchIndex'; import type { ILivechatDepartmentAgents, ILivechatInquiryRecord, ISubscription, RocketChatRecordDeleted } from '@rocket.chat/core-typings'; import type { IAnalyticsModel, @@ -122,6 +123,7 @@ export * from './helpers'; export { registerModel } from './proxify'; export { type Updater, UpdaterImpl } from './updater'; +export { getAllowDiskUse } from './allowDiskUse'; export const Apps = proxify('IAppsModel'); export const AppsPersistence = proxify('IAppsPersistenceModel'); diff --git a/packages/models/src/models/Analytics.ts b/packages/models/src/models/Analytics.ts index a26cb50500fd3..f7a9679ef30de 100644 --- a/packages/models/src/models/Analytics.ts +++ b/packages/models/src/models/Analytics.ts @@ -4,6 +4,7 @@ import { Random } from '@rocket.chat/random'; import type { AggregationCursor, FindCursor, Db, IndexDescription, FindOptions, UpdateResult, Document, Collection } from 'mongodb'; import { BaseRaw } from './BaseRaw'; +import { getAllowDiskUse } from '../allowDiskUse'; import { readSecondaryPreferred } from '../readSecondaryPreferred'; export class AnalyticsRaw extends BaseRaw implements IAnalyticsModel { @@ -286,46 +287,34 @@ export class AnalyticsRaw extends BaseRaw implements IAnalyticsModel if (options?.count) { sortAndPaginationParams.push({ $limit: options.count }); } - const facet = { - $facet: { - channels: [...sortAndPaginationParams], - total: [{ $count: 'total' }], - }, - }; - const totalUnwind = { $unwind: '$total' }; - const totalProject = { - $project: { - channels: '$channels', - total: '$total.total', - }, - }; - const params: Exclude['aggregate']>[0], undefined> = [ typeAndDateMatch, roomsGroup, lookup, roomsUnwind, project, - facet, - totalUnwind, - totalProject, ]; - return params; + return { baseParams: params, sortAndPaginationParams }; } - findRoomsByTypesWithNumberOfMessagesBetweenDate(params: { + async findRoomsByTypesWithNumberOfMessagesBetweenDate(params: { types: Array; start: number; end: number; startOfLastWeek: number; endOfLastWeek: number; options?: any; - }): AggregationCursor<{ channels: IChannelsWithNumberOfMessagesBetweenDate[]; total: number }> { - const aggregationParams = this.getRoomsWithNumberOfMessagesBetweenDateQuery(params); - return this.col.aggregate<{ channels: IChannelsWithNumberOfMessagesBetweenDate[]; total: number }>(aggregationParams, { - allowDiskUse: true, - readPreference: readSecondaryPreferred(), - }); + }): Promise<{ channels: IChannelsWithNumberOfMessagesBetweenDate[]; total: number }[]> { + const { baseParams, sortAndPaginationParams } = this.getRoomsWithNumberOfMessagesBetweenDateQuery(params); + const aggregateOptions = { ...getAllowDiskUse(), readPreference: readSecondaryPreferred() }; + + const [channels, countResult] = await Promise.all([ + this.col.aggregate([...baseParams, ...sortAndPaginationParams], aggregateOptions).toArray(), + this.col.aggregate<{ total: number }>([...baseParams, { $count: 'total' }], aggregateOptions).toArray(), + ]); + + const total = countResult[0]?.total || 0; + return [{ channels, total }]; } } diff --git a/packages/models/src/models/BaseRaw.ts b/packages/models/src/models/BaseRaw.ts index 3c389e9cad9c1..015b0eaf01ba1 100644 --- a/packages/models/src/models/BaseRaw.ts +++ b/packages/models/src/models/BaseRaw.ts @@ -30,6 +30,7 @@ import type { } from 'mongodb'; import { getCollectionName, UpdaterImpl } from '..'; +import { filterIndexesForDocumentDB } from '../filterIndexes'; import type { Updater } from '../updater'; import { setUpdatedAt } from './setUpdatedAt'; @@ -89,9 +90,15 @@ export abstract class BaseRaw< private pendingIndexes: Promise | undefined; public async createIndexes() { - const indexes = this.modelIndexes(); + const allIndexes = this.modelIndexes(); + + if (allIndexes?.length) { + const indexes = filterIndexesForDocumentDB(allIndexes, this.collectionName); + + if (!indexes.length) { + return; + } - if (indexes?.length) { if (this.pendingIndexes) { await this.pendingIndexes; } diff --git a/packages/models/src/models/LivechatAgentActivity.ts b/packages/models/src/models/LivechatAgentActivity.ts index cb0bc8172b339..f9c425835005b 100644 --- a/packages/models/src/models/LivechatAgentActivity.ts +++ b/packages/models/src/models/LivechatAgentActivity.ts @@ -4,6 +4,7 @@ import { parseISO, format } from 'date-fns'; import type { AggregationCursor, Collection, Document, FindCursor, Db, WithId, IndexDescription, UpdateResult } from 'mongodb'; import { BaseRaw } from './BaseRaw'; +import { getAllowDiskUse } from '../allowDiskUse'; import { readSecondaryPreferred } from '../readSecondaryPreferred'; export class LivechatAgentActivityRaw extends BaseRaw implements ILivechatAgentActivityModel { @@ -233,6 +234,6 @@ export class LivechatAgentActivityRaw extends BaseRaw im if (options.count) { params.push({ $limit: options.count }); } - return this.col.aggregate(params, { allowDiskUse: true, readPreference: readSecondaryPreferred() }); + return this.col.aggregate(params, { ...getAllowDiskUse(), readPreference: readSecondaryPreferred() }); } } diff --git a/packages/models/src/models/LivechatBusinessHours.ts b/packages/models/src/models/LivechatBusinessHours.ts index 39d076cbd1951..10eb9f18a0b10 100644 --- a/packages/models/src/models/LivechatBusinessHours.ts +++ b/packages/models/src/models/LivechatBusinessHours.ts @@ -79,53 +79,42 @@ export class LivechatBusinessHoursRaw extends BaseRaw imp }); } - findHoursToScheduleJobs(): Promise { - return this.col - .aggregate([ - { - $facet: { - start: [ - { $match: { active: true } }, - { $project: { _id: 0, workHours: 1 } }, - { $unwind: { path: '$workHours' } }, - { $match: { 'workHours.open': true } }, - { - $group: { - _id: { day: '$workHours.start.cron.dayOfWeek' }, - times: { $addToSet: '$workHours.start.cron.time' }, - }, - }, - { - $project: { - _id: 0, - day: '$_id.day', - times: 1, - }, - }, - ], - finish: [ - { $match: { active: true } }, - { $project: { _id: 0, workHours: 1 } }, - { $unwind: { path: '$workHours' } }, - { $match: { 'workHours.open': true } }, - { - $group: { - _id: { day: '$workHours.finish.cron.dayOfWeek' }, - times: { $addToSet: '$workHours.finish.cron.time' }, - }, - }, - { - $project: { - _id: 0, - day: '$_id.day', - times: 1, - }, - }, - ], + async findHoursToScheduleJobs(): Promise { + const basePipeline = [ + { $match: { active: true } }, + { $project: { _id: 0, workHours: 1 } }, + { $unwind: { path: '$workHours' } }, + { $match: { 'workHours.open': true } }, + ]; + + const [start, finish] = await Promise.all([ + this.col + .aggregate([ + ...basePipeline, + { + $group: { + _id: { day: '$workHours.start.cron.dayOfWeek' }, + times: { $addToSet: '$workHours.start.cron.time' }, + }, }, - }, - ]) - .toArray() as any; + { $project: { _id: 0, day: '$_id.day', times: 1 } }, + ]) + .toArray(), + this.col + .aggregate([ + ...basePipeline, + { + $group: { + _id: { day: '$workHours.finish.cron.dayOfWeek' }, + times: { $addToSet: '$workHours.finish.cron.time' }, + }, + }, + { $project: { _id: 0, day: '$_id.day', times: 1 } }, + ]) + .toArray(), + ]); + + return [{ start, finish }] as any; } async findActiveBusinessHoursToOpen( diff --git a/packages/models/src/models/LivechatContacts.ts b/packages/models/src/models/LivechatContacts.ts index a48f25bc1df5e..7c8894e9a4ca3 100644 --- a/packages/models/src/models/LivechatContacts.ts +++ b/packages/models/src/models/LivechatContacts.ts @@ -26,6 +26,7 @@ import type { } from 'mongodb'; import { BaseRaw } from './BaseRaw'; +import { getAllowDiskUse } from '../allowDiskUse'; import { readSecondaryPreferred } from '../readSecondaryPreferred'; export class LivechatContactsRaw extends BaseRaw implements ILivechatContactsModel { @@ -181,7 +182,7 @@ export class LivechatContactsRaw extends BaseRaw implements IL return this.findPaginated( { ...match }, { - allowDiskUse: true, + ...getAllowDiskUse(), ...options, }, ); @@ -451,7 +452,7 @@ export class LivechatContactsRaw extends BaseRaw implements IL }, }, ], - { allowDiskUse: true, readPreference: readSecondaryPreferred() }, + { ...getAllowDiskUse(), readPreference: readSecondaryPreferred() }, ); } diff --git a/packages/models/src/models/LivechatDepartment.ts b/packages/models/src/models/LivechatDepartment.ts index 60263c72e2416..11d61020641f2 100644 --- a/packages/models/src/models/LivechatDepartment.ts +++ b/packages/models/src/models/LivechatDepartment.ts @@ -384,7 +384,7 @@ export class LivechatDepartmentRaw extends BaseRaw implemen ], }, then: '$_id', - else: '$$REMOVE', + else: null, }, }, }, @@ -395,12 +395,22 @@ export class LivechatDepartmentRaw extends BaseRaw implemen $or: [{ $eq: ['$enabled', false] }, { $eq: ['$archived', true] }], }, then: '$_id', - else: '$$REMOVE', + else: null, }, }, }, }, }, + { + $project: { + validDepartments: { + $filter: { input: '$validDepartments', cond: { $ne: ['$$this', null] } }, + }, + invalidDepartments: { + $filter: { input: '$invalidDepartments', cond: { $ne: ['$$this', null] } }, + }, + }, + }, ]) .toArray(); } @@ -419,13 +429,17 @@ export class LivechatDepartmentRaw extends BaseRaw implemen localField: 'parentId', foreignField: 'unitId', as: 'monitors', - pipeline: [ - { - $match: { - monitorId, - }, + }, + }, + { + $addFields: { + monitors: { + $filter: { + input: '$monitors', + as: 'mon', + cond: { $eq: ['$$mon.monitorId', monitorId] }, }, - ], + }, }, }, { diff --git a/packages/models/src/models/LivechatRooms.ts b/packages/models/src/models/LivechatRooms.ts index 1e027dc15ed16..228b43a7a47ab 100644 --- a/packages/models/src/models/LivechatRooms.ts +++ b/packages/models/src/models/LivechatRooms.ts @@ -32,6 +32,7 @@ import type { import type { Updater } from '../updater'; import { BaseRaw } from './BaseRaw'; +import { getAllowDiskUse } from '../allowDiskUse'; import { readSecondaryPreferred } from '../readSecondaryPreferred'; /** @@ -104,7 +105,7 @@ export class LivechatRoomsRaw extends BaseRaw implements ILive return this.findOne(query); } - getQueueMetrics({ + async getQueueMetrics({ departmentId, agentId, includeOfflineAgents, @@ -124,23 +125,8 @@ export class LivechatRoomsRaw extends BaseRaw implements ILive const departmentsLookup = { $lookup: { from: 'rocketchat_livechat_department', - let: { - deptId: '$departmentId', - }, - pipeline: [ - { - $match: { - $expr: { - $eq: ['$_id', '$$deptId'], - }, - }, - }, - { - $project: { - name: 1, - }, - }, - ], + localField: 'departmentId', + foreignField: '_id', as: 'departments', }, }; @@ -154,31 +140,25 @@ export class LivechatRoomsRaw extends BaseRaw implements ILive const usersLookup = { $lookup: { from: 'users', - let: { - servedById: '$servedBy._id', - }, - pipeline: [ - { - $match: { - $expr: { - $eq: ['$_id', '$$servedById'], - }, - ...(!includeOfflineAgents && { - status: { $ne: 'offline' }, - statusLivechat: 'available', - }), - ...(agentId && { _id: agentId }), - }, - }, - { - $project: { - _id: 1, - username: 1, - status: 1, + localField: 'servedBy._id', + foreignField: '_id', + as: 'user', + }, + }; + const usersFilter = { + $addFields: { + user: { + $filter: { + input: '$user', + as: 'u', + cond: { + $and: [ + ...(!includeOfflineAgents ? [{ $ne: ['$$u.status', 'offline'] }, { $eq: ['$$u.statusLivechat', 'available'] }] : []), + ...(agentId ? [{ $eq: ['$$u._id', agentId] }] : []), + ], }, }, - ], - as: 'user', + }, }, }; const usersUnwind = { @@ -213,7 +193,7 @@ export class LivechatRoomsRaw extends BaseRaw implements ILive chats: 1, }, }; - const firstParams = [match, departmentsLookup, departmentsUnwind, usersLookup, usersUnwind]; + const firstParams = [match, departmentsLookup, departmentsUnwind, usersLookup, usersFilter, usersUnwind]; const sort: Document = { $sort: options.sort || { chats: -1 } }; const pagination = [sort]; @@ -224,16 +204,16 @@ export class LivechatRoomsRaw extends BaseRaw implements ILive pagination.push({ $limit: options.count }); } - const facet = { - $facet: { - sortedResults: pagination, - totalCount: [{ $group: { _id: null, total: { $sum: 1 } } }], - }, - }; + const baseParams = [...firstParams, usersGroup, project]; + const aggregateOptions = { readPreference: readSecondaryPreferred(), ...getAllowDiskUse() }; - const params = [...firstParams, usersGroup, project, facet]; + const [sortedResults, countResult] = await Promise.all([ + this.col.aggregate([...baseParams, ...pagination], aggregateOptions).toArray(), + this.col.aggregate<{ total: number }>([...baseParams, { $count: 'total' }], aggregateOptions).toArray(), + ]); - return this.col.aggregate(params, { readPreference: readSecondaryPreferred(), allowDiskUse: true }).toArray(); + const totalCount = countResult.length ? [{ total: countResult[0].total }] : []; + return [{ sortedResults, totalCount }]; } async findAllNumberOfAbandonedRooms({ @@ -721,7 +701,7 @@ export class LivechatRoomsRaw extends BaseRaw implements ILive if (options.count) { params.push({ $limit: options.count }); } - return this.col.aggregate(params, { allowDiskUse: true, readPreference: readSecondaryPreferred() }).toArray(); + return this.col.aggregate(params, { ...getAllowDiskUse(), readPreference: readSecondaryPreferred() }).toArray(); } countAllOpenChatsBetweenDate({ start, end, departmentId }: { start: Date; end: Date; departmentId?: string }) { @@ -1554,7 +1534,7 @@ export class LivechatRoomsRaw extends BaseRaw implements ILive if: { $eq: ['$source.type', 'app'], }, - then: '$$REMOVE', + then: null, else: { type: '$source.type' }, }, }, @@ -1565,7 +1545,7 @@ export class LivechatRoomsRaw extends BaseRaw implements ILive if: { $eq: ['$source.type', 'app'], }, - else: '$$REMOVE', + else: null, then: { type: '$source.type', id: '$source.id', @@ -1578,6 +1558,12 @@ export class LivechatRoomsRaw extends BaseRaw implements ILive }, }, }, + { + $addFields: { + types: { $filter: { input: '$types', cond: { $ne: ['$$this', null] } } }, + apps: { $filter: { input: '$apps', cond: { $ne: ['$$this', null] } } }, + }, + }, { $project: { _id: 0, @@ -2170,31 +2156,25 @@ export class LivechatRoomsRaw extends BaseRaw implements ILive ...extraMatchers, }, }, - { $addFields: { roomId: '$_id' } }, { $lookup: { from: 'rocketchat_message', - // mongo doesn't like _id as variable name here :( - let: { roomId: '$roomId' }, - pipeline: [ - { - $match: { - $expr: { - $and: [ - { - $eq: ['$$roomId', '$rid'], - }, - { - // this is similar to do { $exists: false } - $lte: ['$t', null], - }, - ...(extraQuery ? [extraQuery] : []), - ], - }, + localField: '_id', + foreignField: 'rid', + as: 'messages', + }, + }, + { + $addFields: { + messages: { + $filter: { + input: '$messages', + as: 'msg', + cond: { + $and: [{ $lte: ['$$msg.t', null] }, ...(extraQuery ? [extraQuery] : [])], }, }, - ], - as: 'messages', + }, }, }, { @@ -2247,32 +2227,25 @@ export class LivechatRoomsRaw extends BaseRaw implements ILive ...(departmentId && departmentId !== 'undefined' && { departmentId }), }, }, - { $addFields: { roomId: '$_id' } }, { $lookup: { from: 'rocketchat_message', - // mongo doesn't like _id as variable name here :( - let: { roomId: '$roomId' }, - pipeline: [ - { - $match: { - $expr: { - $and: [ - { - $eq: ['$$roomId', '$rid'], - }, - { - // this is similar to do { $exists: false } - $lte: ['$t', null], - }, - ], - }, - }, - }, - ], + localField: '_id', + foreignField: 'rid', as: 'messages', }, }, + { + $addFields: { + messages: { + $filter: { + input: '$messages', + as: 'msg', + cond: { $lte: ['$$msg.t', null] }, + }, + }, + }, + }, { $unwind: { path: '$messages', diff --git a/packages/models/src/models/Messages.ts b/packages/models/src/models/Messages.ts index e16cb35fd41ee..16563806c66aa 100644 --- a/packages/models/src/models/Messages.ts +++ b/packages/models/src/models/Messages.ts @@ -30,6 +30,7 @@ import type { } from 'mongodb'; import { BaseRaw } from './BaseRaw'; +import { getAllowDiskUse } from '../allowDiskUse'; import { readSecondaryPreferred } from '../readSecondaryPreferred'; type DeepWritable = T extends (...args: any) => any @@ -247,7 +248,7 @@ export class MessagesRaw extends BaseRaw implements IMessagesModel { params.push({ $limit: options.count }); } return this.col.aggregate<{ _id: string | null; numberOfTransferredRooms: number }>(params, { - allowDiskUse: true, + ...getAllowDiskUse(), readPreference: readSecondaryPreferred(), }); } @@ -327,7 +328,7 @@ export class MessagesRaw extends BaseRaw implements IMessagesModel { if (options.count) { params.push({ $limit: options.count }); } - return this.col.aggregate(params, { allowDiskUse: true, readPreference: readSecondaryPreferred() }).toArray(); + return this.col.aggregate(params, { ...getAllowDiskUse(), readPreference: readSecondaryPreferred() }).toArray(); } findLivechatClosedMessages(rid: IRoom['_id'], searchTerm?: string, options?: FindOptions): FindPaginated> { diff --git a/packages/models/src/models/ModerationReports.ts b/packages/models/src/models/ModerationReports.ts index d700d78cfa5c8..346624da824db 100644 --- a/packages/models/src/models/ModerationReports.ts +++ b/packages/models/src/models/ModerationReports.ts @@ -10,6 +10,7 @@ import type { FindPaginated, IModerationReportsModel, PaginationParams } from '@ import type { AggregationCursor, Collection, Db, Document, FindCursor, FindOptions, IndexDescription, UpdateResult } from 'mongodb'; import { BaseRaw } from './BaseRaw'; +import { getAllowDiskUse } from '../allowDiskUse'; import { readSecondaryPreferred } from '../readSecondaryPreferred'; export class ModerationReportsRaw extends BaseRaw implements IModerationReportsModel { @@ -136,7 +137,7 @@ export class ModerationReportsRaw extends BaseRaw implements }, ]; - return this.col.aggregate(params, { allowDiskUse: true }); + return this.col.aggregate(params, getAllowDiskUse()); } findUserReports( @@ -193,7 +194,7 @@ export class ModerationReportsRaw extends BaseRaw implements }, ]; - return this.col.aggregate(pipeline, { allowDiskUse: true, readPreference: readSecondaryPreferred() }); + return this.col.aggregate(pipeline, { ...getAllowDiskUse(), readPreference: readSecondaryPreferred() }); } async getTotalUniqueReportedUsers(latest: Date, oldest: Date, selector: string, isMessageReports?: boolean): Promise { diff --git a/packages/models/src/models/Rooms.ts b/packages/models/src/models/Rooms.ts index f2a1d07a7e552..f1e97678b150d 100644 --- a/packages/models/src/models/Rooms.ts +++ b/packages/models/src/models/Rooms.ts @@ -30,6 +30,7 @@ import type { import { Subscriptions } from '../index'; import { BaseRaw } from './BaseRaw'; +import { getAllowDiskUse } from '../allowDiskUse'; import { readSecondaryPreferred } from '../readSecondaryPreferred'; import type { Updater } from '../updater'; @@ -605,7 +606,7 @@ export class RoomsRaw extends BaseRaw implements IRoomsModel { }): AggregationCursor { const aggregationParams = this.getChannelsWithNumberOfMessagesBetweenDateQuery(params); return this.col.aggregate(aggregationParams, { - allowDiskUse: true, + ...getAllowDiskUse(), readPreference: readSecondaryPreferred(), }); } @@ -2212,16 +2213,16 @@ export class RoomsRaw extends BaseRaw implements IRoomsModel { return this.updateMany(query, update); } - findChildrenOfTeam( + async findChildrenOfTeam( teamId: string, teamRoomId: string, userId: string, filter?: string, type?: 'channels' | 'discussions', options?: FindOptions, - ): AggregationCursor<{ totalCount: { count: number }[]; paginatedResults: IRoom[] }> { + ): Promise<{ totalCount: { count: number }[]; paginatedResults: IRoom[] }[]> { const nameFilter = filter ? new RegExp(escapeRegExp(filter), 'i') : undefined; - return this.col.aggregate<{ totalCount: { count: number }[]; paginatedResults: IRoom[] }>([ + const baseQuery = [ { $match: { $and: [ @@ -2238,36 +2239,22 @@ export class RoomsRaw extends BaseRaw implements IRoomsModel { { $lookup: { from: 'rocketchat_subscription', - let: { - roomId: '$_id', - }, - pipeline: [ - { - $match: { - $and: [ - { - $expr: { - $eq: ['$rid', '$$roomId'], - }, - }, - { - $expr: { - $eq: ['$u._id', userId], - }, - }, - { - $expr: { - $ne: ['$t', 'c'], - }, - }, - ], + localField: '_id', + foreignField: 'rid', + as: 'subscription', + }, + }, + { + $addFields: { + subscription: { + $filter: { + input: '$subscription', + as: 'sub', + cond: { + $and: [{ $eq: ['$$sub.u._id', userId] }, { $ne: ['$$sub.t', 'c'] }], }, }, - { - $project: { _id: 1 }, - }, - ], - as: 'subscription', + }, }, }, { @@ -2284,13 +2271,14 @@ export class RoomsRaw extends BaseRaw implements IRoomsModel { }, { $project: { subscription: 0 } }, { $sort: options?.sort || { ts: 1 } }, - { - $facet: { - totalCount: [{ $count: 'count' }], - paginatedResults: [{ $skip: options?.skip || 0 }, { $limit: options?.limit || 50 }], - }, - }, + ]; + + const [paginatedResults, countResult] = await Promise.all([ + this.col.aggregate([...baseQuery, { $skip: options?.skip || 0 }, { $limit: options?.limit || 50 }]).toArray(), + this.col.aggregate<{ count: number }>([...baseQuery, { $count: 'count' }]).toArray(), ]); + + return [{ totalCount: countResult, paginatedResults }]; } findAllByTypesAndDiscussionAndTeam( diff --git a/packages/models/src/models/Sessions.ts b/packages/models/src/models/Sessions.ts index 3763bf38b457a..c2adb92ac5b42 100644 --- a/packages/models/src/models/Sessions.ts +++ b/packages/models/src/models/Sessions.ts @@ -12,7 +12,7 @@ import type { RocketChatRecordDeleted, } from '@rocket.chat/core-typings'; import type { ISessionsModel } from '@rocket.chat/model-typings'; -import type { PaginatedResult, WithItemCount } from '@rocket.chat/rest-typings'; +import type { PaginatedResult } from '@rocket.chat/rest-typings'; import type { AggregationCursor, AnyBulkWriteOperation, @@ -30,6 +30,7 @@ import type { import { getCollectionName } from '../index'; import { BaseRaw } from './BaseRaw'; +import { getAllowDiskUse } from '../allowDiskUse'; import { readSecondaryPreferred } from '../readSecondaryPreferred'; type DestructuredDate = { year: number; month: number; day: number }; @@ -124,7 +125,7 @@ const matchBasedOnDate = (start: DestructuredDate, end: DestructuredDate): Filte const getGroupSessionsByHour = ( _id: { range: string; day: string; month: string; year: string } | string, -): { listGroup: object; countGroup: object } => { +): { listGroup: object; filterNulls: object; countGroup: object } => { const isOpenSession = { $not: ['$session.closedAt'] }; const isAfterLoginAt = { $gte: ['$range', { $hour: '$session.loginAt' }] }; const isBeforeClosedAt = { $lte: ['$range', { $hour: '$session.closedAt' }] }; @@ -139,20 +140,26 @@ const getGroupSessionsByHour = ( $or: [{ $and: [isOpenSession, isAfterLoginAt] }, { $and: [isAfterLoginAt, isBeforeClosedAt] }], }, '$session.userId', - '$$REMOVE', + null, ], }, }, }, }; + const filterNulls = { + $addFields: { + usersList: { $filter: { input: '$usersList', cond: { $ne: ['$$this', null] } } }, + }, + }; + const countGroup = { $addFields: { users: { $size: '$usersList' }, }, }; - return { listGroup, countGroup }; + return { listGroup, filterNulls, countGroup }; }; const getSortByFullDate = (): { year: number; month: number; day: number } => ({ @@ -273,7 +280,7 @@ export const aggregates = { devices: ISession['device'][]; _computedAt: string; } - >(pipeline, { allowDiskUse: true }); + >(pipeline, getAllowDiskUse()); }, async getUniqueUsersOfYesterday( @@ -424,7 +431,7 @@ export const aggregates = { }, }, ], - { allowDiskUse: true }, + getAllowDiskUse(), ) .toArray(); }, @@ -574,7 +581,7 @@ export const aggregates = { }, }, ], - { allowDiskUse: true }, + getAllowDiskUse(), ) .toArray(); }, @@ -678,7 +685,7 @@ export const aggregates = { }, }, ], - { allowDiskUse: true }, + getAllowDiskUse(), ) .toArray(); }, @@ -827,26 +834,14 @@ export class SessionsRaw extends BaseRaw implements ISessionsModel { }, }; - const facetOperator = { - $facet: { - docs: [sortOperator, ...skipOperator, limitOperator, ...customSortOp], - count: [ - { - $count: 'total', - }, - ], - }, - }; + const baseQuery = [matchOperator, sortOperator, groupOperator, projectOperator]; - const queryArray = [matchOperator, sortOperator, groupOperator, projectOperator, facetOperator]; - - const [ - { - docs: sessions, - count: [{ total } = { total: 0 }], - }, - ] = await this.col.aggregate>(queryArray).toArray(); + const [sessions, countResult] = await Promise.all([ + this.col.aggregate([...baseQuery, sortOperator, ...skipOperator, limitOperator, ...customSortOp]).toArray(), + this.col.aggregate<{ total: number }>([...baseQuery, { $count: 'total' }]).toArray(), + ]); + const total = countResult[0]?.total || 0; return { sessions, total, count, offset }; } @@ -953,26 +948,25 @@ export class SessionsRaw extends BaseRaw implements ISessionsModel { }, }; - const facetOperator = { - $facet: { - docs: [sortOperator, ...skipOperator, limitOperator, lookupOperator, unwindOperator, projectOperator, ...customSortOp], - count: [ - { - $count: 'total', - }, - ], - }, - }; - - const queryArray = [matchOperator, sortOperator, groupOperator, facetOperator]; - - const [ - { - docs: sessions, - count: [{ total } = { total: 0 }], - }, - ] = await this.col.aggregate>(queryArray).toArray(); - + const baseQuery = [matchOperator, sortOperator, groupOperator]; + + const [sessions, countResult] = await Promise.all([ + this.col + .aggregate([ + ...baseQuery, + sortOperator, + ...skipOperator, + limitOperator, + lookupOperator, + unwindOperator, + projectOperator, + ...customSortOp, + ]) + .toArray(), + this.col.aggregate<{ total: number }>([...baseQuery, { $count: 'total' }]).toArray(), + ]); + + const total = countResult[0]?.total || 0; return { sessions, total, count, offset }; } @@ -1143,7 +1137,7 @@ export class SessionsRaw extends BaseRaw implements ISessionsModel { .aggregate<{ hour: number; users: number; - }>([match, rangeProject, unwind, groups.listGroup, groups.countGroup, presentationProject, sort]) + }>([match, rangeProject, unwind, groups.listGroup, groups.filterNulls, groups.countGroup, presentationProject, sort]) .toArray(); } @@ -1245,7 +1239,7 @@ export class SessionsRaw extends BaseRaw implements ISessionsModel { month: number; year: number; users: number; - }>([match, rangeProject, unwind, groups.listGroup, groups.countGroup, presentationProject, sort]) + }>([match, rangeProject, unwind, groups.listGroup, groups.filterNulls, groups.countGroup, presentationProject, sort]) .toArray(); } diff --git a/packages/models/src/models/Subscriptions.ts b/packages/models/src/models/Subscriptions.ts index 5a7bbdfc7e51a..4ae7334b10adc 100644 --- a/packages/models/src/models/Subscriptions.ts +++ b/packages/models/src/models/Subscriptions.ts @@ -453,11 +453,9 @@ export class SubscriptionsRaw extends BaseRaw implements ISubscri { $lookup: { from: 'rocketchat_subscription', + localField: '_id', + foreignField: 'rid', as: 'subscription', - let: { - rid: '$_id', - }, - pipeline: [{ $match: { '$expr': { $eq: ['$rid', '$$rid'] }, 'u._id': { $ne: userId } } }], }, }, // Unwind the subscription so we have a separate document for each @@ -466,6 +464,12 @@ export class SubscriptionsRaw extends BaseRaw implements ISubscri path: '$subscription', }, }, + // Filter out the requesting user's own subscriptions + { + $match: { + 'subscription.u._id': { $ne: userId }, + }, + }, // Group the data by user id, keeping track of how many documents each user had { $group: { @@ -475,34 +479,36 @@ export class SubscriptionsRaw extends BaseRaw implements ISubscri }, }, }, - // Load the data for the subscription's user, ignoring those who don't match the search terms + // Load the data for the subscription's user { $lookup: { from: 'users', + localField: '_id', + foreignField: '_id', as: 'user', - let: { id: '$_id' }, - pipeline: [ - { - $match: { - $expr: { $eq: ['$_id', '$$id'] }, - ...extraConditions, - active: true, - username: { - $exists: true, - ...(exceptions.length > 0 && { $nin: exceptions }), - }, - ...(searchTerm && orStatement.length > 0 && { $or: orStatement }), - }, - }, - ], }, }, - // Discard documents that didn't load any user data in the previous step: + // Discard documents that didn't load any user data { $unwind: { path: '$user', }, }, + // Filter users by search terms and conditions + { + $match: { + ...Object.fromEntries(Object.entries(extraConditions).map(([k, v]) => [`user.${k}`, v])), + 'user.active': true, + 'user.username': { + $exists: true, + ...(exceptions.length > 0 && { $nin: exceptions }), + }, + ...(searchTerm && + orStatement.length > 0 && { + $or: orStatement.map((cond) => Object.fromEntries(Object.entries(cond).map(([k, v]) => [`user.${k}`, v]))), + }), + }, + }, // Use group to organize the data at the same time that we pick what to project to the end result { $group: { diff --git a/packages/models/src/models/Users.ts b/packages/models/src/models/Users.ts index 95cc31246380a..1e0a6d9bd36b5 100644 --- a/packages/models/src/models/Users.ts +++ b/packages/models/src/models/Users.ts @@ -245,7 +245,7 @@ export class UsersRaw extends BaseRaw> implements IU return this.findPaginated(query, options); } - findAgentsWithDepartments( + async findAgentsWithDepartments( role: IRole['_id'][] | IRole['_id'], query: Filter, options?: FindOptions, @@ -284,15 +284,17 @@ export class UsersRaw extends BaseRaw> implements IU departments: { $push: '$departments.departmentId' }, }, }, - { - $facet: { - sortedResults: [{ $sort: options?.sort }, { $skip: options?.skip }, options?.limit && { $limit: options.limit }], - totalCount: [{ $group: { _id: null, total: { $sum: 1 } } }], - }, - }, ]; - return this.col.aggregate<{ sortedResults: (T & { departments: string[] })[]; totalCount: { total: number }[] }>(aggregate).toArray(); + const paginationStages = [{ $sort: options?.sort }, { $skip: options?.skip }, ...(options?.limit ? [{ $limit: options.limit }] : [])]; + + const [sortedResults, countResult] = await Promise.all([ + this.col.aggregate([...aggregate, ...paginationStages]).toArray(), + this.col.aggregate<{ total: number }>([...aggregate, { $count: 'total' }]).toArray(), + ]); + + const totalCount = countResult.length ? [{ total: countResult[0].total }] : []; + return [{ sortedResults, totalCount }]; } findOneByUsernameAndRoomIgnoringCase(username: string | RegExp, rid: string, options?: FindOptions) { @@ -567,19 +569,22 @@ export class UsersRaw extends BaseRaw> implements IU { $lookup: { from: 'rocketchat_livechat_department_agents', - let: { userId: '$_id' }, - pipeline: [ - { - $match: { - $expr: { - $and: [{ $eq: ['$$userId', '$agentId'] }, { $eq: ['$departmentId', department] }], - }, - }, - }, - ], + localField: '_id', + foreignField: 'agentId', as: 'department', }, }, + { + $addFields: { + department: { + $filter: { + input: '$department', + as: 'dept', + cond: { $eq: ['$$dept.departmentId', department] }, + }, + }, + }, + }, { $match: { department: { $size: 1 } }, }, @@ -592,22 +597,26 @@ export class UsersRaw extends BaseRaw> implements IU { $lookup: { from: 'rocketchat_subscription', - let: { id: '$_id' }, - pipeline: [ - { - $match: { - $expr: { - $and: [ - { $eq: ['$u._id', '$$id'] }, - { $eq: ['$open', true] }, - { $ne: ['$onHold', true] }, - { ...(department && { $eq: ['$department', department] }) }, - ], - }, + localField: '_id', + foreignField: 'u._id', + as: 'subs', + }, + }, + { + $addFields: { + subs: { + $filter: { + input: '$subs', + as: 'sub', + cond: { + $and: [ + { $eq: ['$$sub.open', true] }, + { $ne: ['$$sub.onHold', true] }, + ...(department ? [{ $eq: ['$$sub.department', department] }] : []), + ], }, }, - ], - as: 'subs', + }, }, }, { @@ -647,19 +656,22 @@ export class UsersRaw extends BaseRaw> implements IU { $lookup: { from: 'rocketchat_livechat_department_agents', - let: { userId: '$_id' }, - pipeline: [ - { - $match: { - $expr: { - $and: [{ $eq: ['$$userId', '$agentId'] }, { $eq: ['$departmentId', department] }], - }, - }, - }, - ], + localField: '_id', + foreignField: 'agentId', as: 'department', }, }, + { + $addFields: { + department: { + $filter: { + input: '$department', + as: 'dept', + cond: { $eq: ['$$dept.departmentId', department] }, + }, + }, + }, + }, { $match: { department: { $size: 1 } }, }, diff --git a/packages/models/src/patchIndex.ts b/packages/models/src/patchIndex.ts new file mode 100644 index 0000000000000..1fb0697194548 --- /dev/null +++ b/packages/models/src/patchIndex.ts @@ -0,0 +1,50 @@ +/** + * Amazon DocumentDB only supports one index build at a time per collection. + * This patch serializes createIndex/createIndexes calls at the native + * MongoDB driver level (app's node_modules/mongodb copy), covering calls + * from Rocket.Chat models (BaseRaw) and EE microservices. + * + * Meteor packages (accounts-base, accounts-password) use a separate bundled + * copy of the mongodb driver and are patched independently in the + * rocketchat:mongo-config Meteor package via MongoConnection. + * + * Both patches share the same queue via a globalThis-keyed Map so builds + * for the same collection are serialized across all code paths. + * + * Safe to import multiple times β€” the patch is applied only once. + */ +import { Collection } from 'mongodb'; + +const PATCHED = Symbol.for('rocketchat.documentdb.index.patch'); +const QUEUE_KEY = Symbol.for('rocketchat.documentdb.index.queues'); + +if (process.env.DOCUMENTDB === 'true' && !(Collection as any)[PATCHED]) { + (Collection as any)[PATCHED] = true; + + const g = globalThis as any; + if (!g[QUEUE_KEY]) { + g[QUEUE_KEY] = new Map>(); + } + const queues: Map> = g[QUEUE_KEY]; + + const enqueue = (collectionName: string, fn: () => Promise): Promise => { + const prev = queues.get(collectionName) ?? Promise.resolve(); + const next = prev.then(fn, fn); + queues.set( + collectionName, + // eslint-disable-next-line @typescript-eslint/no-empty-function + next.catch(() => {}), + ); + return next; + }; + + const originalCreateIndex = Collection.prototype.createIndex; + Collection.prototype.createIndex = function (this: Collection, ...args: Parameters) { + return enqueue(this.collectionName, () => originalCreateIndex.apply(this, args)); + } as typeof Collection.prototype.createIndex; + + const originalCreateIndexes = Collection.prototype.createIndexes; + Collection.prototype.createIndexes = function (this: Collection, ...args: Parameters) { + return enqueue(this.collectionName, () => originalCreateIndexes.apply(this, args)); + } as typeof Collection.prototype.createIndexes; +}