Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 117 additions & 42 deletions Sources/ProcessBarMonitor/ProcessSnapshotProvider.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,32 +26,33 @@ actor ProcessSnapshotProvider {
}
}

struct AppMetadata: Sendable {
let appName: String
let bundleIdentifier: String?
let commandKey: String
}

private let psPath = "/bin/ps"
private let psArguments = ["-axo", "pid=,comm=,%cpu=,rss="]
private let metadataRefreshInterval: TimeInterval = 30
private let maxMetadataLookupsPerSnapshot = 48
private let maxConcurrentMetadataLookups = 8

private struct RawProcess {
private struct RawProcess: Sendable {
let pid: Int
let command: String
let rawCPUPercent: Double
let memoryMB: Double
}

private struct AppMetadata {
let appName: String
let bundleIdentifier: String?
let commandKey: String
}

private struct CachedMetadata {
private struct CachedMetadata: Sendable {
let metadata: AppMetadata
let updatedAt: Date
}

private var metadataCache: [Int: CachedMetadata] = [:]

func snapshot() throws -> [ProcessStat] {
func snapshot() async throws -> [ProcessStat] {
let process = Process()
process.executableURL = URL(fileURLWithPath: psPath)
process.arguments = psArguments
Expand Down Expand Up @@ -84,11 +85,85 @@ actor ProcessSnapshotProvider {
throw SnapshotError.noProcessesParsed(lineCount: lineCount)
}
let prioritizedPIDs = prioritizedMetadataPIDs(from: rawProcesses)
refreshMetadataCache(for: rawProcesses, prioritizedPIDs: prioritizedPIDs)

// Resolve metadata off the main thread with bounded concurrency
let now = Date()
let resolved = await resolveMetadataBatch(
rawProcesses: rawProcesses,
prioritizedPIDs: prioritizedPIDs,
now: now,
maxConcurrent: maxConcurrentMetadataLookups
)

for (pid, metadata) in resolved {
metadataCache[pid] = CachedMetadata(metadata: metadata, updatedAt: now)
}

pruneMetadataCache(validPIDs: Set(rawProcesses.map(\.pid)))
return aggregate(rawProcesses)
}

/// Resolves metadata for prioritized PIDs in parallel via TaskGroup with bounded concurrency.
/// Each resolution runs in a detached task, off the actor's thread.
private func resolveMetadataBatch(
rawProcesses: [RawProcess],
prioritizedPIDs: Set<Int>,
now: Date,
maxConcurrent: Int
) async -> [Int: AppMetadata] {
let needsRefresh = rawProcesses.filter { prioritizedPIDs.contains($0.pid) && !isCacheValid(pid: $0.pid, now: now) }

guard !needsRefresh.isEmpty else { return [:] }

var results: [Int: AppMetadata] = [:]
let throttle = Throttle(maxActive: maxConcurrent)

await withTaskGroup(of: (Int, AppMetadata?).self) { group in
for raw in needsRefresh {
group.addTask {
await throttle.enter()
let metadata = Self.resolveMetadataSync(pid: raw.pid, command: raw.command)
await throttle.exit()
return (raw.pid, metadata)
}
}

for await (pid, metadata) in group {
if let metadata {
results[pid] = metadata
}
}
}

return results
}

private func isCacheValid(pid: Int, now: Date) -> Bool {
if let cached = metadataCache[pid] {
return now.timeIntervalSince(cached.updatedAt) < metadataRefreshInterval
}
return false
}

/// Performs NSRunningApplication lookup off the main thread via Task.detached.
/// Called from background tasks spawned by the TaskGroup in resolveMetadataBatch.
private static func resolveMetadataSync(pid: Int, command: String) -> AppMetadata {
if let runningApp = NSRunningApplication(processIdentifier: pid_t(pid)) {
let appName = runningApp.localizedName ?? fallbackAppName(for: command)
return AppMetadata(
appName: appName,
bundleIdentifier: runningApp.bundleIdentifier,
commandKey: command
)
}

return AppMetadata(
appName: fallbackAppName(for: command),
bundleIdentifier: nil,
commandKey: command
)
}

private func parsePSOutput(_ raw: String) -> [RawProcess] {
raw
.split(whereSeparator: { $0.isNewline })
Expand Down Expand Up @@ -116,40 +191,10 @@ actor ProcessSnapshotProvider {
return Set(topCPU.map(\.pid)).union(topMemory.map(\.pid))
}

private func refreshMetadataCache(for rawProcesses: [RawProcess], prioritizedPIDs: Set<Int>) {
let now = Date()

for raw in rawProcesses where prioritizedPIDs.contains(raw.pid) {
if let cached = metadataCache[raw.pid], now.timeIntervalSince(cached.updatedAt) < metadataRefreshInterval {
continue
}

let metadata = resolveMetadata(for: raw)
metadataCache[raw.pid] = CachedMetadata(metadata: metadata, updatedAt: now)
}
}

private func pruneMetadataCache(validPIDs: Set<Int>) {
metadataCache = metadataCache.filter { validPIDs.contains($0.key) }
}

private func resolveMetadata(for raw: RawProcess) -> AppMetadata {
if let runningApp = NSRunningApplication(processIdentifier: pid_t(raw.pid)) {
let appName = runningApp.localizedName ?? fallbackAppName(for: raw.command)
return AppMetadata(
appName: appName,
bundleIdentifier: runningApp.bundleIdentifier,
commandKey: raw.command
)
}

return AppMetadata(
appName: fallbackAppName(for: raw.command),
bundleIdentifier: nil,
commandKey: raw.command
)
}

private func aggregate(_ rawProcesses: [RawProcess]) -> [ProcessStat] {
struct Aggregate {
var pid: Int
Expand All @@ -166,7 +211,7 @@ actor ProcessSnapshotProvider {

for raw in rawProcesses {
let metadata = metadataCache[raw.pid]?.metadata ?? AppMetadata(
appName: fallbackAppName(for: raw.command),
appName: Self.fallbackAppName(for: raw.command),
bundleIdentifier: nil,
commandKey: raw.command
)
Expand Down Expand Up @@ -221,8 +266,38 @@ actor ProcessSnapshotProvider {
return "command:\(metadata.commandKey)"
}

private func fallbackAppName(for command: String) -> String {
private static func fallbackAppName(for command: String) -> String {
let last = URL(fileURLWithPath: command).lastPathComponent
return last.isEmpty ? command : last
}
}

/// Actor-based throttle limiting concurrent background operations.
actor Throttle {
private let maxActive: Int
private var activeCount = 0
private var waitQueue: [CheckedContinuation<Void, Never>] = []

init(maxActive: Int) {
self.maxActive = maxActive
}

func enter() async {
if activeCount < maxActive {
activeCount += 1
} else {
await withCheckedContinuation { continuation in
waitQueue.append(continuation)
}
}
}

func exit() {
if !waitQueue.isEmpty {
let cont = waitQueue.removeFirst()
cont.resume()
} else {
activeCount -= 1
}
}
}