diff --git a/Soma/ViewModels/OllamaManager.swift b/Soma/ViewModels/OllamaManager.swift index b35b082..ba57aa2 100644 --- a/Soma/ViewModels/OllamaManager.swift +++ b/Soma/ViewModels/OllamaManager.swift @@ -84,25 +84,39 @@ final class OllamaManager: ObservableObject { guard let url = URL(string: "http://127.0.0.1:11434/api/tags") else { return } var request = URLRequest(url: url) request.timeoutInterval = 3 - URLSession.shared.dataTask(with: request) { data, _, error in - DispatchQueue.main.async { - if let error { - self.tagsError = error.localizedDescription - self.installedModels = [] + URLSession.shared.dataTask(with: request) { [weak self] data, _, error in + Task.detached { + let errorMsg = error?.localizedDescription + + if let errorMsg { + await MainActor.run { + self?.tagsError = errorMsg + self?.installedModels = [] + } return } + guard let data else { - self.tagsError = "Ollama returned no model list." - self.installedModels = [] + await MainActor.run { + self?.tagsError = "Ollama returned no model list." + self?.installedModels = [] + } return } + do { let decoded = try JSONDecoder().decode(OllamaTagsResponse.self, from: data) - self.installedModels = decoded.models.sorted { $0.name.localizedStandardCompare($1.name) == .orderedAscending } - self.tagsError = nil + let sorted = decoded.models.sorted { $0.name.localizedStandardCompare($1.name) == .orderedAscending } + await MainActor.run { + self?.installedModels = sorted + self?.tagsError = nil + } } catch { - self.tagsError = error.localizedDescription - self.installedModels = [] + let decodingError = error.localizedDescription + await MainActor.run { + self?.tagsError = decodingError + self?.installedModels = [] + } } } }.resume() @@ -111,10 +125,12 @@ final class OllamaManager: ObservableObject { guard let url = URL(string: "http://127.0.0.1:11434/api/ps") else { return } var request = URLRequest(url: url) request.timeoutInterval = 2 - URLSession.shared.dataTask(with: request) { data, _, error in - DispatchQueue.main.async { + URLSession.shared.dataTask(with: request) { [weak self] data, _, error in + Task.detached { if error != nil { - self.updateStatus(isRunning: false, loadedModels: []) + await MainActor.run { + self?.updateStatus(isRunning: false, loadedModels: []) + } return } var loaded: Set = [] @@ -125,7 +141,10 @@ final class OllamaManager: ObservableObject { { loaded = Set(models.compactMap { ($0["name"] as? String)?.lowercased() }) } - self.updateStatus(isRunning: true, loadedModels: loaded) + + await MainActor.run { + self?.updateStatus(isRunning: true, loadedModels: loaded) + } } }.resume() } @@ -171,22 +190,27 @@ final class OllamaManager: ObservableObject { var request = URLRequest(url: url) request.httpMethod = "POST" request.setValue("application/json", forHTTPHeaderField: "Content-Type") - request.httpBody = try? JSONSerialization.data(withJSONObject: [ - "model": trimmedModel, - "prompt": "", - "keep_alive": keepAlive, - "stream": false, - ]) isBusy = true - URLSession.shared.dataTask(with: request) { _, _, _ in - DispatchQueue.main.async { - self.isBusy = false - DispatchQueue.main.asyncAfter(deadline: .now() + 1) { - self.refreshInstalledModels() - self.checkStatus() + Task.detached { + let body = try? JSONSerialization.data(withJSONObject: [ + "model": trimmedModel, + "prompt": "", + "keep_alive": keepAlive, + "stream": false, + ]) + var request = request + request.httpBody = body + + URLSession.shared.dataTask(with: request) { _, _, _ in + DispatchQueue.main.async { + self.isBusy = false + DispatchQueue.main.asyncAfter(deadline: .now() + 1) { + self.refreshInstalledModels() + self.checkStatus() + } } - } - }.resume() + }.resume() + } } private func persistModelName(_ model: String, for role: LocalModelRole) { LocalModelSettingsStore.setModel(model, for: role) diff --git a/Soma/ViewModels/RusToPromptQueueManager+Part1.swift b/Soma/ViewModels/RusToPromptQueueManager+Part1.swift index 0f9a7fd..4579450 100644 --- a/Soma/ViewModels/RusToPromptQueueManager+Part1.swift +++ b/Soma/ViewModels/RusToPromptQueueManager+Part1.swift @@ -164,9 +164,9 @@ extension RusToPromptQueueManager { saveToDisk() } func refreshFreeMemory() { - DispatchQueue.global(qos: .utility).async { - let value = Self.readFreeMemoryGB() - DispatchQueue.main.async { + Task { + let value = await Self.readFreeMemoryGB() + await MainActor.run { self.freeMemoryGB = value } } diff --git a/Soma/ViewModels/RusToPromptQueueManager+Part2.swift b/Soma/ViewModels/RusToPromptQueueManager+Part2.swift index b783d48..eae064f 100644 --- a/Soma/ViewModels/RusToPromptQueueManager+Part2.swift +++ b/Soma/ViewModels/RusToPromptQueueManager+Part2.swift @@ -1,5 +1,6 @@ import Combine import Foundation + struct RusToPromptQueueRunContext { let item: RusToPromptQueueItem let runURL: URL @@ -7,6 +8,7 @@ struct RusToPromptQueueRunContext { let controlURL: URL let snapshot: RusToPromptQueueItemSnapshot } + extension RusToPromptQueueManager { func startNextIfPossible(allowBatteryStart: Bool = false) { refreshPowerSourceValue() @@ -42,6 +44,8 @@ extension RusToPromptQueueManager { self.startItem(at: currentIndex, installedModels: installed, allowBatteryStart: allowBatteryStart) } } + + func nextStartableQueueIndex() -> Int? { items.indices .filter { isStartableQueueItem(items[$0]) } @@ -54,9 +58,13 @@ extension RusToPromptQueueManager { return left.createdAt < right.createdAt } } + + func isStartableQueueItem(_ item: RusToPromptQueueItem) -> Bool { item.status == .queued || item.status == .waitingLocalAI } + + func startItem(at index: Int, installedModels: Set, allowBatteryStart: Bool = false) { guard activeProcess == nil, items.indices.contains(index) else { return } let installedLower = Set(installedModels.map { $0.lowercased() }) @@ -64,12 +72,14 @@ extension RusToPromptQueueManager { let improvers = localCandidates(settings.improverCandidates, installedLower: installedLower) guard validateQueueModels(index: index, translators: translators, improvers: improvers) else { return } guard let context = prepareRunContext(index: index, translators: translators, improvers: improvers) else { return } + markRunStarted(index: index, context: context) if powerSource == .battery && allowBatteryStart { batteryStartOverrideItemID = context.item.id } let process = makeQueueProcess(context: context, translators: translators, improvers: improvers) attachQueueHandlers(to: process) + do { try process.run() activeProcess = process @@ -82,9 +92,11 @@ extension RusToPromptQueueManager { mark(index: index, status: .failed, message: "Could not start queue run: \(error.localizedDescription)") } } + func localCandidates(_ candidates: [String], installedLower: Set) -> [String] { candidates.filter { installedLower.contains($0.lowercased()) && Self.isLocalStageModel($0) } } + func validateQueueModels(index: Int, translators: [String], improvers: [String]) -> Bool { guard !translators.isEmpty else { mark(index: index, status: .blocked, message: "No installed local translator candidates.") @@ -96,6 +108,7 @@ extension RusToPromptQueueManager { } return true } + func prepareRunContext(index: Int, translators: [String], improvers: [String]) -> RusToPromptQueueRunContext? { let item = items[index] let resumedURL = item.recoveredAfterRestart ? item.outputPath.flatMap { $0.isEmpty ? nil : URL(fileURLWithPath: $0) } : nil @@ -114,9 +127,11 @@ extension RusToPromptQueueManager { } return RusToPromptQueueRunContext(item: item, runURL: runURL, casesURL: casesURL, controlURL: controlURL, snapshot: queueSnapshot(translators: translators, improvers: improvers)) } + func queueSnapshot(translators: [String], improvers: [String]) -> RusToPromptQueueItemSnapshot { RusToPromptQueueItemSnapshot(translatorModels: translators, improverModels: improvers, confidenceReferee: settings.confidenceReferee, confidenceModel: settings.confidenceModel, localConfidenceModels: Array(settings.localConfidenceModels.prefix(2)), hybridGeminiModel: settings.hybridGeminiModel, hybridFallbackReferee: settings.hybridFallbackReferee ?? "gemini", confidenceBatchSize: settings.confidenceBatchSize, cooldownSeconds: settings.cooldownSeconds) } + func markRunStarted(index: Int, context: RusToPromptQueueRunContext) { items[index].status = .running items[index].statusMessage = "Running staged benchmark" @@ -136,6 +151,7 @@ extension RusToPromptQueueManager { processOutputBuffer = "" isRunning = true } + func makeQueueProcess(context: RusToPromptQueueRunContext, translators: [String], improvers: [String]) -> Process { let process = Process() process.currentDirectoryURL = repoRootURL @@ -149,6 +165,7 @@ extension RusToPromptQueueManager { process.environment = environment return process } + func queueArguments(context: RusToPromptQueueRunContext, translators: [String], improvers: [String]) -> [String] { var arguments = [stressScriptURL.path, "--benchmark-mode", "staged", "--cases-file", context.casesURL.path, "--limit", "1", "--translator-models"] arguments.append(contentsOf: translators) @@ -160,6 +177,7 @@ extension RusToPromptQueueManager { } return arguments } + func queueConfidenceArguments(context: RusToPromptQueueRunContext) -> [String] { let snapshot = context.snapshot let model = snapshot.confidenceReferee == "hybrid" ? snapshot.hybridGeminiModel : snapshot.confidenceModel @@ -171,6 +189,7 @@ extension RusToPromptQueueManager { } return arguments } + func attachQueueHandlers(to process: Process) { let pipe = Pipe() process.standardOutput = pipe @@ -185,102 +204,52 @@ extension RusToPromptQueueManager { DispatchQueue.main.async { self?.handleProcessFinished(status: finishedProcess.terminationStatus) } } } + + func handleProcessFinished(status: Int32) { - defer { - batteryStartOverrideItemID = nil - activeProcess = nil - activeItemID = nil - activeControlFileURL = nil - isRunning = false - currentStage = "Idle" - currentModel = "-" - startNextIfPossible() - } - guard let itemID = activeItemID, let index = items.firstIndex(where: { $0.id == itemID }) else { return } - if status == 0 { - let completionMessage = queueRunCompletionMessage(outputPath: items[index].outputPath) - items[index].status = .completed - items[index].statusMessage = completionMessage - completeModelProgress(itemID: itemID) - appendActivity("Queue run \(itemID): \(completionMessage).") - } else { - let stopped = controlFlagFromActiveFile("stop") - items[index].status = stopped ? .interrupted : .failed - items[index].statusMessage = stopped ? "Interrupted by user" : "Process exited with code \(status)" - markModelProgressTerminal(itemID: itemID, label: items[index].statusMessage, status: stopped ? "interrupted" : "failed") - appendActivity("Queue run \(itemID) ended: \(items[index].statusMessage).") - } - items[index].finishedAt = Date() - items[index].updatedAt = Date() - saveToDisk() - } - func queueRunCompletionMessage(outputPath: String?) -> String { - guard let outputPath, !outputPath.isEmpty else { return "Completed; summary missing" } - let summaryURL = URL(fileURLWithPath: outputPath).appendingPathComponent("summary.json") - guard let data = try? Data(contentsOf: summaryURL), - let object = try? JSONSerialization.jsonObject(with: data) as? [String: Any] else { - return "Completed; summary missing" - } - let runStatus = object["run_status"] as? String - let success = object["success"] as? Bool - if runStatus == "failed" { - return queueRunIssueMessage(prefix: "Completed with failed summary", summary: object) - } - if runStatus == "completed_with_issues" || success == false { - return queueRunIssueMessage(prefix: "Completed with issues", summary: object) - } - return "Completed" - } - func queueRunIssueMessage(prefix: String, summary: [String: Any]) -> String { - guard let issueCounts = summary["issue_counts"] as? [String: Any] else { return prefix } - let issues = issueCounts - .compactMap { key, value -> (String, Int)? in - let count: Int - if let intValue = value as? Int { - count = intValue - } else if let number = value as? NSNumber { - count = number.intValue - } else { - return nil - } - return count > 0 ? (key.replacingOccurrences(of: "_", with: " "), count) : nil - } - .sorted { lhs, rhs in - if lhs.0 == rhs.0 { return lhs.1 > rhs.1 } - return lhs.0 < rhs.0 + let itemID = activeItemID + let outputPath = activeItemID.flatMap { id in items.first(where: { $0.id == id })?.outputPath } + let controlURL = activeControlFileURL + + Task { + let completionMessage: String? + if status == 0 { + completionMessage = await queueRunCompletionMessage(outputPath: outputPath) + } else { + completionMessage = nil } - .prefix(3) - .map { "\($0.0) \($0.1)" } - return issues.isEmpty ? prefix : "\(prefix): \(issues.joined(separator: ", "))" - } - func consumeProcessOutput(_ text: String) { - processOutputBuffer += text - let parts = processOutputBuffer.components(separatedBy: .newlines) - guard parts.count > 1 else { return } - processOutputBuffer = parts.last ?? "" - for line in parts.dropLast() { - let trimmed = line.trimmingCharacters(in: .whitespacesAndNewlines) - guard !trimmed.isEmpty else { continue } - if let event = decodeProgressEvent(from: trimmed) { - currentStage = displayStage(for: event) - if let translator = event.translatorModel, let analyzer = event.analyzerModel { - currentModel = "\(translator) -> \(analyzer)" - } else if let translator = event.translatorModel { - currentModel = translator - } else if let confidenceModel = event.confidenceModel { - currentModel = confidenceModel + + let stopped = status != 0 ? await controlFlagFromActiveFileAsync("stop", controlURL: controlURL) : false + + await MainActor.run { + defer { + batteryStartOverrideItemID = nil + activeProcess = nil + activeItemID = nil + activeControlFileURL = nil + isRunning = false + currentStage = "Idle" + currentModel = "-" + startNextIfPossible() } - updateModelProgress(for: event) - appendActivity(activityText(for: event)) - } else { - appendActivity(trimmed) + guard let itemID = itemID, let index = items.firstIndex(where: { $0.id == itemID }) else { return } + + if status == 0 { + let msg = completionMessage ?? "Completed" + items[index].status = .completed + items[index].statusMessage = msg + completeModelProgress(itemID: itemID) + appendActivity("Queue run \(itemID): \(msg).") + } else { + items[index].status = stopped ? .interrupted : .failed + items[index].statusMessage = stopped ? "Interrupted by user" : "Process exited with code \(status)" + markModelProgressTerminal(itemID: itemID, label: items[index].statusMessage, status: stopped ? "interrupted" : "failed") + appendActivity("Queue run \(itemID) ended: \(items[index].statusMessage).") + } + items[index].finishedAt = Date() + items[index].updatedAt = Date() + saveToDisk() } } } - func decodeProgressEvent(from line: String) -> QueueProgressEvent? { - guard line.hasPrefix(progressPrefix) else { return nil } - let payload = String(line.dropFirst(progressPrefix.count)) - guard let data = payload.data(using: .utf8) else { return nil } - return try? JSONDecoder().decode(QueueProgressEvent.self, from: data) - } } diff --git a/Soma/ViewModels/RusToPromptQueueManager+Part3.swift b/Soma/ViewModels/RusToPromptQueueManager+Part3.swift index 3c3b74f..4ff8c20 100644 --- a/Soma/ViewModels/RusToPromptQueueManager+Part3.swift +++ b/Soma/ViewModels/RusToPromptQueueManager+Part3.swift @@ -1,54 +1,12 @@ import Combine import Foundation + extension RusToPromptQueueManager { - func activityText(for event: QueueProgressEvent) -> String { - let caseID = event.caseID ?? "case" - switch event.event { - case "stage_start": - return "\(caseID) \(displayStage(for: event)) started · \(currentModel)" - case "stage_complete": - return "\(caseID) \(displayStage(for: event)) finished · \(event.status ?? "unknown")" - case "translation_gate": - return "\(caseID) translation \(event.status ?? "checked") · \(event.reason ?? "")" - case "best_translation_selected": - let confidence = event.confidence.map { String(format: " · conf %.2f", $0) } ?? "" - return "\(caseID) selected \(event.translatorModel ?? "no translation")\(confidence)" - case "cooldown_start": - return "\(caseID) cooldown started · \(event.reason ?? "")" - case "cooldown_pause": - return "\(caseID) cooldown paused" - case "cooldown_complete": - return "\(caseID) cooldown finished" - case "result_write": - return "\(caseID) result saved" - case "resume_skip": - return "\(caseID) resumed; skipped completed \(event.translatorModel ?? "") \(event.analyzerModel ?? "")" - default: - return "\(caseID) \(displayStage(for: event)) · \(event.status ?? "")" - } - } - func displayStage(for event: QueueProgressEvent) -> String { - switch event.stage { - case "queued": return "Queued" - case "translating": return "Translating" - case "translation_confidence", "translation_confidence_batch": return "Translation Check" - case "translation_selection": return "Selecting Translation" - case "translation_rejected": return "Translation Rejected" - case "analyzing": return "Improving" - case "improve_confidence_batch": return "Improve Confidence" - case "overall_confidence_batch": return "Overall Confidence" - case "cooldown": return "Cooldown" - case "writing_result": return "Saving" - case "matrix_resume", "translation_resume", "improver_resume": return "Resuming" - case "done": return "Done" - case "failed": return "Failed" - default: - return (event.stage ?? "Working").replacingOccurrences(of: "_", with: " ").capitalized - } - } func modelProgressKey(itemID: String, role: String, model: String) -> String { "\(itemID)|\(role)|\(model)" } + + func resetModelProgress(itemID: String, snapshot: RusToPromptQueueItemSnapshot) { let prefix = "\(itemID)|" modelProgress = modelProgress.filter { !$0.key.hasPrefix(prefix) } @@ -80,9 +38,13 @@ extension RusToPromptQueueManager { ) } } + + func queueModelProgress(itemID: String, role: String, model: String) -> QueueModelProgressState? { modelProgress[modelProgressKey(itemID: itemID, role: role, model: model)] } + + func updateModelProgress(for event: QueueProgressEvent) { guard let itemID = activeItemID else { return } let targets = progressTargets(for: event) @@ -108,6 +70,8 @@ extension RusToPromptQueueManager { } } } + + func completeModelProgress(itemID: String) { let prefix = "\(itemID)|" let now = Date() @@ -120,6 +84,8 @@ extension RusToPromptQueueManager { modelProgress[key] = state } } + + func markModelProgressTerminal(itemID: String, label: String, status: String) { let prefix = "\(itemID)|" let now = Date() @@ -132,12 +98,16 @@ extension RusToPromptQueueManager { modelProgress[key] = state } } + + func progressTargets(for event: QueueProgressEvent) -> [(role: String, model: String)] { if let refs = event.confidenceModelRefs, !refs.isEmpty { return refs.flatMap { progressTargets(stage: event.stage, translator: $0.translatorModel, analyzer: $0.analyzerModel) } } return progressTargets(stage: event.stage, translator: event.translatorModel, analyzer: event.analyzerModel) } + + func progressTargets(stage: String?, translator: String?, analyzer: String?) -> [(role: String, model: String)] { let normalizedStage = stage ?? "" if normalizedStage == "analyzing" @@ -176,173 +146,57 @@ extension RusToPromptQueueManager { } return [] } - func progressLabel(for event: QueueProgressEvent, role: String) -> String { - let title: String + + + func activityText(for event: QueueProgressEvent) -> String { + let caseID = event.caseID ?? "case" switch event.event { case "stage_start": - title = event.stage == "analyzing" ? "Improving" : displayStage(for: event) + return "\(caseID) \(displayStage(for: event)) started · \(currentModel)" case "stage_complete": - title = event.stage == "analyzing" ? "Improved" : completedStageLabel(for: event) - case "confidence_batch_start", "confidence_batch_complete": - if let index = event.confidenceJudgeIndex, let total = event.confidenceJudgeTotal, total > 0 { - title = "Local judge \(index)/\(total)" - break - } - if event.confidenceModel == "hybrid" { - title = "Aggregated" - break - } - title = displayStage(for: event) + return "\(caseID) \(displayStage(for: event)) finished · \(event.status ?? "unknown")" case "translation_gate": - title = event.status == "accepted" ? "Gate accepted" : "Gate review" + return "\(caseID) translation \(event.status ?? "checked") · \(event.reason ?? "")" case "best_translation_selected": - title = "Selected" + let confidence = event.confidence.map { String(format: " · conf %.2f", $0) } ?? "" + return "\(caseID) selected \(event.translatorModel ?? "no translation")\(confidence)" + case "cooldown_start": + return "\(caseID) cooldown started · \(event.reason ?? "")" + case "cooldown_pause": + return "\(caseID) cooldown paused" + case "cooldown_complete": + return "\(caseID) cooldown finished" case "result_write": - title = event.status == "translation_only" ? "Checkpoint" : "Saved" - case "result_update": - title = "Saved" + return "\(caseID) result saved" case "resume_skip": - title = "Resumed" - case "cooldown_start", "cooldown_pause", "cooldown_complete": - title = "Cooldown" + return "\(caseID) resumed; skipped completed \(event.translatorModel ?? "") \(event.analyzerModel ?? "")" default: - title = displayStage(for: event) - } - if let step = progressStep(for: event, role: role) { - return "\(step.index)/\(step.total) · \(title)" + return "\(caseID) \(displayStage(for: event)) · \(event.status ?? "")" } - return title } - func completedStageLabel(for event: QueueProgressEvent) -> String { + + + func displayStage(for event: QueueProgressEvent) -> String { switch event.stage { - case "translating": return "Translated" - case "translation_confidence", "translation_confidence_batch": return "Checked" - default: return displayStage(for: event) - } - } - func progressStatus(for event: QueueProgressEvent) -> String { - if event.status == "failed" || event.stage == "failed" { - return "failed" - } - if event.status == "rejected" || event.stage == "translation_rejected" { - return "rejected" - } - if event.event == "cooldown_start" || event.event == "cooldown_pause" { - return "cooldown" - } - if event.event == "cooldown_complete" { - return "waiting" - } - if event.event == "stage_complete" || event.event == "confidence_batch_complete" { - return "done" - } - if event.event == "confidence_batch_start", event.status == "cached" { - return "done" - } - if event.event == "result_update" { - return "completed" - } - if event.event == "result_write", event.status == "translation_only" { - return activeSnapshot()?.confidenceReferee == "off" ? "completed" : "waiting" - } - if event.event == "result_write" { - return "completed" - } - if event.event == "resume_skip" { - return "completed" - } - if event.event == "translation_gate", event.status == "accepted" { - return "done" - } - if event.event == "best_translation_selected" { - return "done" - } - return "running" - } - func progressStep(for event: QueueProgressEvent, role: String) -> (index: Int, total: Int)? { - let total = stageTotal(for: role, snapshot: activeSnapshot(), event: event) - guard total > 0 else { return nil } - let stage = event.stage ?? "" - if role == "Translate" { - if stage == "translating" || event.event?.hasPrefix("cooldown_") == true { - return (1, total) - } - if stage == "translation_confidence_batch" { - if let judge = event.confidenceJudgeIndex, event.confidenceJudgeTotal != nil { - return (min(total, 1 + judge), total) - } - return (min(total, 2), total) - } - if stage == "translation_confidence" { - return (max(1, total - 1), total) - } - if stage == "writing_result" { - return (event.event == "result_write" && event.status == "translation_only" ? 1 : total, total) - } - if stage == "translation_selection" { - return (total, total) - } - } else { - if stage == "analyzing" || event.event?.hasPrefix("cooldown_") == true { - return (1, total) - } - if stage == "improve_confidence_batch" { - if let judge = event.confidenceJudgeIndex, event.confidenceJudgeTotal != nil { - return (min(total, 1 + judge), total) - } - return (min(total, 2), total) - } - if stage == "overall_confidence_batch" { - if let judge = event.confidenceJudgeIndex, let judgeTotal = event.confidenceJudgeTotal { - return (min(total, 1 + judgeTotal + judge), total) - } - return (min(total, 3), total) - } - if stage == "writing_result" { - return (total, total) - } - } - return nil - } - func stageTotal(for role: String, snapshot: RusToPromptQueueItemSnapshot?, event: QueueProgressEvent? = nil) -> Int { - let referee = snapshot?.confidenceReferee ?? settings.confidenceReferee - if referee == "off" { - return 2 - } - let eventJudgeTotal = event?.confidenceJudgeTotal - let snapshotJudgeTotal = snapshot?.localConfidenceModels.count ?? settings.localConfidenceModels.count - let judgeTotal = max(1, min(2, eventJudgeTotal ?? snapshotJudgeTotal)) - if referee == "hybrid" { - return role == "Translate" ? 3 + judgeTotal : 2 + (2 * judgeTotal) - } - return 4 - } - func activeSnapshot() -> RusToPromptQueueItemSnapshot? { - guard let activeItemID else { return nil } - return items.first(where: { $0.id == activeItemID })?.snapshot - } - func progressDetail(for event: QueueProgressEvent) -> String { - var parts = [displayStage(for: event)] - if let confidenceModel = event.confidenceModel { - parts.append(confidenceModel) - } - if let index = event.confidenceJudgeIndex, let total = event.confidenceJudgeTotal { - parts.append("judge \(index) of \(total)") - } - if let batchIndex = event.batchIndex, let batchTotal = event.batchTotal { - parts.append("batch \(batchIndex) of \(batchTotal)") - } - if let confidence = event.confidence { - parts.append(String(format: "confidence %.2f", confidence)) - } - if let status = event.status { - parts.append(status) - } - if let reason = event.reason, !reason.isEmpty { - parts.append(reason) + case "queued": return "Queued" + case "translating": return "Translating" + case "translation_confidence", "translation_confidence_batch": return "Translation Check" + case "translation_selection": return "Selecting Translation" + case "translation_rejected": return "Translation Rejected" + case "analyzing": return "Improving" + case "improve_confidence_batch": return "Improve Confidence" + case "overall_confidence_batch": return "Overall Confidence" + case "cooldown": return "Cooldown" + case "writing_result": return "Saving" + case "matrix_resume", "translation_resume", "improver_resume": return "Resuming" + case "done": return "Done" + case "failed": return "Failed" + default: + return (event.stage ?? "Working").replacingOccurrences(of: "_", with: " ").capitalized } - return parts.joined(separator: " · ") } + + func mark(index: Int, status: RusToPromptQueueItemStatus, message: String) { guard items.indices.contains(index) else { return } items[index].status = status @@ -353,156 +207,4 @@ extension RusToPromptQueueManager { } saveToDisk() } - func loadFromDisk() { - do { - try FileManager.default.createDirectory(at: appSupportURL, withIntermediateDirectories: true) - guard FileManager.default.fileExists(atPath: queueFileURL.path) else { return } - let data = try Data(contentsOf: queueFileURL) - let decoded = try JSONDecoder().decode(RusToPromptQueueDiskState.self, from: data) - settings = decoded.settings - items = decoded.items - isPaused = decoded.isPaused ?? false - isPowerPaused = decoded.isPowerPaused ?? false - if isPowerPaused { - isPaused = true - } - } catch { - appendActivity("Queue state could not be loaded: \(error.localizedDescription)") - } - } - func saveToDisk() { - do { - try FileManager.default.createDirectory(at: appSupportURL, withIntermediateDirectories: true) - let state = RusToPromptQueueDiskState(settings: settings, items: items, isPaused: isPaused, isPowerPaused: isPowerPaused) - let encoder = JSONEncoder() - encoder.outputFormatting = [.prettyPrinted, .sortedKeys] - let data = try encoder.encode(state) - try data.write(to: queueFileURL, options: [.atomic]) - } catch { - appendActivity("Queue state could not be saved: \(error.localizedDescription)") - } - } - func recoverRunningItems() { - var changed = false - for index in items.indices where items[index].status == .running { - items[index].status = .queued - items[index].statusMessage = isPowerPaused ? "Paused on battery; connect power to continue" : (isPaused ? "Paused after restart; resume to continue" : "Recovered after restart") - items[index].recoveredAfterRestart = true - items[index].updatedAt = Date() - changed = true - } - if changed { - appendActivity("Recovered running queue items after app restart.") - } - } - func startTimer() { - timer = Timer.scheduledTimer(withTimeInterval: 5, repeats: true) { [weak self] _ in - DispatchQueue.main.async { [weak self] in - self?.refreshFreeMemory() - self?.refreshPowerSource() - self?.startNextIfPossible() - } - } - refreshFreeMemory() - refreshPowerSource() - } - func appendActivity(_ line: String) { - let timestamp = Self.activityFormatter.string(from: Date()) - recentActivity.insert("\(timestamp) \(line)", at: 0) - if recentActivity.count > 80 { - recentActivity.removeLast(recentActivity.count - 80) - } - } - func writeControl(_ payload: [String: Bool]) { - guard let activeControlFileURL else { return } - do { - let data = try JSONSerialization.data(withJSONObject: payload, options: [.sortedKeys]) - try data.write(to: activeControlFileURL, options: [.atomic]) - } catch { - appendActivity("Could not write control file: \(error.localizedDescription)") - } - } - func controlFlagFromActiveFile(_ key: String) -> Bool { - guard let activeControlFileURL, - let data = try? Data(contentsOf: activeControlFileURL), - let decoded = try? JSONSerialization.jsonObject(with: data) as? [String: Any] else { - return false - } - return (decoded[key] as? Bool) == true - } - func fetchInstalledModels(completion: @escaping (Set, Bool) -> Void) { - guard let url = URL(string: "http://127.0.0.1:11434/api/tags") else { - completion([], false) - return - } - var request = URLRequest(url: url) - request.timeoutInterval = 3 - URLSession.shared.dataTask(with: request) { data, _, error in - DispatchQueue.main.async { - guard error == nil, let data else { - completion([], false) - return - } - let decoded = try? JSONDecoder().decode(QueueOllamaTagsResponse.self, from: data) - completion(Set(decoded?.models.map(\.name) ?? []), decoded != nil) - } - }.resume() - } - func cleanLocalModels(_ models: [String]) -> [String] { - var seen = Set() - var cleaned: [String] = [] - for model in models { - let trimmed = model.trimmingCharacters(in: .whitespacesAndNewlines) - let key = trimmed.lowercased() - guard !trimmed.isEmpty, Self.isLocalStageModel(trimmed), !seen.contains(key) else { continue } - cleaned.append(trimmed) - seen.insert(key) - } - return cleaned - } - func normalizePrompt(_ prompt: String) -> String { - prompt - .lowercased() - .components(separatedBy: .whitespacesAndNewlines) - .filter { !$0.isEmpty } - .joined(separator: " ") - } - var stressScriptURL: URL { - repoRootURL.appendingPathComponent("Scripts").appendingPathComponent("rus_to_prompt_stress.py") - } - func pythonPath() -> String { - if FileManager.default.fileExists(atPath: "/opt/homebrew/bin/python3") { - return "/opt/homebrew/bin/python3" - } - return "/usr/bin/python3" - } - nonisolated static func codexExecutablePath() -> String { - ["/opt/homebrew/bin/codex", "/usr/local/bin/codex", "/usr/bin/codex"].first { - FileManager.default.fileExists(atPath: $0) - } ?? "codex" - } - nonisolated static func geminiExecutablePath() -> String { - ["/opt/homebrew/bin/gemini", "/usr/local/bin/gemini", "/usr/bin/gemini"].first { - FileManager.default.fileExists(atPath: $0) - } ?? "gemini" - } - nonisolated static func searchPath(existing: String?) -> String { - var parts = ["/opt/homebrew/bin", "/usr/local/bin", "/usr/bin", "/bin", "/usr/sbin", "/sbin"] - let homeLocal = FileManager.default.homeDirectoryForCurrentUser.appendingPathComponent(".local/bin").path - parts.append(homeLocal) - if let existing, !existing.isEmpty { - parts.append(existing) - } - return parts.joined(separator: ":") - } - nonisolated static func timestampID() -> String { - let formatter = DateFormatter() - formatter.dateFormat = "yyyyMMdd-HHmmss" - return formatter.string(from: Date()) - } - nonisolated static let activityFormatter: DateFormatter = { - let formatter = DateFormatter() - formatter.dateFormat = "HH:mm:ss" - return formatter - }() } diff --git a/Soma/ViewModels/RusToPromptQueueManager+Part4.swift b/Soma/ViewModels/RusToPromptQueueManager+Part4.swift index 8094148..1948a4f 100644 --- a/Soma/ViewModels/RusToPromptQueueManager+Part4.swift +++ b/Soma/ViewModels/RusToPromptQueueManager+Part4.swift @@ -1,33 +1,41 @@ import Combine import Foundation extension RusToPromptQueueManager { - nonisolated static func readFreeMemoryGB() -> Double? { - let process = Process() - process.executableURL = URL(fileURLWithPath: "/usr/bin/vm_stat") - let pipe = Pipe() - process.standardOutput = pipe - do { - try process.run() - let data = pipe.fileHandleForReading.readDataToEndOfFile() - process.waitUntilExit() - guard let text = String(data: data, encoding: .utf8) else { return nil } - let pageSize = 16_384.0 - let keys = ["Pages free", "Pages inactive", "Pages speculative"] - var pages = 0.0 - for line in text.components(separatedBy: .newlines) { - for key in keys where line.hasPrefix(key) { - let digits = line - .replacingOccurrences(of: ".", with: "") - .components(separatedBy: CharacterSet.decimalDigits.inverted) - .filter { !$0.isEmpty } - if let value = digits.first.flatMap(Double.init) { - pages += value + nonisolated static func readFreeMemoryGB() async -> Double? { + await withCheckedContinuation { continuation in + DispatchQueue.global(qos: .userInitiated).async { + let process = Process() + process.executableURL = URL(fileURLWithPath: "/usr/bin/vm_stat") + let pipe = Pipe() + process.standardOutput = pipe + do { + try process.run() + let data = pipe.fileHandleForReading.readDataToEndOfFile() + process.waitUntilExit() + guard let text = String(data: data, encoding: .utf8) else { + continuation.resume(returning: nil) + return } + let pageSize = 16_384.0 + let keys = ["Pages free", "Pages inactive", "Pages speculative"] + var pages = 0.0 + for line in text.components(separatedBy: .newlines) { + for key in keys where line.hasPrefix(key) { + let digits = line + .replacingOccurrences(of: ".", with: "") + .components(separatedBy: CharacterSet.decimalDigits.inverted) + .filter { !$0.isEmpty } + if let value = digits.first.flatMap(Double.init) { + pages += value + } + } + } + let result = pages > 0 ? (pages * pageSize / 1_073_741_824.0) : nil + continuation.resume(returning: result) + } catch { + continuation.resume(returning: nil) } } - return pages > 0 ? (pages * pageSize / 1_073_741_824.0) : nil - } catch { - return nil } } } diff --git a/Soma/ViewModels/RusToPromptQueueManager+Part5.swift b/Soma/ViewModels/RusToPromptQueueManager+Part5.swift new file mode 100644 index 0000000..0ec0fe1 --- /dev/null +++ b/Soma/ViewModels/RusToPromptQueueManager+Part5.swift @@ -0,0 +1,184 @@ +import Combine +import Foundation + +extension RusToPromptQueueManager { + func progressLabel(for event: QueueProgressEvent, role: String) -> String { + let title: String + switch event.event { + case "stage_start": + title = event.stage == "analyzing" ? "Improving" : displayStage(for: event) + case "stage_complete": + title = event.stage == "analyzing" ? "Improved" : completedStageLabel(for: event) + case "confidence_batch_start", "confidence_batch_complete": + if let index = event.confidenceJudgeIndex, let total = event.confidenceJudgeTotal, total > 0 { + title = "Local judge \(index)/\(total)" + break + } + if event.confidenceModel == "hybrid" { + title = "Aggregated" + break + } + title = displayStage(for: event) + case "translation_gate": + title = event.status == "accepted" ? "Gate accepted" : "Gate review" + case "best_translation_selected": + title = "Selected" + case "result_write": + title = event.status == "translation_only" ? "Checkpoint" : "Saved" + case "result_update": + title = "Saved" + case "resume_skip": + title = "Resumed" + case "cooldown_start", "cooldown_pause", "cooldown_complete": + title = "Cooldown" + default: + title = displayStage(for: event) + } + if let step = progressStep(for: event, role: role) { + return "\(step.index)/\(step.total) · \(title)" + } + return title + } + + + func completedStageLabel(for event: QueueProgressEvent) -> String { + switch event.stage { + case "translating": return "Translated" + case "translation_confidence", "translation_confidence_batch": return "Checked" + default: return displayStage(for: event) + } + } + + + func progressStatus(for event: QueueProgressEvent) -> String { + if event.status == "failed" || event.stage == "failed" { + return "failed" + } + if event.status == "rejected" || event.stage == "translation_rejected" { + return "rejected" + } + if event.event == "cooldown_start" || event.event == "cooldown_pause" { + return "cooldown" + } + if event.event == "cooldown_complete" { + return "waiting" + } + if event.event == "stage_complete" || event.event == "confidence_batch_complete" { + return "done" + } + if event.event == "confidence_batch_start", event.status == "cached" { + return "done" + } + if event.event == "result_update" { + return "completed" + } + if event.event == "result_write", event.status == "translation_only" { + return activeSnapshot()?.confidenceReferee == "off" ? "completed" : "waiting" + } + if event.event == "result_write" { + return "completed" + } + if event.event == "resume_skip" { + return "completed" + } + if event.event == "translation_gate", event.status == "accepted" { + return "done" + } + if event.event == "best_translation_selected" { + return "done" + } + return "running" + } + + + func progressStep(for event: QueueProgressEvent, role: String) -> (index: Int, total: Int)? { + let total = stageTotal(for: role, snapshot: activeSnapshot(), event: event) + guard total > 0 else { return nil } + let stage = event.stage ?? "" + if role == "Translate" { + if stage == "translating" || event.event?.hasPrefix("cooldown_") == true { + return (1, total) + } + if stage == "translation_confidence_batch" { + if let judge = event.confidenceJudgeIndex, event.confidenceJudgeTotal != nil { + return (min(total, 1 + judge), total) + } + return (min(total, 2), total) + } + if stage == "translation_confidence" { + return (max(1, total - 1), total) + } + if stage == "writing_result" { + return (event.event == "result_write" && event.status == "translation_only" ? 1 : total, total) + } + if stage == "translation_selection" { + return (total, total) + } + } else { + if stage == "analyzing" || event.event?.hasPrefix("cooldown_") == true { + return (1, total) + } + if stage == "improve_confidence_batch" { + if let judge = event.confidenceJudgeIndex, event.confidenceJudgeTotal != nil { + return (min(total, 1 + judge), total) + } + return (min(total, 2), total) + } + if stage == "overall_confidence_batch" { + if let judge = event.confidenceJudgeIndex, let judgeTotal = event.confidenceJudgeTotal { + return (min(total, 1 + judgeTotal + judge), total) + } + return (min(total, 3), total) + } + if stage == "writing_result" { + return (total, total) + } + } + return nil + } + + + func stageTotal(for role: String, snapshot: RusToPromptQueueItemSnapshot?, event: QueueProgressEvent? = nil) -> Int { + let referee = snapshot?.confidenceReferee ?? settings.confidenceReferee + if referee == "off" { + return 2 + } + let eventJudgeTotal = event?.confidenceJudgeTotal + let snapshotJudgeTotal = snapshot?.localConfidenceModels.count ?? settings.localConfidenceModels.count + let judgeTotal = max(1, min(2, eventJudgeTotal ?? snapshotJudgeTotal)) + if referee == "hybrid" { + return role == "Translate" ? 3 + judgeTotal : 2 + (2 * judgeTotal) + } + return 4 + } + + + func activeSnapshot() -> RusToPromptQueueItemSnapshot? { + guard let activeItemID else { return nil } + return items.first(where: { $0.id == activeItemID })?.snapshot + } + + + func progressDetail(for event: QueueProgressEvent) -> String { + var parts = [displayStage(for: event)] + if let confidenceModel = event.confidenceModel { + parts.append(confidenceModel) + } + if let index = event.confidenceJudgeIndex, let total = event.confidenceJudgeTotal { + parts.append("judge \(index) of \(total)") + } + if let batchIndex = event.batchIndex, let batchTotal = event.batchTotal { + parts.append("batch \(batchIndex) of \(batchTotal)") + } + if let confidence = event.confidence { + parts.append(String(format: "confidence %.2f", confidence)) + } + if let status = event.status { + parts.append(status) + } + if let reason = event.reason, !reason.isEmpty { + parts.append(reason) + } + return parts.joined(separator: " · ") + } +} diff --git a/Soma/ViewModels/RusToPromptQueueManager+Part6.swift b/Soma/ViewModels/RusToPromptQueueManager+Part6.swift new file mode 100644 index 0000000..feaf627 --- /dev/null +++ b/Soma/ViewModels/RusToPromptQueueManager+Part6.swift @@ -0,0 +1,163 @@ +import Combine +import Foundation + +extension RusToPromptQueueManager { + func loadFromDisk() { + do { + try FileManager.default.createDirectory(at: appSupportURL, withIntermediateDirectories: true) + guard FileManager.default.fileExists(atPath: queueFileURL.path) else { return } + let data = try Data(contentsOf: queueFileURL) + let decoded = try JSONDecoder().decode(RusToPromptQueueDiskState.self, from: data) + settings = decoded.settings + items = decoded.items + isPaused = decoded.isPaused ?? false + isPowerPaused = decoded.isPowerPaused ?? false + if isPowerPaused { + isPaused = true + } + } catch { + appendActivity("Queue state could not be loaded: \(error.localizedDescription)") + } + } + + + func saveToDisk() { + let state = RusToPromptQueueDiskState(settings: settings, items: items, isPaused: isPaused, isPowerPaused: isPowerPaused) + let queueFileURL = self.queueFileURL + let appSupportURL = self.appSupportURL + + Task.detached { + do { + try FileManager.default.createDirectory(at: appSupportURL, withIntermediateDirectories: true) + let encoder = JSONEncoder() + encoder.outputFormatting = [.prettyPrinted, .sortedKeys] + let data = try encoder.encode(state) + try data.write(to: queueFileURL, options: [.atomic]) + } catch { + await MainActor.run { + self.appendActivity("Queue state could not be saved: \(error.localizedDescription)") + } + } + } + } + + + func recoverRunningItems() { + var changed = false + for index in items.indices where items[index].status == .running { + items[index].status = .queued + items[index].statusMessage = isPowerPaused ? "Paused on battery; connect power to continue" : (isPaused ? "Paused after restart; resume to continue" : "Recovered after restart") + items[index].recoveredAfterRestart = true + items[index].updatedAt = Date() + changed = true + } + if changed { + appendActivity("Recovered running queue items after app restart.") + } + } + + + func startTimer() { + timer = Timer.scheduledTimer(withTimeInterval: 5, repeats: true) { [weak self] _ in + DispatchQueue.main.async { [weak self] in + self?.refreshFreeMemory() + self?.refreshPowerSource() + self?.startNextIfPossible() + } + } + refreshFreeMemory() + refreshPowerSource() + } + + + func appendActivity(_ line: String) { + let timestamp = Self.activityFormatter.string(from: Date()) + recentActivity.insert("\(timestamp) \(line)", at: 0) + if recentActivity.count > 80 { + recentActivity.removeLast(recentActivity.count - 80) + } + } + + + func writeControl(_ payload: [String: Bool]) { + guard let activeControlFileURL else { return } + Task.detached { + do { + let data = try JSONSerialization.data(withJSONObject: payload, options: [.sortedKeys]) + try data.write(to: activeControlFileURL, options: [.atomic]) + } catch { + await MainActor.run { + self.appendActivity("Could not write control file: \(error.localizedDescription)") + } + } + } + } + + + func controlFlagFromActiveFile(_ key: String) -> Bool { + guard let activeControlFileURL, + let data = try? Data(contentsOf: activeControlFileURL), + let decoded = try? JSONSerialization.jsonObject(with: data) as? [String: Any] else { + return false + } + return (decoded[key] as? Bool) == true + } + + nonisolated func controlFlagFromActiveFileAsync(_ key: String, controlURL: URL?) async -> Bool { + guard let url = controlURL else { return false } + return await Task.detached { + guard let data = try? Data(contentsOf: url), + let decoded = try? JSONSerialization.jsonObject(with: data) as? [String: Any] else { + return false + } + return (decoded[key] as? Bool) == true + }.value + } + + + func fetchInstalledModels(completion: @escaping (Set, Bool) -> Void) { + guard let url = URL(string: "http://127.0.0.1:11434/api/tags") else { + completion([], false) + return + } + var request = URLRequest(url: url) + request.timeoutInterval = 3 + URLSession.shared.dataTask(with: request) { data, _, error in + Task.detached { + guard error == nil, let data else { + await MainActor.run { completion([], false) } + return + } + let decoded = try? JSONDecoder().decode(QueueOllamaTagsResponse.self, from: data) + let models = Set(decoded?.models.map(\.name) ?? []) + let hasDecoded = decoded != nil + await MainActor.run { + completion(models, hasDecoded) + } + } + }.resume() + } + + + func cleanLocalModels(_ models: [String]) -> [String] { + var seen = Set() + var cleaned: [String] = [] + for model in models { + let trimmed = model.trimmingCharacters(in: .whitespacesAndNewlines) + let key = trimmed.lowercased() + guard !trimmed.isEmpty, Self.isLocalStageModel(trimmed), !seen.contains(key) else { continue } + cleaned.append(trimmed) + seen.insert(key) + } + return cleaned + } + + + func normalizePrompt(_ prompt: String) -> String { + prompt + .lowercased() + .components(separatedBy: .whitespacesAndNewlines) + .filter { !$0.isEmpty } + .joined(separator: " ") + } +} diff --git a/Soma/ViewModels/RusToPromptQueueManager+Part7.swift b/Soma/ViewModels/RusToPromptQueueManager+Part7.swift new file mode 100644 index 0000000..1825e79 --- /dev/null +++ b/Soma/ViewModels/RusToPromptQueueManager+Part7.swift @@ -0,0 +1,55 @@ +import Combine +import Foundation + +extension RusToPromptQueueManager { + var stressScriptURL: URL { + repoRootURL.appendingPathComponent("Scripts").appendingPathComponent("rus_to_prompt_stress.py") + } + + + func pythonPath() -> String { + if FileManager.default.fileExists(atPath: "/opt/homebrew/bin/python3") { + return "/opt/homebrew/bin/python3" + } + return "/usr/bin/python3" + } + + + nonisolated static func codexExecutablePath() -> String { + ["/opt/homebrew/bin/codex", "/usr/local/bin/codex", "/usr/bin/codex"].first { + FileManager.default.fileExists(atPath: $0) + } ?? "codex" + } + + + nonisolated static func geminiExecutablePath() -> String { + ["/opt/homebrew/bin/gemini", "/usr/local/bin/gemini", "/usr/bin/gemini"].first { + FileManager.default.fileExists(atPath: $0) + } ?? "gemini" + } + + + nonisolated static func searchPath(existing: String?) -> String { + var parts = ["/opt/homebrew/bin", "/usr/local/bin", "/usr/bin", "/bin", "/usr/sbin", "/sbin"] + let homeLocal = FileManager.default.homeDirectoryForCurrentUser.appendingPathComponent(".local/bin").path + parts.append(homeLocal) + if let existing, !existing.isEmpty { + parts.append(existing) + } + return parts.joined(separator: ":") + } + + + nonisolated static func timestampID() -> String { + let formatter = DateFormatter() + formatter.dateFormat = "yyyyMMdd-HHmmss" + return formatter.string(from: Date()) + } + + + nonisolated static let activityFormatter: DateFormatter = { + let formatter = DateFormatter() + formatter.dateFormat = "HH:mm:ss" + return formatter + }() +} diff --git a/Soma/ViewModels/RusToPromptQueueManager+Part8.swift b/Soma/ViewModels/RusToPromptQueueManager+Part8.swift new file mode 100644 index 0000000..42f7438 --- /dev/null +++ b/Soma/ViewModels/RusToPromptQueueManager+Part8.swift @@ -0,0 +1,107 @@ +import Combine +import Foundation + +extension RusToPromptQueueManager { + nonisolated func queueRunCompletionMessage(outputPath: String?) async -> String { + guard let outputPath, !outputPath.isEmpty else { return "Completed; summary missing" } + let summaryURL = URL(fileURLWithPath: outputPath).appendingPathComponent("summary.json") + return await Task.detached { + guard let data = try? Data(contentsOf: summaryURL), + let object = try? JSONSerialization.jsonObject(with: data) as? [String: Any] else { + return "Completed; summary missing" + } + let runStatus = object["run_status"] as? String + let success = object["success"] as? Bool + if runStatus == "failed" { + return self.queueRunIssueMessage(prefix: "Completed with failed summary", summary: object) + } + if runStatus == "completed_with_issues" || success == false { + return self.queueRunIssueMessage(prefix: "Completed with issues", summary: object) + } + return "Completed" + }.value + } + + + func queueRunIssueMessage(prefix: String, summary: [String: Any]) -> String { + guard let issueCounts = summary["issue_counts"] as? [String: Any] else { return prefix } + let issues = issueCounts + .compactMap { key, value -> (String, Int)? in + let count: Int + if let intValue = value as? Int { + count = intValue + } else if let number = value as? NSNumber { + count = number.intValue + } else { + return nil + } + return count > 0 ? (key.replacingOccurrences(of: "_", with: " "), count) : nil + } + .sorted { lhs, rhs in + if lhs.0 == rhs.0 { return lhs.1 > rhs.1 } + return lhs.0 < rhs.0 + } + .prefix(3) + .map { "\($0.0) \($0.1)" } + return issues.isEmpty ? prefix : "\(prefix): \(issues.joined(separator: ", "))" + } + + + func consumeProcessOutput(_ text: String) { + processOutputBuffer += text + let parts = processOutputBuffer.components(separatedBy: .newlines) + guard parts.count > 1 else { return } + processOutputBuffer = parts.last ?? "" + + let linesToProcess = Array(parts.dropLast()) + let prefix = progressPrefix + + Task.detached { + var events: [(QueueProgressEvent?, String)] = [] + let decoder = JSONDecoder() + for line in linesToProcess { + let trimmed = line.trimmingCharacters(in: .whitespacesAndNewlines) + guard !trimmed.isEmpty else { continue } + + if trimmed.hasPrefix(prefix) { + let payload = String(trimmed.dropFirst(prefix.count)) + if let data = payload.data(using: .utf8), + let event = try? decoder.decode(QueueProgressEvent.self, from: data) { + events.append((event, trimmed)) + } else { + events.append((nil, trimmed)) + } + } else { + events.append((nil, trimmed)) + } + } + + await MainActor.run { + for (eventOpt, trimmed) in events { + if let event = eventOpt { + self.currentStage = self.displayStage(for: event) + if let translator = event.translatorModel, let analyzer = event.analyzerModel { + self.currentModel = "\(translator) -> \(analyzer)" + } else if let translator = event.translatorModel { + self.currentModel = translator + } else if let confidenceModel = event.confidenceModel { + self.currentModel = confidenceModel + } + self.updateModelProgress(for: event) + self.appendActivity(self.activityText(for: event)) + } else { + self.appendActivity(trimmed) + } + } + } + } + } + + + nonisolated func decodeProgressEvent(from line: String) -> QueueProgressEvent? { + guard line.hasPrefix(progressPrefix) else { return nil } + let payload = String(line.dropFirst(progressPrefix.count)) + guard let data = payload.data(using: .utf8) else { return nil } + return try? JSONDecoder().decode(QueueProgressEvent.self, from: data) + } +}