Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
281 changes: 277 additions & 4 deletions db-connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -1005,6 +1005,87 @@ func GetWorkflowExecution(ctx context.Context, id string) (*WorkflowExecution, e

workflowExecution.Results[valueIndex].Result = newValue
}

for paramIndex, param := range value.Action.Parameters {
if strings.Contains(param.Value, "Result too large to handle") {
newValue, err := getExecutionFileValue(ctx, *workflowExecution, ActionResult{
Result: param.Value,
})
if err == nil {
workflowExecution.Results[valueIndex].Action.Parameters[paramIndex].Value = newValue
}
}
}

for paramIndex, param := range value.Action.InvalidParameters {
if strings.Contains(param.Value, "Result too large to handle") {
newValue, err := getExecutionFileValue(ctx, *workflowExecution, ActionResult{
Result: param.Value,
})
if err == nil {
workflowExecution.Results[valueIndex].Action.InvalidParameters[paramIndex].Value = newValue
}
}
}
}

for actionIndex, action := range workflowExecution.Workflow.Actions {
for paramIndex, param := range action.Parameters {
if strings.Contains(param.Value, "Result too large to handle") {
newValue, err := getExecutionFileValue(ctx, *workflowExecution, ActionResult{
Result: param.Value,
})
if err == nil {
workflowExecution.Workflow.Actions[actionIndex].Parameters[paramIndex].Value = newValue
}
}
}

for paramIndex, param := range action.InvalidParameters {
if strings.Contains(param.Value, "Result too large to handle") {
newValue, err := getExecutionFileValue(ctx, *workflowExecution, ActionResult{
Result: param.Value,
})
if err == nil {
workflowExecution.Workflow.Actions[actionIndex].InvalidParameters[paramIndex].Value = newValue
}
}
}
}

for triggerIndex, trigger := range workflowExecution.Workflow.Triggers {
for paramIndex, param := range trigger.Parameters {
if strings.Contains(param.Value, "Result too large to handle") {
newValue, err := getExecutionFileValue(ctx, *workflowExecution, ActionResult{
Result: param.Value,
})
if err == nil {
workflowExecution.Workflow.Triggers[triggerIndex].Parameters[paramIndex].Value = newValue
}
}
}
}

for execVarIndex, execVar := range workflowExecution.Workflow.ExecutionVariables {
if strings.Contains(execVar.Value, "Result too large to handle") {
newValue, err := getExecutionFileValue(ctx, *workflowExecution, ActionResult{
Result: execVar.Value,
})
if err == nil {
workflowExecution.Workflow.ExecutionVariables[execVarIndex].Value = newValue
}
}
}

for execVarIndex, execVar := range workflowExecution.ExecutionVariables {
if strings.Contains(execVar.Value, "Result too large to handle") {
newValue, err := getExecutionFileValue(ctx, *workflowExecution, ActionResult{
Result: execVar.Value,
})
if err == nil {
workflowExecution.ExecutionVariables[execVarIndex].Value = newValue
}
}
}

// Fixes missing pieces
Expand Down Expand Up @@ -1109,6 +1190,87 @@ func GetWorkflowExecution(ctx context.Context, id string) (*WorkflowExecution, e

workflowExecution.Results[valueIndex].Result = newValue
}

for paramIndex, param := range value.Action.Parameters {
if strings.Contains(param.Value, "Result too large to handle") {
newValue, err := getExecutionFileValue(ctx, *workflowExecution, ActionResult{
Result: param.Value,
})
if err == nil {
workflowExecution.Results[valueIndex].Action.Parameters[paramIndex].Value = newValue
}
}
}

for paramIndex, param := range value.Action.InvalidParameters {
if strings.Contains(param.Value, "Result too large to handle") {
newValue, err := getExecutionFileValue(ctx, *workflowExecution, ActionResult{
Result: param.Value,
})
if err == nil {
workflowExecution.Results[valueIndex].Action.InvalidParameters[paramIndex].Value = newValue
}
}
}
}

for actionIndex, action := range workflowExecution.Workflow.Actions {
for paramIndex, param := range action.Parameters {
if strings.Contains(param.Value, "Result too large to handle") {
newValue, err := getExecutionFileValue(ctx, *workflowExecution, ActionResult{
Result: param.Value,
})
if err == nil {
workflowExecution.Workflow.Actions[actionIndex].Parameters[paramIndex].Value = newValue
}
}
}

for paramIndex, param := range action.InvalidParameters {
if strings.Contains(param.Value, "Result too large to handle") {
newValue, err := getExecutionFileValue(ctx, *workflowExecution, ActionResult{
Result: param.Value,
})
if err == nil {
workflowExecution.Workflow.Actions[actionIndex].InvalidParameters[paramIndex].Value = newValue
}
}
}
}

for triggerIndex, trigger := range workflowExecution.Workflow.Triggers {
for paramIndex, param := range trigger.Parameters {
if strings.Contains(param.Value, "Result too large to handle") {
newValue, err := getExecutionFileValue(ctx, *workflowExecution, ActionResult{
Result: param.Value,
})
if err == nil {
workflowExecution.Workflow.Triggers[triggerIndex].Parameters[paramIndex].Value = newValue
}
}
}
}

for execVarIndex, execVar := range workflowExecution.Workflow.ExecutionVariables {
if strings.Contains(execVar.Value, "Result too large to handle") {
newValue, err := getExecutionFileValue(ctx, *workflowExecution, ActionResult{
Result: execVar.Value,
})
if err == nil {
workflowExecution.Workflow.ExecutionVariables[execVarIndex].Value = newValue
}
}
}

for execVarIndex, execVar := range workflowExecution.ExecutionVariables {
if strings.Contains(execVar.Value, "Result too large to handle") {
newValue, err := getExecutionFileValue(ctx, *workflowExecution, ActionResult{
Result: execVar.Value,
})
if err == nil {
workflowExecution.ExecutionVariables[execVarIndex].Value = newValue
}
}
}
}

Expand Down Expand Up @@ -1900,9 +2062,28 @@ func GetExecutionVariables(ctx context.Context, executionId string) (string, int
}

func getExecutionFileValue(ctx context.Context, workflowExecution WorkflowExecution, action ActionResult) (string, error) {
fullParsedPath := fmt.Sprintf("large_executions/%s/%s_%s", workflowExecution.ExecutionOrg, workflowExecution.ExecutionId, action.Action.ID)
fileID := action.Action.ID
if len(fileID) == 0 && len(action.Result) > 0 {
parsedPlaceholder := map[string]interface{}{}
if err := json.Unmarshal([]byte(action.Result), &parsedPlaceholder); err == nil {
if parsedID, ok := parsedPlaceholder["id"].(string); ok && len(parsedID) > 0 {
prefix := fmt.Sprintf("%s_", workflowExecution.ExecutionId)
if strings.HasPrefix(parsedID, prefix) {
fileID = strings.TrimPrefix(parsedID, prefix)
} else {
fileID = parsedID
}
}
}
}

if len(fileID) == 0 {
return "", errors.New("missing file id for large execution value")
}

fullParsedPath := fmt.Sprintf("large_executions/%s/%s_%s", workflowExecution.ExecutionOrg, workflowExecution.ExecutionId, fileID)

cacheKey := fmt.Sprintf("%s_%s_action_replace", workflowExecution.ExecutionId, action.Action.ID)
cacheKey := fmt.Sprintf("%s_%s_action_replace", workflowExecution.ExecutionId, fileID)
if project.CacheDb {
cache, err := GetCache(ctx, cacheKey)
if err == nil {
Expand Down Expand Up @@ -9741,7 +9922,53 @@ func SetWorkflowRevision(ctx context.Context, workflow Workflow) error {
if project.DbType == "opensearch" {
err = indexEs(ctx, nameKey, workflow.RevisionId, data)
if err != nil {
return err
if strings.Contains(err.Error(), "immense term") {
retried := false
indexWorkflow := workflow

for actionIndex, action := range indexWorkflow.Actions {
for paramIndex, param := range action.Parameters {
if len(param.Value) > 32500 {
log.Printf("[DEBUG][%s] Trimming workflow revision parameter %s in action %s for OpenSearch indexing (size: %d bytes)", workflow.ID, param.Name, action.Label, len(param.Value))
indexWorkflow.Actions[actionIndex].Parameters[paramIndex].Value = "Size too large. Removed."
retried = true
}
}

for paramIndex, param := range action.InvalidParameters {
if len(param.Value) > 32500 {
log.Printf("[DEBUG][%s] Trimming workflow revision invalid parameter %s in action %s for OpenSearch indexing (size: %d bytes)", workflow.ID, param.Name, action.Label, len(param.Value))
indexWorkflow.Actions[actionIndex].InvalidParameters[paramIndex].Value = "Size too large. Removed."
retried = true
}
}
}

for triggerIndex, trigger := range indexWorkflow.Triggers {
for paramIndex, param := range trigger.Parameters {
if len(param.Value) > 32500 {
log.Printf("[DEBUG][%s] Trimming workflow revision trigger parameter %s in trigger %s for OpenSearch indexing (size: %d bytes)", workflow.ID, param.Name, trigger.Label, len(param.Value))
indexWorkflow.Triggers[triggerIndex].Parameters[paramIndex].Value = "Size too large. Removed."
retried = true
}
}
}

if retried {
indexData, marshalErr := json.Marshal(indexWorkflow)
if marshalErr != nil {
log.Printf("[WARNING] Failed marshalling trimmed workflow revision for ES retry: %s", marshalErr)
return marshalErr
}

log.Printf("[DEBUG][%s] Retrying OpenSearch workflow revision save after trimming oversized parameter values", workflow.ID)
err = indexEs(ctx, nameKey, workflow.RevisionId, indexData)
}
}

if err != nil {
return err
}
}
} else {
key := datastore.NameKey(nameKey, workflow.RevisionId, nil)
Expand Down Expand Up @@ -9944,7 +10171,53 @@ func SetWorkflow(ctx context.Context, workflow Workflow, id string, optionalEdit
if project.DbType == "opensearch" {
err = indexEs(ctx, nameKey, id, data)
if err != nil {
return err
if strings.Contains(err.Error(), "immense term") {
retried := false
indexWorkflow := workflow

for actionIndex, action := range indexWorkflow.Actions {
for paramIndex, param := range action.Parameters {
if len(param.Value) > 32500 {
log.Printf("[DEBUG][%s] Trimming workflow parameter %s in action %s for OpenSearch indexing (size: %d bytes)", workflow.ID, param.Name, action.Label, len(param.Value))
indexWorkflow.Actions[actionIndex].Parameters[paramIndex].Value = "Size too large. Removed."
retried = true
}
}

for paramIndex, param := range action.InvalidParameters {
if len(param.Value) > 32500 {
log.Printf("[DEBUG][%s] Trimming workflow invalid parameter %s in action %s for OpenSearch indexing (size: %d bytes)", workflow.ID, param.Name, action.Label, len(param.Value))
indexWorkflow.Actions[actionIndex].InvalidParameters[paramIndex].Value = "Size too large. Removed."
retried = true
}
}
}

for triggerIndex, trigger := range indexWorkflow.Triggers {
for paramIndex, param := range trigger.Parameters {
if len(param.Value) > 32500 {
log.Printf("[DEBUG][%s] Trimming workflow trigger parameter %s in trigger %s for OpenSearch indexing (size: %d bytes)", workflow.ID, param.Name, trigger.Label, len(param.Value))
indexWorkflow.Triggers[triggerIndex].Parameters[paramIndex].Value = "Size too large. Removed."
retried = true
}
}
}

if retried {
indexData, marshalErr := json.Marshal(indexWorkflow)
if marshalErr != nil {
log.Printf("[WARNING] Failed marshalling trimmed workflow for ES retry: %s", marshalErr)
return marshalErr
}

log.Printf("[DEBUG][%s] Retrying OpenSearch workflow save after trimming oversized parameter values", workflow.ID)
err = indexEs(ctx, nameKey, id, indexData)
}
}

if err != nil {
return err
}
}
} else {
//log.Printf("\n\n[INFO] Adding workflow with ID %s\n\n", id)
Expand Down
Loading
Loading