Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 53 additions & 29 deletions Soma/ViewModels/OllamaManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -84,25 +84,39 @@ final class OllamaManager: ObservableObject {
guard let url = URL(string: "http://127.0.0.1:11434/api/tags") else { return }
var request = URLRequest(url: url)
request.timeoutInterval = 3
URLSession.shared.dataTask(with: request) { data, _, error in
DispatchQueue.main.async {
if let error {
self.tagsError = error.localizedDescription
self.installedModels = []
URLSession.shared.dataTask(with: request) { [weak self] data, _, error in
Task.detached {
let errorMsg = error?.localizedDescription

if let errorMsg {
await MainActor.run {
self?.tagsError = errorMsg
self?.installedModels = []
}
return
}

guard let data else {
self.tagsError = "Ollama returned no model list."
self.installedModels = []
await MainActor.run {
self?.tagsError = "Ollama returned no model list."
self?.installedModels = []
}
return
}

do {
let decoded = try JSONDecoder().decode(OllamaTagsResponse.self, from: data)
self.installedModels = decoded.models.sorted { $0.name.localizedStandardCompare($1.name) == .orderedAscending }
self.tagsError = nil
let sorted = decoded.models.sorted { $0.name.localizedStandardCompare($1.name) == .orderedAscending }
await MainActor.run {
self?.installedModels = sorted
self?.tagsError = nil
}
} catch {
self.tagsError = error.localizedDescription
self.installedModels = []
let decodingError = error.localizedDescription
await MainActor.run {
self?.tagsError = decodingError
self?.installedModels = []
}
}
}
}.resume()
Expand All @@ -111,10 +125,12 @@ final class OllamaManager: ObservableObject {
guard let url = URL(string: "http://127.0.0.1:11434/api/ps") else { return }
var request = URLRequest(url: url)
request.timeoutInterval = 2
URLSession.shared.dataTask(with: request) { data, _, error in
DispatchQueue.main.async {
URLSession.shared.dataTask(with: request) { [weak self] data, _, error in
Task.detached {
if error != nil {
self.updateStatus(isRunning: false, loadedModels: [])
await MainActor.run {
self?.updateStatus(isRunning: false, loadedModels: [])
}
return
}
var loaded: Set<String> = []
Expand All @@ -125,7 +141,10 @@ final class OllamaManager: ObservableObject {
{
loaded = Set(models.compactMap { ($0["name"] as? String)?.lowercased() })
}
self.updateStatus(isRunning: true, loadedModels: loaded)

await MainActor.run {
self?.updateStatus(isRunning: true, loadedModels: loaded)
}
}
}.resume()
}
Expand Down Expand Up @@ -171,22 +190,27 @@ final class OllamaManager: ObservableObject {
var request = URLRequest(url: url)
request.httpMethod = "POST"
request.setValue("application/json", forHTTPHeaderField: "Content-Type")
request.httpBody = try? JSONSerialization.data(withJSONObject: [
"model": trimmedModel,
"prompt": "",
"keep_alive": keepAlive,
"stream": false,
])
isBusy = true
URLSession.shared.dataTask(with: request) { _, _, _ in
DispatchQueue.main.async {
self.isBusy = false
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
self.refreshInstalledModels()
self.checkStatus()
Task.detached {
let body = try? JSONSerialization.data(withJSONObject: [
"model": trimmedModel,
"prompt": "",
"keep_alive": keepAlive,
"stream": false,
])
var request = request
request.httpBody = body

URLSession.shared.dataTask(with: request) { _, _, _ in
DispatchQueue.main.async {
self.isBusy = false
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
self.refreshInstalledModels()
self.checkStatus()
}
}
}
}.resume()
}.resume()
}
}
private func persistModelName(_ model: String, for role: LocalModelRole) {
LocalModelSettingsStore.setModel(model, for: role)
Expand Down
6 changes: 3 additions & 3 deletions Soma/ViewModels/RusToPromptQueueManager+Part1.swift
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,9 @@ extension RusToPromptQueueManager {
saveToDisk()
}
func refreshFreeMemory() {
DispatchQueue.global(qos: .utility).async {
let value = Self.readFreeMemoryGB()
DispatchQueue.main.async {
Task {
let value = await Self.readFreeMemoryGB()
await MainActor.run {
self.freeMemoryGB = value
}
}
Expand Down
153 changes: 61 additions & 92 deletions Soma/ViewModels/RusToPromptQueueManager+Part2.swift
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import Combine
import Foundation

struct RusToPromptQueueRunContext {
let item: RusToPromptQueueItem
let runURL: URL
let casesURL: URL
let controlURL: URL
let snapshot: RusToPromptQueueItemSnapshot
}

extension RusToPromptQueueManager {
func startNextIfPossible(allowBatteryStart: Bool = false) {
refreshPowerSourceValue()
Expand Down Expand Up @@ -42,6 +44,8 @@ extension RusToPromptQueueManager {
self.startItem(at: currentIndex, installedModels: installed, allowBatteryStart: allowBatteryStart)
}
}


func nextStartableQueueIndex() -> Int? {
items.indices
.filter { isStartableQueueItem(items[$0]) }
Expand All @@ -54,22 +58,28 @@ extension RusToPromptQueueManager {
return left.createdAt < right.createdAt
}
}


func isStartableQueueItem(_ item: RusToPromptQueueItem) -> Bool {
item.status == .queued || item.status == .waitingLocalAI
}


func startItem(at index: Int, installedModels: Set<String>, allowBatteryStart: Bool = false) {
guard activeProcess == nil, items.indices.contains(index) else { return }
let installedLower = Set(installedModels.map { $0.lowercased() })
let translators = localCandidates(settings.translatorCandidates, installedLower: installedLower)
let improvers = localCandidates(settings.improverCandidates, installedLower: installedLower)
guard validateQueueModels(index: index, translators: translators, improvers: improvers) else { return }
guard let context = prepareRunContext(index: index, translators: translators, improvers: improvers) else { return }

markRunStarted(index: index, context: context)
if powerSource == .battery && allowBatteryStart {
batteryStartOverrideItemID = context.item.id
}
let process = makeQueueProcess(context: context, translators: translators, improvers: improvers)
attachQueueHandlers(to: process)

do {
try process.run()
activeProcess = process
Expand All @@ -82,9 +92,11 @@ extension RusToPromptQueueManager {
mark(index: index, status: .failed, message: "Could not start queue run: \(error.localizedDescription)")
}
}

func localCandidates(_ candidates: [String], installedLower: Set<String>) -> [String] {
candidates.filter { installedLower.contains($0.lowercased()) && Self.isLocalStageModel($0) }
}

func validateQueueModels(index: Int, translators: [String], improvers: [String]) -> Bool {
guard !translators.isEmpty else {
mark(index: index, status: .blocked, message: "No installed local translator candidates.")
Expand All @@ -96,6 +108,7 @@ extension RusToPromptQueueManager {
}
return true
}

func prepareRunContext(index: Int, translators: [String], improvers: [String]) -> RusToPromptQueueRunContext? {
let item = items[index]
let resumedURL = item.recoveredAfterRestart ? item.outputPath.flatMap { $0.isEmpty ? nil : URL(fileURLWithPath: $0) } : nil
Expand All @@ -114,9 +127,11 @@ extension RusToPromptQueueManager {
}
return RusToPromptQueueRunContext(item: item, runURL: runURL, casesURL: casesURL, controlURL: controlURL, snapshot: queueSnapshot(translators: translators, improvers: improvers))
}

func queueSnapshot(translators: [String], improvers: [String]) -> RusToPromptQueueItemSnapshot {
RusToPromptQueueItemSnapshot(translatorModels: translators, improverModels: improvers, confidenceReferee: settings.confidenceReferee, confidenceModel: settings.confidenceModel, localConfidenceModels: Array(settings.localConfidenceModels.prefix(2)), hybridGeminiModel: settings.hybridGeminiModel, hybridFallbackReferee: settings.hybridFallbackReferee ?? "gemini", confidenceBatchSize: settings.confidenceBatchSize, cooldownSeconds: settings.cooldownSeconds)
}

func markRunStarted(index: Int, context: RusToPromptQueueRunContext) {
items[index].status = .running
items[index].statusMessage = "Running staged benchmark"
Expand All @@ -136,6 +151,7 @@ extension RusToPromptQueueManager {
processOutputBuffer = ""
isRunning = true
}

func makeQueueProcess(context: RusToPromptQueueRunContext, translators: [String], improvers: [String]) -> Process {
let process = Process()
process.currentDirectoryURL = repoRootURL
Expand All @@ -149,6 +165,7 @@ extension RusToPromptQueueManager {
process.environment = environment
return process
}

func queueArguments(context: RusToPromptQueueRunContext, translators: [String], improvers: [String]) -> [String] {
var arguments = [stressScriptURL.path, "--benchmark-mode", "staged", "--cases-file", context.casesURL.path, "--limit", "1", "--translator-models"]
arguments.append(contentsOf: translators)
Expand All @@ -160,6 +177,7 @@ extension RusToPromptQueueManager {
}
return arguments
}

func queueConfidenceArguments(context: RusToPromptQueueRunContext) -> [String] {
let snapshot = context.snapshot
let model = snapshot.confidenceReferee == "hybrid" ? snapshot.hybridGeminiModel : snapshot.confidenceModel
Expand All @@ -171,6 +189,7 @@ extension RusToPromptQueueManager {
}
return arguments
}

func attachQueueHandlers(to process: Process) {
let pipe = Pipe()
process.standardOutput = pipe
Expand All @@ -185,102 +204,52 @@ extension RusToPromptQueueManager {
DispatchQueue.main.async { self?.handleProcessFinished(status: finishedProcess.terminationStatus) }
}
}


func handleProcessFinished(status: Int32) {
defer {
batteryStartOverrideItemID = nil
activeProcess = nil
activeItemID = nil
activeControlFileURL = nil
isRunning = false
currentStage = "Idle"
currentModel = "-"
startNextIfPossible()
}
guard let itemID = activeItemID, let index = items.firstIndex(where: { $0.id == itemID }) else { return }
if status == 0 {
let completionMessage = queueRunCompletionMessage(outputPath: items[index].outputPath)
items[index].status = .completed
items[index].statusMessage = completionMessage
completeModelProgress(itemID: itemID)
appendActivity("Queue run \(itemID): \(completionMessage).")
} else {
let stopped = controlFlagFromActiveFile("stop")
items[index].status = stopped ? .interrupted : .failed
items[index].statusMessage = stopped ? "Interrupted by user" : "Process exited with code \(status)"
markModelProgressTerminal(itemID: itemID, label: items[index].statusMessage, status: stopped ? "interrupted" : "failed")
appendActivity("Queue run \(itemID) ended: \(items[index].statusMessage).")
}
items[index].finishedAt = Date()
items[index].updatedAt = Date()
saveToDisk()
}
func queueRunCompletionMessage(outputPath: String?) -> String {
guard let outputPath, !outputPath.isEmpty else { return "Completed; summary missing" }
let summaryURL = URL(fileURLWithPath: outputPath).appendingPathComponent("summary.json")
guard let data = try? Data(contentsOf: summaryURL),
let object = try? JSONSerialization.jsonObject(with: data) as? [String: Any] else {
return "Completed; summary missing"
}
let runStatus = object["run_status"] as? String
let success = object["success"] as? Bool
if runStatus == "failed" {
return queueRunIssueMessage(prefix: "Completed with failed summary", summary: object)
}
if runStatus == "completed_with_issues" || success == false {
return queueRunIssueMessage(prefix: "Completed with issues", summary: object)
}
return "Completed"
}
func queueRunIssueMessage(prefix: String, summary: [String: Any]) -> String {
guard let issueCounts = summary["issue_counts"] as? [String: Any] else { return prefix }
let issues = issueCounts
.compactMap { key, value -> (String, Int)? in
let count: Int
if let intValue = value as? Int {
count = intValue
} else if let number = value as? NSNumber {
count = number.intValue
} else {
return nil
}
return count > 0 ? (key.replacingOccurrences(of: "_", with: " "), count) : nil
}
.sorted { lhs, rhs in
if lhs.0 == rhs.0 { return lhs.1 > rhs.1 }
return lhs.0 < rhs.0
let itemID = activeItemID
let outputPath = activeItemID.flatMap { id in items.first(where: { $0.id == id })?.outputPath }
let controlURL = activeControlFileURL

Task {
let completionMessage: String?
if status == 0 {
completionMessage = await queueRunCompletionMessage(outputPath: outputPath)
} else {
completionMessage = nil
}
.prefix(3)
.map { "\($0.0) \($0.1)" }
return issues.isEmpty ? prefix : "\(prefix): \(issues.joined(separator: ", "))"
}
func consumeProcessOutput(_ text: String) {
processOutputBuffer += text
let parts = processOutputBuffer.components(separatedBy: .newlines)
guard parts.count > 1 else { return }
processOutputBuffer = parts.last ?? ""
for line in parts.dropLast() {
let trimmed = line.trimmingCharacters(in: .whitespacesAndNewlines)
guard !trimmed.isEmpty else { continue }
if let event = decodeProgressEvent(from: trimmed) {
currentStage = displayStage(for: event)
if let translator = event.translatorModel, let analyzer = event.analyzerModel {
currentModel = "\(translator) -> \(analyzer)"
} else if let translator = event.translatorModel {
currentModel = translator
} else if let confidenceModel = event.confidenceModel {
currentModel = confidenceModel

let stopped = status != 0 ? await controlFlagFromActiveFileAsync("stop", controlURL: controlURL) : false

await MainActor.run {
defer {
batteryStartOverrideItemID = nil
activeProcess = nil
activeItemID = nil
activeControlFileURL = nil
isRunning = false
currentStage = "Idle"
currentModel = "-"
startNextIfPossible()
}
updateModelProgress(for: event)
appendActivity(activityText(for: event))
} else {
appendActivity(trimmed)
guard let itemID = itemID, let index = items.firstIndex(where: { $0.id == itemID }) else { return }

if status == 0 {
let msg = completionMessage ?? "Completed"
items[index].status = .completed
items[index].statusMessage = msg
completeModelProgress(itemID: itemID)
appendActivity("Queue run \(itemID): \(msg).")
} else {
items[index].status = stopped ? .interrupted : .failed
items[index].statusMessage = stopped ? "Interrupted by user" : "Process exited with code \(status)"
markModelProgressTerminal(itemID: itemID, label: items[index].statusMessage, status: stopped ? "interrupted" : "failed")
appendActivity("Queue run \(itemID) ended: \(items[index].statusMessage).")
}
items[index].finishedAt = Date()
items[index].updatedAt = Date()
saveToDisk()
}
}
}
func decodeProgressEvent(from line: String) -> QueueProgressEvent? {
guard line.hasPrefix(progressPrefix) else { return nil }
let payload = String(line.dropFirst(progressPrefix.count))
guard let data = payload.data(using: .utf8) else { return nil }
return try? JSONDecoder().decode(QueueProgressEvent.self, from: data)
}
}
Loading
Loading