Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions packages/graphai/src/graphai.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,15 @@ export class GraphAI {
this.graphId = `${Date.now().toString(36)}-${Math.random().toString(36).substr(2, 9)}`; // URL.createObjectURL(new Blob()).slice(-36);
this.agentFunctionInfoDictionary = agentFunctionInfoDictionary;
this.propFunctions = propFunctions;
this.bypassAgentIds = options.bypassAgentIds ?? [];

// Validate before constructing TaskManager so user-facing ValidationError
// is reported instead of TaskManager's internal guard message.
validateGraphData(graphData, [...Object.keys(agentFunctionInfoDictionary), ...this.bypassAgentIds]);
validateAgent(agentFunctionInfoDictionary);

this.taskManager = options.taskManager ?? new TaskManager(graphData.concurrency ?? defaultConcurrency);
this.agentFilters = options.agentFilters ?? [];
this.bypassAgentIds = options.bypassAgentIds ?? [];
this.config = options.config;
this.graphLoader = options.graphLoader;
this.forceLoop = options.forceLoop ?? false;
Expand All @@ -164,9 +170,6 @@ export class GraphAI {
throw new Error("SOMETHING IS WRONG: onComplete is called without run()");
};

validateGraphData(graphData, [...Object.keys(agentFunctionInfoDictionary), ...this.bypassAgentIds]);
validateAgent(agentFunctionInfoDictionary);

this.graphData = {
...graphData,
nodes: {
Expand Down
100 changes: 57 additions & 43 deletions packages/graphai/src/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ export class ComputedNode extends Node {
private agentFunction?: AgentFunction<any, any, any, any>;
public readonly timeout?: number; // msec
public readonly priority: number;
public readonly label?: string;
public error?: Error;
public transactionId: undefined | number; // To reject callbacks from timed-out transactions
private readonly passThrough?: PassThrough;
Expand Down Expand Up @@ -118,6 +119,10 @@ export class ComputedNode extends Node {
this.timeout = data.timeout;
this.isResult = data.isResult ?? false;
this.priority = data.priority ?? 0;
// Defensive: graph data may originate from YAML/JSON without strict typing.
// Keep label only when it is actually a string so TaskManager's label-keyed
// bookkeeping cannot be silently bypassed by a non-string value.
this.label = typeof data.label === "string" ? data.label : undefined;

assert(["function", "string"].includes(typeof data.agent), "agent must be either string or function");
if (typeof data.agent === "string") {
Expand Down Expand Up @@ -343,55 +348,64 @@ export class ComputedNode extends Node {
const localLog: TransactionLog[] = [];
const context = this.getContext(previousResults, localLog, agentId, config);

// NOTE: We use the existence of graph object in the agent-specific params to determine
// if this is a nested agent or not.
if (hasNestedGraph) {
this.graph.taskManager.prepareForNesting();
context.forNestedGraph = {
graphData: this.nestedGraph
? "nodes" in this.nestedGraph
? this.nestedGraph
: (this.graph.resultOf(this.nestedGraph) as GraphData) // HACK: compiler work-around
: { version: 0, nodes: {} },
agents: this.graph.agentFunctionInfoDictionary,
graphOptions: {
agentFilters: this.graph.agentFilters,
taskManager: this.graph.taskManager,
bypassAgentIds: this.graph.bypassAgentIds,
config,
graphLoader: this.graph.graphLoader,
},
onLogCallback: this.graph.onLogCallback,
callbacks: this.graph.callbacks,
};
}
// The `nestingPrepared` flag tracks whether prepareForNesting has actually
// run. If anything throws between prepareForNesting and the agent call --
// e.g. resultOf() during forNestedGraph construction -- we still need to
// restore. Conversely, if prepareForNesting itself throws, we must NOT
// restore (no bump to undo).
let nestingPrepared = false;
try {
// NOTE: We use the existence of graph object in the agent-specific params to determine
// if this is a nested agent or not.
if (hasNestedGraph) {
this.graph.taskManager.prepareForNesting(this.label, this.graphId);
nestingPrepared = true;
context.forNestedGraph = {
graphData: this.nestedGraph
? "nodes" in this.nestedGraph
? this.nestedGraph
: (this.graph.resultOf(this.nestedGraph) as GraphData) // HACK: compiler work-around
: { version: 0, nodes: {} },
agents: this.graph.agentFunctionInfoDictionary,
graphOptions: {
agentFilters: this.graph.agentFilters,
taskManager: this.graph.taskManager,
bypassAgentIds: this.graph.bypassAgentIds,
config,
graphLoader: this.graph.graphLoader,
},
onLogCallback: this.graph.onLogCallback,
callbacks: this.graph.callbacks,
};
}

this.beforeConsoleLog(context);
const result = await this.agentFilterHandler(context as AgentFunctionContext, agentFunction, agentId);
this.afterConsoleLog(result);
this.beforeConsoleLog(context);
const result = await this.agentFilterHandler(context as AgentFunctionContext, agentFunction, agentId);
this.afterConsoleLog(result);

if (hasNestedGraph) {
this.graph.taskManager.restoreAfterNesting();
}
if (!this.isCurrentTransaction(transactionId)) {
// This condition happens when the agent function returns
// after the timeout (either retried or not).
GraphAILogger.log(`-- transactionId mismatch with ${this.nodeId} (probably timeout)`);
return;
}

if (!this.isCurrentTransaction(transactionId)) {
// This condition happens when the agent function returns
// after the timeout (either retried or not).
GraphAILogger.log(`-- transactionId mismatch with ${this.nodeId} (probably timeout)`);
return;
}
if (this.repeatUntil?.exists) {
const dummyResult = { self: { result: this.getResult(result) } as unknown as ComputedNode };
const repeatResult = resultsOf({ data: this.repeatUntil?.exists }, dummyResult, [], true);
if (isNull(repeatResult?.data)) {
this.retry(NodeState.Failed, Error("Repeat Until"));
return;
}
}

if (this.repeatUntil?.exists) {
const dummyResult = { self: { result: this.getResult(result) } as unknown as ComputedNode };
const repeatResult = resultsOf({ data: this.repeatUntil?.exists }, dummyResult, [], true);
if (isNull(repeatResult?.data)) {
this.retry(NodeState.Failed, Error("Repeat Until"));
return;
// after process
this.afterExecute(result, localLog);
} finally {
if (nestingPrepared) {
this.graph.taskManager.restoreAfterNesting(this.label, this.graphId);
}
}

// after process
this.afterExecute(result, localLog);
} catch (error) {
this.errorProcess(error, transactionId, previousResults);
}
Expand Down
166 changes: 153 additions & 13 deletions packages/graphai/src/task_manager.ts
Original file line number Diff line number Diff line change
@@ -1,42 +1,126 @@
import { ComputedNode } from "./node";
import { assert } from "./utils/utils";
import { ConcurrencyConfig } from "./type";
import { assert, isPlainObject } from "./utils/utils";

type TaskEntry = {
node: ComputedNode;
graphId: string;
callback: (node: ComputedNode) => void;
};

const assertPositiveInteger = (value: unknown, field: string): number => {
if (typeof value !== "number" || !Number.isInteger(value) || value < 1) {
throw new Error(`TaskManager: ${field} must be a positive integer (got ${String(value)})`);
}
return value;
};

// Mirrors the strictness of validateConcurrencyConfig() so that direct
// `new TaskManager(...)` calls (e.g. tests, advanced consumers that bypass
// validateGraphData) cannot silently disable label enforcement with a
// malformed shape such as Map / Date / class instances / arrays.
const normalizeConcurrencyConfig = (config: number | ConcurrencyConfig): { global: number; labels: Map<string, number> } => {
if (typeof config === "number") {
return { global: assertPositiveInteger(config, "concurrency"), labels: new Map() };
}
if (!isPlainObject(config)) {
throw new Error("TaskManager: concurrency must be a positive integer or a ConcurrencyConfig object");
}
const global = assertPositiveInteger(config.global, "concurrency.global");
const labels = new Map<string, number>();
if (config.labels !== undefined) {
if (!isPlainObject(config.labels)) {
throw new Error("TaskManager: concurrency.labels must be a plain object");
}
for (const [labelKey, labelValue] of Object.entries(config.labels)) {
labels.set(labelKey, assertPositiveInteger(labelValue, `concurrency.labels.${labelKey}`));
}
}
return { global, labels };
};

// TaskManage object controls the concurrency of ComputedNode execution.
//
// NOTE: A TaskManager instance will be shared between parent graph and its children
// when nested agents are involved.
export class TaskManager {
private concurrency: number;
private labelLimits: Map<string, number>;
private runningByLabel: Map<string, number> = new Map();
// Per-label bypass capacity granted to nested children. Keyed by parentGraphId
// so that only tasks whose graphId differs from the parent (i.e. those running
// inside the nested graph) can consume the extra slot. Unrelated siblings on
// the same graphId as the parent are not affected.
private nestingBypassByLabel: Map<string, Map<string, number>> = new Map();
private taskQueue: Array<TaskEntry> = [];
private runningNodes = new Set<ComputedNode>();

constructor(concurrency: number) {
this.concurrency = concurrency;
constructor(config: number | ConcurrencyConfig) {
const normalized = normalizeConcurrencyConfig(config);
this.concurrency = normalized.global;
this.labelLimits = normalized.labels;
}

// This internal method dequeus a task from the task queue
// and call the associated callback method, if the number of
// running task is lower than the spcified limit.
// Returns true if the task can run right now under both the global limit
// and (if specified) its label-specific limit.
private canRun(task: TaskEntry): boolean {
if (this.runningNodes.size >= this.concurrency) {
return false;
}
const label = task.node.label;
if (label === undefined) {
return true;
}
const limit = this.labelLimits.get(label);
if (limit === undefined) {
return true;
}
const running = this.runningByLabel.get(label) ?? 0;
if (running < limit) {
return true;
}
// Bypass path: if a labeled parent has prepared for nesting and this task
// belongs to a different graph (i.e. the nested graph), grant +1 per such
// outstanding bump. Unrelated siblings on the parent's graphId do NOT get
// this allowance, so the per-label cap is preserved for them.
const bypass = this.nestingBypassByLabel.get(label);
if (!bypass) {
return false;
}
let extra = 0;
for (const [parentGraphId, count] of bypass) {
if (parentGraphId !== task.graphId) {
extra += count;
}
}
return running < limit + extra;
}

// Walk the queue (already sorted by priority desc) and dispatch the first task
// whose label still has capacity. This is the "head-of-line skip" policy.
private dequeueTaskIfPossible() {
if (this.runningNodes.size < this.concurrency) {
const task = this.taskQueue.shift();
if (task) {
if (this.runningNodes.size >= this.concurrency) {
return;
}
for (let i = 0; i < this.taskQueue.length; i++) {
const task = this.taskQueue[i];
if (this.canRun(task)) {
this.taskQueue.splice(i, 1);
this.runningNodes.add(task.node);
const label = task.node.label;
if (label !== undefined) {
this.runningByLabel.set(label, (this.runningByLabel.get(label) ?? 0) + 1);
}
task.callback(task.node);
return;
}
}
}

// Node will call this method to put itself in the execution queue.
// We call the associated callback function when it is dequeued.
public addTask(node: ComputedNode, graphId: string, callback: (node: ComputedNode) => void) {
// Finder tasks in the queue, which has either the same or higher priority.
// Find tasks in the queue, which has either the same or higher priority.
const count = this.taskQueue.filter((task) => {
return task.node.priority >= node.priority;
}).length;
Expand All @@ -57,18 +141,72 @@ export class TaskManager {
public onComplete(node: ComputedNode) {
assert(this.runningNodes.has(node), `TaskManager.onComplete node(${node.nodeId}) is not in list`);
this.runningNodes.delete(node);
this.dequeueTaskIfPossible();
const label = node.label;
if (label !== undefined) {
const running = this.runningByLabel.get(label) ?? 0;
if (running <= 1) {
this.runningByLabel.delete(label);
} else {
this.runningByLabel.set(label, running - 1);
}
}
// A label slot may have just opened, so try to dispatch as many newly-eligible
// tasks as possible (the freed label could allow several queued tasks to run if
// the global limit is not yet reached).
let progressed = true;
while (progressed) {
const before = this.runningNodes.size;
this.dequeueTaskIfPossible();
progressed = this.runningNodes.size > before;
}
}

// Node will call this method before it hands the task manager from the graph
// to a nested agent. We need to make it sure that there is enough room to run
// computed nodes inside the nested graph to avoid a deadlock.
public prepareForNesting() {
//
// When the parent carries a label that has a configured per-label limit,
// a nested child sharing that label would otherwise stay queued forever
// (parent waits for child, child blocked by parent's label slot). To avoid
// this without widening the cap for unrelated siblings, we record a per-
// parent-graphId bypass that canRun() applies only to tasks whose graphId
// differs from the parent's.
public prepareForNesting(label?: string, parentGraphId?: string) {
this.concurrency++;
if (label !== undefined && parentGraphId !== undefined && this.labelLimits.has(label)) {
let perParent = this.nestingBypassByLabel.get(label);
if (!perParent) {
perParent = new Map();
this.nestingBypassByLabel.set(label, perParent);
}
perParent.set(parentGraphId, (perParent.get(parentGraphId) ?? 0) + 1);
}
// Both the global slot bump and the optional label bypass can free capacity
// for already-queued tasks; drain the queue while progress is being made.
let progressed = true;
while (progressed) {
const before = this.runningNodes.size;
this.dequeueTaskIfPossible();
progressed = this.runningNodes.size > before;
}
}

public restoreAfterNesting() {
public restoreAfterNesting(label?: string, parentGraphId?: string) {
this.concurrency--;
if (label !== undefined && parentGraphId !== undefined) {
const perParent = this.nestingBypassByLabel.get(label);
if (perParent) {
const next = (perParent.get(parentGraphId) ?? 0) - 1;
if (next <= 0) {
perParent.delete(parentGraphId);
} else {
perParent.set(parentGraphId, next);
}
if (perParent.size === 0) {
this.nestingBypassByLabel.delete(label);
}
}
}
}

public getStatus(verbose: boolean = false) {
Expand All @@ -86,5 +224,7 @@ export class TaskManager {
public reset() {
this.taskQueue.length = 0;
this.runningNodes.clear();
this.runningByLabel.clear();
this.nestingBypassByLabel.clear();
}
}
8 changes: 7 additions & 1 deletion packages/graphai/src/type.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ export type ComputedNodeData = {
graphLoader?: GraphDataLoaderOption;
isResult?: boolean;
priority?: number; // The default is 0.
label?: string; // optional concurrency-control bucket key. unspecified -> not subject to per-label limits.
passThrough?: PassThrough; // data that pass trough to result
console?: ConsoleElement;
};
Expand All @@ -81,10 +82,15 @@ export type LoopData = {
while?: string | boolean;
};

export type ConcurrencyConfig = {
global: number;
labels?: Record<string, number>;
};

export type GraphData = {
version?: number; // major version, 0.1, 0.2, ...
nodes: Record<string, NodeData>;
concurrency?: number;
concurrency?: number | ConcurrencyConfig;
loop?: LoopData;
verbose?: boolean;
retry?: number;
Expand Down
Loading
Loading