diff --git a/ai.go b/ai.go
index 4877f8a0..49e41986 100644
--- a/ai.go
+++ b/ai.go
@@ -44,13 +44,15 @@ import (
var standalone bool
// var model = "gpt-5-mini"
-var model = "gpt-5-mini"
+//var model = "gpt-5-mini"
+var model = "gpt-5.4-nano"
//var model = "gpt-5.2-codex"
var fallbackModel = ""
var assistantId = os.Getenv("OPENAI_ASSISTANT_ID")
var docsVectorStoreID = os.Getenv("OPENAI_DOCS_VS_ID")
var skipAgentWait = os.Getenv("SHUFFLE_SKIP_AGENT_WAIT")
+var agentRunLocation = os.Getenv("SHUFFLE_AGENT_RUN_LOCATION")
var assistantModel = model
var aiMaxTokens = 4096 // Controllable with AI_MAX_TOKENS env
@@ -2494,7 +2496,9 @@ func GetActionAIResponse(ctx context.Context, resp http.ResponseWriter, user Use
appname = appname1.(string)
}
- log.Printf("[INFO] Starting AI Translation with app '%s' and category '%s' for query '%s'", appname, category, inputQuery)
+ if debug {
+ log.Printf("[DEBUG] Starting AI Translation with app '%s' and category '%s' for query '%s'", appname, category, inputQuery)
+ }
if strings.Contains(contentOutput, "success\": false") {
// Maybe look for a Workflow that does what they want?
@@ -7079,6 +7083,7 @@ func abortAgentExecution(ctx context.Context, execution WorkflowExecution, start
agentOutput.Error = reason
agentOutput.CompletedAt = time.Now().UnixMilli()
+ // How do we find the original input?
lastDecisionIsFinish := false
if len(agentOutput.Decisions) > 0 {
last := agentOutput.Decisions[len(agentOutput.Decisions)-1]
@@ -7090,6 +7095,11 @@ func abortAgentExecution(ctx context.Context, execution WorkflowExecution, start
}
}
+ // FIXME: Where do we find original_input?
+ // What if it doesn't exist?
+ if len(agentOutput.OriginalInput) == 0 {
+ }
+
if !lastDecisionIsFinish {
nextIndex := len(agentOutput.Decisions)
b := make([]byte, 6)
@@ -7097,6 +7107,7 @@ func abortAgentExecution(ctx context.Context, execution WorkflowExecution, start
if _, randErr := rand.Read(b); randErr == nil {
finishId = base64.RawURLEncoding.EncodeToString(b)
}
+
syntheticFinish := AgentDecision{
I: nextIndex,
Action: "finish",
@@ -7112,9 +7123,14 @@ func abortAgentExecution(ctx context.Context, execution WorkflowExecution, start
CompletedAt: agentOutput.CompletedAt,
},
}
+
agentOutput.Decisions = append(agentOutput.Decisions, syntheticFinish)
}
+
agentOutput.Output = reason
+ if strings.Contains(reason, "Minimum of one branch") {
+ agentOutput.Output = "The agent did not start due to the workflow not reaching this point."
+ }
marshalledOutput, marshalErr := json.Marshal(agentOutput)
if marshalErr != nil {
@@ -7144,6 +7160,7 @@ func abortAgentExecution(ctx context.Context, execution WorkflowExecution, start
break
}
}
+
if !replaced {
execution.Results = append(execution.Results, abortResult)
}
@@ -7179,26 +7196,34 @@ func sendAITokenLimitAlert(ctx context.Context, execution WorkflowExecution, ful
appRunsLimit := int64(0)
orgStats, statsErr := GetOrgStatistics(ctx, billingOrgId)
if statsErr == nil && orgStats != nil {
- totalAppExecutions = orgStats.MonthlyAppExecutions + orgStats.MonthlyChildAppExecutions
+ stats := handleGetCorrectedStats(orgStats)
+ totalAppExecutions = stats.MonthlyAppExecutions + stats.MonthlyChildAppExecutions
}
if fullOrg != nil {
appRunsLimit = fullOrg.SyncFeatures.AppExecutions.Limit
}
+ appRunsUsagePercentage := float64(totalAppExecutions) / float64(appRunsLimit) * 100
subjectLine := fmt.Sprintf("%d%% of your AI token limit", int64(aiPercentage))
Subject := fmt.Sprintf("[Shuffle]: You've reached %s for your tenant %s", subjectLine, orgName)
AiRecommendation := "Tip: Connect your own AI provider app to use your own keys and bypass the AI token limit entirely."
+
+ if tokenLimit == 0 {
+ tokenLimit = 10000000
+ }
+
substitutions := map[string]interface{}{
- "app_runs_usage": totalAppExecutions,
- "app_runs_limit": appRunsLimit,
- "subject_string": subjectLine,
- "ai_tokens_usage": monthlyTokensUsed,
- "ai_tokens_limit": tokenLimit,
- "org_name": orgName,
- "org_id": billingOrgId,
- "admin_email": orgName,
- "app_runs_usage_percentage": int64(aiPercentage),
- "ai_recommendation": AiRecommendation,
+ "app_runs_usage": totalAppExecutions,
+ "app_runs_limit": appRunsLimit,
+ "subject_string": subjectLine,
+ "ai_tokens_usage": monthlyTokensUsed,
+ "ai_tokens_limit": tokenLimit,
+ "org_name": orgName,
+ "org_id": billingOrgId,
+ "admin_email": orgName,
+ "app_runs_usage_percentage": int64(appRunsUsagePercentage),
+ "ai_tokens_usage_percentage": int64(aiPercentage),
+ "ai_recommendation": AiRecommendation,
}
err := sendMailSendgridV2(
@@ -7502,11 +7527,114 @@ func HandleAiAgentExecutionStart(execution WorkflowExecution, startNode Action,
foundReasoning := ""
enableQuestions := false
+ // This is a part of making sure variables work properly, no matter where
+ // in Shuffle we are
+ parsingBody := map[string]string{}
+ for _, param := range startNode.Parameters {
+ // Could this alleviate the need for the openai App itself??
+ // Would that help?
+ if strings.Contains(param.Value, "$") {
+ parsingBody[param.Name] = param.Value
+ }
+ }
+
+ // Self-request starts here!
+ backendUrl := "https://shuffler.io"
+ if len(os.Getenv("BASE_URL")) > 0 {
+ backendUrl = os.Getenv("BASE_URL")
+ }
+
+ if len(os.Getenv("SHUFFLE_CLOUDRUN_URL")) > 0 {
+ backendUrl = os.Getenv("SHUFFLE_CLOUDRUN_URL")
+ }
+
+ llmStatusCode := 0
+ parsedAgentInput := ""
+ if len(parsingBody) > 0 {
+ marshalledBody, err := json.Marshal(parsingBody)
+ if err == nil && len(marshalledBody) > 0 {
+ repeaterNode := Action{}
+ repeaterNode.AppID = "3e2bdf9d5069fe3f4746c29d68785a6a"
+ repeaterNode.AppName = "Shuffle Tools"
+ repeaterNode.AppVersion = "1.2.0"
+ repeaterNode.Name = "repeat_back_to_me"
+
+ repeaterNode.Parameters = []WorkflowAppActionParameter{
+ WorkflowAppActionParameter{
+ Name: "call",
+ Value: string(marshalledBody),
+ },
+ }
+
+ repeaterNode.SourceWorkflow = execution.Workflow.ID
+ repeaterNode.SourceExecution = execution.ExecutionId
+
+ marshalledAction, err := json.Marshal(repeaterNode)
+ if err != nil {
+ log.Printf("[ERROR][%s] AI_AGENT_LLM_FAILURE: Failed marshaling shuffle-tools request during LLM setup: %s", execution.ExecutionId, err)
+ //return abortAgentExecution(ctx, execution, startNode, oldAgentOutput, "llm_request_build_failed", fmt.Sprintf("Failed to start AI Agent (6): %s", err.Error()))
+ } else {
+ fullUrl := fmt.Sprintf("%s/api/v1/apps/%s/run?execution_id=%s&authorization=%s&parent_node=%s", backendUrl, repeaterNode.AppID, execution.ExecutionId, execution.Authorization, startNode.ID)
+ client := GetExternalClient(fullUrl)
+
+ client.Timeout = time.Minute * 5
+ req, err := http.NewRequest(
+ "POST",
+ fullUrl,
+ bytes.NewBuffer([]byte(marshalledAction)),
+ )
+
+ if err != nil {
+ log.Printf("[ERROR][%s] AI_AGENT_LLM_FAILURE: Failed creating shuffle-tools request during LLM setup: %s", execution.ExecutionId, err)
+ //return abortAgentExecution(ctx, execution, startNode, oldAgentOutput, "llm_request_build_failed", fmt.Sprintf("Failed to start AI Agent (7): %s", err.Error()))
+ } else {
+
+ // Just a request tree where any failure = skip
+
+ newresp, err := client.Do(req)
+ if err != nil {
+ log.Printf("[ERROR][%s] AI_AGENT_LLM_FAILURE: Failed sending request during LLM setup: %s", execution.ExecutionId, err)
+ } else {
+ defer newresp.Body.Close()
+ body, err := ioutil.ReadAll(newresp.Body)
+ if err != nil {
+ log.Printf("[ERROR][%s] AI_AGENT_LLM_FAILURE: Failed reading response during LLM setup: %s", execution.ExecutionId, err)
+ } else {
+ // Check the results of the output
+ toolsResultMapping := SingleResult{}
+ if err := json.Unmarshal(body, &toolsResultMapping); err != nil {
+ log.Printf("[ERROR][%s] AI_AGENT_LLM_FAILURE: Failed parsing response during LLM setup: %s", execution.ExecutionId, err)
+ } else {
+ if len(toolsResultMapping.Result) > 0 {
+ mappedResult := map[string]string{}
+ unmarshalErr := json.Unmarshal([]byte(toolsResultMapping.Result), &mappedResult)
+ if unmarshalErr != nil {
+ log.Printf("[ERROR][%s] AI_AGENT_LLM_FAILURE: Failed parsing final result during LLM setup: %s", execution.ExecutionId, unmarshalErr)
+ } else {
+ for paramIndex, param := range startNode.Parameters {
+ if val, ok := mappedResult[param.Name]; ok {
+ startNode.Parameters[paramIndex].Value = val
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ } else {
+ log.Printf("[ERROR] Failed to marshal parsing body for shuffle tools translation: %v", err)
+ }
+ }
+
imagesIncluded := []string{}
imageDetail := openai.ImageURLDetailAuto // low, high, original, auto (let the model decide)
for _, param := range startNode.Parameters {
if param.Name == "input" {
userMessage = param.Value
+
+ parsedAgentInput = userMessage
}
if param.Name == "enable_questions" && strings.ToLower(param.Value) == "true" {
@@ -7664,7 +7792,6 @@ func HandleAiAgentExecutionStart(execution WorkflowExecution, startNode Action,
systemMessage += "\n\n"
}
-
if param.Name == "memory" {
// Handle memory injection (may use Singul?)
if debug && len(param.Value) > 0 {
@@ -8149,12 +8276,13 @@ func HandleAiAgentExecutionStart(execution WorkflowExecution, startNode Action,
You are an Action Execution Agent that performs actions in third-party tools. You can use ANY tool and platform to achieve these goals if they are presented by the user. You receive tools (USER CONTEXT), a request (USER REQUEST), and history. Your goal is to execute tasks and **IMMEDIATELY** stop and summarize when done. Attempt to achieve what the users most likely intention is - not just exactly what they ask for. Iterate until the goal is achieved by using the USER CONTEXT tools and actions available to you. Don't be too verbose, and ask as few questions as possible.
### RULES:
-1. Look at user intent, not just words. Do NOT stop until the user intent has been fulfilled. This means rewriting the original user intent for them to make it more clear what they most likely wanted.
-2. Use tools and their actions to achieve the user request.
-3. Do NOT ask unnecessary questions. Make assumptions for the user.
-4. DO NOT LIE. Only say you did something if you actually did.
-5. "action" should be the EXACT name of the function, without paranthesis or parameters.
-6. If future scheduling may be necessary, ignore it and run it right now. Scheduling is a separate process.
+1. ALWAYS output the strict decision JSON OUTPUT FORMAT and nothing else.
+2. Look at user intent, not just words of the USER REQUEST. Do NOT stop until the user intent has been fulfilled. This means understanding the original user intent for them to make it more clear what they most likely wanted.
+3. Use tools and their actions to achieve the user request.
+4. Do NOT ask unnecessary questions. Make assumptions for the user.
+5. DO NOT LIE. Only say you did something if you actually did.
+6. "action" should be the EXACT name of the function, without paranthesis or parameters.
+7. If future scheduling may be necessary, ignore it and run it right now. Scheduling is a separate process.
8. App Actions show up in the python function format. Put the function name in the 'action' field and the parameters in 'fields' array. Don't add empty fields.
9. IF an App Action parameter contains a value, use it and fill it in with relevant values. Ask questions, if important data is missing. Do not add random values to nested JSON bodies unless necessary.
@@ -8401,60 +8529,6 @@ data_filter:
log.Printf("\n\n\n[DEBUG] BODY for AI Agent (first request): %s\n\n\n", string(initialAgentRequestBody))
}
- // Hardcoded for now
- aiNode := Action{}
- aiNode.AppID = "5d19dd82517870c68d40cacad9b5ca91"
- aiNode.AppName = "openai"
- aiNode.Name = "post_generate_a_chat_response"
-
- //aiNode.Environment = "cloud"
-
- // FIXME: Resetting auth as it should auto-pick (if possible)
- aiNode.AuthenticationId = ""
- aiNode.Parameters = []WorkflowAppActionParameter{
- // WorkflowAppActionParameter{
- // Name: "url",
- // Value: "",
- // },
- //WorkflowAppActionParameter{
- // Name: "apikey",
- // Value: "",
- //},
- WorkflowAppActionParameter{
- Name: "body",
- Value: string(initialAgentRequestBody),
- },
- WorkflowAppActionParameter{
- Name: "headers",
- Value: "Content-Type: application/json\nAccept: application/json",
- },
- }
-
- // To ensure we get the context of an execution properly
- // This gives it variables to run IN CONTEXT of the current execution,
- // meaning it has access to current variables
- aiNode.SourceWorkflow = execution.Workflow.ID
- aiNode.SourceExecution = execution.ExecutionId
-
- // App run delay if needed (e.g. for debugging)
- //aiNode.ExecutionDelay = 60
-
- marshalledAction, err := json.Marshal(aiNode)
- if err != nil {
- log.Printf("[ERROR][%s] AI Agent: Failed marshalling action for AI Agent (first agent request): %s", execution.ExecutionId, err)
- return abortAgentExecution(ctx, execution, startNode, AgentOutput{}, "marshal_ai_action_failed", fmt.Sprintf("Failed to start AI Agent (6): %s", err.Error()))
- }
-
- // Self-request starts here!
- backendUrl := "https://shuffler.io"
- if len(os.Getenv("BASE_URL")) > 0 {
- backendUrl = os.Getenv("BASE_URL")
- }
-
- if len(os.Getenv("SHUFFLE_CLOUDRUN_URL")) > 0 {
- backendUrl = os.Getenv("SHUFFLE_CLOUDRUN_URL")
- }
-
billingOrgId := execution.Workflow.OrgId
var billingOrg *Org
if len(execution.Workflow.OrgId) > 0 {
@@ -8484,7 +8558,7 @@ data_filter:
orgStats, statsErr := GetOrgStatistics(ctx, billingOrgId)
monthlyTokensUsed := int64(0)
if statsErr == nil && orgStats != nil {
- monthlyTokensUsed = orgStats.MonthlyAgentTokens
+ monthlyTokensUsed = orgStats.MonthlyAgentTokens + orgStats.MonthlyChildOrgAgentTokens
}
tokenLimit := int64(0)
@@ -8516,315 +8590,395 @@ data_filter:
}
}
- fullUrl := fmt.Sprintf("%s/api/v1/apps/%s/run?execution_id=%s&authorization=%s&parent_node=%s", backendUrl, aiNode.AppID, execution.ExecutionId, execution.Authorization, startNode.ID)
- client := GetExternalClient(fullUrl)
- body := []byte{}
- llmStatusCode := 0
-
- if skipAgentWait == "true" && len(llmResponse) > 0 {
- body = llmResponse
- } else {
-
- client.Timeout = time.Minute * 5
+ bodyString := []byte{}
+ decisionString := ""
+ choicesString := ""
+ skipHttpParsing := false
+ resultMapping := ActionResult{}
+ openaiOutput := openai.ChatCompletionResponse{}
- // Test for whether we can ignore response wait time
- // This is to drastically reduce CPU use of Agent requests
- // 1 second = enough to read the body, which is the only major
- // obstacle
- if skipAgentWait == "true" {
- //client.Timeout = time.Second * 1
- client.Timeout = time.Millisecond * 1000
- fullUrl += "&skip_result_wait=true"
- } else {
- // Makes sure we wait as long as possible
- fullUrl += "&timeout=300"
+ if agentRunLocation == "local" {
+ callInfo := AiCallInfo{
+ Caller: "aiAgentRunner",
+ OrgID: execution.Workflow.OrgId,
}
- req, err := http.NewRequest(
- "POST",
- fullUrl,
- bytes.NewBuffer([]byte(marshalledAction)),
+ output, err := RunAiQuery(
+ ctx,
+ callInfo,
+ "",
+ "",
+ completionRequest,
)
- if err != nil {
- log.Printf("[ERROR][%s] AI_AGENT_LLM_FAILURE: Failed creating request during LLM setup: %s", execution.ExecutionId, err)
- return abortAgentExecution(ctx, execution, startNode, oldAgentOutput, "llm_request_build_failed", fmt.Sprintf("Failed to start AI Agent (7): %s", err.Error()))
+ if err != nil {
+ log.Printf("[ERROR][%s] AI Agent: Failed running AI query for action %s: %s", execution.ExecutionId, startNode.ID, err)
+ return abortAgentExecution(ctx, execution, startNode, AgentOutput{}, "run_ai_query_failed", fmt.Sprintf("Failed to start AI Agent (6): %s", err.Error()))
}
- // Generate a one-time-use token so PrepareSingleAction knows this request originated from a legitimate agent execution and is allowed to inject the system AI credentials.
- agentOneTimeToken := uuid.NewV4().String()
- agentTokenCacheKey := fmt.Sprintf("agent_onetime_token_%s", agentOneTimeToken)
- if err := SetCache(ctx, agentTokenCacheKey, []byte("1"), 60); err != nil {
- log.Printf("[WARNING][%s] Failed to set agent one-time token in cache: %s", execution.ExecutionId, err)
+ bodyString = []byte(output)
+ resultMapping.Result = output
+ resultMapping.ExecutionId = execution.ExecutionId
+ resultMapping.Authorization = execution.Authorization
+ // Waiting 3
+ resultMapping.Status = "WAITING"
+ resultMapping.Action = startNode
+ resultMapping.Action.Name = "agent"
+
+ decisionString = output
+ skipHttpParsing = true
+
+ } else {
+ // Hardcoded for now
+ aiNode := Action{}
+ aiNode.AppID = "5d19dd82517870c68d40cacad9b5ca91"
+ aiNode.AppName = "openai"
+ aiNode.Name = "post_generate_a_chat_response"
+
+ //aiNode.Environment = "cloud"
+
+ // FIXME: Resetting auth as it should auto-pick (if possible)
+ aiNode.AuthenticationId = ""
+ aiNode.Parameters = []WorkflowAppActionParameter{
+ // WorkflowAppActionParameter{
+ // Name: "url",
+ // Value: "",
+ // },
+ //WorkflowAppActionParameter{
+ // Name: "apikey",
+ // Value: "",
+ //},
+ WorkflowAppActionParameter{
+ Name: "body",
+ Value: string(initialAgentRequestBody),
+ },
+ WorkflowAppActionParameter{
+ Name: "headers",
+ Value: "Content-Type: application/json\nAccept: application/json",
+ },
}
- req.Header.Set("X-Agent-Token", agentOneTimeToken)
- newresp, err := client.Do(req)
+ // Adding additional non-required params to make sure we get them parsed
- log.Printf("[INFO][%s] Started AI Agent action %s with app '%s'. Waiting for results...", execution.ExecutionId, startNode.ID, chosenAiApp)
- if err != nil {
- if skipAgentWait == "true" && strings.Contains(strings.ToLower(err.Error()), "timeout") {
- // Question when we return here:
- // How do we get back to EXACTLY here when the AI is done?
- // Point being: we need the same data anyway.
- return startNode, nil
- } else {
- log.Printf("[ERROR][%s] AI_AGENT_LLM_FAILURE: org=%s error=%s", execution.ExecutionId, execution.Workflow.OrgId, strings.Replace(err.Error(), `"`, `\"`, -1))
- return abortAgentExecution(ctx, execution, startNode, oldAgentOutput, "llm_http_failure", fmt.Sprintf("LLM call failed after %ds: %s", int(client.Timeout.Seconds()), err.Error()))
- }
- }
- if skipAgentWait == "true" {
- return startNode, nil
- }
+ // To ensure we get the context of an execution properly
+ // This gives it variables to run IN CONTEXT of the current execution,
+ // meaning it has access to current variables
+ aiNode.SourceWorkflow = execution.Workflow.ID
+ aiNode.SourceExecution = execution.ExecutionId
- // Set timestamp as soon as it's ready
- // https://pkg.go.dev/github.com/sashabaranov/go-openai#ChatCompletionMessage
- for messageIndex, _ := range completionRequest.Messages {
- if len(completionRequest.Messages[messageIndex].Name) == 0 {
- completionRequest.Messages[messageIndex].Name = fmt.Sprintf("%d", time.Now().Unix())
- }
- }
+ // App run delay if needed (e.g. for debugging)
+ //aiNode.ExecutionDelay = 60
- defer newresp.Body.Close()
- body, err = ioutil.ReadAll(newresp.Body)
+ marshalledAction, err := json.Marshal(aiNode)
if err != nil {
- log.Printf("[ERROR][%s] AI Agent: Failed reading response body from LLM: %s", execution.ExecutionId, err)
- return abortAgentExecution(ctx, execution, startNode, oldAgentOutput, "llm_body_read_failed", fmt.Sprintf("Failed to read LLM response body: %s", err.Error()))
+ log.Printf("[ERROR][%s] AI Agent: Failed marshalling action for AI Agent (first agent request): %s", execution.ExecutionId, err)
+ return abortAgentExecution(ctx, execution, startNode, AgentOutput{}, "marshal_ai_action_failed", fmt.Sprintf("Failed to start AI Agent (6): %s", err.Error()))
}
-
- llmStatusCode = newresp.StatusCode
- }
- // Maps OpenAI -> Result struct so we can handle it
- resultMapping := ActionResult{}
- err = json.Unmarshal(body, &resultMapping)
- if err != nil {
- log.Printf("[ERROR] AI Agent (2): Failed unmarshalling response into decisions. Response from sending AI Agent request to %s: %d - '%s'. Err: %s", fullUrl, llmStatusCode, string(body), err)
- }
+ fullUrl := fmt.Sprintf("%s/api/v1/apps/%s/run?execution_id=%s&authorization=%s&parent_node=%s", backendUrl, aiNode.AppID, execution.ExecutionId, execution.Authorization, startNode.ID)
+ client := GetExternalClient(fullUrl)
+ body := []byte{}
+ if skipAgentWait == "true" && len(llmResponse) > 0 {
+ body = llmResponse
+ } else {
+
+ client.Timeout = time.Minute * 5
- resultMapping.ExecutionId = execution.ExecutionId
- resultMapping.Authorization = execution.Authorization
- // Waiting 3
- resultMapping.Status = "WAITING"
- resultMapping.Action = startNode
- resultMapping.Action.Name = "agent"
+ // Test for whether we can ignore response wait time
+ // This is to drastically reduce CPU use of Agent requests
+ // 1 second = enough to read the body, which is the only major
+ // obstacle
+ if skipAgentWait == "true" {
+ //client.Timeout = time.Second * 1
+ client.Timeout = time.Millisecond * 1000
+ fullUrl += "&skip_result_wait=true"
+ } else {
+ // Makes sure we wait as long as possible
+ fullUrl += "&timeout=300"
+ }
- // This exists for the single reason of tracking errors + parameters
- // ActionResult{} is the type we are using to build the request, while
- // the LLM request ACTUALLY returns SingleResult{}
- additionalResultMapping := SingleResult{}
- err = json.Unmarshal(body, &additionalResultMapping)
+ req, err := http.NewRequest(
+ "POST",
+ fullUrl,
+ bytes.NewBuffer([]byte(marshalledAction)),
+ )
- parsedAgentInput := ""
- if err == nil {
- // Checking for errors in the Single Action run.
- // They usually cause notifications to occur as well.
- if len(additionalResultMapping.Errors) > 0 {
- // Handle this.
- if debug {
- log.Printf("\n\n[ERROR][%s] AI Agent: BODY LEN: %d. Got %d errors from Agent AI subrequest", resultMapping.ExecutionId, len(body), len(additionalResultMapping.Errors))
+ if err != nil {
+ log.Printf("[ERROR][%s] AI_AGENT_LLM_FAILURE: Failed creating request during LLM setup: %s", execution.ExecutionId, err)
+ return abortAgentExecution(ctx, execution, startNode, oldAgentOutput, "llm_request_build_failed", fmt.Sprintf("Failed to start AI Agent (7): %s", err.Error()))
}
- }
- if len(additionalResultMapping.Parameters) > 0 {
- // FIXME: Check if the result somehow contains the input we sent in.
- // The reason for this is to ensure we can use the return params (somehow)
- //log.Printf("\n\n[WARNING][%s] BODY LEN: %d. Got %d params from Agent AI subrequest", resultMapping.ExecutionId, len(body), len(additionalResultMapping.Parameters))
+ // Generate a one-time-use token so PrepareSingleAction knows this request originated from a legitimate agent execution and is allowed to inject the system AI credentials.
+ agentOneTimeToken := uuid.NewV4().String()
+ agentTokenCacheKey := fmt.Sprintf("agent_onetime_token_%s", agentOneTimeToken)
+ if err := SetCache(ctx, agentTokenCacheKey, []byte("1"), 60); err != nil {
+ log.Printf("[WARNING][%s] Failed to set agent one-time token in cache: %s", execution.ExecutionId, err)
+ }
- for _, param := range additionalResultMapping.Parameters {
- if param.Name != "body" {
- continue
- }
+ req.Header.Set("X-Agent-Token", agentOneTimeToken)
+ newresp, err := client.Do(req)
- if debug {
- log.Printf("[DEBUG][%s] AI Agent: Found body parameter which MAY contain the right user input. LEN: %d", execution.ExecutionId, len(param.Value))
- }
+ log.Printf("[INFO][%s] Started AI Agent action %s with app '%s'. Waiting for results...", execution.ExecutionId, startNode.ID, chosenAiApp)
- if len(param.Value) > 0 {
- parsedAgentInput = param.Value
- break
+ if err != nil {
+ if skipAgentWait == "true" && strings.Contains(strings.ToLower(err.Error()), "timeout") {
+ // Question when we return here:
+ // How do we get back to EXACTLY here when the AI is done?
+ // Point being: we need the same data anyway.
+ return startNode, nil
+ } else {
+ log.Printf("[ERROR][%s] AI_AGENT_LLM_FAILURE: org=%s error=%s", execution.ExecutionId, execution.Workflow.OrgId, strings.Replace(err.Error(), `"`, `\"`, -1))
+ return abortAgentExecution(ctx, execution, startNode, oldAgentOutput, "llm_http_failure", fmt.Sprintf("LLM call failed after %ds: %s", int(client.Timeout.Seconds()), err.Error()))
}
}
- }
- }
- // Store the completion request in datastore?
- if len(resultMapping.Result) > 0 {
- if strings.Contains(strings.ToLower(resultMapping.Result), "minimum of one branch") {
- branchSkipOutput := AgentOutput{
- Status: "FINISHED",
- Output: resultMapping.Result,
- CompletedAt: time.Now().UnixMilli(),
- }
- marshalledOutput, _ := json.Marshal(branchSkipOutput)
-
- successResult := ActionResult{
- Status: "SUCCESS",
- Result: string(marshalledOutput),
- Action: startNode,
- ExecutionId: execution.ExecutionId,
- Authorization: execution.Authorization,
- StartedAt: time.Now().UnixMilli(),
- CompletedAt: time.Now().UnixMilli(),
+ if skipAgentWait == "true" {
+ return startNode, nil
}
- for i, r := range execution.Results {
- if r.Action.ID == startNode.ID {
- execution.Results[i] = successResult
- break
+ // Set timestamp as soon as it's ready
+ // https://pkg.go.dev/github.com/sashabaranov/go-openai#ChatCompletionMessage
+ for messageIndex, _ := range completionRequest.Messages {
+ if len(completionRequest.Messages[messageIndex].Name) == 0 {
+ completionRequest.Messages[messageIndex].Name = fmt.Sprintf("%d", time.Now().Unix())
}
}
- go sendAgentActionSelfRequest("SUCCESS", execution, successResult)
- return startNode, nil
+ defer newresp.Body.Close()
+ body, err = ioutil.ReadAll(newresp.Body)
+ if err != nil {
+ log.Printf("[ERROR][%s] AI Agent: Failed reading response body from LLM: %s", execution.ExecutionId, err)
+ return abortAgentExecution(ctx, execution, startNode, oldAgentOutput, "llm_body_read_failed", fmt.Sprintf("Failed to read LLM response body: %s", err.Error()))
+ }
+
+ llmStatusCode = newresp.StatusCode
}
- // 1. Map it to a Shuffle HTTP Result
- // 2. Find the content: $ai_agent_1.body.choices.#.message.content
- // 3. Map the content into the AgentOutput struct
- //resultMapping.Result = openaiOutput
- outputMap := HTTPOutput{}
- err = json.Unmarshal([]byte(resultMapping.Result), &outputMap)
+ // Maps OpenAI -> Result struct so we can handle it
+ resultMapping := ActionResult{}
+ err = json.Unmarshal(body, &resultMapping)
if err != nil {
- // resultMapping.Result is not a valid HTTPOutput wrapper — this usually means the Shuffle HTTP action itself failed (timeout, or something like connection refused etc) and returned a bare error string instead of its normal JSON response.
- log.Printf("[ERROR][%s] AI_AGENT_LLM_FAILURE: org=%s error_type=http_wrapper_parse_error unmarshal_err=%s raw_response=%s", execution.ExecutionId, execution.Workflow.OrgId, err, string(resultMapping.Result))
- return abortAgentExecution(ctx, execution, startNode, AgentOutput{}, "llm_response_unmarshal_failed", fmt.Sprintf("LLM HTTP wrapper parse error: %s", err))
+ log.Printf("[ERROR] AI Agent (2): Failed unmarshalling response into decisions. Response from sending AI Agent request to %s: %d - '%s'. Err: %s", fullUrl, llmStatusCode, string(body), err)
}
- if outputMap.Status != 200 {
- log.Printf("[ERROR][%s] AI Agent: Failed to run AI agent with status code %d", execution.ExecutionId, outputMap.Status)
- // Don't log AI_AGENT_LLM_FAILURE here yet - wait to see if we can parse the error details below
- //return startNode, errors.New(fmt.Sprintf("Failed to run AI agent with status code %d", outputMap.Status))
- }
+ resultMapping.ExecutionId = execution.ExecutionId
+ resultMapping.Authorization = execution.Authorization
+ // Waiting 3
+ resultMapping.Status = "WAITING"
+ resultMapping.Action = startNode
+ resultMapping.Action.Name = "agent"
- // Parse the outputMap.Result to OpenAI response
- choicesString := ""
- bodyString := []byte{}
- bodyMap, ok := outputMap.Body.(map[string]interface{})
- if !ok {
- log.Printf("[ERROR][%s] AI Agent: Failed to convert body to MAP in AI Agent response. Raw response: %s", execution.ExecutionId, string(resultMapping.Result))
+ // This exists for the single reason of tracking errors + parameters
+ // ActionResult{} is the type we are using to build the request, while
+ // the LLM request ACTUALLY returns SingleResult{}
+ additionalResultMapping := SingleResult{}
+ err = json.Unmarshal(body, &additionalResultMapping)
- //choicesString = fmt.Sprintf("LLM Response Error: %s", string(resultMapping.Result))
- choicesString = fmt.Sprintf("%s", string(resultMapping.Result))
+ if err == nil {
+ // Checking for errors in the Single Action run.
+ // They usually cause notifications to occur as well.
+ if len(additionalResultMapping.Errors) > 0 {
+ // Handle this.
+ if debug {
+ log.Printf("\n\n[ERROR][%s] AI Agent: BODY LEN: %d. Got %d errors from Agent AI subrequest", resultMapping.ExecutionId, len(body), len(additionalResultMapping.Errors))
+ }
+ }
- // Log LLM failure for body parsing error
- log.Printf("[ERROR][%s] AI_AGENT_LLM_FAILURE: org=%s status_code=%d error_type=body_parse_error raw_response=%s", execution.ExecutionId, execution.Workflow.OrgId, outputMap.Status, string(resultMapping.Result))
- } else {
- bodyString, err = json.Marshal(bodyMap)
- if err != nil {
- log.Printf("[ERROR] AI Agent: Failed marshalling body to string in AI Agent response: %s", err)
- return abortAgentExecution(ctx, execution, startNode, AgentOutput{}, "llm_body_marshal_failed", fmt.Sprintf("Failed to start AI Agent (3): %s", err.Error()))
+ if len(additionalResultMapping.Parameters) > 0 {
+ // FIXME: Check if the result somehow contains the input we sent in.
+ // The reason for this is to ensure we can use the return params (somehow)
+ //log.Printf("\n\n[WARNING][%s] BODY LEN: %d. Got %d params from Agent AI subrequest", resultMapping.ExecutionId, len(body), len(additionalResultMapping.Parameters))
+
+ for _, param := range additionalResultMapping.Parameters {
+ if param.Name != "body" {
+ continue
+ }
+
+ if debug {
+ log.Printf("[DEBUG][%s] AI Agent: Found body parameter which MAY contain the right user input. LEN: %d", execution.ExecutionId, len(param.Value))
+ }
+
+ if len(param.Value) > 0 {
+ parsedAgentInput = param.Value
+ break
+ }
+ }
}
}
+ }
- openaiOutput := openai.ChatCompletionResponse{}
- err = json.Unmarshal(bodyString, &openaiOutput)
- if err != nil {
- log.Printf("[ERROR][%s] AI Agent (4): Failed unmarshalling response from OpenAI Agent request: %s", execution.ExecutionId, err)
- }
+ // Store the completion request in datastore?
+ if len(resultMapping.Result) > 0 {
- // Edgecase handling for LLM not being available etc
- if len(choicesString) > 0 {
- if debug {
- log.Printf("[ERROR][%s] AI Agent: Found choicesString (1) in AI Agent response error handling: %s", execution.ExecutionId, choicesString)
+ // In case of local AI Query
+ // To bypass, use export SHUFFLE_AGENT_RUN_LOCATION="local"
+ if !skipHttpParsing {
+ if strings.Contains(strings.ToLower(resultMapping.Result), "minimum of one branch") {
+ branchSkipOutput := AgentOutput{
+ Status: "FINISHED",
+ Output: resultMapping.Result,
+ CompletedAt: time.Now().UnixMilli(),
+ }
+ marshalledOutput, _ := json.Marshal(branchSkipOutput)
+
+ successResult := ActionResult{
+ Status: "SUCCESS",
+ Result: string(marshalledOutput),
+ Action: startNode,
+ ExecutionId: execution.ExecutionId,
+ Authorization: execution.Authorization,
+ StartedAt: time.Now().UnixMilli(),
+ CompletedAt: time.Now().UnixMilli(),
+ }
+
+ for i, r := range execution.Results {
+ if r.Action.ID == startNode.ID {
+ execution.Results[i] = successResult
+ break
+ }
+ }
+
+ go sendAgentActionSelfRequest("SUCCESS", execution, successResult)
+ return startNode, nil
}
- } else if len(openaiOutput.Choices) == 0 {
- log.Printf("[ERROR][%s] AI Agent: No choices found in AI agent response (1). Status: %d. Raw: %s", execution.ExecutionId, outputMap.Status, bodyString)
+ // 1. Map it to a Shuffle HTTP Result
+ // 2. Find the content: $ai_agent_1.body.choices.#.message.content
+ // 3. Map the content into the AgentOutput struct
+ //resultMapping.Result = openaiOutput
+ outputMap := HTTPOutput{}
+ err = json.Unmarshal([]byte(resultMapping.Result), &outputMap)
+ if err != nil {
+ // resultMapping.Result is not a valid HTTPOutput wrapper — this usually means the Shuffle HTTP action itself failed (timeout, or something like connection refused etc) and returned a bare error string instead of its normal JSON response.
+ log.Printf("[ERROR][%s] AI_AGENT_LLM_FAILURE: org=%s error_type=http_wrapper_parse_error unmarshal_err=%s raw_response=%s", execution.ExecutionId, execution.Workflow.OrgId, err, string(resultMapping.Result))
+ return abortAgentExecution(ctx, execution, startNode, AgentOutput{}, "llm_response_unmarshal_failed", fmt.Sprintf("LLM HTTP wrapper parse error: %s", err))
+ }
- // This is specific to OpenAI, but may work for others
- newOutput := openai.ErrorResponse{}
- err = json.Unmarshal(bodyString, &newOutput)
- if err == nil && len(newOutput.Error.Message) > 0 {
- // choicesString = fmt.Sprintf("LLM Error: %s", newOutput.Error.Message)
+ if outputMap.Status != 200 {
+ log.Printf("[ERROR][%s] AI Agent: Failed to run AI agent with status code %d", execution.ExecutionId, outputMap.Status)
+ // Don't log AI_AGENT_LLM_FAILURE here yet - wait to see if we can parse the error details below
+ //return startNode, errors.New(fmt.Sprintf("Failed to run AI agent with status code %d", outputMap.Status))
+ }
- // resultMapping.Status = "FAILURE"
- // LLM returned a proper error (401 invalid key, 429 rate limit, 500 server error, etc.)
- if outputMap.Status == 429 {
- rateLimitKey := "openai_rate_limit_log"
- if _, cacheErr := GetCache(ctx, rateLimitKey); cacheErr != nil {
- log.Printf("[ERROR][%s] AI_OPENAI_RATE_LIMIT: org=%s error_message=%s", execution.ExecutionId, execution.Workflow.OrgId, newOutput.Error.Message)
- _ = SetCache(ctx, rateLimitKey, []byte("1"), 30)
- }
- } else {
- log.Printf("[ERROR][%s] AI_AGENT_LLM_FAILURE: org=%s status_code=%d error_type=%s error_message=%s", execution.ExecutionId, execution.Workflow.OrgId, outputMap.Status, newOutput.Error.Type, newOutput.Error.Message)
- }
- return abortAgentExecution(ctx, execution, startNode, oldAgentOutput, "llm_http_error", fmt.Sprintf("LLM error (HTTP %d %s): %s", outputMap.Status, newOutput.Error.Type, newOutput.Error.Message))
+ // Parse the outputMap.Result to OpenAI response
+ choicesString := ""
+ bodyMap, ok := outputMap.Body.(map[string]interface{})
+ if !ok {
+ log.Printf("[ERROR][%s] AI Agent: Failed to convert body to MAP in AI Agent response. Raw response: %s", execution.ExecutionId, string(resultMapping.Result))
+
+ //choicesString = fmt.Sprintf("LLM Response Error: %s", string(resultMapping.Result))
+ choicesString = fmt.Sprintf("%s", string(resultMapping.Result))
+
+ // Log LLM failure for body parsing error
+ log.Printf("[ERROR][%s] AI_AGENT_LLM_FAILURE: org=%s status_code=%d error_type=body_parse_error raw_response=%s", execution.ExecutionId, execution.Workflow.OrgId, outputMap.Status, string(resultMapping.Result))
} else {
- log.Printf("[ERROR][%s] AI Agent: No choices, nor error found in AI agent response. Status: %d. Raw: %s", execution.ExecutionId, outputMap.Status, bodyString)
- resultMapping.Status = "FAILURE"
+ bodyString, err = json.Marshal(bodyMap)
+ if err != nil {
+ log.Printf("[ERROR] AI Agent: Failed marshalling body to string in AI Agent response: %s", err)
+ return abortAgentExecution(ctx, execution, startNode, AgentOutput{}, "llm_body_marshal_failed", fmt.Sprintf("Failed to start AI Agent (3): %s", err.Error()))
+ }
+ }
- // Log LLM failure for unknown error format
- log.Printf("[ERROR][%s] AI_AGENT_LLM_FAILURE: org=%s status_code=%d error_type=unknown_format raw_response=%s", execution.ExecutionId, execution.Workflow.OrgId, outputMap.Status, string(bodyString))
+ err = json.Unmarshal(bodyString, &openaiOutput)
+ if err != nil {
+ log.Printf("[ERROR][%s] AI Agent (4): Failed unmarshalling response from OpenAI Agent request: %s", execution.ExecutionId, err)
}
- } else {
- choicesString = openaiOutput.Choices[0].Message.Content
- //if debug {
- // log.Printf("[DEBUG] Found choices string (2) in AI Agent response - len: %d: %s", len(choicesString), choicesString)
- //}
- if openaiOutput.Usage.TotalTokens > 0 && len(execution.Workflow.OrgId) > 0 {
- cachedTokens := 0
- if openaiOutput.Usage.PromptTokensDetails != nil {
- cachedTokens = openaiOutput.Usage.PromptTokensDetails.CachedTokens
+ // Edgecase handling for LLM not being available etc
+ if len(choicesString) > 0 {
+ if debug {
+ log.Printf("[ERROR][%s] AI Agent: Found choicesString (1) in AI Agent response error handling: %s", execution.ExecutionId, choicesString)
}
- reasoningTokens := 0
- if openaiOutput.Usage.CompletionTokensDetails != nil {
- reasoningTokens = openaiOutput.Usage.CompletionTokensDetails.ReasoningTokens
- }
+ } else if len(openaiOutput.Choices) == 0 {
+ log.Printf("[ERROR][%s] AI Agent: No choices found in AI agent response (1). Status: %d. Raw: %s", execution.ExecutionId, outputMap.Status, bodyString)
- inputTokens := int(openaiOutput.Usage.PromptTokens)
- outputTokens := int(openaiOutput.Usage.CompletionTokens)
- totalTokens := int(openaiOutput.Usage.TotalTokens)
+ // This is specific to OpenAI, but may work for others
+ newOutput := openai.ErrorResponse{}
+ err = json.Unmarshal(bodyString, &newOutput)
+ if err == nil && len(newOutput.Error.Message) > 0 {
+ // choicesString = fmt.Sprintf("LLM Error: %s", newOutput.Error.Message)
- subOrgId := execution.Workflow.OrgId
- go func() {
- time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
- IncrementCacheDump(ctx, billingOrgId, "agent_tokens", totalTokens)
- if inputTokens > 0 {
- IncrementCache(ctx, billingOrgId, "agent_input_tokens", inputTokens)
+ // resultMapping.Status = "FAILURE"
+ // LLM returned a proper error (401 invalid key, 429 rate limit, 500 server error, etc.)
+ if outputMap.Status == 429 {
+ rateLimitKey := "openai_rate_limit_log"
+ if _, cacheErr := GetCache(ctx, rateLimitKey); cacheErr != nil {
+ log.Printf("[ERROR][%s] AI_OPENAI_RATE_LIMIT: org=%s error_message=%s", execution.ExecutionId, execution.Workflow.OrgId, newOutput.Error.Message)
+ _ = SetCache(ctx, rateLimitKey, []byte("1"), 30)
+ }
+ } else {
+ log.Printf("[ERROR][%s] AI_AGENT_LLM_FAILURE: org=%s status_code=%d error_type=%s error_message=%s", execution.ExecutionId, execution.Workflow.OrgId, outputMap.Status, newOutput.Error.Type, newOutput.Error.Message)
+ }
+ return abortAgentExecution(ctx, execution, startNode, oldAgentOutput, "llm_http_error", fmt.Sprintf("LLM error (HTTP %d %s): %s", outputMap.Status, newOutput.Error.Type, newOutput.Error.Message))
+ } else {
+ log.Printf("[ERROR][%s] AI Agent: No choices, nor error found in AI agent response. Status: %d. Raw: %s", execution.ExecutionId, outputMap.Status, bodyString)
+ resultMapping.Status = "FAILURE"
+
+ // Log LLM failure for unknown error format
+ log.Printf("[ERROR][%s] AI_AGENT_LLM_FAILURE: org=%s status_code=%d error_type=unknown_format raw_response=%s", execution.ExecutionId, execution.Workflow.OrgId, outputMap.Status, string(bodyString))
+ }
+ } else {
+ choicesString = openaiOutput.Choices[0].Message.Content
+ //if debug {
+ // log.Printf("[DEBUG] Found choices string (2) in AI Agent response - len: %d: %s", len(choicesString), choicesString)
+ //}
+
+ if openaiOutput.Usage.TotalTokens > 0 && len(execution.Workflow.OrgId) > 0 {
+ cachedTokens := 0
+ if openaiOutput.Usage.PromptTokensDetails != nil {
+ cachedTokens = openaiOutput.Usage.PromptTokensDetails.CachedTokens
}
- if outputTokens > 0 {
- IncrementCache(ctx, billingOrgId, "agent_output_tokens", outputTokens)
+
+ reasoningTokens := 0
+ if openaiOutput.Usage.CompletionTokensDetails != nil {
+ reasoningTokens = openaiOutput.Usage.CompletionTokensDetails.ReasoningTokens
}
- if billingOrgId != subOrgId {
- IncrementCache(ctx, subOrgId, "agent_tokens", totalTokens)
+ inputTokens := int(openaiOutput.Usage.PromptTokens)
+ outputTokens := int(openaiOutput.Usage.CompletionTokens)
+ totalTokens := int(openaiOutput.Usage.TotalTokens)
+
+ go func() {
+ time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
+ IncrementCacheDump(ctx, billingOrgId, "agent_tokens", totalTokens)
if inputTokens > 0 {
- IncrementCache(ctx, subOrgId, "agent_input_tokens", inputTokens)
+ IncrementCache(ctx, billingOrgId, "agent_input_tokens", inputTokens)
}
if outputTokens > 0 {
- IncrementCache(ctx, subOrgId, "agent_output_tokens", outputTokens)
+ IncrementCache(ctx, billingOrgId, "agent_output_tokens", outputTokens)
}
- }
- }()
- log.Printf("[AUDIT][%s] Incremented AI Agent usage for billing_org=%s exec_org=%s total=%d input=%d output=%d cached=%d reasoning=%d", execution.ExecutionId, billingOrgId, execution.Workflow.OrgId, totalTokens, inputTokens, outputTokens, cachedTokens, reasoningTokens)
- }
+ }()
+ log.Printf("[AUDIT][%s] Incremented AI Agent usage for billing_org=%s exec_org=%s total=%d input=%d output=%d cached=%d reasoning=%d", execution.ExecutionId, billingOrgId, execution.Workflow.OrgId, totalTokens, inputTokens, outputTokens, cachedTokens, reasoningTokens)
+ }
- // Handles reasoning models for Refusal control edgecases
- // Not always sure why this is happening
- if len(choicesString) == 0 && len(openaiOutput.Choices[0].Message.Refusal) > 0 {
- choicesString = openaiOutput.Choices[0].Message.Refusal
+ // Handles reasoning models for Refusal control edgecases
+ // Not always sure why this is happening
+ if len(choicesString) == 0 && len(openaiOutput.Choices[0].Message.Refusal) > 0 {
+ choicesString = openaiOutput.Choices[0].Message.Refusal
- if strings.HasPrefix(choicesString, "JSON") {
- choicesString = strings.Replace(choicesString, "JSON", "", 1)
- }
+ if strings.HasPrefix(choicesString, "JSON") {
+ choicesString = strings.Replace(choicesString, "JSON", "", 1)
+ }
- if strings.HasPrefix(choicesString, "json") {
- choicesString = strings.Replace(choicesString, "json", "", 1)
+ if strings.HasPrefix(choicesString, "json") {
+ choicesString = strings.Replace(choicesString, "json", "", 1)
+ }
}
+
+ choicesString = strings.TrimSpace(choicesString)
+ //log.Printf("\n\n\nCONTENT: %#v\n\n\n", choicesString)
}
+ }
- choicesString = strings.TrimSpace(choicesString)
- //log.Printf("\n\n\nCONTENT: %#v\n\n\n", choicesString)
+ if len(decisionString) > 0 && len(choicesString) == 0 {
+ choicesString = decisionString
}
// Found random JSON issues with [{} and similar, due to LLM instability.
- mappedDecisions := []AgentDecision{}
- decisionString := FixContentOutput(choicesString)
+ decisionString = FixContentOutput(choicesString)
// Find the first one and remove anything until that point
conditionText := "conditions must be correct"
@@ -8839,13 +8993,14 @@ data_filter:
}
}
- // LLM is occasionally appending freeform text like (e.g. "Summary: ...") after the closing bracket. Truncate everything past the last ']' so the JSON
+ // LLM is occasionally appending freeform text like (e.g. "Summary: ...") after the closing bracket. Truncate everything past the last ']' so the JSON
// parser doesn't dont break due to that.
if lastBracket := strings.LastIndex(decisionString, "]"); lastBracket != -1 {
decisionString = decisionString[:lastBracket+1]
}
errorMessage := ""
+ mappedDecisions := []AgentDecision{}
err = json.Unmarshal([]byte(decisionString), &mappedDecisions)
if err != nil {
if !strings.Contains(decisionString, conditionText) {
@@ -8910,6 +9065,7 @@ data_filter:
agentOutput = oldAgentOutput
agentOutput.Status = "RUNNING"
agentOutput.LLMCallCount += 1
+
// Accumulate token usage
if openaiOutput.Usage.TotalTokens > 0 {
agentOutput.TotalTokens += int64(openaiOutput.Usage.TotalTokens)
@@ -9389,6 +9545,7 @@ data_filter:
return startNode, err
}
+ client := GetExternalClient(streamUrl)
_, _, streamErr := DoRequestWithRetry(client, streamReq, 3)
if streamErr != nil {
log.Printf("[ERROR] AI Agent: Failed sending request for stream during SKIPPED user input: %s", streamErr)
@@ -9792,7 +9949,6 @@ func RunAiQuery(ctx context.Context, info AiCallInfo, systemMessage, userMessage
aiRequestUrl := os.Getenv("AI_API_URL")
aiApiVersion := os.Getenv("AI_API_VERSION")
orgId := os.Getenv("AI_API_ORG")
-
if len(apiKey) == 0 {
apiKey = os.Getenv("OPENAI_API_KEY")
}
@@ -9952,12 +10108,6 @@ func RunAiQuery(ctx context.Context, info AiCallInfo, systemMessage, userMessage
}
}
- if debug {
- for _, message := range chatCompletion.Messages {
- log.Printf("[DEBUG] Role: '%s' => Content: %s\n\n", message.Role, message.Content)
- }
- }
-
maxRetries := 3
sleepTimer := time.Duration(2)
contentOutput := ""
diff --git a/blobs.go b/blobs.go
index 41262756..73719f18 100644
--- a/blobs.go
+++ b/blobs.go
@@ -1224,6 +1224,7 @@ func GetDefaultWorkflowByType(workflow Workflow, orgId string, categoryAction Ca
prepareAgentRun := uuid.NewV4().String()
aiAgentRun := uuid.NewV4().String()
addAgentResponse := uuid.NewV4().String()
+ assignedAppsId := uuid.NewV4().String()
// Test-workflow used to figure it out
// /workflows/25cd78f7-06a0-4f5e-8026-f1c25c74bf74
@@ -1259,6 +1260,29 @@ func GetDefaultWorkflowByType(workflow Workflow, orgId string, categoryAction Ca
},
},
},
+ Action{
+ Name: "get_datastore_value",
+ AppID: "Shuffle Tools",
+ AppName: "Shuffle Tools",
+ ID: assignedAppsId,
+ AppVersion: "1.2.0",
+ Environment: actionEnv,
+ Label: "Get_assigned_apps",
+ Parameters: []WorkflowAppActionParameter{
+ WorkflowAppActionParameter{
+ Name: "key",
+ Value: "config",
+ Multiline: false,
+ Required: true,
+ },
+ WorkflowAppActionParameter{
+ Name: "category",
+ Value: "shuffle-security_agent_tools",
+ Multiline: false,
+ Required: false,
+ },
+ },
+ },
Action{
Name: "execute_python",
AppID: "Shuffle Tools",
@@ -1307,7 +1331,7 @@ func GetDefaultWorkflowByType(workflow Workflow, orgId string, categoryAction Ca
Name: "app_name",
Multiline: false,
Required: true,
- Value: "Shuffle AI",
+ Value: "$handle_ai_agent_run.message.assigned_apps",
},
WorkflowAppActionParameter{
Name: "input",
@@ -1320,11 +1344,13 @@ func GetDefaultWorkflowByType(workflow Workflow, orgId string, categoryAction Ca
**Full context:**
$exec`,
},
+
+ /*
WorkflowAppActionParameter{
Name: "action",
Multiline: false,
Required: false,
- Value: "Nothing",
+ Value: "",
},
WorkflowAppActionParameter{
Name: "memory",
@@ -1338,6 +1364,7 @@ $exec`,
Required: false,
Value: "",
},
+ */
},
},
Action{
@@ -1380,12 +1407,24 @@ $exec`,
},
},
},
+ Branch{
+ SourceID: startActionId,
+ DestinationID: assignedAppsId,
+ ID: uuid.NewV4().String(),
+ Conditions: []Condition{},
+ },
Branch{
SourceID: startActionId,
DestinationID: prepareAgentRun,
ID: uuid.NewV4().String(),
Conditions: []Condition{},
},
+ Branch{
+ SourceID: assignedAppsId,
+ DestinationID: prepareAgentRun,
+ ID: uuid.NewV4().String(),
+ Conditions: []Condition{},
+ },
Branch{
SourceID: relevantPeopleId,
DestinationID: prepareAgentRun,
@@ -1426,6 +1465,21 @@ $exec`,
Value: "",
},
},
+ //Condition{
+ // Source: WorkflowAppActionParameter{
+ // Name: "source",
+ // Value: "$handle_ai_agent_run.success",
+ // },
+ // Condition: WorkflowAppActionParameter{
+ // Name: "condition",
+ // Value: "does not equal",
+ // Configuration: true, // Opposite
+ // },
+ // Destination: WorkflowAppActionParameter{
+ // Name: "destination",
+ // Value: "false",
+ // },
+ //},
},
},
Branch{
@@ -4149,16 +4203,6 @@ func handleRelevantPeopleAgentPrepareCode() string {
cur_exec = self.get_key("$exec.shuffle_datastore.key", category="$exec.shuffle_datastore.category")
if cur_exec["success"] and cur_exec["value"]:
cur_exec = cur_exec["value"]
-
- if "finding_uid" in cur_exec and len(cur_exec["finding_uid"]) > 0:
- pass
- else:
- print(json.dumps({
- "success": False,
- "reason": "No finding_uid in the incident. Not updating."
- }))
- exit()
-
else:
print(json.dumps({
"success": False,
@@ -4170,6 +4214,7 @@ activities = []
if "activity" in cur_exec:
activities = cur_exec["activity"]
+agent_name = ""
ai_agent_activities = []
activities_changed = False
for activityIndex in range(len(activities)):
@@ -4186,6 +4231,7 @@ for activityIndex in range(len(activities)):
if "content" not in activity or ("@aiagent" not in activity["content"].lower()):
continue
+ agent_name = "default"
ai_agent_activities.append(activity)
db_updated = False
@@ -4211,6 +4257,24 @@ if activities_changed or len(assignee) > 0:
ret = self.set_key("$exec.shuffle_datastore.key", json.dumps(parsed_activity), category="$exec.shuffle_datastore.category")
if ret["success"]:
db_updated = True
+
+available_apps = ""
+assigned_apps = json.loads(r"""$get_assigned_apps""")
+if "success" in assigned_apps and assigned_apps["success"] == True and "value" in assigned_apps and len(str(assigned_apps["value"])) > 0:
+ try:
+ assigned_apps["value"] = json.loads(assigned_apps["value"])
+ except:
+ pass
+
+ for agent_config in assigned_apps["value"]:
+ if agent_config["agent"] != agent_name:
+ continue
+
+ for tool in agent_config["tools"]:
+ available_apps += tool["name"]+","
+
+ if len(available_apps) > 0:
+ available_apps = available_apps[:-1]
# Print the details of the key after it's been updated
# To get the value, use self.get_key(key)["value"]
@@ -4219,12 +4283,13 @@ if len(ai_agent_activities) > 0:
"assignee": assignee,
"updated": db_updated,
"agent": ai_agent_activities[0],
+ "assigned_apps": available_apps,
}))
else:
- print(json.dumps({
- "updated": db_updated,
- "agent": "",
- }))`
+ print(json.dumps({
+ "updated": db_updated,
+ "agent": "",
+ }))`
}
func handleRelevantPeopleAgentResponseCode() string {
diff --git a/cloudSync.go b/cloudSync.go
index d50eec9f..ca5a2d24 100755
--- a/cloudSync.go
+++ b/cloudSync.go
@@ -3656,6 +3656,8 @@ ExecStart=$INSTALL_PATH \
--screenlock_check=$SCREENLOCK_CHECK \
--log_forwarding=$LOG_FORWARDING \
--response_actions=$RESPONSE_ACTIONS
+StandardOutput=append:/var/log/orborus.log
+StandardError=append:/var/log/orborus.log
Restart=always
RestartSec=5
@@ -3702,6 +3704,13 @@ install_macos() {
RunAtLoad
+
+ StandardOutPath
+ /tmp/orborus.log
+
+ StandardErrorPath
+ /tmp/orborus.log
+
EOF
diff --git a/codegen.go b/codegen.go
index 885dadcf..49562413 100755
--- a/codegen.go
+++ b/codegen.go
@@ -3936,7 +3936,7 @@ func HandlePut(swagger *openapi3.Swagger, api WorkflowApp, extraParameters []Wor
}
func GetAppRequirements() string {
- return "requests==2.32.3\nurllib3==2.3.0\nliquidpy==0.8.2\nMarkupSafe==3.0.2\nflask[async]==3.1.0\npython-dateutil==2.9.0.post0\nPyJWT==2.10.1\ncryptography==44.0.2\nshufflepy==0.2.2\nshuffle-sdk==0.0.39"
+ return "requests==2.32.3\nurllib3==2.3.0\nliquidpy==0.8.2\nMarkupSafe==3.0.2\nflask[async]==3.1.0\npython-dateutil==2.9.0.post0\nPyJWT==2.10.1\ncryptography==44.0.2\nshufflepy==0.2.2\nshuffle-sdk==0.0.40"
}
// Removes JSON values from the input
@@ -4645,26 +4645,29 @@ func GetAppNameSplit(version DockerRequestCheck) (string, string, string, error)
return appname, baseAppname, appVersion, nil
}
-func handleDatastoreAutomationWebhook(ctx context.Context, marshalledBody []byte, cacheData CacheKeyData, automation DatastoreAutomation, url, runType string) error {
+func handleDatastoreAutomationRequest(ctx context.Context, marshalledBody []byte, cacheData CacheKeyData, automation DatastoreAutomation, url, runType string) error {
var err error
- // Dedup here with cache
- cacheName := fmt.Sprintf("automation_%s_%s_%s", runType, cacheData.Category, cacheData.Key)
- _, err = GetCache(ctx, cacheName)
- if err == nil {
- if debug {
- log.Printf("[DEBUG] Found existing cache for %s - skipping execution to prevent duplicates", cacheName)
- }
+ // Makes sure we wait 2500ms. This is to avoid infinite loops primarily.
+ // Problem: There's a difference between user updates and automation updates.
+ // Trying without cache.
- return nil
- }
+ //cacheName := fmt.Sprintf("automation_%s_%s_%s", runType, cacheData.Category, cacheData.Key)
+ //_, err = GetCache(ctx, cacheName)
+ //if err == nil {
+ // if debug {
+ // log.Printf("[DEBUG] Found existing '%s' cache for '%s' - skipping execution to prevent duplicates", runType, cacheName)
+ // }
- // Makes sure we wait 2500ms
- SetCache(ctx, cacheName, []byte("1"), 2500, true)
+ // return nil
+ //}
+ //SetCache(ctx, cacheName, []byte("1"), 2500, true)
if runType == "run_workflow" {
} else if runType == "webhook" {
+
+ // For now HTTP app hmm
webhookUrl := ""
for _, option := range automation.Options {
if option.Key == "webhook_url" {
@@ -4766,6 +4769,10 @@ func handleDatastoreAutomationWebhook(ctx context.Context, marshalledBody []byte
Timeout: time.Second * 5,
}
+ //if debug {
+ // log.Printf("[DEBUG] OUTPUT BODY: %s", string(marshalledBody))
+ //}
+
req, err := http.NewRequest(
"POST",
parsedUrl,
@@ -5190,6 +5197,19 @@ func handleRunDatastoreAutomation(ctx context.Context, cacheData CacheKeyData, a
cacheData.WorkflowId = option.Value
workflowIds := strings.Split(option.Value, ",")
+
+ formattedBodyStruct := ExecutionRequest{
+ ExecutionSource: fmt.Sprintf("datastore_%s_%s", cacheData.Category, cacheData.Key),
+ ExecutionArgument: string(marshalledBody),
+ }
+
+ marshalledFormattedBody, err := json.Marshal(formattedBodyStruct)
+ if err != nil {
+ log.Printf("[ERROR] Failed in marshalling data in 'run_workflow' datastore automation for workflow %s")
+ } else {
+ marshalledBody = marshalledFormattedBody
+ }
+
handled := []string{}
for _, workflowId := range workflowIds {
workflowId = strings.TrimSpace(workflowId)
@@ -5198,19 +5218,8 @@ func handleRunDatastoreAutomation(ctx context.Context, cacheData CacheKeyData, a
}
handled = append(handled, workflowId)
- formattedBodyStruct := ExecutionRequest{
- ExecutionSource: fmt.Sprintf("datastore_%s_%s", cacheData.Category, cacheData.Key),
- ExecutionArgument: string(marshalledBody),
- }
-
- marshalledFormattedBody, err := json.Marshal(formattedBodyStruct)
- if err != nil {
- log.Printf("[ERROR] Failed in marshalling data in 'run_workflow' datastore automation for workflow %s", workflowId)
- } else {
- marshalledBody = marshalledFormattedBody
- }
- go handleDatastoreAutomationWebhook(ctx, marshalledBody, cacheData, automation, fmt.Sprintf("/api/v1/workflows/%s/execute", workflowId), "run_workflow")
+ go handleDatastoreAutomationRequest(ctx, marshalledBody, cacheData, automation, fmt.Sprintf("/api/v1/workflows/%s/execute", workflowId), "run_workflow")
}
break
@@ -5222,7 +5231,7 @@ func handleRunDatastoreAutomation(ctx context.Context, cacheData CacheKeyData, a
return errors.New("No options provided for 'run_workflow' automation")
}
- return handleDatastoreAutomationWebhook(ctx, marshalledBody, cacheData, automation, "/api/v1/apps/HTTP/run", "webhook")
+ return handleDatastoreAutomationRequest(ctx, marshalledBody, cacheData, automation, "/api/v1/apps/HTTP/run", "webhook")
// Send the webhook using the HTTP app with a POST request
diff --git a/correlations.go b/correlations.go
index aba740e1..30d10da1 100644
--- a/correlations.go
+++ b/correlations.go
@@ -246,10 +246,6 @@ func crossCorrelateNGrams(ctx context.Context, orgId, category, datastoreKey, va
tmpValue := parsedValue
tmpValue = strings.TrimPrefix(tmpValue, "file_")
if isValidUUID(tmpValue) {
- if debug {
- log.Printf("[DEBUG] Skipping value that is a valid UUID: %s", parsedValue)
- }
-
continue
}
diff --git a/db-connector.go b/db-connector.go
index 66322dce..cebd7f93 100755
--- a/db-connector.go
+++ b/db-connector.go
@@ -1346,6 +1346,46 @@ func IncrementCacheDump(ctx context.Context, orgId, dataType string, amount ...i
}
}
+ if len(tmpOrgDetail.ManagerOrgs) > 0 && (dataType == "agent_tokens") {
+ for _, managerOrg := range tmpOrgDetail.ManagerOrgs {
+ if len(managerOrg.Id) == 36 {
+ IncrementCache(ctx, managerOrg.Id, "childorg_agent_tokens", int(dbDumpInterval))
+ }
+ }
+ }
+
+ if len(tmpOrgDetail.ManagerOrgs) > 0 && (dataType == "agent_input_tokens") {
+ for _, managerOrg := range tmpOrgDetail.ManagerOrgs {
+ if len(managerOrg.Id) == 36 {
+ IncrementCache(ctx, managerOrg.Id, "childorg_agent_input_tokens", int(dbDumpInterval))
+ }
+ }
+ }
+
+ if len(tmpOrgDetail.ManagerOrgs) > 0 && (dataType == "agent_output_tokens") {
+ for _, managerOrg := range tmpOrgDetail.ManagerOrgs {
+ if len(managerOrg.Id) == 36 {
+ IncrementCache(ctx, managerOrg.Id, "childorg_agent_output_tokens", int(dbDumpInterval))
+ }
+ }
+ }
+
+ if len(tmpOrgDetail.ManagerOrgs) > 0 && (dataType == "send_sms") {
+ for _, managerOrg := range tmpOrgDetail.ManagerOrgs {
+ if len(managerOrg.Id) == 36 {
+ IncrementCache(ctx, managerOrg.Id, "childorg_send_sms", int(dbDumpInterval))
+ }
+ }
+ }
+
+ if len(tmpOrgDetail.ManagerOrgs) > 0 && (dataType == "send_mail") {
+ for _, managerOrg := range tmpOrgDetail.ManagerOrgs {
+ if len(managerOrg.Id) == 36 {
+ IncrementCache(ctx, managerOrg.Id, "childorg_send_mail", int(dbDumpInterval))
+ }
+ }
+ }
+
concurrentTxn := false
errMsg := ""
@@ -11581,6 +11621,7 @@ func GetOrgNotifications(ctx context.Context, orgId string) ([]Notification, err
"sort": map[string]interface{}{
"updated_at": map[string]interface{}{
"order": "desc",
+ "unmapped_type": "long",
},
},
"query": map[string]interface{}{
@@ -14926,7 +14967,7 @@ func SetDatastoreKeyBulk(ctx context.Context, allKeys []CacheKeyData) ([]Datasto
if len(cacheData.Category) == 0 || len(cacheData.OrgId) == 0 {
if debug {
- log.Printf("[DEBUG] No category/orgid. Continue")
+ log.Printf("[DEBUG] ERROR: No category/orgid for key '%s'. Continue", cacheData.Key)
}
continue
}
diff --git a/health.go b/health.go
index 457b36a9..94650429 100644
--- a/health.go
+++ b/health.go
@@ -2527,7 +2527,7 @@ func GetStaticWorkflowHealth(ctx context.Context, workflow Workflow) (Workflow,
for _, action := range workflow.Actions {
if action.AppID == "integration" || action.AppID == "shuffle_agent" {
actionName := "Singul"
- if action.AppID == "shuffle_agent" {
+ if action.AppID == "shuffle_agent" || strings.Contains(strings.ToLower(action.AppID), "agent") {
actionName = "Shuffle Agent"
}
@@ -2543,6 +2543,10 @@ func GetStaticWorkflowHealth(ctx context.Context, workflow Workflow) (Workflow,
for _, field := range action.Parameters {
if (field.Name == "app_name" || field.Name == "appname") && (field.Value == "" || field.Value == "noapp") {
+ if actionName == "Shuffle Agent" {
+ continue
+ }
+
parsedError := fmt.Sprintf("%s action %s requires an app to use", actionName, action.Label)
if !ArrayContains(workflow.Errors, parsedError) {
workflow.Errors = append(workflow.Errors, parsedError)
@@ -4763,6 +4767,22 @@ func extractAgentOutputFromResults(execution WorkflowExecution) (AgentOutput, bo
err := json.Unmarshal([]byte(execution.Result), &agentOutput)
if err == nil && len(agentOutput.Decisions) > 0 {
+ // when the output field is empty, fall back to extracting it from the last 'finish'
+ // decision's fields array, which is always present in the snapshot.
+ if agentOutput.Output == "" {
+ for i := len(agentOutput.Decisions) - 1; i >= 0; i-- {
+ d := agentOutput.Decisions[i]
+ if d.Action == "finish" {
+ for _, f := range d.Fields {
+ if f.Key == "output" && f.Value != "" {
+ agentOutput.Output = f.Value
+ break
+ }
+ }
+ break
+ }
+ }
+ }
return agentOutput, true
}
@@ -4770,6 +4790,20 @@ func extractAgentOutputFromResults(execution WorkflowExecution) (AgentOutput, bo
var inner string
if err := json.Unmarshal([]byte(execution.Result), &inner); err == nil {
if err := json.Unmarshal([]byte(inner), &agentOutput); err == nil && len(agentOutput.Decisions) > 0 {
+ if agentOutput.Output == "" {
+ for i := len(agentOutput.Decisions) - 1; i >= 0; i-- {
+ d := agentOutput.Decisions[i]
+ if d.Action == "finish" {
+ for _, f := range d.Fields {
+ if f.Key == "output" && f.Value != "" {
+ agentOutput.Output = f.Value
+ break
+ }
+ }
+ break
+ }
+ }
+ }
return agentOutput, true
}
}
@@ -4817,34 +4851,36 @@ func RunOpsAgent(apiKey string, orgId string, cloudRunUrl string) (AgentHealth,
// Extract agent-level output (decisions, LLM success) from action results.
if agentOutput, found := extractAgentOutputFromResults(execution); found {
agentHealth.AgentStatus = agentOutput.Status
- agentTemp, err := strconv.Atoi(strings.TrimSpace(agentOutput.Output))
- if err != nil {
- log.Printf("[ERROR] Agent Health check failed due to atoi conversion failure: %s", err)
- agentHealth.Error.Run = fmt.Sprintf("Agent Health check failed due to atoi conversion failure: %s", err)
- agentHealth.LLMCallSuccess = false
+ agentHealth.AgentDecisionCount = len(agentOutput.Decisions)
+ if debug {
+ log.Printf("[DEBUG] Health check for Agent made %d decisions", len(agentOutput.Decisions))
}
- realTemp, apiErr := getRealTempC()
- if apiErr != nil {
- log.Printf("[ERROR] Agent Health check failed due to weather api call failure: %s", apiErr)
- agentHealth.Error.Run = fmt.Sprintf("Agent Health check failed due to weather api call failure: %s", apiErr)
+ agentTemp, err := strconv.Atoi(strings.TrimSpace(agentOutput.Output))
+ if err != nil {
+ log.Printf("[ERROR] Agent Health check failed due to atoi conversion failure (output=%q): %s", agentOutput.Output, err)
+ agentHealth.Error.Run = fmt.Sprintf("Agent Health check failed due to atoi conversion failure (output=%q): %s", agentOutput.Output, err)
agentHealth.LLMCallSuccess = false
} else {
- realTempStr := strconv.Itoa(realTemp)
- agentTempStr := strconv.Itoa(agentTemp)
- // check if this real tmp value exists in the agentTemp
- if strings.Contains(agentTempStr, realTempStr) {
- agentHealth.LLMCallSuccess = true
- log.Printf("[INFO] Agent Health check - LLM Call was successful. Expected: %d, Got: %d", realTemp, agentTemp)
- } else {
+ realTemp, apiErr := getRealTempC()
+ if apiErr != nil {
+ log.Printf("[ERROR] Agent Health check failed due to weather api call failure: %s", apiErr)
+ agentHealth.Error.Run = fmt.Sprintf("Agent Health check failed due to weather api call failure: %s", apiErr)
agentHealth.LLMCallSuccess = false
- log.Printf("[ERROR] Agent Health check - LLM Call was not successful. Expected: %d, Got: %d", realTemp, agentTemp)
- agentHealth.Error.Run = fmt.Sprintf("Agent Health check - LLM Call was not successful. Expected: %d, Got: %d", realTemp, agentTemp)
+ } else {
+ realTempStr := strconv.Itoa(realTemp)
+ agentTempStr := strconv.Itoa(agentTemp)
+ // check if the real temp value exists in the agent temp string
+ if strings.Contains(agentTempStr, realTempStr) {
+ agentHealth.LLMCallSuccess = true
+ log.Printf("[INFO] Agent Health check - LLM Call was successful. Expected: %d, Got: %d", realTemp, agentTemp)
+ } else {
+ agentHealth.LLMCallSuccess = false
+ log.Printf("[ERROR] Agent Health check - LLM Call was not successful. Expected: %d, Got: %d", realTemp, agentTemp)
+ agentHealth.Error.Run = fmt.Sprintf("Agent Health check - LLM Call was not successful. Expected: %d, Got: %d", realTemp, agentTemp)
+ }
}
}
-
- agentHealth.AgentDecisionCount = len(agentOutput.Decisions)
- log.Printf("[DEBUG] Health check for Agent made %d decisions, LLM call successful", len(agentOutput.Decisions))
}
}
@@ -4894,4 +4930,4 @@ func getRealTempC() (int, error) {
}
return val, nil
-}
\ No newline at end of file
+}
diff --git a/notifications.go b/notifications.go
index 8274e10c..d4ca7d1d 100755
--- a/notifications.go
+++ b/notifications.go
@@ -271,8 +271,8 @@ func HandleGetNotifications(resp http.ResponseWriter, request *http.Request) {
newNotifications = append(newNotifications, notification)
}
- sort.Slice(notifications[:], func(i, j int) bool {
- return notifications[i].UpdatedAt > notifications[j].UpdatedAt
+ sort.Slice(newNotifications[:], func(i, j int) bool {
+ return newNotifications[i].UpdatedAt > newNotifications[j].UpdatedAt
})
notificationResponse := NotificationResponse{
@@ -313,6 +313,18 @@ func sendToNotificationWorkflow(ctx context.Context, notification Notification,
return nil
}
+ workflow, err := GetWorkflow(ctx, workflowId)
+ if err != nil {
+ log.Printf("[ERROR] Failed getting workflow %s: %s", workflowId, err)
+ } else if workflow.ExecutingOrg.Id != "" && workflow.ExecutingOrg.Id != authOrg.Id {
+ executionOrg, err := GetOrg(ctx, workflow.ExecutingOrg.Id)
+ if err != nil {
+ log.Printf("[ERROR] Failed getting execution org %s: %s", workflow.ExecutingOrg.Id, err)
+ } else {
+ authOrg = *executionOrg
+ }
+ }
+
//log.Printf("[DEBUG] Sending notification to workflow with id: %#v", workflowId)
cachedNotifications := NotificationCached{}
@@ -1040,13 +1052,13 @@ func CreateOrgNotification(ctx context.Context, title, description, referenceUrl
}
workflow, err := GetWorkflow(ctx, org.Defaults.NotificationWorkflow)
- if err != nil {
- log.Printf("[WARNING] Failed getting workflow with ID %s: %s", org.Defaults.NotificationWorkflow, err)
+ if err != nil || len(workflow.ID) == 0 {
+ log.Printf("[WARNING] Failed getting notification workflow with ID '%s': %s", org.Defaults.NotificationWorkflow, err)
return err
}
if workflow.OrgId != mainNotification.OrgId {
- log.Printf("[WARNING] Can't access workflow %s with org %s (%s): %#v", workflow.ID, mainNotification.OrgName, mainNotification.OrgId, workflow.Org)
+ log.Printf("[WARNING] Can't access workflow '%s' (notification create) with org %s (%s): %#v", workflow.ID, mainNotification.OrgName, mainNotification.OrgId, workflow.Org)
// Get parent org if it exists and check too
if len(org.ManagerOrgs) > 0 {
diff --git a/shared.go b/shared.go
index a34c7279..1efa6631 100644
--- a/shared.go
+++ b/shared.go
@@ -25,6 +25,7 @@ import (
"sort"
"unicode"
+ openai "github.com/sashabaranov/go-openai"
"github.com/go-git/go-billy/v5"
"github.com/go-git/go-billy/v5/memfs"
"google.golang.org/api/cloudfunctions/v1"
@@ -14019,6 +14020,10 @@ func HandleEditOrg(resp http.ResponseWriter, request *http.Request) {
org.SyncFeatures.AppExecutions.Limit = 300000
org.SyncFeatures.MultiEnv.Limit = 250
org.SyncFeatures.MultiTenant.Limit = 1000
+ org.SyncFeatures.SendSms.Limit = 10000
+ org.SyncFeatures.SendMail.Limit = 10000
+ org.SyncFeatures.SendSms.Active = true
+ org.SyncFeatures.SendMail.Active = true
log.Printf("[INFO] Set limits to 300000 app runs / 250 envs / 1000 tenants for org %s (enterprise/business)", org.Id)
} else if newLeadinfo.POV {
org.SyncFeatures.AppExecutions.Limit = 10000
@@ -18389,6 +18394,36 @@ func ParsedExecutionResult(ctx context.Context, workflowExecution WorkflowExecut
foundError += fmt.Sprintf(quickUnmarshal.Reason)
}
+ // Tries to map it in from the openai request
+ if len(oldAgentOutput.OriginalInput) == 0 {
+ for _, param := range actionResult.Action.Parameters {
+ if param.Name != "body" {
+ continue
+ }
+
+ // Marshal into openai conversation request
+ openaiReq := openai.ChatCompletionRequest{}
+ unmarshalledErr := json.Unmarshal([]byte(param.Value), &openaiReq)
+ if unmarshalledErr != nil {
+ log.Printf("[ERROR] Failed unmarshalling body into openai request: %s", unmarshalledErr)
+ break
+ }
+
+ if len(openaiReq.Messages) > 0 {
+ for _, userMessage := range openaiReq.Messages {
+ if !strings.HasPrefix(userMessage.Content, "USER REQUEST:") {
+ continue
+ }
+
+ oldAgentOutput.OriginalInput = userMessage.Content
+ break
+ }
+ }
+
+ break
+ }
+ }
+
go abortAgentExecution(ctx, *foundParentExec, startNode, oldAgentOutput, "llm_received_failure", foundError)
}
}
@@ -22394,10 +22429,12 @@ func PrepareSingleAction(ctx context.Context, parentRequest *http.Request, user
if len(decision) > 0 {
decisionId = decision[0]
}
+
} else if strings.ToLower(appId) == "integration" || strings.ToLower(appId) == "singul" {
log.Printf("[INFO] Running single action for 'integration' app => Singul")
// Related to sensor groups for Orborus
+
} else if strings.ToLower(appId) == "sensors" && action.Name == "run_action" {
if len(user.ActiveOrg.Id) == 0 {
return workflowExecution, errors.New("No org ID supplied for sensor execution")
@@ -26275,6 +26312,10 @@ func PrepareWorkflowExecution(ctx context.Context, workflow Workflow, request *h
var execution ExecutionRequest
err = json.Unmarshal(body, &execution)
if err != nil {
+ if debug {
+ log.Printf("[DEBUG] JSON parsing problem in run workflow: %s", err)
+ }
+
if len(string(body)) < 100 {
log.Printf("[WARNING] Failed execution POST unmarshaling - continuing anyway: '%s'. Err: %s", string(body), err)
} else {
@@ -26285,15 +26326,23 @@ func PrepareWorkflowExecution(ctx context.Context, workflow Workflow, request *h
// Ensuring it works even if startpoint isn't defined
if execution.Start == "" && len(body) > 0 && len(execution.ExecutionSource) == 0 && len(execution.ExecutionArgument) == 0 {
// Check if "execution_argument" in body
+ if debug {
+ log.Printf("[DEBUG] Fallback to full body usage for exec arg")
+ }
+
execution.ExecutionArgument = string(body)
}
// FIXME - this should have "execution_argument" from executeWorkflow frontend
//log.Printf("EXEC: %s", execution)
- if len(execution.ExecutionArgument) > 0 {
+ if len(execution.ExecutionArgument) > 0 && len(workflowExecution.ExecutionArgument) == 0 {
workflowExecution.ExecutionArgument = execution.ExecutionArgument
}
+ //if debug {
+ // log.Printf("\n\n\n\n\n[DEBUG] INPUT BODY: %s \n\n\n\n\n", string(body))
+ //}
+
if len(execution.ExecutionSource) > 0 {
workflowExecution.ExecutionSource = execution.ExecutionSource
diff --git a/stats.go b/stats.go
index 8a65532c..7a84a62c 100755
--- a/stats.go
+++ b/stats.go
@@ -850,7 +850,9 @@ func HandleGetStatistics(resp http.ResponseWriter, request *http.Request) {
}
}
- newjson, err := json.Marshal(info)
+ stats := handleGetCorrectedStats(info)
+
+ newjson, err := json.Marshal(stats)
if err != nil {
log.Printf("[ERROR] Failed marshal in get org stats: %s", err)
resp.WriteHeader(500)
@@ -862,6 +864,33 @@ func HandleGetStatistics(resp http.ResponseWriter, request *http.Request) {
resp.Write(newjson)
}
+// Make sure that we are not calling SetOrgStatistics function after calling this function. This will increase the app runs count in db on every call to this function.
+func handleGetCorrectedStats(info *ExecutionInfo) *ExecutionInfo {
+
+ // 1 Million Input Tokens = 250 app runs
+ // 1 Million Output Tokens = 1500 app runs
+ // 1 SMS = 3 app runs
+ // 1 Email = 2 app runs
+
+ // Loop through the daily statistics and add the app runs from tokens, SMS and email on top of existing counts
+ for i := range info.DailyStatistics {
+ info.DailyStatistics[i].AppExecutions += info.DailyStatistics[i].AgentInputTokens*250/1_000_000 + info.DailyStatistics[i].AgentOutputTokens*1500/1_000_000 + info.DailyStatistics[i].DailySMSUsage*3 + info.DailyStatistics[i].DailyEmailUsage*2
+ info.DailyStatistics[i].ChildAppExecutions += info.DailyStatistics[i].ChildOrgAgentInputTokens*250/1_000_000 + info.DailyStatistics[i].ChildOrgAgentOutputTokens*1500/1_000_000 + info.DailyStatistics[i].DailyChildOrgSMSUsage*3 + info.DailyStatistics[i].DailyChildOrgEmailUsage*2
+ }
+
+ // Add the monthly app runs from SMS, Email, Input Tokens and Output Tokens on top of existing counts
+ info.MonthlyAppExecutions += info.MonthlySMSUsage*3 + info.MonthlyEmailUsage*2 + info.MonthlyAgentInputTokens*250/1_000_000 + info.MonthlyAgentOutputTokens*1500/1_000_000
+ info.MonthlyChildAppExecutions += info.MonthlyChildOrgSMSUsage*3 + info.MonthlyChildOrgEmailUsage*2 + info.MonthlyChildOrgAgentInputTokens*250/1_000_000 + info.MonthlyChildOrgAgentOutputTokens*1500/1_000_000
+
+ info.DailyAppExecutions += info.DailyAgentInputTokens*250/1_000_000 + info.DailyAgentOutputTokens*1500/1_000_000 + info.DailySMSUsage*3 + info.DailyEmailUsage*2
+ info.DailyChildAppExecutions += info.DailyChildOrgAgentInputTokens*250/1_000_000 + info.DailyChildOrgAgentOutputTokens*1500/1_000_000 + info.DailyChildOrgSMSUsage*3 + info.DailyChildOrgEmailUsage*2
+
+ info.TotalAppExecutions += info.TotalAgentInputTokens*250/1_000_000 + info.TotalAgentOutputTokens*1500/1_000_000 + info.TotalSMSUsage*3 + info.TotalEmailUsage*2
+ info.TotalChildAppExecutions += info.TotalChildOrgAgentInputTokens*250/1_000_000 + info.TotalChildOrgAgentOutputTokens*1500/1_000_000 + info.TotalChildOrgSMSUsage*3 + info.TotalChildOrgEmailUsage*2
+
+ return info
+}
+
func HandleAppendStatistics(resp http.ResponseWriter, request *http.Request) {
// Send in a thing to increment
cors := HandleCors(resp, request)
@@ -1327,6 +1356,19 @@ func handleDailyCacheUpdate(executionInfo *ExecutionInfo) *ExecutionInfo {
CloudExecutions: executionInfo.DailyCloudExecutions,
OnpremExecutions: executionInfo.DailyOnpremExecutions,
AIUsage: executionInfo.DailyAIUsage,
+ AgentExecutions: executionInfo.DailyAgentExecutions,
+ AgentTokens: executionInfo.DailyAgentTokens,
+ AgentInputTokens: executionInfo.DailyAgentInputTokens,
+ AgentOutputTokens: executionInfo.DailyAgentOutputTokens,
+ ChildOrgAiUsage: executionInfo.DailyChildOrgAiUsage,
+ ChildOrgAgentExecutions: executionInfo.DailyChildOrgAgentExecutions,
+ ChildOrgAgentTokens: executionInfo.DailyChildOrgAgentTokens,
+ ChildOrgAgentInputTokens: executionInfo.DailyChildOrgAgentInputTokens,
+ ChildOrgAgentOutputTokens: executionInfo.DailyChildOrgAgentOutputTokens,
+ DailySMSUsage: executionInfo.DailySMSUsage,
+ DailyChildOrgSMSUsage: executionInfo.DailyChildOrgSMSUsage,
+ DailyEmailUsage: executionInfo.DailyEmailUsage,
+ DailyChildOrgEmailUsage: executionInfo.DailyChildOrgEmailUsage,
ApiUsage: executionInfo.DailyApiUsage,
@@ -1362,6 +1404,20 @@ func handleDailyCacheUpdate(executionInfo *ExecutionInfo) *ExecutionInfo {
executionInfo.DailyOnpremExecutions = 0
executionInfo.DailyApiUsage = 0
executionInfo.DailyAIUsage = 0
+ executionInfo.DailyChildOrgAiUsage = 0
+ executionInfo.DailyAgentExecutions = 0
+ executionInfo.DailyAgentTokens = 0
+ executionInfo.DailyAgentInputTokens = 0
+ executionInfo.DailyAgentOutputTokens = 0
+ executionInfo.DailyChildOrgAiUsage = 0
+ executionInfo.DailyChildOrgAgentExecutions = 0
+ executionInfo.DailyChildOrgAgentTokens = 0
+ executionInfo.DailyChildOrgAgentInputTokens = 0
+ executionInfo.DailyChildOrgAgentOutputTokens = 0
+ executionInfo.DailySMSUsage = 0
+ executionInfo.DailyChildOrgSMSUsage = 0
+ executionInfo.DailyEmailUsage = 0
+ executionInfo.DailyChildOrgEmailUsage = 0
// Weekly
executionInfo.WeeklyAppExecutions = 0
@@ -1404,6 +1460,15 @@ func handleDailyCacheUpdate(executionInfo *ExecutionInfo) *ExecutionInfo {
executionInfo.MonthlyAgentTokens = 0
executionInfo.MonthlyAgentInputTokens = 0
executionInfo.MonthlyAgentOutputTokens = 0
+ executionInfo.MonthlyChildOrgAiUsage = 0
+ executionInfo.MonthlyChildOrgAgentExecutions = 0
+ executionInfo.MonthlyChildOrgAgentTokens = 0
+ executionInfo.MonthlyChildOrgAgentInputTokens = 0
+ executionInfo.MonthlyChildOrgAgentOutputTokens = 0
+ executionInfo.MonthlySMSUsage = 0
+ executionInfo.MonthlyChildOrgSMSUsage = 0
+ executionInfo.MonthlyEmailUsage = 0
+ executionInfo.MonthlyChildOrgEmailUsage = 0
executionInfo.LastMonthlyResetMonth = currentMonth
executionInfo.LastUsageAlertThreshold = 0
@@ -1555,14 +1620,42 @@ func HandleIncrement(dataType string, orgStatistics *ExecutionInfo, increment ui
orgStatistics.TotalAgentTokens += int64(increment)
orgStatistics.MonthlyAgentTokens += int64(increment)
orgStatistics.DailyAgentTokens += int64(increment)
+ } else if dataType == "childorg_agent_tokens" {
+ orgStatistics.TotalChildOrgAgentTokens += int64(increment)
+ orgStatistics.MonthlyChildOrgAgentTokens += int64(increment)
+ orgStatistics.DailyChildOrgAgentTokens += int64(increment)
} else if dataType == "agent_input_tokens" {
orgStatistics.TotalAgentInputTokens += int64(increment)
orgStatistics.MonthlyAgentInputTokens += int64(increment)
orgStatistics.DailyAgentInputTokens += int64(increment)
+ } else if dataType == "childorg_agent_input_tokens" {
+ orgStatistics.TotalChildOrgAgentInputTokens += int64(increment)
+ orgStatistics.MonthlyChildOrgAgentInputTokens += int64(increment)
+ orgStatistics.DailyChildOrgAgentInputTokens += int64(increment)
} else if dataType == "agent_output_tokens" {
orgStatistics.TotalAgentOutputTokens += int64(increment)
orgStatistics.MonthlyAgentOutputTokens += int64(increment)
orgStatistics.DailyAgentOutputTokens += int64(increment)
+ } else if dataType == "childorg_agent_output_tokens" {
+ orgStatistics.TotalChildOrgAgentOutputTokens += int64(increment)
+ orgStatistics.MonthlyChildOrgAgentOutputTokens += int64(increment)
+ orgStatistics.DailyChildOrgAgentOutputTokens += int64(increment)
+ } else if dataType == "send_sms" {
+ orgStatistics.TotalSMSUsage += int64(increment)
+ orgStatistics.MonthlySMSUsage += int64(increment)
+ orgStatistics.DailySMSUsage += int64(increment)
+ } else if dataType == "childorg_send_sms" {
+ orgStatistics.TotalChildOrgSMSUsage += int64(increment)
+ orgStatistics.MonthlyChildOrgSMSUsage += int64(increment)
+ orgStatistics.DailyChildOrgSMSUsage += int64(increment)
+ } else if dataType == "send_mail" {
+ orgStatistics.TotalEmailUsage += int64(increment)
+ orgStatistics.MonthlyEmailUsage += int64(increment)
+ orgStatistics.DailyEmailUsage += int64(increment)
+ } else if dataType == "childorg_send_mail" {
+ orgStatistics.TotalChildOrgEmailUsage += int64(increment)
+ orgStatistics.MonthlyChildOrgEmailUsage += int64(increment)
+ orgStatistics.DailyChildOrgEmailUsage += int64(increment)
} else {
//log.Printf("\n\n[ERROR] Unknown data type in stats increment for org %s: %s. Appending to custom list.\n\n", orgStatistics.OrgId, dataType)
appendCustom = true
@@ -1687,17 +1780,24 @@ func HandleIncrement(dataType string, orgStatistics *ExecutionInfo, increment ui
AppRunsPercentage := float64(totalAppExecutions) / float64(org.SyncFeatures.AppExecutions.Limit) * 100
appRunsUsagePercentageStr := fmt.Sprintf("%d%% of your app runs limit", int64(AppRunsPercentage))
Subject := fmt.Sprintf("[Shuffle]: You've reached %s for your tenant %s", appRunsUsagePercentageStr, org.Name)
+ aiTokensUsage := orgStatistics.MonthlyAgentTokens + orgStatistics.MonthlyChildOrgAgentTokens
+ aiTokensUsagePercentage := float64(aiTokensUsage) / float64(org.SyncFeatures.AgentTokens.Limit) * 100
+ aiTokensLimit := org.SyncFeatures.AgentTokens.Limit
+ if aiTokensLimit == 0 {
+ aiTokensLimit = 10000000
+ }
substitutions := map[string]interface{}{
- "app_runs_usage": totalAppExecutions,
- "app_runs_limit": org.SyncFeatures.AppExecutions.Limit,
- "subject_string": appRunsUsagePercentageStr,
- "ai_tokens_usage": orgStatistics.MonthlyAgentTokens,
- "ai_tokens_limit": org.SyncFeatures.AgentTokens.Limit,
- "org_name": org.Name,
- "org_id": org.Id,
- "admin_email": org.Name,
- "app_runs_usage_percentage": int64(AppRunsPercentage),
+ "app_runs_usage": totalAppExecutions,
+ "app_runs_limit": org.SyncFeatures.AppExecutions.Limit,
+ "subject_string": appRunsUsagePercentageStr,
+ "ai_tokens_usage": aiTokensUsage,
+ "ai_tokens_limit": aiTokensLimit,
+ "org_name": org.Name,
+ "org_id": org.Id,
+ "admin_email": org.Name,
+ "app_runs_usage_percentage": int64(AppRunsPercentage),
+ "ai_tokens_usage_percentage": int64(aiTokensUsagePercentage),
}
err = sendMailSendgridV2(
diff --git a/streaming.go b/streaming.go
index b5a7f4bc..1e6ce989 100644
--- a/streaming.go
+++ b/streaming.go
@@ -17,8 +17,15 @@ var streamPresenceColors = []string{
"#818CF8", "#FB923C",
}
-func presenceColor(userID string, slotIndex int) string {
- return streamPresenceColors[slotIndex%len(streamPresenceColors)]
+func presenceColor(userID string) string {
+ var hash int
+ for _, c := range userID {
+ hash = hash*31 + int(c)
+ }
+ if hash < 0 {
+ hash = -hash
+ }
+ return streamPresenceColors[hash%len(streamPresenceColors)]
}
// streamPresenceInterval: presence update every 100 poll iterations (~10s at 100ms/poll)
@@ -377,7 +384,7 @@ func HandleStreamWorkflow(resp http.ResponseWriter, request *http.Request) {
UserID: user.Id,
Username: user.Username,
LastSeen: now,
- Color: presenceColor(user.Id, len(activeUsers)),
+ Color: presenceColor(user.Id),
})
}
presence.Users = activeUsers
diff --git a/structs.go b/structs.go
index c8117e8c..bbb8c776 100755
--- a/structs.go
+++ b/structs.go
@@ -408,6 +408,20 @@ type DailyStatistics struct {
CloudExecutions int64 `json:"cloud_executions" datastore:"cloud_executions"`
OnpremExecutions int64 `json:"onprem_executions" datastore:"onprem_executions"`
AIUsage int64 `json:"ai_executions" datastore:"ai_executions"`
+ AgentExecutions int64 `json:"agent_executions" datastore:"agent_executions"`
+ AgentTokens int64 `json:"agent_tokens" datastore:"agent_tokens"`
+ AgentInputTokens int64 `json:"agent_input_tokens" datastore:"agent_input_tokens"`
+ AgentOutputTokens int64 `json:"agent_output_tokens" datastore:"agent_output_tokens"`
+ ChildOrgAiUsage int64 `json:"child_org_ai_usage" datastore:"child_org_ai_usage"`
+ ChildOrgAgentExecutions int64 `json:"child_org_agent_executions" datastore:"child_org_agent_executions"`
+ ChildOrgAgentTokens int64 `json:"child_org_agent_tokens" datastore:"child_org_agent_tokens"`
+ ChildOrgAgentInputTokens int64 `json:"child_org_agent_input_tokens" datastore:"child_org_agent_input_tokens"`
+ ChildOrgAgentOutputTokens int64 `json:"child_org_agent_output_tokens" datastore:"child_org_agent_output_tokens"`
+
+ DailySMSUsage int64 `json:"daily_sms_usage" datastore:"daily_sms_usage"`
+ DailyChildOrgSMSUsage int64 `json:"daily_child_org_sms_usage" datastore:"daily_child_org_sms_usage"`
+ DailyEmailUsage int64 `json:"daily_email_usage" datastore:"daily_email_usage"`
+ DailyChildOrgEmailUsage int64 `json:"daily_child_org_email_usage" datastore:"daily_child_org_email_usage"`
ApiUsage int64 `json:"api_usage" datastore:"api_usage"`
AppUsage []AppUsage `json:"app_usage" datastore:"app_usage"`
@@ -442,7 +456,16 @@ type ExecutionInfo struct {
TotalAgentTokens int64 `json:"total_agent_tokens" datastore:"total_agent_tokens"`
TotalAgentInputTokens int64 `json:"total_agent_input_tokens" datastore:"total_agent_input_tokens"`
TotalAgentOutputTokens int64 `json:"total_agent_output_tokens" datastore:"total_agent_output_tokens"`
+ TotalChildOrgAiUsage int64 `json:"total_child_org_ai_usage" datastore:"total_child_org_ai_usage"`
+ TotalChildOrgAgentExecutions int64 `json:"total_child_org_agent_executions" datastore:"total_child_org_agent_executions"`
+ TotalChildOrgAgentTokens int64 `json:"total_child_org_agent_tokens" datastore:"total_child_org_agent_tokens"`
+ TotalChildOrgAgentInputTokens int64 `json:"total_child_org_agent_input_tokens" datastore:"total_child_org_agent_input_tokens"`
+ TotalChildOrgAgentOutputTokens int64 `json:"total_child_org_agent_output_tokens" datastore:"total_child_org_agent_output_tokens"`
TotalChildWorkflowExecutions int64 `json:"total_child_workflow_executions" datastore:"total_child_workflow_executions"`
+ TotalSMSUsage int64 `json:"total_sms_usage" datastore:"total_sms_usage"`
+ TotalChildOrgSMSUsage int64 `json:"total_child_org_sms_usage" datastore:"total_child_org_sms_usage"`
+ TotalEmailUsage int64 `json:"total_email_usage" datastore:"total_email_usage"`
+ TotalChildOrgEmailUsage int64 `json:"total_child_org_email_usage" datastore:"total_child_org_email_usage"`
MonthlyApiUsage int64 `json:"monthly_api_usage,omitempty" datastore:"monthly_api_usage"`
MonthlyChildAppExecutions int64 `json:"monthly_child_app_executions,omitempty" datastore:"monthly_child_app_executions"`
@@ -461,6 +484,15 @@ type ExecutionInfo struct {
MonthlyAgentTokens int64 `json:"monthly_agent_tokens,omitempty" datastore:"monthly_agent_tokens"`
MonthlyAgentInputTokens int64 `json:"monthly_agent_input_tokens,omitempty" datastore:"monthly_agent_input_tokens"`
MonthlyAgentOutputTokens int64 `json:"monthly_agent_output_tokens,omitempty" datastore:"monthly_agent_output_tokens"`
+ MonthlyChildOrgAiUsage int64 `json:"monthly_child_org_ai_usage,omitempty" datastore:"monthly_child_org_ai_usage"`
+ MonthlyChildOrgAgentExecutions int64 `json:"monthly_child_org_agent_executions,omitempty" datastore:"monthly_child_org_agent_executions"`
+ MonthlyChildOrgAgentTokens int64 `json:"monthly_child_org_agent_tokens,omitempty" datastore:"monthly_child_org_agent_tokens"`
+ MonthlyChildOrgAgentInputTokens int64 `json:"monthly_child_org_agent_input_tokens,omitempty" datastore:"monthly_child_org_agent_input_tokens"`
+ MonthlyChildOrgAgentOutputTokens int64 `json:"monthly_child_org_agent_output_tokens,omitempty" datastore:"monthly_child_org_agent_output_tokens"`
+ MonthlySMSUsage int64 `json:"monthly_sms_usage,omitempty" datastore:"monthly_sms_usage"`
+ MonthlyChildOrgSMSUsage int64 `json:"monthly_child_org_sms_usage,omitempty" datastore:"monthly_child_org_sms_usage"`
+ MonthlyEmailUsage int64 `json:"monthly_email_usage,omitempty" datastore:"monthly_email_usage"`
+ MonthlyChildOrgEmailUsage int64 `json:"monthly_child_org_email_usage,omitempty" datastore:"monthly_child_org_email_usage"`
WeeklyAppExecutions int64 `json:"weekly_app_executions,omitempty" datastore:"weekly_app_executions"`
WeeklyChildAppExecutions int64 `json:"weekly_child_app_executions,omitempty" datastore:"weekly_child_app_executions"`
@@ -491,6 +523,15 @@ type ExecutionInfo struct {
DailyAgentTokens int64 `json:"daily_agent_tokens" datastore:"daily_agent_tokens"`
DailyAgentInputTokens int64 `json:"daily_agent_input_tokens" datastore:"daily_agent_input_tokens"`
DailyAgentOutputTokens int64 `json:"daily_agent_output_tokens" datastore:"daily_agent_output_tokens"`
+ DailyChildOrgAiUsage int64 `json:"daily_child_org_ai_usage" datastore:"daily_child_org_ai_usage"`
+ DailyChildOrgAgentExecutions int64 `json:"daily_child_org_agent_executions" datastore:"daily_child_org_agent_executions"`
+ DailyChildOrgAgentTokens int64 `json:"daily_child_org_agent_tokens" datastore:"daily_child_org_agent_tokens"`
+ DailyChildOrgAgentInputTokens int64 `json:"daily_child_org_agent_input_tokens" datastore:"daily_child_org_agent_input_tokens"`
+ DailyChildOrgAgentOutputTokens int64 `json:"daily_child_org_agent_output_tokens" datastore:"daily_child_org_agent_output_tokens"`
+ DailySMSUsage int64 `json:"daily_sms_usage" datastore:"daily_sms_usage"`
+ DailyChildOrgSMSUsage int64 `json:"daily_child_org_sms_usage" datastore:"daily_child_org_sms_usage"`
+ DailyEmailUsage int64 `json:"daily_email_usage" datastore:"daily_email_usage"`
+ DailyChildOrgEmailUsage int64 `json:"daily_child_org_email_usage" datastore:"daily_child_org_email_usage"`
HourlyAppExecutions int64 `json:"hourly_app_executions,omitempty" datastore:"hourly_app_executions"`
HourlyChildAppExecutions int64 `json:"hourly_child_app_executions,omitempty" datastore:"hourly_child_app_executions"`