diff --git a/Sources/ProcessBarMonitor/ProcessSnapshotProvider.swift b/Sources/ProcessBarMonitor/ProcessSnapshotProvider.swift index 8c192b3..1e0ea3f 100644 --- a/Sources/ProcessBarMonitor/ProcessSnapshotProvider.swift +++ b/Sources/ProcessBarMonitor/ProcessSnapshotProvider.swift @@ -26,32 +26,33 @@ actor ProcessSnapshotProvider { } } + struct AppMetadata: Sendable { + let appName: String + let bundleIdentifier: String? + let commandKey: String + } + private let psPath = "/bin/ps" private let psArguments = ["-axo", "pid=,comm=,%cpu=,rss="] private let metadataRefreshInterval: TimeInterval = 30 private let maxMetadataLookupsPerSnapshot = 48 + private let maxConcurrentMetadataLookups = 8 - private struct RawProcess { + private struct RawProcess: Sendable { let pid: Int let command: String let rawCPUPercent: Double let memoryMB: Double } - private struct AppMetadata { - let appName: String - let bundleIdentifier: String? - let commandKey: String - } - - private struct CachedMetadata { + private struct CachedMetadata: Sendable { let metadata: AppMetadata let updatedAt: Date } private var metadataCache: [Int: CachedMetadata] = [:] - func snapshot() throws -> [ProcessStat] { + func snapshot() async throws -> [ProcessStat] { let process = Process() process.executableURL = URL(fileURLWithPath: psPath) process.arguments = psArguments @@ -84,11 +85,85 @@ actor ProcessSnapshotProvider { throw SnapshotError.noProcessesParsed(lineCount: lineCount) } let prioritizedPIDs = prioritizedMetadataPIDs(from: rawProcesses) - refreshMetadataCache(for: rawProcesses, prioritizedPIDs: prioritizedPIDs) + + // Resolve metadata off the main thread with bounded concurrency + let now = Date() + let resolved = await resolveMetadataBatch( + rawProcesses: rawProcesses, + prioritizedPIDs: prioritizedPIDs, + now: now, + maxConcurrent: maxConcurrentMetadataLookups + ) + + for (pid, metadata) in resolved { + metadataCache[pid] = CachedMetadata(metadata: metadata, updatedAt: now) + } + pruneMetadataCache(validPIDs: Set(rawProcesses.map(\.pid))) return aggregate(rawProcesses) } + /// Resolves metadata for prioritized PIDs in parallel via TaskGroup with bounded concurrency. + /// Each resolution runs in a detached task, off the actor's thread. + private func resolveMetadataBatch( + rawProcesses: [RawProcess], + prioritizedPIDs: Set, + now: Date, + maxConcurrent: Int + ) async -> [Int: AppMetadata] { + let needsRefresh = rawProcesses.filter { prioritizedPIDs.contains($0.pid) && !isCacheValid(pid: $0.pid, now: now) } + + guard !needsRefresh.isEmpty else { return [:] } + + var results: [Int: AppMetadata] = [:] + let throttle = Throttle(maxActive: maxConcurrent) + + await withTaskGroup(of: (Int, AppMetadata?).self) { group in + for raw in needsRefresh { + group.addTask { + await throttle.enter() + let metadata = Self.resolveMetadataSync(pid: raw.pid, command: raw.command) + await throttle.exit() + return (raw.pid, metadata) + } + } + + for await (pid, metadata) in group { + if let metadata { + results[pid] = metadata + } + } + } + + return results + } + + private func isCacheValid(pid: Int, now: Date) -> Bool { + if let cached = metadataCache[pid] { + return now.timeIntervalSince(cached.updatedAt) < metadataRefreshInterval + } + return false + } + + /// Performs NSRunningApplication lookup off the main thread via Task.detached. + /// Called from background tasks spawned by the TaskGroup in resolveMetadataBatch. + private static func resolveMetadataSync(pid: Int, command: String) -> AppMetadata { + if let runningApp = NSRunningApplication(processIdentifier: pid_t(pid)) { + let appName = runningApp.localizedName ?? fallbackAppName(for: command) + return AppMetadata( + appName: appName, + bundleIdentifier: runningApp.bundleIdentifier, + commandKey: command + ) + } + + return AppMetadata( + appName: fallbackAppName(for: command), + bundleIdentifier: nil, + commandKey: command + ) + } + private func parsePSOutput(_ raw: String) -> [RawProcess] { raw .split(whereSeparator: { $0.isNewline }) @@ -116,40 +191,10 @@ actor ProcessSnapshotProvider { return Set(topCPU.map(\.pid)).union(topMemory.map(\.pid)) } - private func refreshMetadataCache(for rawProcesses: [RawProcess], prioritizedPIDs: Set) { - let now = Date() - - for raw in rawProcesses where prioritizedPIDs.contains(raw.pid) { - if let cached = metadataCache[raw.pid], now.timeIntervalSince(cached.updatedAt) < metadataRefreshInterval { - continue - } - - let metadata = resolveMetadata(for: raw) - metadataCache[raw.pid] = CachedMetadata(metadata: metadata, updatedAt: now) - } - } - private func pruneMetadataCache(validPIDs: Set) { metadataCache = metadataCache.filter { validPIDs.contains($0.key) } } - private func resolveMetadata(for raw: RawProcess) -> AppMetadata { - if let runningApp = NSRunningApplication(processIdentifier: pid_t(raw.pid)) { - let appName = runningApp.localizedName ?? fallbackAppName(for: raw.command) - return AppMetadata( - appName: appName, - bundleIdentifier: runningApp.bundleIdentifier, - commandKey: raw.command - ) - } - - return AppMetadata( - appName: fallbackAppName(for: raw.command), - bundleIdentifier: nil, - commandKey: raw.command - ) - } - private func aggregate(_ rawProcesses: [RawProcess]) -> [ProcessStat] { struct Aggregate { var pid: Int @@ -166,7 +211,7 @@ actor ProcessSnapshotProvider { for raw in rawProcesses { let metadata = metadataCache[raw.pid]?.metadata ?? AppMetadata( - appName: fallbackAppName(for: raw.command), + appName: Self.fallbackAppName(for: raw.command), bundleIdentifier: nil, commandKey: raw.command ) @@ -221,8 +266,38 @@ actor ProcessSnapshotProvider { return "command:\(metadata.commandKey)" } - private func fallbackAppName(for command: String) -> String { + private static func fallbackAppName(for command: String) -> String { let last = URL(fileURLWithPath: command).lastPathComponent return last.isEmpty ? command : last } } + +/// Actor-based throttle limiting concurrent background operations. +actor Throttle { + private let maxActive: Int + private var activeCount = 0 + private var waitQueue: [CheckedContinuation] = [] + + init(maxActive: Int) { + self.maxActive = maxActive + } + + func enter() async { + if activeCount < maxActive { + activeCount += 1 + } else { + await withCheckedContinuation { continuation in + waitQueue.append(continuation) + } + } + } + + func exit() { + if !waitQueue.isEmpty { + let cont = waitQueue.removeFirst() + cont.resume() + } else { + activeCount -= 1 + } + } +} \ No newline at end of file