-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_flow_system.py
More file actions
116 lines (100 loc) · 3.23 KB
/
test_flow_system.py
File metadata and controls
116 lines (100 loc) · 3.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#!/usr/bin/env python3
"""
Test script for the flow compilation and execution system
"""
import asyncio
import json
from app.Containers.Node.Engine.NodeBootstrap import bootstrap_nodes
from app.Containers.Flow.Actions.CompileFlowAction import CompileFlowAction
from app.Containers.Flow.Actions.ExecuteFlowAction import ExecuteFlowAction
async def test_flow_system():
"""Test the complete flow system"""
# Bootstrap nodes
print("🔧 Bootstrapping node system...")
bootstrap_nodes()
# Define a simple test flow
flow_definition = {
"id": "test_flow",
"name": "Test Flow",
"nodes": [
{
"id": "input_1",
"type": "InputNode",
"parameters": {
"input_key": "text"
}
},
{
"id": "processor_1",
"type": "TextProcessorNode",
"parameters": {
"operation": "uppercase"
},
"input_mappings": {
"text": {
"source_node": "input_1",
"source_output": "output"
}
}
},
{
"id": "output_1",
"type": "OutputNode",
"parameters": {
"output_key": "result"
},
"input_mappings": {
"input": {
"source_node": "processor_1",
"source_output": "processed_text"
}
}
}
],
"connections": [
{
"source": "input_1",
"target": "processor_1"
},
{
"source": "processor_1",
"target": "output_1"
}
]
}
# Test compilation
print("\n📝 Testing flow compilation...")
compile_action = CompileFlowAction()
compilation_result = await compile_action.run(flow_definition)
if compilation_result.success:
print("✅ Compilation successful!")
print("Generated code:")
print("-" * 50)
print(compilation_result.code)
print("-" * 50)
else:
print("❌ Compilation failed!")
print("Errors:", compilation_result.errors)
return
# Test execution
print("\n🚀 Testing flow execution...")
execute_action = ExecuteFlowAction()
test_inputs = {
"text": "hello world"
}
execution = await execute_action.run(flow_definition, test_inputs)
print(f"Execution ID: {execution.id}")
print(f"Status: {execution.status.value}")
print(f"Inputs: {execution.inputs}")
print(f"Outputs: {execution.outputs}")
if execution.error:
print(f"Error: {execution.error}")
# Show node results
print("\nNode Results:")
for node_id, result in execution.node_results.items():
print(f" {node_id}: {result.success} - {result.outputs}")
if result.error:
print(f" Error: {result.error}")
print("\n🎉 Flow system test completed!")
if __name__ == "__main__":
asyncio.run(test_flow_system())