diff --git a/db-connector.go b/db-connector.go index 344018f2..a790db56 100755 --- a/db-connector.go +++ b/db-connector.go @@ -1005,6 +1005,87 @@ func GetWorkflowExecution(ctx context.Context, id string) (*WorkflowExecution, e workflowExecution.Results[valueIndex].Result = newValue } + + for paramIndex, param := range value.Action.Parameters { + if strings.Contains(param.Value, "Result too large to handle") { + newValue, err := getExecutionFileValue(ctx, *workflowExecution, ActionResult{ + Result: param.Value, + }) + if err == nil { + workflowExecution.Results[valueIndex].Action.Parameters[paramIndex].Value = newValue + } + } + } + + for paramIndex, param := range value.Action.InvalidParameters { + if strings.Contains(param.Value, "Result too large to handle") { + newValue, err := getExecutionFileValue(ctx, *workflowExecution, ActionResult{ + Result: param.Value, + }) + if err == nil { + workflowExecution.Results[valueIndex].Action.InvalidParameters[paramIndex].Value = newValue + } + } + } + } + + for actionIndex, action := range workflowExecution.Workflow.Actions { + for paramIndex, param := range action.Parameters { + if strings.Contains(param.Value, "Result too large to handle") { + newValue, err := getExecutionFileValue(ctx, *workflowExecution, ActionResult{ + Result: param.Value, + }) + if err == nil { + workflowExecution.Workflow.Actions[actionIndex].Parameters[paramIndex].Value = newValue + } + } + } + + for paramIndex, param := range action.InvalidParameters { + if strings.Contains(param.Value, "Result too large to handle") { + newValue, err := getExecutionFileValue(ctx, *workflowExecution, ActionResult{ + Result: param.Value, + }) + if err == nil { + workflowExecution.Workflow.Actions[actionIndex].InvalidParameters[paramIndex].Value = newValue + } + } + } + } + + for triggerIndex, trigger := range workflowExecution.Workflow.Triggers { + for paramIndex, param := range trigger.Parameters { + if strings.Contains(param.Value, "Result too large to handle") { + newValue, err := getExecutionFileValue(ctx, *workflowExecution, ActionResult{ + Result: param.Value, + }) + if err == nil { + workflowExecution.Workflow.Triggers[triggerIndex].Parameters[paramIndex].Value = newValue + } + } + } + } + + for execVarIndex, execVar := range workflowExecution.Workflow.ExecutionVariables { + if strings.Contains(execVar.Value, "Result too large to handle") { + newValue, err := getExecutionFileValue(ctx, *workflowExecution, ActionResult{ + Result: execVar.Value, + }) + if err == nil { + workflowExecution.Workflow.ExecutionVariables[execVarIndex].Value = newValue + } + } + } + + for execVarIndex, execVar := range workflowExecution.ExecutionVariables { + if strings.Contains(execVar.Value, "Result too large to handle") { + newValue, err := getExecutionFileValue(ctx, *workflowExecution, ActionResult{ + Result: execVar.Value, + }) + if err == nil { + workflowExecution.ExecutionVariables[execVarIndex].Value = newValue + } + } } // Fixes missing pieces @@ -1109,6 +1190,87 @@ func GetWorkflowExecution(ctx context.Context, id string) (*WorkflowExecution, e workflowExecution.Results[valueIndex].Result = newValue } + + for paramIndex, param := range value.Action.Parameters { + if strings.Contains(param.Value, "Result too large to handle") { + newValue, err := getExecutionFileValue(ctx, *workflowExecution, ActionResult{ + Result: param.Value, + }) + if err == nil { + workflowExecution.Results[valueIndex].Action.Parameters[paramIndex].Value = newValue + } + } + } + + for paramIndex, param := range value.Action.InvalidParameters { + if strings.Contains(param.Value, "Result too large to handle") { + newValue, err := getExecutionFileValue(ctx, *workflowExecution, ActionResult{ + Result: param.Value, + }) + if err == nil { + workflowExecution.Results[valueIndex].Action.InvalidParameters[paramIndex].Value = newValue + } + } + } + } + + for actionIndex, action := range workflowExecution.Workflow.Actions { + for paramIndex, param := range action.Parameters { + if strings.Contains(param.Value, "Result too large to handle") { + newValue, err := getExecutionFileValue(ctx, *workflowExecution, ActionResult{ + Result: param.Value, + }) + if err == nil { + workflowExecution.Workflow.Actions[actionIndex].Parameters[paramIndex].Value = newValue + } + } + } + + for paramIndex, param := range action.InvalidParameters { + if strings.Contains(param.Value, "Result too large to handle") { + newValue, err := getExecutionFileValue(ctx, *workflowExecution, ActionResult{ + Result: param.Value, + }) + if err == nil { + workflowExecution.Workflow.Actions[actionIndex].InvalidParameters[paramIndex].Value = newValue + } + } + } + } + + for triggerIndex, trigger := range workflowExecution.Workflow.Triggers { + for paramIndex, param := range trigger.Parameters { + if strings.Contains(param.Value, "Result too large to handle") { + newValue, err := getExecutionFileValue(ctx, *workflowExecution, ActionResult{ + Result: param.Value, + }) + if err == nil { + workflowExecution.Workflow.Triggers[triggerIndex].Parameters[paramIndex].Value = newValue + } + } + } + } + + for execVarIndex, execVar := range workflowExecution.Workflow.ExecutionVariables { + if strings.Contains(execVar.Value, "Result too large to handle") { + newValue, err := getExecutionFileValue(ctx, *workflowExecution, ActionResult{ + Result: execVar.Value, + }) + if err == nil { + workflowExecution.Workflow.ExecutionVariables[execVarIndex].Value = newValue + } + } + } + + for execVarIndex, execVar := range workflowExecution.ExecutionVariables { + if strings.Contains(execVar.Value, "Result too large to handle") { + newValue, err := getExecutionFileValue(ctx, *workflowExecution, ActionResult{ + Result: execVar.Value, + }) + if err == nil { + workflowExecution.ExecutionVariables[execVarIndex].Value = newValue + } + } } } @@ -1900,9 +2062,28 @@ func GetExecutionVariables(ctx context.Context, executionId string) (string, int } func getExecutionFileValue(ctx context.Context, workflowExecution WorkflowExecution, action ActionResult) (string, error) { - fullParsedPath := fmt.Sprintf("large_executions/%s/%s_%s", workflowExecution.ExecutionOrg, workflowExecution.ExecutionId, action.Action.ID) + fileID := action.Action.ID + if len(fileID) == 0 && len(action.Result) > 0 { + parsedPlaceholder := map[string]interface{}{} + if err := json.Unmarshal([]byte(action.Result), &parsedPlaceholder); err == nil { + if parsedID, ok := parsedPlaceholder["id"].(string); ok && len(parsedID) > 0 { + prefix := fmt.Sprintf("%s_", workflowExecution.ExecutionId) + if strings.HasPrefix(parsedID, prefix) { + fileID = strings.TrimPrefix(parsedID, prefix) + } else { + fileID = parsedID + } + } + } + } + + if len(fileID) == 0 { + return "", errors.New("missing file id for large execution value") + } + + fullParsedPath := fmt.Sprintf("large_executions/%s/%s_%s", workflowExecution.ExecutionOrg, workflowExecution.ExecutionId, fileID) - cacheKey := fmt.Sprintf("%s_%s_action_replace", workflowExecution.ExecutionId, action.Action.ID) + cacheKey := fmt.Sprintf("%s_%s_action_replace", workflowExecution.ExecutionId, fileID) if project.CacheDb { cache, err := GetCache(ctx, cacheKey) if err == nil { @@ -9741,7 +9922,53 @@ func SetWorkflowRevision(ctx context.Context, workflow Workflow) error { if project.DbType == "opensearch" { err = indexEs(ctx, nameKey, workflow.RevisionId, data) if err != nil { - return err + if strings.Contains(err.Error(), "immense term") { + retried := false + indexWorkflow := workflow + + for actionIndex, action := range indexWorkflow.Actions { + for paramIndex, param := range action.Parameters { + if len(param.Value) > 32500 { + log.Printf("[DEBUG][%s] Trimming workflow revision parameter %s in action %s for OpenSearch indexing (size: %d bytes)", workflow.ID, param.Name, action.Label, len(param.Value)) + indexWorkflow.Actions[actionIndex].Parameters[paramIndex].Value = "Size too large. Removed." + retried = true + } + } + + for paramIndex, param := range action.InvalidParameters { + if len(param.Value) > 32500 { + log.Printf("[DEBUG][%s] Trimming workflow revision invalid parameter %s in action %s for OpenSearch indexing (size: %d bytes)", workflow.ID, param.Name, action.Label, len(param.Value)) + indexWorkflow.Actions[actionIndex].InvalidParameters[paramIndex].Value = "Size too large. Removed." + retried = true + } + } + } + + for triggerIndex, trigger := range indexWorkflow.Triggers { + for paramIndex, param := range trigger.Parameters { + if len(param.Value) > 32500 { + log.Printf("[DEBUG][%s] Trimming workflow revision trigger parameter %s in trigger %s for OpenSearch indexing (size: %d bytes)", workflow.ID, param.Name, trigger.Label, len(param.Value)) + indexWorkflow.Triggers[triggerIndex].Parameters[paramIndex].Value = "Size too large. Removed." + retried = true + } + } + } + + if retried { + indexData, marshalErr := json.Marshal(indexWorkflow) + if marshalErr != nil { + log.Printf("[WARNING] Failed marshalling trimmed workflow revision for ES retry: %s", marshalErr) + return marshalErr + } + + log.Printf("[DEBUG][%s] Retrying OpenSearch workflow revision save after trimming oversized parameter values", workflow.ID) + err = indexEs(ctx, nameKey, workflow.RevisionId, indexData) + } + } + + if err != nil { + return err + } } } else { key := datastore.NameKey(nameKey, workflow.RevisionId, nil) @@ -9944,7 +10171,53 @@ func SetWorkflow(ctx context.Context, workflow Workflow, id string, optionalEdit if project.DbType == "opensearch" { err = indexEs(ctx, nameKey, id, data) if err != nil { - return err + if strings.Contains(err.Error(), "immense term") { + retried := false + indexWorkflow := workflow + + for actionIndex, action := range indexWorkflow.Actions { + for paramIndex, param := range action.Parameters { + if len(param.Value) > 32500 { + log.Printf("[DEBUG][%s] Trimming workflow parameter %s in action %s for OpenSearch indexing (size: %d bytes)", workflow.ID, param.Name, action.Label, len(param.Value)) + indexWorkflow.Actions[actionIndex].Parameters[paramIndex].Value = "Size too large. Removed." + retried = true + } + } + + for paramIndex, param := range action.InvalidParameters { + if len(param.Value) > 32500 { + log.Printf("[DEBUG][%s] Trimming workflow invalid parameter %s in action %s for OpenSearch indexing (size: %d bytes)", workflow.ID, param.Name, action.Label, len(param.Value)) + indexWorkflow.Actions[actionIndex].InvalidParameters[paramIndex].Value = "Size too large. Removed." + retried = true + } + } + } + + for triggerIndex, trigger := range indexWorkflow.Triggers { + for paramIndex, param := range trigger.Parameters { + if len(param.Value) > 32500 { + log.Printf("[DEBUG][%s] Trimming workflow trigger parameter %s in trigger %s for OpenSearch indexing (size: %d bytes)", workflow.ID, param.Name, trigger.Label, len(param.Value)) + indexWorkflow.Triggers[triggerIndex].Parameters[paramIndex].Value = "Size too large. Removed." + retried = true + } + } + } + + if retried { + indexData, marshalErr := json.Marshal(indexWorkflow) + if marshalErr != nil { + log.Printf("[WARNING] Failed marshalling trimmed workflow for ES retry: %s", marshalErr) + return marshalErr + } + + log.Printf("[DEBUG][%s] Retrying OpenSearch workflow save after trimming oversized parameter values", workflow.ID) + err = indexEs(ctx, nameKey, id, indexData) + } + } + + if err != nil { + return err + } } } else { //log.Printf("\n\n[INFO] Adding workflow with ID %s\n\n", id) diff --git a/shared.go b/shared.go index 1ad2670e..87e3a8e3 100644 --- a/shared.go +++ b/shared.go @@ -19836,18 +19836,174 @@ func compressExecution(ctx context.Context, workflowExecution WorkflowExecution, } } + for execVarIndex, execVar := range workflowExecution.Workflow.ExecutionVariables { + if len(execVar.Value) > 32500 && !strings.Contains(execVar.Value, "Result too large to handle") { + itemSize := len(execVar.Value) + safeName := execVar.ID + if len(safeName) == 0 { + safeName = execVar.Name + } + + safeName = strings.ReplaceAll(safeName, "/", "_") + safeName = strings.ReplaceAll(safeName, " ", "_") + if len(safeName) == 0 { + safeName = fmt.Sprintf("execution_variable_%d", execVarIndex) + } + + actionId := fmt.Sprintf("workflow_execution_variable_%s", safeName) + fullParsedPath := fmt.Sprintf("large_executions/%s/%s_%s", workflowExecution.ExecutionOrg, workflowExecution.ExecutionId, actionId) + localPath := fmt.Sprintf("%s/%s", basepath, fullParsedPath) + + log.Printf("[DEBUG][%s] Offloading workflow execution variable %s (%s) (%d bytes) to file %s", workflowExecution.ExecutionId, execVar.Name, execVar.ID, itemSize, localPath) + + replacementJson := fmt.Sprintf(`{ + "success": false, + "reason": "Result too large to handle (https://github.com/frikky/shuffle/issues/171).", + "size": %d, + "extra": "replace", + "id": "%s_%s" + }`, itemSize, workflowExecution.ExecutionId, actionId) + + if err := ioutil.WriteFile(localPath, []byte(execVar.Value), 0644); err != nil { + dirPath := fmt.Sprintf("%s/large_executions/%s", basepath, workflowExecution.ExecutionOrg) + if mkdirErr := os.MkdirAll(dirPath, 0755); mkdirErr != nil { + log.Printf("[WARNING][%s] Failed creating directory %s: %s (original write error: %s)", workflowExecution.ExecutionId, dirPath, mkdirErr, err) + workflowExecution.Workflow.ExecutionVariables[execVarIndex].Value = "Size too large. Removed." + } else if retryErr := ioutil.WriteFile(localPath, []byte(execVar.Value), 0644); retryErr != nil { + log.Printf("[WARNING][%s] Failed writing workflow execution variable file after creating directory: %s", workflowExecution.ExecutionId, retryErr) + workflowExecution.Workflow.ExecutionVariables[execVarIndex].Value = "Size too large. Removed." + } else { + workflowExecution.Workflow.ExecutionVariables[execVarIndex].Value = replacementJson + } + } else { + workflowExecution.Workflow.ExecutionVariables[execVarIndex].Value = replacementJson + } + } + } + + for execVarIndex, execVar := range workflowExecution.ExecutionVariables { + if len(execVar.Value) > 32500 && !strings.Contains(execVar.Value, "Result too large to handle") { + itemSize := len(execVar.Value) + safeName := execVar.ID + if len(safeName) == 0 { + safeName = execVar.Name + } + + safeName = strings.ReplaceAll(safeName, "/", "_") + safeName = strings.ReplaceAll(safeName, " ", "_") + if len(safeName) == 0 { + safeName = fmt.Sprintf("execution_variable_%d", execVarIndex) + } + + actionId := fmt.Sprintf("execution_variable_%s", safeName) + fullParsedPath := fmt.Sprintf("large_executions/%s/%s_%s", workflowExecution.ExecutionOrg, workflowExecution.ExecutionId, actionId) + localPath := fmt.Sprintf("%s/%s", basepath, fullParsedPath) + + log.Printf("[DEBUG][%s] Offloading execution variable %s (%s) (%d bytes) to file %s", workflowExecution.ExecutionId, execVar.Name, execVar.ID, itemSize, localPath) + + replacementJson := fmt.Sprintf(`{ + "success": false, + "reason": "Result too large to handle (https://github.com/frikky/shuffle/issues/171).", + "size": %d, + "extra": "replace", + "id": "%s_%s" + }`, itemSize, workflowExecution.ExecutionId, actionId) + + if err := ioutil.WriteFile(localPath, []byte(execVar.Value), 0644); err != nil { + dirPath := fmt.Sprintf("%s/large_executions/%s", basepath, workflowExecution.ExecutionOrg) + if mkdirErr := os.MkdirAll(dirPath, 0755); mkdirErr != nil { + log.Printf("[WARNING][%s] Failed creating directory %s: %s (original write error: %s)", workflowExecution.ExecutionId, dirPath, mkdirErr, err) + workflowExecution.ExecutionVariables[execVarIndex].Value = "Size too large. Removed." + } else if retryErr := ioutil.WriteFile(localPath, []byte(execVar.Value), 0644); retryErr != nil { + log.Printf("[WARNING][%s] Failed writing execution variable file after creating directory: %s", workflowExecution.ExecutionId, retryErr) + workflowExecution.ExecutionVariables[execVarIndex].Value = "Size too large. Removed." + } else { + workflowExecution.ExecutionVariables[execVarIndex].Value = replacementJson + } + } else { + workflowExecution.ExecutionVariables[execVarIndex].Value = replacementJson + } + } + } + for resultIndex, result := range workflowExecution.Results { for paramIndex, param := range result.Action.Parameters { if len(param.Value) > 32500 { - log.Printf("[DEBUG][%s] Trimming parameter %s in action %s (size: %d bytes)", workflowExecution.ExecutionId, param.Name, result.Action.Label, len(param.Value)) - workflowExecution.Results[resultIndex].Action.Parameters[paramIndex].Value = "Size too large. Removed." + itemSize := len(param.Value) + safeName := strings.ReplaceAll(param.Name, "/", "_") + safeName = strings.ReplaceAll(safeName, " ", "_") + if len(safeName) == 0 { + safeName = fmt.Sprintf("parameter_%d", paramIndex) + } + + actionId := fmt.Sprintf("result_action_parameter_%s_%s", result.Action.ID, safeName) + fullParsedPath := fmt.Sprintf("large_executions/%s/%s_%s", workflowExecution.ExecutionOrg, workflowExecution.ExecutionId, actionId) + localPath := fmt.Sprintf("%s/%s", basepath, fullParsedPath) + + log.Printf("[DEBUG][%s] Offloading parameter %s in action %s (%d bytes) to file %s", workflowExecution.ExecutionId, param.Name, result.Action.Label, itemSize, localPath) + + replacementJson := fmt.Sprintf(`{ + "success": false, + "reason": "Result too large to handle (https://github.com/frikky/shuffle/issues/171).", + "size": %d, + "extra": "replace", + "id": "%s_%s" + }`, itemSize, workflowExecution.ExecutionId, actionId) + + if err := ioutil.WriteFile(localPath, []byte(param.Value), 0644); err != nil { + dirPath := fmt.Sprintf("%s/large_executions/%s", basepath, workflowExecution.ExecutionOrg) + if mkdirErr := os.MkdirAll(dirPath, 0755); mkdirErr != nil { + log.Printf("[WARNING][%s] Failed creating directory %s: %s (original write error: %s)", workflowExecution.ExecutionId, dirPath, mkdirErr, err) + workflowExecution.Results[resultIndex].Action.Parameters[paramIndex].Value = "Size too large. Removed." + } else if retryErr := ioutil.WriteFile(localPath, []byte(param.Value), 0644); retryErr != nil { + log.Printf("[WARNING][%s] Failed writing parameter file after creating directory: %s", workflowExecution.ExecutionId, retryErr) + workflowExecution.Results[resultIndex].Action.Parameters[paramIndex].Value = "Size too large. Removed." + } else { + workflowExecution.Results[resultIndex].Action.Parameters[paramIndex].Value = replacementJson + } + } else { + workflowExecution.Results[resultIndex].Action.Parameters[paramIndex].Value = replacementJson + } } } for paramIndex, param := range result.Action.InvalidParameters { if len(param.Value) > 32500 { - log.Printf("[DEBUG][%s] Trimming invalid parameter %s in action %s (size: %d bytes)", workflowExecution.ExecutionId, param.Name, result.Action.Label, len(param.Value)) - workflowExecution.Results[resultIndex].Action.InvalidParameters[paramIndex].Value = "Size too large. Removed." + itemSize := len(param.Value) + safeName := strings.ReplaceAll(param.Name, "/", "_") + safeName = strings.ReplaceAll(safeName, " ", "_") + if len(safeName) == 0 { + safeName = fmt.Sprintf("invalid_parameter_%d", paramIndex) + } + + actionId := fmt.Sprintf("result_action_invalid_parameter_%s_%s", result.Action.ID, safeName) + fullParsedPath := fmt.Sprintf("large_executions/%s/%s_%s", workflowExecution.ExecutionOrg, workflowExecution.ExecutionId, actionId) + localPath := fmt.Sprintf("%s/%s", basepath, fullParsedPath) + + log.Printf("[DEBUG][%s] Offloading invalid parameter %s in action %s (%d bytes) to file %s", workflowExecution.ExecutionId, param.Name, result.Action.Label, itemSize, localPath) + + replacementJson := fmt.Sprintf(`{ + "success": false, + "reason": "Result too large to handle (https://github.com/frikky/shuffle/issues/171).", + "size": %d, + "extra": "replace", + "id": "%s_%s" + }`, itemSize, workflowExecution.ExecutionId, actionId) + + if err := ioutil.WriteFile(localPath, []byte(param.Value), 0644); err != nil { + dirPath := fmt.Sprintf("%s/large_executions/%s", basepath, workflowExecution.ExecutionOrg) + if mkdirErr := os.MkdirAll(dirPath, 0755); mkdirErr != nil { + log.Printf("[WARNING][%s] Failed creating directory %s: %s (original write error: %s)", workflowExecution.ExecutionId, dirPath, mkdirErr, err) + workflowExecution.Results[resultIndex].Action.InvalidParameters[paramIndex].Value = "Size too large. Removed." + } else if retryErr := ioutil.WriteFile(localPath, []byte(param.Value), 0644); retryErr != nil { + log.Printf("[WARNING][%s] Failed writing invalid parameter file after creating directory: %s", workflowExecution.ExecutionId, retryErr) + workflowExecution.Results[resultIndex].Action.InvalidParameters[paramIndex].Value = "Size too large. Removed." + } else { + workflowExecution.Results[resultIndex].Action.InvalidParameters[paramIndex].Value = replacementJson + } + } else { + workflowExecution.Results[resultIndex].Action.InvalidParameters[paramIndex].Value = replacementJson + } } } } @@ -19855,15 +20011,81 @@ func compressExecution(ctx context.Context, workflowExecution WorkflowExecution, for actionIndex, action := range workflowExecution.Workflow.Actions { for paramIndex, param := range action.Parameters { if len(param.Value) > 32500 { - log.Printf("[DEBUG][%s] Trimming workflow parameter %s in action %s (size: %d bytes)", workflowExecution.ExecutionId, param.Name, action.Label, len(param.Value)) - workflowExecution.Workflow.Actions[actionIndex].Parameters[paramIndex].Value = "Size too large. Removed." + itemSize := len(param.Value) + safeName := strings.ReplaceAll(param.Name, "/", "_") + safeName = strings.ReplaceAll(safeName, " ", "_") + if len(safeName) == 0 { + safeName = fmt.Sprintf("parameter_%d", paramIndex) + } + + actionId := fmt.Sprintf("workflow_action_parameter_%s_%s", action.ID, safeName) + fullParsedPath := fmt.Sprintf("large_executions/%s/%s_%s", workflowExecution.ExecutionOrg, workflowExecution.ExecutionId, actionId) + localPath := fmt.Sprintf("%s/%s", basepath, fullParsedPath) + + log.Printf("[DEBUG][%s] Offloading workflow parameter %s in action %s (%d bytes) to file %s", workflowExecution.ExecutionId, param.Name, action.Label, itemSize, localPath) + + replacementJson := fmt.Sprintf(`{ + "success": false, + "reason": "Result too large to handle (https://github.com/frikky/shuffle/issues/171).", + "size": %d, + "extra": "replace", + "id": "%s_%s" + }`, itemSize, workflowExecution.ExecutionId, actionId) + + if err := ioutil.WriteFile(localPath, []byte(param.Value), 0644); err != nil { + dirPath := fmt.Sprintf("%s/large_executions/%s", basepath, workflowExecution.ExecutionOrg) + if mkdirErr := os.MkdirAll(dirPath, 0755); mkdirErr != nil { + log.Printf("[WARNING][%s] Failed creating directory %s: %s (original write error: %s)", workflowExecution.ExecutionId, dirPath, mkdirErr, err) + workflowExecution.Workflow.Actions[actionIndex].Parameters[paramIndex].Value = "Size too large. Removed." + } else if retryErr := ioutil.WriteFile(localPath, []byte(param.Value), 0644); retryErr != nil { + log.Printf("[WARNING][%s] Failed writing workflow parameter file after creating directory: %s", workflowExecution.ExecutionId, retryErr) + workflowExecution.Workflow.Actions[actionIndex].Parameters[paramIndex].Value = "Size too large. Removed." + } else { + workflowExecution.Workflow.Actions[actionIndex].Parameters[paramIndex].Value = replacementJson + } + } else { + workflowExecution.Workflow.Actions[actionIndex].Parameters[paramIndex].Value = replacementJson + } } } for paramIndex, param := range action.InvalidParameters { if len(param.Value) > 32500 { - log.Printf("[DEBUG][%s] Trimming workflow invalid parameter %s in action %s (size: %d bytes)", workflowExecution.ExecutionId, param.Name, action.Label, len(param.Value)) - workflowExecution.Workflow.Actions[actionIndex].InvalidParameters[paramIndex].Value = "Size too large. Removed." + itemSize := len(param.Value) + safeName := strings.ReplaceAll(param.Name, "/", "_") + safeName = strings.ReplaceAll(safeName, " ", "_") + if len(safeName) == 0 { + safeName = fmt.Sprintf("invalid_parameter_%d", paramIndex) + } + + actionId := fmt.Sprintf("workflow_action_invalid_parameter_%s_%s", action.ID, safeName) + fullParsedPath := fmt.Sprintf("large_executions/%s/%s_%s", workflowExecution.ExecutionOrg, workflowExecution.ExecutionId, actionId) + localPath := fmt.Sprintf("%s/%s", basepath, fullParsedPath) + + log.Printf("[DEBUG][%s] Offloading workflow invalid parameter %s in action %s (%d bytes) to file %s", workflowExecution.ExecutionId, param.Name, action.Label, itemSize, localPath) + + replacementJson := fmt.Sprintf(`{ + "success": false, + "reason": "Result too large to handle (https://github.com/frikky/shuffle/issues/171).", + "size": %d, + "extra": "replace", + "id": "%s_%s" + }`, itemSize, workflowExecution.ExecutionId, actionId) + + if err := ioutil.WriteFile(localPath, []byte(param.Value), 0644); err != nil { + dirPath := fmt.Sprintf("%s/large_executions/%s", basepath, workflowExecution.ExecutionOrg) + if mkdirErr := os.MkdirAll(dirPath, 0755); mkdirErr != nil { + log.Printf("[WARNING][%s] Failed creating directory %s: %s (original write error: %s)", workflowExecution.ExecutionId, dirPath, mkdirErr, err) + workflowExecution.Workflow.Actions[actionIndex].InvalidParameters[paramIndex].Value = "Size too large. Removed." + } else if retryErr := ioutil.WriteFile(localPath, []byte(param.Value), 0644); retryErr != nil { + log.Printf("[WARNING][%s] Failed writing workflow invalid parameter file after creating directory: %s", workflowExecution.ExecutionId, retryErr) + workflowExecution.Workflow.Actions[actionIndex].InvalidParameters[paramIndex].Value = "Size too large. Removed." + } else { + workflowExecution.Workflow.Actions[actionIndex].InvalidParameters[paramIndex].Value = replacementJson + } + } else { + workflowExecution.Workflow.Actions[actionIndex].InvalidParameters[paramIndex].Value = replacementJson + } } } }