diff --git a/backend/src/api/public/v1/dev-stats/getAffiliations.ts b/backend/src/api/public/v1/dev-stats/getAffiliations.ts new file mode 100644 index 0000000000..1011526e4d --- /dev/null +++ b/backend/src/api/public/v1/dev-stats/getAffiliations.ts @@ -0,0 +1,94 @@ +import type { Request, Response } from 'express' +import { z } from 'zod' + +import { + findMembersByGithubHandles, + findVerifiedEmailsByMemberIds, + optionsQx, + resolveAffiliationsByMemberIds, +} from '@crowd/data-access-layer' +import { getServiceChildLogger } from '@crowd/logging' + +import { ok } from '@/utils/api' +import { validateOrThrow } from '@/utils/validation' + +const log = getServiceChildLogger('dev-stats') + +const MAX_HANDLES = 1000 + +const bodySchema = z.object({ + githubHandles: z + .array(z.string().min(1)) + .min(1) + .max(MAX_HANDLES, `Maximum ${MAX_HANDLES} handles per request`), +}) + +export async function getAffiliations(req: Request, res: Response): Promise { + const { githubHandles } = validateOrThrow(bodySchema, req.body) + const qx = optionsQx(req) + + const t0 = performance.now() + + const lowercasedHandles = githubHandles.map((h) => h.toLowerCase()) + + // Step 1: find verified members by github handles + const memberRows = await findMembersByGithubHandles(qx, lowercasedHandles) + + const t1 = performance.now() + log.info( + { handles: githubHandles.length, found: memberRows.length, ms: Math.round(t1 - t0) }, + 'Step 1: members lookup', + ) + + const foundHandles = new Set(memberRows.map((r) => r.githubHandle.toLowerCase())) + const notFound = githubHandles.filter((h) => !foundHandles.has(h.toLowerCase())) + + if (memberRows.length === 0) { + ok(res, { total_found: 0, contributors: [], notFound }) + return + } + + const memberIds = memberRows.map((r) => r.memberId) + + // Step 2: fetch verified emails + const emailRows = await findVerifiedEmailsByMemberIds(qx, memberIds) + + const t2 = performance.now() + log.info( + { members: memberIds.length, emails: emailRows.length, ms: Math.round(t2 - t1) }, + 'Step 2: emails lookup', + ) + + const emailsByMember = new Map() + for (const row of emailRows) { + const list = emailsByMember.get(row.memberId) ?? [] + list.push(row.email) + emailsByMember.set(row.memberId, list) + } + + // Step 3: resolve affiliations (conflict resolution, gap-filling, selection priority) + const affiliationsByMember = await resolveAffiliationsByMemberIds(qx, memberIds) + + const t3 = performance.now() + log.info({ members: memberIds.length, ms: Math.round(t3 - t2) }, 'Step 3: affiliations resolved') + + // Step 4: build response + const contributors = memberRows.map((member) => ({ + githubHandle: member.githubHandle, + name: member.displayName, + emails: emailsByMember.get(member.memberId) ?? [], + affiliations: affiliationsByMember.get(member.memberId) ?? [], + })) + + log.info( + { + handles: githubHandles.length, + found: contributors.length, + notFound: notFound.length, + totalMs: Math.round(t3 - t0), + }, + 'dev-stats affiliations complete', + ) + + ok(res, { total_found: contributors.length, contributors, notFound }) +} diff --git a/backend/src/api/public/v1/dev-stats/index.ts b/backend/src/api/public/v1/dev-stats/index.ts index 3dc77716a3..6ed9a9a440 100644 --- a/backend/src/api/public/v1/dev-stats/index.ts +++ b/backend/src/api/public/v1/dev-stats/index.ts @@ -4,6 +4,8 @@ import { createRateLimiter } from '@/api/apiRateLimiter' import { requireScopes } from '@/api/public/middlewares/requireScopes' import { SCOPES } from '@/security/scopes' +import { getAffiliations } from './getAffiliations' + const rateLimiter = createRateLimiter({ max: 60, windowMs: 60 * 1000 }) export function devStatsRouter(): Router { @@ -11,9 +13,7 @@ export function devStatsRouter(): Router { router.use(rateLimiter) - router.post('/affiliations', requireScopes([SCOPES.READ_AFFILIATIONS]), (_req, res) => { - res.json({ status: 'ok' }) - }) + router.post('/affiliations', requireScopes([SCOPES.READ_AFFILIATIONS]), getAffiliations) return router } diff --git a/services/libs/data-access-layer/src/devStats/index.ts b/services/libs/data-access-layer/src/devStats/index.ts new file mode 100644 index 0000000000..00efefd57a --- /dev/null +++ b/services/libs/data-access-layer/src/devStats/index.ts @@ -0,0 +1,574 @@ +import { getServiceChildLogger } from '@crowd/logging' +import { MemberIdentityType, PlatformType } from '@crowd/types' + +import { QueryExecutor } from '../queryExecutor' + +const log = getServiceChildLogger('dev-stats:affiliations') + +// ─── Constants ──────────────────────────────────────────────────────────────── + +const BLACKLISTED_TITLES = ['investor', 'mentor', 'board member'] + +// ─── Public interfaces ──────────────────────────────────────────────────────── + +export interface IDevStatsMemberRow { + githubHandle: string + memberId: string + displayName: string | null +} + +export interface IDevStatsAffiliation { + organization: string + startDate: string | null + endDate: string | null +} + +// ─── Internal row type (union of memberOrganizations + manual affiliations) ─── + +interface IDevStatsWorkRow { + id: string + memberId: string + organizationId: string + organizationName: string + title: string | null + dateStart: string | null + dateEnd: string | null + createdAt: Date | string + isPrimaryWorkExperience: boolean + memberCount: number + /** null for memberOrganizations rows; non-null for memberSegmentAffiliations rows */ + segmentId: string | null +} + +// ─── Step 1: member lookup ──────────────────────────────────────────────────── + +export async function findMembersByGithubHandles( + qx: QueryExecutor, + lowercasedHandles: string[], +): Promise { + return qx.select( + ` + SELECT + mi.value AS "githubHandle", + mi."memberId", + m."displayName" + FROM "memberIdentities" mi + JOIN members m ON m.id = mi."memberId" + WHERE mi.platform = $(platform) + AND mi.type = $(type) + AND mi.verified = true + AND lower(mi.value) IN ($(lowercasedHandles:csv)) + AND mi."deletedAt" IS NULL + AND m."deletedAt" IS NULL + `, + { + platform: PlatformType.GITHUB, + type: MemberIdentityType.USERNAME, + lowercasedHandles, + }, + ) +} + +// ─── Step 2: verified emails ────────────────────────────────────────────────── + +export async function findVerifiedEmailsByMemberIds( + qx: QueryExecutor, + memberIds: string[], +): Promise<{ memberId: string; email: string }[]> { + return qx.select( + ` + SELECT "memberId", value AS email + FROM "memberIdentities" + WHERE "memberId" IN ($(memberIds:csv)) + AND type = $(type) + AND verified = true + AND "deletedAt" IS NULL + `, + { + memberIds, + type: MemberIdentityType.EMAIL, + }, + ) +} + +// ─── Step 3a: regular work experiences (bulk) ───────────────────────────────── + +async function findWorkExperiencesBulk( + qx: QueryExecutor, + memberIds: string[], +): Promise { + const rows: IDevStatsWorkRow[] = await qx.select( + ` + WITH aggs AS ( + SELECT + osa."organizationId", + sum(osa."memberCount") AS total_count + FROM "organizationSegmentsAgg" osa + WHERE osa."segmentId" IN ( + SELECT id FROM segments + WHERE "grandparentId" IS NOT NULL + AND "parentId" IS NOT NULL + ) + GROUP BY osa."organizationId" + ) + SELECT + mo.id, + mo."memberId", + mo."organizationId", + o."displayName" AS "organizationName", + mo.title, + mo."dateStart", + mo."dateEnd", + mo."createdAt", + COALESCE(ovr."isPrimaryWorkExperience", false) AS "isPrimaryWorkExperience", + COALESCE(a.total_count, 0) AS "memberCount", + NULL::text AS "segmentId" + FROM "memberOrganizations" mo + JOIN organizations o ON mo."organizationId" = o.id + LEFT JOIN "memberOrganizationAffiliationOverrides" ovr ON ovr."memberOrganizationId" = mo.id + LEFT JOIN aggs a ON a."organizationId" = mo."organizationId" + WHERE mo."memberId" IN ($(memberIds:csv)) + AND mo."deletedAt" IS NULL + AND COALESCE(ovr."allowAffiliation", true) = true + `, + { memberIds }, + ) + + return rows.filter( + (r) => !r.title || !BLACKLISTED_TITLES.some((t) => r.title?.toLowerCase().includes(t)), + ) +} + +// ─── Step 3b: manual affiliations (bulk) ───────────────────────────────────── + +async function findManualAffiliationsBulk( + qx: QueryExecutor, + memberIds: string[], +): Promise { + return qx.select( + ` + SELECT + msa.id, + msa."memberId", + msa."organizationId", + o."displayName" AS "organizationName", + NULL AS title, + msa."dateStart", + msa."dateEnd", + NULL::timestamptz AS "createdAt", + false AS "isPrimaryWorkExperience", + 0 AS "memberCount", + msa."segmentId" + FROM "memberSegmentAffiliations" msa + JOIN organizations o ON msa."organizationId" = o.id + WHERE msa."memberId" IN ($(memberIds:csv)) + `, + { memberIds }, + ) +} + +// ─── Selection priority (mirrors selectPrimaryWorkExperience) ───────────────── + +function longestDateRange(orgs: IDevStatsWorkRow[]): IDevStatsWorkRow { + const withDates = orgs.filter((r) => r.dateStart) + if (withDates.length === 0) return orgs[0] + + return withDates.reduce((best, curr) => { + const bestMs = + new Date(best.dateEnd ?? '9999-12-31').getTime() - new Date(best.dateStart ?? '').getTime() + const currMs = + new Date(curr.dateEnd ?? '9999-12-31').getTime() - new Date(curr.dateStart ?? '').getTime() + return currMs > bestMs ? curr : best + }) +} + +function selectPrimaryWorkExperience(orgs: IDevStatsWorkRow[]): IDevStatsWorkRow { + if (orgs.length === 1) return orgs[0] + + // 1. Manual affiliations (segmentId non-null) always win + const manual = orgs.filter((r) => r.segmentId !== null) + if (manual.length > 0) { + if (manual.length === 1) return manual[0] + return longestDateRange(manual) + } + + // 2. isPrimaryWorkExperience = true — prefer those with a dateStart + const primary = orgs.filter((r) => r.isPrimaryWorkExperience) + if (primary.length > 0) { + const withDates = primary.filter((r) => r.dateStart) + if (withDates.length > 0) return withDates[0] + return primary[0] + } + + // 3. Only one org has a dateStart — pick it + const withDates = orgs.filter((r) => r.dateStart) + if (withDates.length === 1) return withDates[0] + + // 4. Org with strictly more members wins; if tied, fall through + const sorted = [...orgs].sort((a, b) => b.memberCount - a.memberCount) + if (sorted.length >= 2 && sorted[0].memberCount > sorted[1].memberCount) { + return sorted[0] + } + + // 5. Longest date range as final tiebreaker + return longestDateRange(orgs) +} + +// ─── Per-member affiliation resolution ─────────────────────────────────────── + +/** Returns the org used to fill gaps — primary undated wins, then earliest-created undated. */ +function findFallbackOrg(rows: IDevStatsWorkRow[]): IDevStatsWorkRow | null { + const primaryUndated = rows.find((r) => r.isPrimaryWorkExperience && !r.dateStart && !r.dateEnd) + if (primaryUndated) return primaryUndated + + return ( + rows + .filter((r) => !r.dateStart && !r.dateEnd) + .sort((a, b) => new Date(a.createdAt).getTime() - new Date(b.createdAt).getTime()) + .at(0) ?? null + ) +} + +/** + * Collects all date boundaries from the dated rows, capped at today. + * Each dateStart and (dateEnd + 1 day) marks a point where active orgs can change. + */ +function collectBoundaries(datedRows: IDevStatsWorkRow[]): Date[] { + const today = startOfDay(new Date()) + + const ms = new Set([today.getTime()]) + + for (const row of datedRows) { + const start = startOfDay(row.dateStart ?? '') + if (start <= today) ms.add(start.getTime()) + + if (row.dateEnd) { + const afterEnd = startOfDay(row.dateEnd) + afterEnd.setDate(afterEnd.getDate() + 1) + if (afterEnd <= today) ms.add(afterEnd.getTime()) + } + } + + return Array.from(ms) + .sort((a, b) => a - b) + .map((t) => new Date(t)) +} + +function orgsActiveAt(datedRows: IDevStatsWorkRow[], boundaryDate: Date): IDevStatsWorkRow[] { + return datedRows.filter((role) => { + const roleStart = startOfDay(role.dateStart ?? '') + const roleEnd = role.dateEnd ? startOfDay(role.dateEnd) : null + + // org is active if the boundary date falls within its employment period + return boundaryDate >= roleStart && (!roleEnd || boundaryDate <= roleEnd) + }) +} + +function startOfDay(date: Date | string): Date { + const d = new Date(date) + d.setHours(0, 0, 0, 0) + return d +} + +function dayBefore(date: Date): Date { + const d = new Date(date) + d.setDate(d.getDate() - 1) + return d +} + +function closeAffiliationWindow( + memberId: string, + affiliations: IDevStatsAffiliation[], + org: IDevStatsWorkRow, + windowStart: Date, + windowEnd: Date, +): void { + log.debug( + { + memberId, + org: org.organizationName, + windowStart: windowStart.toISOString(), + windowEnd: windowEnd.toISOString(), + }, + 'closing affiliation window', + ) + affiliations.push({ + organization: org.organizationName, + startDate: windowStart.toISOString(), + endDate: windowEnd.toISOString(), + }) +} + +/** Iterates boundary intervals and builds non-overlapping affiliation windows. */ +function buildTimeline( + memberId: string, + datedRows: IDevStatsWorkRow[], + fallbackOrg: IDevStatsWorkRow | null, + boundaries: Date[], +): IDevStatsAffiliation[] { + const affiliations: IDevStatsAffiliation[] = [] + let currentOrg: IDevStatsWorkRow = null + let currentWindowStart: Date = null + let uncoveredPeriodStart: Date = null + + for (let i = 0; i < boundaries.length - 1; i++) { + const boundaryDate = boundaries[i] + const activeOrgsAtBoundary = orgsActiveAt(datedRows, boundaryDate) + + log.debug( + { + memberId, + boundaryDate: boundaryDate.toISOString(), + orgsAtBoundary: activeOrgsAtBoundary.map((r) => ({ + org: r.organizationName, + dateStart: r.dateStart, + dateEnd: r.dateEnd, + isPrimary: r.isPrimaryWorkExperience, + memberCount: r.memberCount, + isManual: r.segmentId !== null, + })), + }, + 'processing boundary', + ) + + // No orgs active at this boundary — close the current window and start tracking a gap + if (activeOrgsAtBoundary.length === 0) { + if (currentOrg && currentWindowStart) { + closeAffiliationWindow( + memberId, + affiliations, + currentOrg, + currentWindowStart, + dayBefore(boundaryDate), + ) + currentOrg = null + currentWindowStart = null + } + + if (uncoveredPeriodStart === null) { + uncoveredPeriodStart = boundaryDate + log.debug( + { memberId, uncoveredPeriodStart: boundaryDate.toISOString() }, + 'uncovered period started', + ) + } + + continue + } + + // Orgs are active again — close the uncovered period using the fallback org if available + if (uncoveredPeriodStart !== null) { + log.debug( + { + memberId, + fallbackOrg: fallbackOrg?.organizationName ?? null, + uncoveredPeriodStart: uncoveredPeriodStart.toISOString(), + uncoveredPeriodEnd: dayBefore(boundaryDate).toISOString(), + }, + 'closing uncovered period with fallback org', + ) + + if (fallbackOrg) { + closeAffiliationWindow( + memberId, + affiliations, + fallbackOrg, + uncoveredPeriodStart, + dayBefore(boundaryDate), + ) + } + + uncoveredPeriodStart = null + } + + const winningAffiliation = selectPrimaryWorkExperience(activeOrgsAtBoundary) + + // No current window open — start a new one with the winning org + if (!currentOrg) { + log.debug( + { memberId, org: winningAffiliation.organizationName, from: boundaryDate.toISOString() }, + 'opening affiliation window', + ) + currentOrg = winningAffiliation + currentWindowStart = boundaryDate + continue + } + + // Winning org changed — close the current window and open a new one + if (currentOrg.organizationId !== winningAffiliation.organizationId) { + log.debug( + { + memberId, + from: currentOrg.organizationName, + to: winningAffiliation.organizationName, + at: boundaryDate.toISOString(), + }, + 'affiliation changed', + ) + closeAffiliationWindow( + memberId, + affiliations, + currentOrg, + currentWindowStart ?? boundaryDate, + dayBefore(boundaryDate), + ) + currentOrg = winningAffiliation + currentWindowStart = boundaryDate + } + } + + // Close the last open window using the org's actual end date (null = ongoing) + if (currentOrg && currentWindowStart) { + const endDate = currentOrg.dateEnd ? new Date(currentOrg.dateEnd).toISOString() : null + log.debug( + { + memberId, + org: currentOrg.organizationName, + start: currentWindowStart.toISOString(), + endDate, + }, + 'closing final affiliation window', + ) + affiliations.push({ + organization: currentOrg.organizationName, + startDate: currentWindowStart.toISOString(), + endDate, + }) + } + + // Close a trailing uncovered period using the fallback org (ongoing, no end date) + if (uncoveredPeriodStart !== null && fallbackOrg) { + log.debug( + { + memberId, + fallbackOrg: fallbackOrg.organizationName, + uncoveredPeriodStart: uncoveredPeriodStart.toISOString(), + }, + 'closing trailing uncovered period with fallback org', + ) + affiliations.push({ + organization: fallbackOrg.organizationName, + startDate: uncoveredPeriodStart.toISOString(), + endDate: null, + }) + } + + return affiliations +} + +function resolveAffiliationsForMember( + memberId: string, + rows: IDevStatsWorkRow[], +): IDevStatsAffiliation[] { + log.debug( + { + memberId, + totalRows: rows.length, + rows: rows.map((r) => ({ + org: r.organizationName, + dateStart: r.dateStart, + dateEnd: r.dateEnd, + isPrimary: r.isPrimaryWorkExperience, + memberCount: r.memberCount, + isManual: r.segmentId !== null, + })), + }, + 'resolving affiliations', + ) + + // If one undated org is marked primary, drop all other undated orgs to avoid infinite conflicts + const primaryUndated = rows.find((r) => r.isPrimaryWorkExperience && !r.dateStart && !r.dateEnd) + const cleaned = primaryUndated + ? rows.filter((r) => r.dateStart || r.id === primaryUndated.id) + : rows + + if (cleaned.length < rows.length) { + log.debug( + { + memberId, + dropped: rows.length - cleaned.length, + keptPrimaryUndated: primaryUndated?.organizationName, + }, + 'dropped undated orgs (primary undated exists)', + ) + } + + const fallbackOrg = findFallbackOrg(cleaned) + const datedRows = cleaned.filter((r) => r.dateStart) + + log.debug( + { + memberId, + datedRows: datedRows.length, + undatedRows: cleaned.length - datedRows.length, + fallbackOrg: fallbackOrg?.organizationName ?? null, + datedRowsList: datedRows.map((r) => ({ + org: r.organizationName, + dateStart: r.dateStart, + dateEnd: r.dateEnd, + })), + }, + 'prepared rows', + ) + + if (datedRows.length === 0) { + log.debug({ memberId }, 'no dated rows — returning empty affiliations') + return [] + } + + const boundaries = collectBoundaries(datedRows) + log.debug( + { + memberId, + boundaries: boundaries.length, + boundaryDates: boundaries.map((b) => b.toISOString()), + }, + 'collected boundaries', + ) + + const timeline = buildTimeline(memberId, datedRows, fallbackOrg, boundaries) + + log.debug( + { + memberId, + affiliations: timeline.length, + result: timeline.map((a) => ({ + org: a.organization, + startDate: a.startDate, + endDate: a.endDate, + })), + }, + 'timeline built', + ) + + return timeline.sort((a, b) => { + if (!a.startDate) return 1 + if (!b.startDate) return -1 + return new Date(b.startDate).getTime() - new Date(a.startDate).getTime() + }) +} + +// ─── Public bulk resolver ───────────────────────────────────────────────────── + +export async function resolveAffiliationsByMemberIds( + qx: QueryExecutor, + memberIds: string[], +): Promise> { + const [workExperiences, manualAffiliations] = await Promise.all([ + findWorkExperiencesBulk(qx, memberIds), + findManualAffiliationsBulk(qx, memberIds), + ]) + + const byMember = new Map() + for (const row of [...workExperiences, ...manualAffiliations]) { + const list = byMember.get(row.memberId) ?? [] + list.push(row) + byMember.set(row.memberId, list) + } + + const result = new Map() + for (const id of memberIds) { + result.set(id, resolveAffiliationsForMember(id, byMember.get(id) ?? [])) + } + return result +} diff --git a/services/libs/data-access-layer/src/index.ts b/services/libs/data-access-layer/src/index.ts index 639f0547b8..459fa15495 100644 --- a/services/libs/data-access-layer/src/index.ts +++ b/services/libs/data-access-layer/src/index.ts @@ -1,4 +1,5 @@ export * from './activities' +export * from './devStats' export * from './activityRelations' export * from './apiKeys' export * from './dashboards'