diff --git a/.gitignore b/.gitignore index faf7cc95a3..64af6d0ab9 100644 --- a/.gitignore +++ b/.gitignore @@ -9,4 +9,6 @@ *.sw* .run node_modules -ignore \ No newline at end of file +ignore +.kilo* +.claude* diff --git a/client/web/workflow/public/icons/warning.svg b/client/web/workflow/public/icons/warning.svg new file mode 100644 index 0000000000..69c44acc5d --- /dev/null +++ b/client/web/workflow/public/icons/warning.svg @@ -0,0 +1,4 @@ + + + + diff --git a/client/web/workflow/src/components/Configurator/Error.vue b/client/web/workflow/src/components/Configurator/Error.vue index 0a4124c2b9..1c418c0203 100644 --- a/client/web/workflow/src/components/Configurator/Error.vue +++ b/client/web/workflow/src/components/Configurator/Error.vue @@ -15,17 +15,69 @@ + + + + + + + +
+ + + + +
+
+ + +
@@ -60,6 +112,13 @@ import base from './base' import ExpressionEditor from '../ExpressionEditor.vue' +const ALLOWED_TARGETS = ['message', 'title', 'severity'] +const SEVERITY_VALUES = ['error', 'warning', 'info'] + +function makeArg (target, expr = '') { + return { target, type: 'String', expr } +} + export default { components: { ExpressionEditor, @@ -71,53 +130,153 @@ export default { return { expressionEditor: { currentExpression: undefined, + currentField: undefined, }, } }, + computed: { + // Computed getters are pure reads. All four allowed-target args + // are pre-created in `created()` and also re-created by + // `ensureArgs()` before any method that needs write access. This + // keeps the computeds side-effect-free so Vue's reactivity graph + // can't re-enter the getter while it is mutating arguments. + messageArg () { + return this.findArg('message') || makeArg('message') + }, + titleArg () { + return this.findArg('title') || makeArg('title') + }, + severityArg () { + return this.findArg('severity') || makeArg('severity') + }, + + // severityIsLiteral: true when the severity expression is either + // empty or a plain quoted literal we can round-trip through the + // dropdown. When it's a real expression (e.g. `vars.level`), the + // configurator falls back to a raw text input so we never clobber + // the author's work. + severityIsLiteral () { + const raw = (this.severityArg.expr || '').trim() + if (!raw) return true + return /^["'](error|warning|info)["']$/.test(raw) + }, + + severityValue: { + get () { + const raw = (this.severityArg.expr || '').trim() + if (!raw) return 'error' + const m = raw.match(/^["'](error|warning|info)["']$/) + return m ? m[1] : 'error' + }, + set (v) { + if (!SEVERITY_VALUES.includes(v)) v = 'error' + const arg = this.severityArg + this.$set(arg, 'expr', `"${v}"`) + }, + }, + + severityOptions () { + return [ + { value: 'error', text: this.$t('general:error-step.severity.options.error') }, + { value: 'warning', text: this.$t('general:error-step.severity.options.warning') }, + { value: 'info', text: this.$t('general:error-step.severity.options.info') }, + ] + }, + }, + created () { - let args = [{ - target: 'message', - type: 'String', - expr: '', - }] - - if (this.item.config.arguments && this.item.config.arguments.length) { - args = this.item.config.arguments.map(({ target, type, value, expr }) => { - return { + this.ensureArgs() + }, + + methods: { + // ensureArgs normalises item.config.arguments: keeps only allowed + // targets and guarantees one entry per target, even if empty. Safe + // to call from any write path. Does NOT run from computed getters. + ensureArgs () { + const existing = Array.isArray(this.item.config.arguments) ? this.item.config.arguments : [] + const byTarget = {} + existing.forEach(({ target, type, value, expr }) => { + if (!ALLOWED_TARGETS.includes(target)) return + byTarget[target] = { target, - type, + type: type || 'String', expr: expr || (value ? `"${value}"` : ''), } }) - } - this.$set(this.item.config, 'arguments', args) - }, + const args = ALLOWED_TARGETS.map(t => byTarget[t] || makeArg(t)) + this.$set(this.item.config, 'arguments', args) + }, - methods: { - valueChanged (value) { + // findArg is a pure read. Returns the matching arg object or + // undefined when absent. Safe to call from computeds. + findArg (target) { + const args = this.item.config.arguments || [] + return args.find(a => a.target === target) + }, + + // findOrCreateArg is a mutating read used by write paths only. + // It guarantees the arg exists on item.config.arguments before + // returning it. Never call from inside a computed getter — use + // findArg with a fallback instead. + findOrCreateArg (target) { + let arg = this.findArg(target) + if (!arg) { + this.ensureArgs() + arg = this.findArg(target) || makeArg(target) + } + return arg + }, + + // stripLiteralQuotes returns the unquoted content of a simple + // quoted string expression (e.g. "foo" -> foo) so the canvas node + // label preview reads cleanly. Non-literal expressions are passed + // through unchanged. + stripLiteralQuotes (s) { + const t = (s || '').trim() + const m = t.match(/^(["'])((?:[^\\]|\\.)*)\1$/) + return m ? m[2] : t + }, + + onFieldInput () { + const pick = this.titleArg.expr || this.messageArg.expr || '' + const preview = this.stripLiteralQuotes(pick) this.$emit('update-default-value', { - value: `Stop workflow with error: ${value}`, + value: preview ? `Stop workflow with error: ${preview}` : 'Stop workflow with error', force: !this.item.node.value, }) this.$root.$emit('change-detected') }, - openInEditor () { - this.expressionEditor.currentExpression = this.item.config.arguments[0].expr + onSeverityChange () { + this.$root.$emit('change-detected') }, - saveExpression () { - const { currentExpression } = this.expressionEditor - this.$set(this.item.config.arguments[0], 'expr', currentExpression) + resetSeverityToLiteral () { + this.$set(this.severityArg, 'expr', '"error"') this.$root.$emit('change-detected') + }, + openInEditor (field) { + const arg = this.findOrCreateArg(field) + this.expressionEditor.currentField = field + this.expressionEditor.currentExpression = arg.expr || '' + }, + + saveExpression () { + const { currentExpression, currentField } = this.expressionEditor + if (currentField) { + const arg = this.findOrCreateArg(currentField) + this.$set(arg, 'expr', currentExpression) + this.$root.$emit('change-detected') + } this.resetExpression() }, resetExpression () { this.expressionEditor.currentExpression = undefined + this.expressionEditor.currentField = undefined }, }, } diff --git a/client/web/workflow/src/components/WorkflowEditor.vue b/client/web/workflow/src/components/WorkflowEditor.vue index 74d15b1f62..dc0291ca14 100644 --- a/client/web/workflow/src/components/WorkflowEditor.vue +++ b/client/web/workflow/src/components/WorkflowEditor.vue @@ -147,6 +147,17 @@ +
+ + {{ $t('editor:detected-warnings') }} + +
+
-

- {{ issue[0].toUpperCase() + issue.slice(1) }} -

+
+ {{ $t('editor:issues') }} +
+
+

+ {{ issue[0].toUpperCase() + issue.slice(1) }} +

+
+
+ +
+
+ {{ $t('editor:warnings') }} +
+
+

+ {{ warning[0].toUpperCase() + warning.slice(1) }} +

+
@@ -527,6 +562,7 @@ export default { vertices: {}, edges: {}, issues: {}, + warnings: {}, highlights: [], @@ -547,6 +583,7 @@ export default { issuesModal: { show: false, issues: [], + warnings: [], }, dryRun: { @@ -691,6 +728,10 @@ export default { return (this.workflow.issues || []).length }, + hasWarnings () { + return (this.workflow.warnings || []).length + }, + getRunAs () { if (this.runAsUser) { const { userID, name, username, email } = this.runAsUser @@ -972,15 +1013,19 @@ export default { const isSelected = this.selection.includes(cell.mxObjectId) const shadow = isSelected ? 'shadow' : 'shadow-sm' const issue = this.getIcon('issue') + const warningIcon = this.getIcon('warning') const playIcon = this.getIcon('play') const stopIcon = this.getIcon('stop') const opacity = kind === 'trigger' && !vertex.triggers.enabled ? 'opacity: 0.7;' : '' let test = '' let issues = '' + let warnings = '' let id = '' if (this.issues[cell.id]) { issues = `` + } else if (this.warnings[cell.id]) { + warnings = `` } else { id = `${cell.id}` } @@ -1130,6 +1175,7 @@ export default { test + id + issues + + warnings + '' + '' + `
` + @@ -1800,7 +1846,8 @@ export default { const itemType = cell.edge ? 'edge' : item.config.kind if (event.target.id === 'openIssues') { - this.issuesModal.issues = this.issues[cell.id] + this.issuesModal.issues = this.issues[cell.id] || [] + this.issuesModal.warnings = this.warnings[cell.id] || [] this.issuesModal.show = true } else if (event.target.id === 'testWorkflow') { this.dryRun.cellID = cell.id @@ -2459,6 +2506,29 @@ export default { }) } + // Assemble warnings (same shape as issues, non-fatal). Rendered + // with a yellow badge on affected steps so authors can see them + // without the editor refusing to save or run the workflow. + this.warnings = {} + if (this.workflow.warnings) { + this.workflow.warnings.forEach(({ culprit, description }) => { + if (culprit) { + const { step = -1, trigger = -1 } = culprit + let stepID = '' + + if (step >= 0) { + stepID = (this.workflow.steps[step] || {}).stepID + } else if (trigger >= 0) { + stepID = (this.triggers[trigger] || {}).meta?.visual?.id || '' + } + + if (stepID) { + this.warnings[stepID] ? this.warnings[stepID].push(description) : this.warnings[stepID] = [description] + } + } + }) + } + this.deferred = false this.triggersPathsChanged = false diff --git a/client/web/workflow/src/views/Workflow/Editor.vue b/client/web/workflow/src/views/Workflow/Editor.vue index 5a55894a7b..fbfbc2c3f9 100644 --- a/client/web/workflow/src/views/Workflow/Editor.vue +++ b/client/web/workflow/src/views/Workflow/Editor.vue @@ -121,21 +121,61 @@ export default { }, saveWorkflow: throttle(async function (wf) { - try { - this.processingSave = true + // Issue #687: previously this handler would swallow trigger-update + // failures behind a misleading "configure triggers" message and + // could crash on `undefined.workflowID` if the workflow API call + // rejected (e.g. backend unreachable). It now distinguishes + // network failures from validation errors and surfaces the real + // underlying error to the user. + this.processingSave = true + + // isNetworkError requires *positive* evidence of a transport-layer + // failure. A plain JS error thrown from inside a .then() handler + // must not be misreported as "server unreachable", so we only + // match on known axios/fetch signals: + // - axios error code indicates network/timeout/DNS (ECONNREFUSED, + // ETIMEDOUT, ENETUNREACH, ENOTFOUND, EAI_AGAIN) or the + // axios ≥ 1.x generic ERR_NETWORK + // - axios marks the request as sent but with no response + // (err.request set, err.response absent) which is the + // canonical "server did not reply" case + // - the error message explicitly says so + // Anything else falls through as a generic error. + const isNetworkError = e => { + if (!e || typeof e === 'string') return false + if (e.code && /^(E(CONN|TIMED|NETUN|NOTFO|AI_AGAIN)|ERR_NETWORK)/i.test(e.code)) return true + if (e.request && !e.response) return true + if (typeof e.message === 'string' && /Network Error|Failed to fetch/i.test(e.message)) return true + return false + } + + const reportSaveError = e => { + if (isNetworkError(e)) { + this.toastDanger( + this.$t('notification:failed-save-network'), + this.$t('notification:failed-save'), + ) + return + } + // Defer to the shared handler — it knows how to render workflow + // error step rich payloads and falls back to plain toasts. + this.toastErrorHandler(this.$t('notification:failed-save'))(e) + } + try { const isNew = wf.workflowID === '0' const { triggers = [] } = wf // Firstly handle trigger updates // Delete triggers of steps that were deleted - await Promise.all(this.triggers.filter(({ triggerID }) => { - return !triggers.find(t => triggerID === t.triggerID) - }).map(({ triggerID }) => { - return this.$AutomationAPI.triggerDelete({ triggerID }) - }), - ).then(async () => { + try { + await Promise.all(this.triggers.filter(({ triggerID }) => { + return !triggers.find(t => triggerID === t.triggerID) + }).map(({ triggerID }) => { + return this.$AutomationAPI.triggerDelete({ triggerID }) + })) + await Promise.all(triggers.map(t => { // Update triggers that already have an ID if (t.triggerID) { @@ -152,25 +192,46 @@ export default { ownedBy: this.userID, }) } - })).catch(() => { - throw new Error(this.$t('notification:configure-triggers')) - }) - }) + })) + } catch (e) { + // Surface the actual trigger error instead of the generic + // "configure triggers" message that used to mask it. + reportSaveError(e) + return + } // Secondly handle workflow updates - if (isNew) { - wf = await this.$AutomationAPI.workflowCreate(wf) - } else { - wf = await this.$AutomationAPI.workflowUpdate(wf) + let saved + try { + saved = isNew + ? await this.$AutomationAPI.workflowCreate(wf) + : await this.$AutomationAPI.workflowUpdate(wf) + } catch (e) { + reportSaveError(e) + return + } + + if (!saved || !saved.workflowID) { + // Defensive: API returned an unexpected shape. Don't crash on + // saved.workflowID later down the chain. Emit the raw payload + // to the browser console so the malformed response is still + // diagnosable — the toast can't show it safely and silently + // dropping it would make this path impossible to debug. + console.warn('workflow save: unexpected response payload', saved) + this.toastDanger( + this.$t('notification:failed-save-unexpected-response'), + this.$t('notification:failed-save'), + ) + return } // Lastly update all of the bits - await this.fetchTriggers(wf.workflowID) + await this.fetchTriggers(saved.workflowID) this.changeDetected = false window.onbeforeunload = null - this.workflow = new automation.Workflow(wf) + this.workflow = new automation.Workflow(saved) this.toastSuccess(this.$t('notification:update.success')) if (isNew) { @@ -178,10 +239,12 @@ export default { this.$router.push({ name: 'workflow.edit', params: { workflowID: this.workflow.workflowID } }) } } catch (e) { - this.toastErrorHandler(this.$t('notification:failed-save'))(e) + reportSaveError(e) + } finally { + // Guarantee the processing flag is cleared even if an unexpected + // synchronous error escapes the try block above. + this.processingSave = false } - - this.processingSave = false }, 500), deleteWorkflow () { diff --git a/lib/vue/src/mixins/toast.js b/lib/vue/src/mixins/toast.js index 27de040d46..220126a955 100644 --- a/lib/vue/src/mixins/toast.js +++ b/lib/vue/src/mixins/toast.js @@ -32,6 +32,40 @@ export default { return err.message }, + // extractWorkflowErrorMeta looks for the namespaced "workflow.error.*" + // meta keys attached by the backend workflow error step. It probes the + // handful of shapes the error can arrive in (direct reject, axios wrap, + // nested response.data.error) and returns a normalised object, or null + // when the error is not a workflow error step error. + // + // This is purely additive: when it returns null, toastErrorHandler + // falls through to the existing plain-toast behaviour. + extractWorkflowErrorMeta (err) { + if (!err || typeof err !== 'object') return null + + // candidate meta bags, in priority order + const candidates = [ + err.meta, + err.response && err.response.data && err.response.data.error && err.response.data.error.meta, + err.data && err.data.error && err.data.error.meta, + err.error && err.error.meta, + ] + + for (const meta of candidates) { + if (!meta || typeof meta !== 'object') continue + const title = meta['workflow.error.title'] + const severity = meta['workflow.error.severity'] + if (title || severity) { + return { + title: title || '', + severity: severity || 'error', + } + } + } + + return null + }, + toastErrorHandler (opt) { if (typeof opt === 'string') { opt = { title: opt } @@ -40,6 +74,37 @@ export default { const { prefix, title } = opt return (err = {}) => { + // Rich workflow error step path. Guarded by a try/catch so that + // any unexpected toast library or context issue silently degrades + // to the legacy plain-toast rendering below. + try { + const wfErr = this.extractWorkflowErrorMeta(err) + if (wfErr) { + const variant = wfErr.severity === 'warning' + ? 'warning' + : wfErr.severity === 'info' + ? 'info' + : 'danger' + + // The toast body is always the flat error message (which + // is equal to the author-configured `message` argument). + // + // NOTE: message/title originate from workflow-author-controlled + // meta fields. BootstrapVue's $bvToast.toast(content, opts) + // renders both content and title as plain text (no v-html), + // which is what we want here — do not switch to an HTML + // variant without sanitising, as that would expose the + // workflow editor as a stored-XSS surface. + const body = err.message || '' + const toastTitle = wfErr.title || title || '' + + this.toast(body, { title: toastTitle, variant, solid: true }) + return err.message + } + } catch (_) { + // fall through to legacy rendering + } + let toastMsg = '' let toastTitle = title diff --git a/locale/en/corteza-webapp-workflow/editor.yaml b/locale/en/corteza-webapp-workflow/editor.yaml index 1d9b8c20a9..ced9d88c9a 100644 --- a/locale/en/corteza-webapp-workflow/editor.yaml +++ b/locale/en/corteza-webapp-workflow/editor.yaml @@ -7,6 +7,7 @@ delete: Delete deleted: Deleted 'detected-changes': "Changes detected " 'detected-issues': Issues detected +'detected-warnings': Warnings detected disabled: Disabled help: Help handle: (Handle) @@ -15,6 +16,7 @@ id: "ID:" 'initial-scope': Initial scope 'input-ids-or-handles': The initial scope gets injected into the workflow at execution. To load available variables, input the related IDs/Handles below issues: Issues +warnings: Warnings 'load-and-configure': Load and Configure 'modify-initial-scope-if-no-variables-are-loaded': If you do not wish to load any variables, click "Configure" to modify the initial scope before running workflow 'open-webapp-on-prompt-use' : "WARNING: If prompts are used in the workflow, make sure that the related webapp is also opened. Otherwise workflow will timeout" diff --git a/locale/en/corteza-webapp-workflow/general.yaml b/locale/en/corteza-webapp-workflow/general.yaml index 6b25c06f74..83adf6e63c 100644 --- a/locale/en/corteza-webapp-workflow/general.yaml +++ b/locale/en/corteza-webapp-workflow/general.yaml @@ -5,6 +5,21 @@ enable: Enable disable: Disable error: Error error-expression: Error message (expression) +error-step: + message: + label: Message + description: The error text shown to the end user. Required. + title: + label: Title + description: Short headline shown in rich toast notifications. Optional. + severity: + label: Severity + description: Controls the toast variant and icon in the end-user UI. + reset-to-literal: Reset to a literal value (error, warning, info) + options: + error: Error + warning: Warning + info: Info export: Export general: General handle: Handle diff --git a/locale/en/corteza-webapp-workflow/notification.yaml b/locale/en/corteza-webapp-workflow/notification.yaml index a48a25db78..a0c0d9a92f 100644 --- a/locale/en/corteza-webapp-workflow/notification.yaml +++ b/locale/en/corteza-webapp-workflow/notification.yaml @@ -1,4 +1,3 @@ -'configure-triggers': Make sure all trigger steps are properly configured 'confirm-unsaved-changes': You have unsaved changes, are you sure you want to exit? error: " - ERROR: " 'event-type-not-found': Event type not found @@ -9,6 +8,8 @@ error: " - ERROR: " 'failed-fetch-workflows': Failed to fetch workflows 'failed-load-file': Failed to load file 'failed-save': Save failed +'failed-save-network': Unable to reach the server. Check your connection and try again. +'failed-save-unexpected-response': The server returned an unexpected response. The workflow may not have been saved. 'failed-test': Test failed 'fetch-types-failed': Failed to fetch types general: diff --git a/server/automation/service/workflow.go b/server/automation/service/workflow.go index dd89d42996..00f692f2d0 100644 --- a/server/automation/service/workflow.go +++ b/server/automation/service/workflow.go @@ -676,6 +676,11 @@ func (svc *workflow) validateWorkflow(ctx context.Context, wf *types.Workflow) ( g, wf.Issues = Convert(svc, wf) + // Non-fatal static analysis. Produces wf.Warnings so the editor + // can surface design-time problems (e.g. variable conflicts at + // parallel joins) without blocking save or run. + wf.Warnings = analyzeWorkflow(wf) + tt, _, err = store.SearchAutomationTriggers(ctx, svc.store, types.TriggerFilter{ WorkflowID: id.Strings(types.WorkflowSet{wf}.IDs()...), Deleted: filter.StateExcluded, diff --git a/server/automation/service/workflow_analyzer.go b/server/automation/service/workflow_analyzer.go new file mode 100644 index 0000000000..660746090f --- /dev/null +++ b/server/automation/service/workflow_analyzer.go @@ -0,0 +1,245 @@ +package service + +import ( + "fmt" + "sort" + + "github.com/cortezaproject/corteza/server/automation/types" +) + +// analyzeWorkflow performs non-fatal static analysis on a workflow +// definition and returns a WorkflowIssueSet suitable for assignment to +// wf.Warnings. Analysis is intentionally conservative: it only reports +// on patterns that can be decided purely from the workflow graph and +// the declared step metadata, without evaluating expressions. +// +// Current checks: +// +// - Parallel-join variable conflicts: for every join gateway, walk +// every upstream branch back to the matching fork (or the workflow +// entry) and collect the set of variable names each branch can +// write. Any variable name written in more than one branch is +// reported as a potential conflict at the join step. Matches the +// runtime detectConflicts check in wfexec.joinGateway.Exec but +// fires at save time so authors see the warning before they run. +// +// Assumptions (documented so future readers know the limits): +// +// - Only statically declared writes are considered. An Expressions +// step's writes are its argument target names; a Function or +// ExecWorkflow step's writes are its result target names. Dynamic +// writes inside expression bodies (e.g. vars[computed] = ...) are +// not detected. +// - Subworkflow contents are NOT walked. A subworkflow invocation's +// writes into the calling scope are exactly the ExecWorkflow step's +// own declared results, so the step's static declaration already +// captures the full contract. This is how the runtime wires +// subworkflow results back too (see convExecWorkflowStep). +// - The analyzer never fails the workflow. If an unsupported topology +// is encountered (e.g. a broken path with no reachable fork) it +// silently skips that join and moves on. +func analyzeWorkflow(wf *types.Workflow) types.WorkflowIssueSet { + if wf == nil || len(wf.Steps) == 0 || len(wf.Paths) == 0 { + return nil + } + + // step index lookup: stepID -> position in wf.Steps (used as + // culprit index for per-step badge placement in the editor) + stepIdx := make(map[uint64]int, len(wf.Steps)) + stepByID := make(map[uint64]*types.WorkflowStep, len(wf.Steps)) + for i, s := range wf.Steps { + stepIdx[s.ID] = i + stepByID[s.ID] = s + } + + // parents[stepID] = list of parent step IDs feeding into stepID + parents := make(map[uint64][]uint64, len(wf.Steps)) + for _, p := range wf.Paths { + parents[p.ChildID] = append(parents[p.ChildID], p.ParentID) + } + + var warnings types.WorkflowIssueSet + + for _, s := range wf.Steps { + if s.Kind != types.WorkflowStepKindGateway || s.Ref != "join" { + continue + } + + // For each incoming branch, collect the set of variables that + // branch can write. A "branch" is defined as the set of steps + // strictly between a fork (or entry) and the join, following + // parent edges backwards from each direct parent of the join. + // The home join ID is passed in so the walker can stop at any + // *other* join it encounters — otherwise, in nested parallels, + // the outer branch's writes get unioned into every inner + // branch and produce spurious inner-join conflicts. + branchWrites := make([]map[string]bool, 0, len(parents[s.ID])) + for _, parentID := range parents[s.ID] { + writes := collectBranchWrites(parentID, s.ID, stepByID, parents) + if len(writes) > 0 { + branchWrites = append(branchWrites, writes) + } + } + + if len(branchWrites) < 2 { + continue + } + + // For each variable name, count how many branches wrote it. + counts := make(map[string]int) + for _, bw := range branchWrites { + for name := range bw { + counts[name]++ + } + } + + var conflicts []string + for name, c := range counts { + if c >= 2 { + conflicts = append(conflicts, name) + } + } + if len(conflicts) == 0 { + continue + } + + sort.Strings(conflicts) + for _, name := range conflicts { + warnings = warnings.Append( + fmt.Errorf( + "variable %q is written by more than one parallel branch joining here; the value from the first-configured parent wins at runtime and the others are discarded. Rename the conflicting outputs (e.g. %s_a, %s_b) if you need them all.", + name, name, name, + ), + map[string]int{"step": stepIdx[s.ID]}, + ) + } + } + + return warnings +} + +// collectBranchWrites walks parent edges backwards from the given +// starting step, collecting the set of variable names that any step +// on the branch can write. Walking stops when it hits: +// +// - A fork gateway (upstream boundary of the branch). +// - Any join gateway other than `homeJoinID`. In nested parallels, +// an inner branch's upstream chain passes through the inner join, +// then the inner fork, then the outer branch — walking through +// the inner join would pull the outer branch's writes into every +// inner branch and produce spurious inner-join conflicts. Stopping +// at any *other* join cuts that bleed. +// - No more parent edges (workflow entry). +// +// The starting step itself is included. +// +// Cycles are possible if the workflow has iterators (loop-back edges). +// A visited set prevents infinite walks; iterator bodies still get +// their writes collected on the first visit. +func collectBranchWrites( + startID uint64, + homeJoinID uint64, + stepByID map[uint64]*types.WorkflowStep, + parents map[uint64][]uint64, +) map[string]bool { + writes := make(map[string]bool) + visited := make(map[uint64]bool) + + var walk func(id uint64) + walk = func(id uint64) { + if visited[id] { + return + } + visited[id] = true + + s := stepByID[id] + if s == nil { + return + } + + // Stop at a fork — that's the upstream boundary of the branch. + // Do not collect the fork's writes; it doesn't produce any. + if s.Kind == types.WorkflowStepKindGateway && s.Ref == "fork" { + return + } + + // Stop at any join other than the one we're analyzing. The + // inner join's own analysis will handle its incoming branches + // independently; we must not cross it. + if s.Kind == types.WorkflowStepKindGateway && s.Ref == "join" && id != homeJoinID { + return + } + + for name := range stepWrites(s) { + writes[name] = true + } + + for _, pid := range parents[id] { + walk(pid) + } + } + + walk(startID) + return writes +} + +// stepWrites returns the set of variable names a step can write into +// the shared scope based on its declared metadata. Returns an empty +// map for step kinds that do not produce scope writes. +func stepWrites(s *types.WorkflowStep) map[string]bool { + out := make(map[string]bool) + if s == nil { + return out + } + + switch s.Kind { + case types.WorkflowStepKindExpressions: + // Expressions step declares its assignments via Arguments, + // each with an explicit Target name that becomes the scope + // key. This is how convExpressionStep / ExpressionsStep wire + // outputs at runtime. + for _, a := range s.Arguments { + if a != nil && a.Target != "" { + out[a.Target] = true + } + } + + case types.WorkflowStepKindFunction, + types.WorkflowStepKindExecWorkflow: + // Function / subworkflow call: step results are the declared + // outputs. For ExecWorkflow specifically, the runtime evaluates + // s.Results against the subworkflow's return value, so the + // declared targets ARE the contract. + for _, r := range s.Results { + if r != nil && r.Target != "" { + out[r.Target] = true + } + } + + case types.WorkflowStepKindIterator: + // Iterator step results (item, counter, isFirst, ...) are + // loop-local: they're injected into the body scope on each + // iteration and do NOT persist past the iterator's exit path. + // Treating them as branch writes produces false positives + // (two parallel iterators both called "item" are not actually + // in conflict at the downstream join). Steps inside the + // iterator body still get their writes collected through the + // normal parent-edge walk. + + case types.WorkflowStepKindErrHandler: + // Error handler declares author-chosen variable names for + // error / errorMessage / errorStepID via Results targets. + // These become scope variables when a downstream step errors. + for _, r := range s.Results { + if r != nil && r.Target != "" { + out[r.Target] = true + } + } + + default: + // Error, Termination, Break, Continue, Debug, Delay, Prompt, + // Visual and plain Gateway steps do not write to scope. + } + + return out +} diff --git a/server/automation/service/workflow_converter.go b/server/automation/service/workflow_converter.go index 06f3dbd305..2013ae5b84 100644 --- a/server/automation/service/workflow_converter.go +++ b/server/automation/service/workflow_converter.go @@ -44,10 +44,9 @@ func Convert(wfService *workflow, wf *types.Workflow) (*wfexec.Graph, types.Work // Converts workflow definition to wf execution graph func (svc workflowConverter) makeGraph(def *types.Workflow) (*wfexec.Graph, types.WorkflowIssueSet) { var ( - g = wfexec.NewGraph() - wfii = types.WorkflowIssueSet{} - IDs = make(map[uint64]int) - lastResStep *types.WorkflowStep + g = wfexec.NewGraph() + wfii = types.WorkflowIssueSet{} + IDs = make(map[uint64]int) ) // Basic step verification @@ -80,10 +79,8 @@ func (svc workflowConverter) makeGraph(def *types.Workflow) (*wfexec.Graph, type for g.Len() < len(ss) { progress := false - lastResStep = nil for _, step := range ss { - lastResStep = step if g.StepByID(step.ID) != nil { // resolved continue @@ -118,17 +115,79 @@ func (svc workflowConverter) makeGraph(def *types.Workflow) (*wfexec.Graph, type } if !progress { - var culprit = make(map[string]int) - if lastResStep != nil { - culprit = map[string]int{"step": IDs[lastResStep.ID]} - } + // nothing resolved for 1 cycle — report every unresolved step + // that is genuinely waiting on a dependency. If a step has no + // unresolved neighbours in either direction, it is failing to + // convert for its own reasons (bad ref, missing required + // arg, unknown function, ...) — verifyStep and the converter + // error path have already added those issues to wfii, so + // adding a "waiting for [] and/or []" message on top would + // just be misleading noise. + for _, step := range def.Steps { + if g.StepByID(step.ID) != nil { + continue + } + + var ( + unresolvedChildren []uint64 + unresolvedParents []uint64 + ) + + for _, path := range def.Paths { + if path.ParentID == step.ID && g.StepByID(path.ChildID) == nil { + unresolvedChildren = append(unresolvedChildren, path.ChildID) + } + if path.ChildID == step.ID && g.StepByID(path.ParentID) == nil { + unresolvedParents = append(unresolvedParents, path.ParentID) + } + } + + if len(unresolvedChildren) == 0 && len(unresolvedParents) == 0 { + // not a dependency problem — let the other issues on + // this step speak for themselves. + continue + } - // nothing resolved for 1 cycle - wfii = wfii.Append(fmt.Errorf("failed to resolve workflow step dependencies"), culprit) + culprit := map[string]int{"step": IDs[step.ID]} + wfii = wfii.Append(fmt.Errorf( + "failed to resolve workflow step dependencies for %s step (ref: %q, ID: %d): waiting for unresolved child step(s) [IDs: %v] and/or unresolved parent step(s) [IDs: %v]", + step.Kind, + step.Ref, + step.ID, + unresolvedChildren, + unresolvedParents, + ), culprit) + } break } } + // Backfill join gateway parents that weren't resolved at the time the + // join gateway itself was created. This is necessary because join gateways + // are now created eagerly (with whatever parents are available) so that + // circular dependencies (e.g. iterator -> join -> iterator) can be broken. + for _, step := range ss { + if step.Kind != types.WorkflowStepKindGateway || step.Ref != "join" { + continue + } + resolved := g.StepByID(step.ID) + if resolved == nil { + continue + } + pa, ok := resolved.(wfexec.PathAdder) + if !ok { + continue + } + for _, path := range def.Paths { + if path.ChildID != step.ID { + continue + } + if parent := g.StepByID(path.ParentID); parent != nil { + pa.AddPath(parent) + } + } + } + for pos, path := range def.Paths { if g.StepByID(path.ChildID) == nil { wfii = wfii.Append(fmt.Errorf("failed to resolve step with ID %d", path.ChildID), map[string]int{"path": pos}) @@ -162,12 +221,17 @@ func (svc workflowConverter) makeGraph(def *types.Workflow) (*wfexec.Graph, type // // if this func returns nil for step and error, assume unresolved dependencies func (svc workflowConverter) workflowStepDefConv(g *wfexec.Graph, def *types.Workflow, s *types.WorkflowStep, in, out []*types.WorkflowPath) (bool, error) { - if err := svc.parseExpressions(s.Arguments...); err != nil { - return false, errors.Internal("failed to parse step arguments expressions for %s: %s", s.Kind, err).Wrap(err) + if err := svc.parseExpressionsAs("argument", s.Arguments...); err != nil { + // parseExpressionsAs already wraps its inner error with a + // descriptive message via fmt.Errorf("%w"), so the returned + // error's .Error() text already includes the full chain. + // Don't add another .Wrap(err) on top — that would double + // the inner error in both the message AND the wrap chain. + return false, errors.Internal("invalid %s step (ref: %q): %s", s.Kind, s.Ref, err) } - if err := svc.parseExpressions(s.Results...); err != nil { - return false, errors.Internal("failed to parse step results expressions for %s: %s", s.Kind, err).Wrap(err) + if err := svc.parseExpressionsAs("result", s.Results...); err != nil { + return false, errors.Internal("invalid %s step (ref: %q): %s", s.Kind, s.Ref, err) } conv, err := func() (wfexec.Step, error) { @@ -238,12 +302,13 @@ func (svc workflowConverter) convGateway(g *wfexec.Graph, s *types.WorkflowStep, var ( ss []wfexec.Step ) + // Collect whatever parents are already resolved. Missing parents will + // be backfilled after the main resolution loop via AddPath. This avoids + // circular dependencies where a parent (e.g. an iterator) needs this + // join gateway to exist in the graph before it itself can resolve. for _, p := range in { if parent := g.StepByID(p.ParentID); parent != nil { ss = append(ss, parent) - } else { - // unresolved parent, come back later. - return nil, nil } } @@ -401,37 +466,94 @@ func (svc workflowConverter) convFunctionStep(g *wfexec.Graph, s *types.Workflow // creates error step // -// Expects ZERO outgoing paths and +// Expects ZERO outgoing paths. +// +// Supported arguments: +// - message (required, string) the error text. Rendered as the flat +// error string and also as the body of the rich toast when title or +// severity are set. +// - title (optional, string) short headline for rich toast rendering +// - severity (optional, string) one of "error" (default), "warning", "info" +// +// title/severity are attached to the resulting error's meta under the +// "workflow.error.*" namespace and surfaced to the frontend via the +// errors package's JSON marshaller. The error's flat message is always +// equal to the evaluated message argument, so older clients that do +// not understand the meta fields still render a sensible string. func (svc workflowConverter) convErrorStep(s *types.WorkflowStep) (wfexec.Step, error) { const ( - argName = "message" + argMessage = "message" + argTitle = "title" + argSeverity = "severity" + + metaTitle = "workflow.error.title" + metaSeverity = "workflow.error.severity" + + severityError = "error" + severityWarning = "warning" + severityInfo = "info" ) var ( args = types.ExprSet(s.Arguments) ) + // resolve a single string argument either from the evaluated scope or + // from a literal value on the unevaluated arg set; returns "" if absent. + resolveStr := func(result *expr.Vars, name string) string { + if result != nil && result.Has(name) { + if str, err := expr.NewString(expr.Must(result.Select(name))); err == nil { + return str.GetValue() + } + } + if a := args.GetByTarget(name); a != nil { + if aux, is := a.Value.(string); is { + return aux + } + } + return "" + } + return wfexec.NewGenericStep(func(ctx context.Context, r *wfexec.ExecRequest) (wfexec.ExecResponse, error) { - var ( - msg string - result, err = args.Eval(ctx, r.Scope) - ) + result, err := args.Eval(ctx, r.Scope) if err != nil { return nil, err } - if result.Has(argName) { - str, _ := expr.NewString(expr.Must(result.Select(argName))) - msg = str.GetValue() - } else { - if aux, is := args.GetByTarget(argName).Value.(string); is { - msg = aux - } else { - msg = "ERROR" - } + var ( + message = resolveStr(result, argMessage) + title = resolveStr(result, argTitle) + severity = resolveStr(result, argSeverity) + ) + + // normalise severity to a known value; default to error + switch severity { + case severityWarning, severityInfo, severityError: + // keep as-is + default: + severity = severityError } - return nil, errors.Automation("%s", msg) + // fall back to a literal "ERROR" so we never emit an empty error + if message == "" { + message = "ERROR" + } + + // Tag the error as a user-authored workflow error step so that + // errors.ServeHTTPWithCode preserves its full payload (message + + // meta) across the wire even under production masking. This is + // a narrow, opt-in bypass — generic KindAutomation errors from + // elsewhere are not affected. + e := errors.Automation("%s", message).Apply(errors.Meta(errors.MetaWorkflowErrorSafe, true)) + + if title != "" { + e = e.Apply(errors.Meta(metaTitle, title)) + } + if title != "" || severity != severityError { + e = e.Apply(errors.Meta(metaSeverity, severity)) + } + + return nil, e }), nil } @@ -598,22 +720,36 @@ func (svc workflowConverter) convExecWorkflowStep(wf *types.Workflow, s *types.W }), nil } -func (svc workflowConverter) parseExpressions(ee ...*types.Expr) (err error) { - for _, e := range ee { +// parseExpressionsAs parses argument/result expressions and enriches any +// failure with a human-readable category (e.g. "argument" or "result"), +// the target name, the failing expression excerpt, and which lifecycle +// stage (parse / type / test parse) produced the error. +// +// Issue #1725: previously failures leaked bare parser output with no +// indication of which step field was at fault. +func (svc workflowConverter) parseExpressionsAs(category string, ee ...*types.Expr) (err error) { + wrap := func(target, stage string, inner error) error { + excerpt := exprExcerpt(ee, target) + if category == "" { + return fmt.Errorf("%s %q (%s): %w", stage, target, excerpt, inner) + } + return fmt.Errorf("%s %q %s (%s): %w", category, target, stage, excerpt, inner) + } + for _, e := range ee { if len(strings.TrimSpace(e.Expr)) > 0 { if err = svc.parser.ParseEvaluators(e); err != nil { - return + return wrap(e.Target, "failed to parse expression", err) } } if err = e.SetType(exprTypeSetter(svc.reg, e)); err != nil { - return err + return wrap(e.Target, "failed to resolve type", err) } - for _, t := range e.Tests { + for ti, t := range e.Tests { if err = svc.parser.ParseEvaluators(t); err != nil { - return + return wrap(e.Target, fmt.Sprintf("failed to parse test #%d expression", ti+1), err) } } } @@ -621,6 +757,32 @@ func (svc workflowConverter) parseExpressions(ee ...*types.Expr) (err error) { return nil } +// exprExcerpt returns a short, single-line snippet of the expression for +// the named target, suitable for embedding in an error message. Long +// expressions are truncated with an ellipsis to keep messages readable. +// +// Truncation is rune-aware so that non-ASCII identifiers and string +// literals are never split mid-codepoint. +func exprExcerpt(ee []*types.Expr, target string) string { + const maxRunes = 80 + for _, e := range ee { + if e.Target != target { + continue + } + s := strings.TrimSpace(e.Expr) + s = strings.ReplaceAll(s, "\n", " ") + if s == "" { + return "" + } + runes := []rune(s) + if len(runes) > maxRunes { + return string(runes[:maxRunes]) + "…" + } + return s + } + return "" +} + func verifyStep(s *types.WorkflowStep, in, out types.WorkflowPathSet) types.WorkflowIssueSet { const ( arguments = "argument" @@ -719,7 +881,14 @@ func verifyStep(s *types.WorkflowStep, in, out types.WorkflowPathSet) types.Work } } - return errors.Internal("unexpected type %q for argument %q on step type %q", msgArg.Type, argName, s.Kind) + accepted := make([]string, 0, len(tt)) + for _, typ := range tt { + accepted = append(accepted, typ.Type()) + } + return errors.Internal( + "%s step argument %q has unexpected type %q (accepted: %s)", + s.Kind, argName, msgArg.Type, strings.Join(accepted, ", "), + ) } } @@ -727,7 +896,17 @@ func verifyStep(s *types.WorkflowStep, in, out types.WorkflowPathSet) types.Work requiredArg = func(argName string, tt ...expr.Type) func() error { return func() error { if msgArg := types.ExprSet(s.Arguments).GetByTarget(argName); msgArg == nil { - return errors.Internal("%s step expects to have '%s' argument", s.Kind, argName) + accepted := make([]string, 0, len(tt)) + for _, typ := range tt { + accepted = append(accepted, typ.Type()) + } + if len(accepted) > 0 { + return errors.Internal( + "%s step is missing required argument %q (expected type: %s)", + s.Kind, argName, strings.Join(accepted, " or "), + ) + } + return errors.Internal("%s step is missing required argument %q", s.Kind, argName) } return checkArg(argName, tt...)() @@ -789,9 +968,46 @@ func verifyStep(s *types.WorkflowStep, in, out types.WorkflowPathSet) types.Work checks = append(checks, gatewayCheck(zero(arguments), zero(results))...) case types.WorkflowStepKindError: + // Error step supports three string arguments: + // message (required) the error text. Doubles as the rich + // toast body and the flat error string. + // title (optional) rich toast headline + // severity (optional) "error"|"warning"|"info" checks = append(checks, requiredArg("message", expr.String{}), - count(0, 1, arguments), + checkArg("title", expr.String{}), + checkArg("severity", expr.String{}), + count(1, 3, arguments), + func() error { + allowed := map[string]bool{"message": true, "title": true, "severity": true} + for _, a := range s.Arguments { + if !allowed[a.Target] { + return errors.Internal("%s step does not accept argument %q (allowed: message, title, severity)", s.Kind, a.Target) + } + } + return nil + }, + // If severity is provided as a literal string expression, + // validate it at save time. Non-literal expressions (e.g. + // vars.level) fall through and are normalised at runtime. + func() error { + sev := types.ExprSet(s.Arguments).GetByTarget("severity") + if sev == nil { + return nil + } + lit, ok := severityLiteralValue(sev.Expr) + if !ok { + return nil + } + switch lit { + case "error", "warning", "info": + return nil + } + return errors.Internal( + "%s step severity must be one of \"error\", \"warning\" or \"info\" (got %q)", + s.Kind, lit, + ) + }, zero(results), last, ) @@ -882,3 +1098,28 @@ func verifyStep(s *types.WorkflowStep, in, out types.WorkflowPathSet) types.Work return ii } + +// severityLiteralValue returns the unquoted content of an expression +// string when it is a simple single- or double-quoted literal (e.g. +// "error" or 'warning'), along with true. For any other expression — +// a variable reference, a function call, a concatenation — it returns +// "", false so the caller can skip save-time validation and leave the +// runtime normalisation to do its job. +func severityLiteralValue(expr string) (string, bool) { + s := strings.TrimSpace(expr) + if len(s) < 2 { + return "", false + } + q := s[0] + if (q != '"' && q != '\'') || s[len(s)-1] != q { + return "", false + } + inner := s[1 : len(s)-1] + // Reject anything that still contains a quote of the same kind — + // that would mean the literal is not the whole expression (e.g. + // "foo" + "bar" — each side is a literal but the whole isn't). + if strings.ContainsRune(inner, rune(q)) { + return "", false + } + return inner, true +} diff --git a/server/automation/types/workflow.go b/server/automation/types/workflow.go index dc1b6fc84c..aee8ea44a0 100644 --- a/server/automation/types/workflow.go +++ b/server/automation/types/workflow.go @@ -36,6 +36,12 @@ type ( // Collection of issues from the last parse Issues WorkflowIssueSet `json:"issues,omitempty"` + // Collection of non-fatal warnings from the last parse. + // Uses the same shape as Issues (description + culprit) so + // the frontend can render them with the same per-step mapping. + // Warnings do not prevent the workflow from running. + Warnings WorkflowIssueSet `json:"warnings,omitempty"` + RunAs uint64 `json:"runAs,string"` OwnedBy uint64 `json:"ownedBy,string"` diff --git a/server/pkg/errors/http.go b/server/pkg/errors/http.go index 0270750fe6..bbd3237aa2 100644 --- a/server/pkg/errors/http.go +++ b/server/pkg/errors/http.go @@ -139,8 +139,22 @@ func writeHttpJSON(ctx context.Context, w io.Writer, err error, mask bool) { return } - if se, is := err.(interface{ Safe() bool }); !is || !se.Safe() || mask { - // trim error details when not debugging or error is not safe or maske + // Preserve the full error (message + meta) when: + // - it is a KindAutomation error AND + // - it carries the explicit workflow.error.safe = true meta flag + // + // The flag is set only by convErrorStep for user-authored workflow + // error steps. This keeps the bypass surgical: generic automation + // errors from elsewhere (e.g. corredor wrapper errors) still go + // through the normal masking path. + if e, isErr := err.(*Error); isErr && e.kind == KindAutomation && e.meta != nil { + if safe, _ := e.meta[MetaWorkflowErrorSafe].(bool); safe { + // preserve full error (skip masking) + } else if se, is := err.(interface{ Safe() bool }); !is || !se.Safe() || mask { + err = errors.New(err.Error()) + } + } else if se, is := err.(interface{ Safe() bool }); !is || !se.Safe() || mask { + // trim error details when not debugging or error is not safe or masked err = errors.New(err.Error()) } diff --git a/server/pkg/errors/http_test.go b/server/pkg/errors/http_test.go index ced4e1797d..13e2122769 100644 --- a/server/pkg/errors/http_test.go +++ b/server/pkg/errors/http_test.go @@ -88,3 +88,55 @@ func Test_writeHttpJSON(t *testing.T) { req.NotContains(buf.String(), "meta") req.NotContains(buf.String(), "stack") } + +// Locks the masking matrix for KindAutomation errors so the +// MetaWorkflowErrorSafe opt-in bypass semantics cannot be +// silently changed by future refactors. +// +// Cells: +// 1. KindAutomation + MetaWorkflowErrorSafe=true + masked → meta preserved +// 2. KindAutomation + no flag + masked → meta stripped +// 3. KindInternal + MetaWorkflowErrorSafe=true + masked → meta stripped +// (flag is a no-op on non-automation kinds) +// 4. KindAutomation + MetaWorkflowErrorSafe=true + unmasked → meta preserved +func Test_writeHttpJSON_workflowErrorSafeBypass(t *testing.T) { + req := require.New(t) + + // 1. automation + safe flag + masked → meta visible + safe := Automation("author message").Apply( + Meta(MetaWorkflowErrorSafe, true), + Meta("workflow.error.title", "Upstream failure"), + ) + buf := bytes.NewBuffer(nil) + writeHttpJSON(context.Background(), buf, safe, true) + req.Contains(buf.String(), "author message", "message must always be visible") + req.Contains(buf.String(), "workflow.error.title", "safe flag must preserve meta under mask") + req.Contains(buf.String(), "Upstream failure") + + // 2. automation + no flag + masked → meta stripped + plain := Automation("runtime error").Apply(Meta("internal", "secret")) + buf.Truncate(0) + writeHttpJSON(context.Background(), buf, plain, true) + req.Contains(buf.String(), "runtime error", "flat message must still be visible") + req.NotContains(buf.String(), "internal", "unflagged automation meta must be stripped under mask") + req.NotContains(buf.String(), "secret") + + // 3. non-automation kind + safe flag + masked → meta stripped + // (the flag is only honoured for KindAutomation — it must not + // leak meta from other error kinds even if the flag is set) + nonAuto := Internal("internal boom").Apply( + Meta(MetaWorkflowErrorSafe, true), + Meta("leak", "pls no"), + ) + buf.Truncate(0) + writeHttpJSON(context.Background(), buf, nonAuto, true) + req.Contains(buf.String(), "internal boom") + req.NotContains(buf.String(), "leak", "safe flag on non-automation must be ignored") + req.NotContains(buf.String(), "pls no") + + // 4. automation + safe flag + unmasked → meta visible (debug path) + buf.Truncate(0) + writeHttpJSON(context.Background(), buf, safe, false) + req.Contains(buf.String(), "author message") + req.Contains(buf.String(), "Upstream failure") +} diff --git a/server/pkg/errors/meta.go b/server/pkg/errors/meta.go index 507a901270..24045b23a9 100644 --- a/server/pkg/errors/meta.go +++ b/server/pkg/errors/meta.go @@ -12,6 +12,15 @@ type ( meta map[interface{}]interface{} ) +// Well-known meta keys recognised by the errors package itself. +// +// MetaWorkflowErrorSafe is set on KindAutomation errors that originate +// from a user-authored workflow error step. When present and truthy, +// ServeHTTPWithCode preserves the full error (message + meta) in the +// HTTP response even in masked/production mode, so that structured +// fields like workflow.error.title/body/severity reach the frontend. +const MetaWorkflowErrorSafe = "workflow.error.safe" + // StringKeys returns max length of (string) keys and slice of all strings func (m meta) StringKeys() (int, []string) { var ( diff --git a/server/pkg/wfexec/gateways.go b/server/pkg/wfexec/gateways.go index 3097640ef7..1ee702c6da 100644 --- a/server/pkg/wfexec/gateways.go +++ b/server/pkg/wfexec/gateways.go @@ -2,7 +2,9 @@ package wfexec import ( "context" + "encoding/json" "fmt" + "strings" "sync" "github.com/cortezaproject/corteza/server/pkg/expr" @@ -27,6 +29,13 @@ func NewGatewayPath(s Step, t pathTester) (gwp *GatewayPath, err error) { return &GatewayPath{to: s, test: t}, nil } +// PathAdder is implemented by gateways that support lazy parent registration. +// This allows resolving circular dependencies during graph construction where +// parent steps aren't yet in the graph when the gateway is created. +type PathAdder interface { + AddPath(Step) +} + // joinGateway handles merging/joining of multiple paths into // a single path forward type ( @@ -39,6 +48,17 @@ type ( } ) +// AddPath adds a parent path to the join gateway after initial creation. +// This allows resolving circular dependencies where parent steps aren't yet +// in the graph at the time the join gateway is constructed. +func (gw *joinGateway) AddPath(s Step) { + gw.l.Lock() + defer gw.l.Unlock() + if !gw.paths.Contains(s) { + gw.paths = append(gw.paths, s) + } +} + // JoinGateway fn initializes join gateway with all paths that are expected to be partial func JoinGateway(ss ...Step) *joinGateway { return &joinGateway{ @@ -56,12 +76,30 @@ func JoinGateway(ss ...Step) *joinGateway { } } -// Exec fn on join gateway can be called multiple times, even multiple times parent the same parent +// Exec fn on join gateway can be called multiple times, even multiple times for the same parent. +// +// It collects scope and results from every parent path and merges them when +// all configured paths have reported in. +// +// Merge rules: +// - Every parent's full scope is merged in (previously only paths[0]'s +// scope was kept, so variables set mid-branch in any other path were +// silently dropped — this is the "variables lost at parallel join" +// class of bug). +// - Every parent's immediate step results are merged on top of the +// combined scope, mirroring the intra-branch rule that a step's +// results overwrite earlier scope values on conflict. +// - Within each tier (scopes, then results), lower path index wins on +// key conflicts. paths[0] is the preferred branch. This is +// implemented by iterating paths in REVERSE order and relying on +// MustMerge's last-writer-wins semantics: the last iterator passed +// to MustMerge wins, so the earliest path ends up applied last and +// takes precedence. // -// Func will collect results from each parent path. +// Final precedence, high to low: // -// Join gateway is ready to continue when all configured paths have been collected -// Results are merged to preserve changes from all parallel paths +// paths[0].results > paths[1].results > ... > paths[N].results > +// paths[0].scope > paths[1].scope > ... > paths[N].scope func (gw *joinGateway) Exec(_ context.Context, r *ExecRequest) (ExecResponse, error) { gw.l.Lock() defer gw.l.Unlock() @@ -82,30 +120,155 @@ func (gw *joinGateway) Exec(_ context.Context, r *ExecRequest) (ExecResponse, er return &partial{}, nil } - // All collected, merge results from all paths into base scope - var merged *expr.Vars - if len(gw.paths) > 0 && gw.scopes[r.SessionID][gw.paths[0]] != nil { - merged = gw.scopes[r.SessionID][gw.paths[0]].MustMerge() - } - - var allResults []expr.Iterator - for _, p := range gw.paths { - if gw.results[r.SessionID][p] != nil { - allResults = append(allResults, gw.results[r.SessionID][p]) + // Detect cross-branch variable conflicts before merging. For each + // key, collect the set of (pathIndex -> serialised value) entries + // across both scopes and results. If a key has more than one + // distinct value across the branches it is a conflict and we emit + // a warning. The merge still proceeds (paths[0] wins) — the + // warnings are advisory, not fatal. + warnings := gw.detectConflicts(r.SessionID) + + // Merge scopes first, in reverse path order so paths[0] wins on conflict. + merged := &expr.Vars{} + for i := len(gw.paths) - 1; i >= 0; i-- { + if s := gw.scopes[r.SessionID][gw.paths[i]]; s != nil { + merged = merged.MustMerge(s) } } - if merged != nil && len(allResults) > 0 { - merged = merged.MustMerge(allResults...) + // Then merge results on top, also in reverse path order. + for i := len(gw.paths) - 1; i >= 0; i-- { + if res := gw.results[r.SessionID][gw.paths[i]]; res != nil { + merged = merged.MustMerge(res) + } } // all inbound paths visited, cleanup scopes and results for the session delete(gw.scopes, r.SessionID) delete(gw.results, r.SessionID) + if len(warnings) > 0 { + return ResponseWithWarnings(merged, warnings), nil + } return merged, nil } +// detectConflicts walks every (scope, results) bag from every parent +// branch and returns a human-readable warning for every key that has +// more than one distinct value across branches. +// +// The per-value fingerprint is produced by JSON-marshaling the underlying +// value with map keys sorted. This gives a stable, deterministic string +// even for map-valued contributors — fmt.Sprintf("%v", ...) prints maps +// in random iteration order and pointers as addresses, which produces +// both false positives and false negatives. encoding/json sorts object +// keys on output (see encoding/json package docs), so two structurally +// equal maps serialise identically. +// +// Values that fail to marshal (cyclic references, unsupported types) +// fall back to "%v" formatting; they're still compared consistently +// within a single run, just not robustly across types. +func (gw *joinGateway) detectConflicts(sessionID uint64) []string { + type contributor struct { + parentID uint64 + pathIdx int + origin string // "scope" or "results" + value string + } + + fingerprint := func(tv expr.TypedValue) string { + if tv == nil { + return "null" + } + if b, err := json.Marshal(tv.Get()); err == nil { + return string(b) + } + return fmt.Sprintf("%v", tv.Get()) + } + + perKey := make(map[string][]contributor) + + collect := func(origin string, bag map[Step]*expr.Vars) { + for i, p := range gw.paths { + v := bag[p] + if v == nil { + continue + } + var parentID uint64 + if p != nil { + parentID = p.ID() + } + _ = v.Each(func(k string, tv expr.TypedValue) error { + perKey[k] = append(perKey[k], contributor{ + parentID: parentID, + pathIdx: i, + origin: origin, + value: fingerprint(tv), + }) + return nil + }) + } + } + + collect("scope", gw.scopes[sessionID]) + collect("results", gw.results[sessionID]) + + var warnings []string + for key, cc := range perKey { + if len(cc) < 2 { + continue + } + // detect whether all contributors share the same value + same := true + for i := 1; i < len(cc); i++ { + if cc[i].value != cc[0].value { + same = false + break + } + } + if same { + continue + } + + // build a list of contributing branches for the warning, + // naming branches by their parent step ID so authors can + // correlate with the canvas instead of an abstract paths[i]. + branchDescs := make([]string, 0, len(cc)) + for _, c := range cc { + branchDescs = append(branchDescs, + fmt.Sprintf("step %d.%s=%s", c.parentID, c.origin, truncateForWarning(c.value)), + ) + } + + // name the winner — paths[0] — by its actual parent step ID + var winner uint64 + if len(gw.paths) > 0 && gw.paths[0] != nil { + winner = gw.paths[0].ID() + } + + warnings = append(warnings, fmt.Sprintf( + "variable %q has conflicting values at parallel join (%s); the value from step %d wins", + key, + strings.Join(branchDescs, ", "), + winner, + )) + } + + return warnings +} + +// truncateForWarning clips long values so a warning listing 8 branches +// doesn't explode into an unreadable wall of text. Rune-aware so that +// non-ASCII values can never be split mid-codepoint. +func truncateForWarning(s string) string { + const maxRunes = 40 + runes := []rune(s) + if len(runes) <= maxRunes { + return s + } + return string(runes[:maxRunes]) + "…" +} + // forkGateway handles forking to multiple paths type forkGateway struct { StepIdentifier diff --git a/server/pkg/wfexec/response.go b/server/pkg/wfexec/response.go index c5a16b34a6..058116dc60 100644 --- a/server/pkg/wfexec/response.go +++ b/server/pkg/wfexec/response.go @@ -52,3 +52,22 @@ type ( func LoopBreak() *loopBreak { return &loopBreak{} } func LoopContinue() *loopContinue { return &loopContinue{} } + +// responseWithWarnings wraps a *expr.Vars scope together with a set of +// non-fatal warnings produced while computing it. The session unwraps +// this and attaches the warnings to the current State so they land in +// the emitted Frame's Warnings slice alongside the scope in Frame.Scope. +// +// Used by the join gateway to surface variable conflicts across +// parallel branches without changing the normal *expr.Vars contract. +type responseWithWarnings struct { + scope *expr.Vars + warnings []string +} + +// ResponseWithWarnings constructs a wrapper response. Callers pass the +// scope they would have returned normally plus a list of human-readable +// warning messages that the session should propagate into the frame. +func ResponseWithWarnings(scope *expr.Vars, warnings []string) *responseWithWarnings { + return &responseWithWarnings{scope: scope, warnings: warnings} +} diff --git a/server/pkg/wfexec/session.go b/server/pkg/wfexec/session.go index 7795b446b1..53e83f3d51 100644 --- a/server/pkg/wfexec/session.go +++ b/server/pkg/wfexec/session.go @@ -88,6 +88,13 @@ type ( Action string `json:"action,omitempty"` Error string `json:"error,omitempty"` + + // Warnings are non-fatal messages produced during step execution. + // Populated from State.warnings by MakeFrame. Used today by the + // join gateway to surface cross-branch variable conflicts but + // intended as a general channel for any step that wants to report + // a soft problem without failing the workflow. + Warnings []string `json:"warnings,omitempty"` } // ExecRequest is passed to Exec() functions and contains all information @@ -721,15 +728,19 @@ func (s *Session) exec(ctx context.Context, log *zap.Logger, st *State) (nxt []* zap.Error(st.err), ) - err = setErrorHandlerResultsToScope(scope, st.results, st.err, st.step.ID()) + err = setErrorHandlerResultsToScope(scope, st.errHandlerResults, st.err, st.step.ID()) if err != nil { return nil, err } // copy error handler & disable it on state to prevent inf. loop - // in case of another error in the error-handling branch + // in case of another error in the error-handling branch. + // errHandlerResults is cleared alongside errHandler so the two + // stay in lockstep — a disabled handler must not leave its name + // mapping behind for a later handler to accidentally reuse. eh := st.errHandler st.errHandler = nil + st.errHandlerResults = nil st.errHandled = true return []*State{st.Next(eh, scope)}, nil } @@ -754,6 +765,22 @@ func (s *Session) exec(ctx context.Context, log *zap.Logger, st *State) (nxt []* } log.Debug("step executed", zap.String("resultType", fmt.Sprintf("%T", result))) + + // Unwrap a responseWithWarnings wrapper. The inner scope is + // treated exactly like a plain *expr.Vars return by the switch + // below; the warnings are attached to state so MakeFrame picks + // them up on the next frame emission, and mirrored into the + // server log so ops can grep for them without the editor. + if rw, isRW := result.(*responseWithWarnings); isRW { + if len(rw.warnings) > 0 { + st.warnings = append(st.warnings, rw.warnings...) + for _, w := range rw.warnings { + log.Warn("workflow step warning", zap.String("warning", w)) + } + } + result = rw.scope + } + switch result := result.(type) { case *expr.Vars: // most common (successful) result @@ -764,9 +791,17 @@ func (s *Session) exec(ctx context.Context, log *zap.Logger, st *State) (nxt []* case *errHandler: st.action = "error handler initialized" // this step sets error handling step on current state - // and continues on the current path + // and continues on the current path. + // + // The error-var name mapping (error / errorMessage / + // errorStepID -> author-chosen variable names) is stored + // in its own field so it survives across later normal + // steps — previously it was merged into st.results and + // got clobbered the moment any downstream step returned + // its own outputs, making errorMessage etc. disappear + // from scope on any subsequent error. st.errHandler = result.handler - st.results = st.results.MustMerge(result.results) + st.errHandlerResults = result.results // find step that's not error handler and // use it for the next step diff --git a/server/pkg/wfexec/state.go b/server/pkg/wfexec/state.go index 4ad6b06135..8045d1d0a4 100644 --- a/server/pkg/wfexec/state.go +++ b/server/pkg/wfexec/state.go @@ -48,12 +48,56 @@ type ( // error handling step errHandler Step + // error handler result variable name mapping. + // + // Populated when an error-handler step runs and holds the + // "error" / "errorMessage" / "errorStepID" -> variable-name + // mapping declared on the handler. Read by the error-injection + // path when a downstream step fails, so that the actual error + // values end up under the author-configured variable names. + // + // Must not be conflated with `results` (previous step outputs): + // previously both lived in `results`, so the first non-error + // step after an error handler clobbered the mapping and made + // `errorMessage` etc. disappear from scope on any later error. + // Carried alongside `errHandler` in State.Next() with the same + // lifecycle — set together, replaced together, cleared together. + // + // Intentional non-propagation to the running scope: these + // variables are NOT merged into the happy-path scope. An + // Expressions step downstream of the error handler that + // references, say, `myErrorMessage` will not see it until an + // error actually fires and the error-injection path writes + // the real value under that name. This is by design — silently + // seeding author-chosen variable names into scope as empty/nil + // would pollute the scope namespace and make debugging harder + // (expression referencing `myErrorMessage` returning nil would + // not be distinguishable from a typo). Authors who want a + // default value should initialise it explicitly in an + // Expressions step before the error handler. + errHandlerResults *expr.Vars + // error handled flag, this gets restarted on every new state! errHandled bool loops []Iterator action string + + // warnings accumulated during the current step's execution. + // Set by the session when a step returns a responseWithWarnings + // wrapper and surfaced via MakeFrame into Frame.Warnings so the + // editor test panel can render them alongside the existing + // action/error info. + // + // Concurrency: each *State is owned by a single step-execution + // goroutine for its entire lifecycle (see Session.exec and the + // goroutine launched around it). Appends to warnings happen in + // exec(); MakeFrame is called synchronously on the same + // goroutine from the subsequent eventHandler invocation. + // Sibling parallel branches run on distinct *State values so + // there is no cross-state sharing. No mutex needed. + warnings []string } ) @@ -83,13 +127,14 @@ func FinalState(ses *Session, scope *expr.Vars) *State { func (s State) Next(current Step, scope *expr.Vars) *State { return &State{ - stateId: nextID(), - owner: s.owner, - sessionId: s.sessionId, - parent: s.step, - errHandler: s.errHandler, - results: s.results, - loops: s.loops, + stateId: nextID(), + owner: s.owner, + sessionId: s.sessionId, + parent: s.step, + errHandler: s.errHandler, + errHandlerResults: s.errHandlerResults, + results: s.results, + loops: s.loops, step: current, scope: scope, @@ -153,6 +198,7 @@ func (s State) MakeFrame() *Frame { StateID: s.stateId, NextSteps: s.next.IDs(), Action: s.action, + Warnings: append([]string(nil), s.warnings...), } var wg sync.WaitGroup