From 6d3358dacba071bda1ecb58152c7d4147696d7be Mon Sep 17 00:00:00 2001 From: Dan Plyukhin Date: Tue, 23 Jun 2026 16:52:25 -0400 Subject: [PATCH 1/4] Visit payloads inside system Nexus envelopes When the payload visitor encounters a ScheduleNexusOperationCommandAttributes targeting the system Nexus endpoint (__temporal_system) and the WorkflowService, its Input is a proto-binary-serialized request whose own fields carry the user payloads rather than an opaque single payload. Decode the envelope, visit the inner payloads recursively (so external storage and codecs apply to the inner payloads, not the envelope), and re-serialize -- leaving the envelope itself un-offloaded and un-codec'd. Ordinary Nexus operations continue to visit Input as a single opaque payload. The (service, operation) -> input proto type mapping comes from a new, dependency-free `nexussystem` registry package generated by nex-gen from a checked-in WIT (nexussystem/wit), wired into the build via the `nexus-system-registry` Make target. The envelope decode/visit/encode logic lives in a hand-written proxy/system_nexus.go helper that the generated interceptor calls; only binary/protobuf envelopes are accepted. --- Makefile | 29 ++- cmd/proxygenerator/interceptor.go | 6 + nexussystem/registry.go | 25 ++ .../wit/deps/nexus-temporal-types/model.wit | 156 +++++++++++++ nexussystem/wit/workflow-service.wit | 119 ++++++++++ proxy/interceptor.go | 10 +- proxy/system_nexus.go | 129 +++++++++++ proxy/system_nexus_test.go | 216 ++++++++++++++++++ 8 files changed, 685 insertions(+), 5 deletions(-) create mode 100644 nexussystem/registry.go create mode 100644 nexussystem/wit/deps/nexus-temporal-types/model.wit create mode 100644 nexussystem/wit/workflow-service.wit create mode 100644 proxy/system_nexus.go create mode 100644 proxy/system_nexus_test.go diff --git a/Makefile b/Makefile index 4ec09ae0..1307664c 100644 --- a/Makefile +++ b/Makefile @@ -97,10 +97,37 @@ grpc-mock: go run ./cmd/mockgen-fix WorkflowService workflowservicemock/v1/service_grpc.pb.mock.go .PHONY: proxy -proxy: gen-proto-desc +proxy: gen-proto-desc nexus-system-registry printf $(COLOR) "Generate proxy code..." (cd ./cmd/proxygenerator && go mod tidy && go run ./) +##### Nexus system operation registry ##### +# The registry maps (service, operation) -> input proto message for system +# Nexus envelopes. It is generated by nex-gen from a checked-in WIT and consumed +# by the proxy payload visitor. nex-gen is invoked via the NEX_GEN_BIN env var +# (path to the built binary) or, if unset, the `nex-gen` binary on PATH. +NEXUS_SYSTEM_DIR := nexussystem +NEXUS_SYSTEM_WIT := $(NEXUS_SYSTEM_DIR)/wit/workflow-service.wit +NEXUS_SYSTEM_WIT_DEPS := $(NEXUS_SYSTEM_DIR)/wit/deps +NEX_GEN ?= $(if $(NEX_GEN_BIN),$(NEX_GEN_BIN),nex-gen) + +.PHONY: nexus-system-registry +nexus-system-registry: gen-proto-desc + printf $(COLOR) "Generate Nexus system operation registry..." + rm -rf $(NEXUS_SYSTEM_DIR)/gen + $(NEX_GEN) generate \ + --lang go \ + --input $(NEXUS_SYSTEM_WIT) \ + --input $(NEXUS_SYSTEM_WIT_DEPS) \ + --descriptors $(PROTO_OUT)/descriptor_set.pb \ + --output $(NEXUS_SYSTEM_DIR)/gen + # Keep only the dependency-free registry; rewrite its package to the + # top-level nexussystem package and check it in as registry.go. + sed 's/^package registry$$/package nexussystem/' \ + $(NEXUS_SYSTEM_DIR)/gen/registry/registry.go > $(NEXUS_SYSTEM_DIR)/registry.go + rm -rf $(NEXUS_SYSTEM_DIR)/gen + gofmt -w $(NEXUS_SYSTEM_DIR)/registry.go + goimports: printf $(COLOR) "Run goimports..." goimports -w $(PROTO_OUT) diff --git a/cmd/proxygenerator/interceptor.go b/cmd/proxygenerator/interceptor.go index 787b8944..86b08dec 100644 --- a/cmd/proxygenerator/interceptor.go +++ b/cmd/proxygenerator/interceptor.go @@ -550,6 +550,12 @@ func visitPayloads( if err := visitPayload(ctx, options, o, concState, &result); err != nil { return err } o.Outcome = &workflowservice.PollNexusOperationExecutionResponse_Result{Result: result} } + {{else if and (eq $type "*command.ScheduleNexusOperationCommandAttributes") (eq . "Input")}} + // System Nexus envelopes carry a proto-message request in Input whose + // own fields hold the user payloads; visitScheduleNexusOperationInput + // transparently descends into them (and falls back to visiting Input as + // a single opaque payload for ordinary Nexus operations). + if err := visitScheduleNexusOperationInput(ctx, options, concState, o); err != nil { return err } {{else}} if o.{{.}} != nil { if err := visitPayload(ctx, options, o, concState, &o.{{.}}); err != nil { return err } diff --git a/nexussystem/registry.go b/nexussystem/registry.go new file mode 100644 index 00000000..147037a6 --- /dev/null +++ b/nexussystem/registry.go @@ -0,0 +1,25 @@ +// Code generated by nex-gen. DO NOT EDIT. +package nexussystem + +import ( + "google.golang.org/protobuf/proto" + + workflowservice "go.temporal.io/api/workflowservice/v1" +) + +// Endpoint is the Nexus endpoint shared by all registered system operations. +const Endpoint = "__temporal_system" + +// InputMessage returns a freshly-allocated input proto message for the given +// Nexus system service and operation. The second return value reports whether +// the (service, operation) pair is registered. +func InputMessage(service string, operation string) (proto.Message, bool) { + switch service { + case "temporal.api.workflowservice.v1.WorkflowService": + switch operation { + case "SignalWithStartWorkflowExecution": + return &workflowservice.SignalWithStartWorkflowExecutionRequest{}, true + } + } + return nil, false +} diff --git a/nexussystem/wit/deps/nexus-temporal-types/model.wit b/nexussystem/wit/deps/nexus-temporal-types/model.wit new file mode 100644 index 00000000..c5741c8c --- /dev/null +++ b/nexussystem/wit/deps/nexus-temporal-types/model.wit @@ -0,0 +1,156 @@ +package nexus:temporal-types@1.0.0; + +interface model { + /// String-shaped placeholder for semantic types that generators reinterpret. + type placeholder = string; + + /// @nexus.proto "temporal.api.common.v1.Payload" typescript-package="@temporalio/proto" + /// @nexus.type python="typing.Any" typescript="common.Payload" typescript-package="@temporalio/common" go="any" + type payload = placeholder; + + /// @nexus.proto "temporal.api.common.v1.Payloads" + /// typescript-package="@temporalio/proto" + type payloads = list; + + /// Callable result annotation for workflow functions. + /// @nexus.type + /// python="collections.abc.Awaitable[WorkflowResult]" + /// typescript="Promise" + /// go="any" + type workflow-result = placeholder; + + /// Receiver/context argument for workflow callable method forms. + /// @nexus.type python="typing.Any" typescript="any" + type callable-prefix = placeholder; + + /// @nexus.function-args + /// varargs=true + /// param="args" + /// typescript-drop-prefix=true + workflow-call: async func(callable-prefix: callable-prefix, args: payloads) -> workflow-result; + + /// Callable result annotation for signal functions. + /// @nexus.type python="None | collections.abc.Awaitable[None]" typescript="void" go="any" + type signal-result = placeholder; + + /// @nexus.function-args + /// varargs=true + /// param="signal-args" + /// typescript-drop-prefix=true + signal-call: func(callable-prefix: callable-prefix, signal-args: payloads) -> signal-result; + + /// @nexus.proto "temporal.api.common.v1.WorkflowType" typescript-package="@temporalio/proto" + /// @nexus.type python="str" typescript="string" go="string" + type workflow-type = placeholder; + + /// @nexus.function + /// primary=true + /// signature="workflow-call" + /// args-field="input" + /// result-type-parameter="WorkflowResult" + /// alternate-type="workflow-type" + /// @nexus.add-rpc-compatible-with "workflow-type" + type workflow-function = placeholder; + + /// @nexus.function + /// signature="signal-call" + /// args-field="signal-input" + /// alternate-type="string" + /// python-converter="signal_function_to_proto" + /// typescript-converter="signalFunctionToProto" + /// @nexus.add-rpc-compatible-with "string" + /// @nexus.typescript-with-arguments + /// signature="signal-call" + /// args-field="signal-input" + /// alternate-type="string" + /// value-type="workflow.SignalDefinition" + /// args-type="Value extends workflow.SignalDefinition ? Args : never" + /// name-expr="value.name" + /// typescript-package="@temporalio/workflow" + type signal-function = placeholder; + + /// @nexus.proto "temporal.api.common.v1.RetryPolicy" typescript-package="@temporalio/proto" + /// @nexus.type + /// python="temporalio.common.RetryPolicy" + /// typescript="common.RetryPolicy" + /// typescript-package="@temporalio/common" + /// go="go.temporal.io/sdk/temporal.RetryPolicy" + type retry-policy = placeholder; + + /// @nexus.proto "temporal.api.taskqueue.v1.TaskQueue" typescript-package="@temporalio/proto" + /// @nexus.type python="str" typescript="string" go="string" + type task-queue = placeholder; + + /// @nexus.proto "temporal.api.common.v1.Memo" typescript-package="@temporalio/proto" + /// @nexus.type python="collections.abc.Mapping[str, typing.Any]" typescript="Record" go="map[string]any" + type memo = placeholder; + + /// @nexus.proto "temporal.api.common.v1.SearchAttributes" typescript-package="@temporalio/proto" + /// @nexus.type + /// python="temporalio.common.TypedSearchAttributes" + /// typescript="common.TypedSearchAttributes" + /// typescript-package="@temporalio/common" + type search-attributes = placeholder; + + /// @nexus.proto "temporal.api.common.v1.Priority" typescript-package="@temporalio/proto" + /// @nexus.type + /// python="temporalio.common.Priority" + /// typescript="common.Priority" + /// typescript-package="@temporalio/common" + /// go="go.temporal.io/sdk/temporal.Priority" + type priority = placeholder; + + /// @nexus.proto "temporal.api.workflow.v1.VersioningOverride" typescript-package="@temporalio/proto" + /// @nexus.type + /// python="temporalio.common.VersioningOverride" + /// typescript="common.VersioningOverride" + /// typescript-package="@temporalio/common" + /// go="go.temporal.io/sdk/client.VersioningOverride" + type versioning-override = placeholder; + + /// @nexus.proto "google.protobuf.Duration" typescript-package="@temporalio/proto" + /// @nexus.type + /// python="datetime.timedelta" + /// typescript="common.Duration" + /// typescript-package="@temporalio/common" + /// go="time.Duration" + type duration = placeholder; + + /// @nexus.proto "temporal.api.enums.v1.WorkflowIdReusePolicy" typescript-package="@temporalio/proto" + /// @nexus.type + /// python="temporalio.common.WorkflowIDReusePolicy" + /// typescript="common.WorkflowIdReusePolicy" + /// typescript-package="@temporalio/common" + /// go="go.temporal.io/api/enums/v1:enums.WorkflowIdReusePolicy" + enum workflow-id-reuse-policy { + allow-duplicate, + allow-duplicate-failed-only, + reject-duplicate, + terminate-if-running, + } + + /// @nexus.proto "temporal.api.enums.v1.WorkflowIdConflictPolicy" typescript-package="@temporalio/proto" + /// @nexus.type + /// python="temporalio.common.WorkflowIDConflictPolicy" + /// typescript="common.WorkflowIdConflictPolicy" + /// typescript-package="@temporalio/common" + /// go="go.temporal.io/api/enums/v1:enums.WorkflowIdConflictPolicy" + enum workflow-id-conflict-policy { + fail, + use-existing, + terminate-existing, + } + + /// @nexus.proto "temporal.api.sdk.v1.UserMetadata" typescript-package="@temporalio/proto" + /// @nexus.flatten-in-api + record user-metadata { + /// @nexus.doc "Single-line fixed summary for the workflow execution that may appear in UI and CLI. This can be in single-line Temporal Markdown format." + /// @nexus.proto-field "summary" + /// @nexus.flattened-type python="str" typescript="string" + static-summary: option, + /// @nexus.doc "General fixed details for the workflow execution that may appear in UI and CLI. This can be in Temporal Markdown format and can span multiple lines. This value is fixed on the workflow execution and cannot be updated." + /// @nexus.proto-field "details" + /// @nexus.flattened-type python="str" typescript="string" + static-details: option, + } +} diff --git a/nexussystem/wit/workflow-service.wit b/nexussystem/wit/workflow-service.wit new file mode 100644 index 00000000..bae8d430 --- /dev/null +++ b/nexussystem/wit/workflow-service.wit @@ -0,0 +1,119 @@ +package temporal:nexus@1.0.0; + +world system { + export workflow-service; +} + +/// @nexus.endpoint "__temporal_system" +/// @nexus.service-name "temporal.api.workflowservice.v1.WorkflowService" +/// @nexus.delay-load-temporalio-workflow +/// @nexus.experimental +interface workflow-service { + use nexus:temporal-types/model@1.0.0.{ + duration, + memo, + payloads, + placeholder, + priority, + retry-policy, + search-attributes, + signal-function, + task-queue, + user-metadata, + versioning-override, + workflow-function, + workflow-id-conflict-policy, + workflow-id-reuse-policy, + }; + + /// @nexus.doc "Request fields for signaling a workflow, starting it first if needed." + /// @nexus.experimental + /// @nexus.proto "temporal.api.workflowservice.v1.SignalWithStartWorkflowExecutionRequest" typescript-package="@temporalio/proto" + record signal-with-start-workflow-request { + /// @nexus.doc + /// python="Workflow type name or callable identifying the workflow to start." + /// typescript="Workflow type name or workflow function identifying the workflow to start." + /// @nexus.proto-field "workflow_type" + workflow: workflow-function, + /// @nexus.doc "Unique identifier for the workflow execution." + /// @nexus.proto-field "workflow_id" + id: string, + /// @nexus.doc "Task queue to run the workflow on." + task-queue: task-queue, + /// @nexus.doc + /// python="Signal name or callable to send with the start request." + /// typescript="Signal name or signal definition to send with the start request." + /// @nexus.proto-field "signal_name" + signal: signal-function, + /// @nexus.doc "Total workflow execution timeout, including retries and continue-as-new." + /// @nexus.proto-field "workflow_execution_timeout" + execution-timeout: option, + /// @nexus.doc "Timeout of a single workflow run." + /// @nexus.proto-field "workflow_run_timeout" + run-timeout: option, + /// @nexus.doc "Timeout of a single workflow task." + /// @nexus.proto-field "workflow_task_timeout" + task-timeout: option, + /// @nexus.omit + identity: placeholder, + /// @nexus.doc "Request ID used to deduplicate workflow start requests." + request-id: option, + /// @nexus.doc "Behavior when a closed workflow with the same ID exists. Default is allow-duplicate." + /// @nexus.proto-field "workflow_id_reuse_policy" + /// @nexus.default "allow-duplicate" + id-reuse-policy: workflow-id-reuse-policy, + /// @nexus.doc "Behavior when a workflow is currently running with the same ID. Set to use-existing for idempotent deduplication on workflow ID. Cannot be set if id-reuse-policy is terminate-if-running." + /// @nexus.proto-field "workflow_id_conflict_policy" + id-conflict-policy: option, + /// @nexus.doc "Retry policy for the workflow." + retry-policy: option, + /// @nexus.doc "Cron schedule for recurring workflow executions. See https://docs.temporal.io/cron-job." + cron-schedule: option, + /// @nexus.doc "Memo for the workflow." + memo: option, + /// @nexus.doc "Typed search attributes for the workflow." + search-attributes: option, + /// @nexus.doc "Priority of the workflow execution." + priority: option, + /// @nexus.doc "Override for workflow versioning behavior." + versioning-override: option, + /// @nexus.doc "Amount of time to wait before starting the workflow. This does not work with cron-schedule." + /// @nexus.proto-field "workflow_start_delay" + start-delay: option, + user-metadata: option, + /// @nexus.source python="workflow_namespace" typescript="workflowNamespace" go="WorkflowNamespace" + namespace: string, + /// @nexus.omit + control: placeholder, + /// @nexus.omit + header: placeholder, + /// @nexus.omit + links: placeholder, + /// @nexus.omit + time-skipping-config: placeholder, + } + + /// @nexus.experimental + /// @nexus.proto "temporal.api.workflowservice.v1.SignalWithStartWorkflowExecutionResponse" typescript-package="@temporalio/proto" + record signal-with-start-workflow-response { + run-id: option, + started: option, + /// @nexus.omit + signal-link: placeholder, + } + + /// @nexus.doc + /// "Signal a workflow, starting it first if needed." + /// returns="A workflow handle to the started workflow." + /// @nexus.output-transform + /// python-type="temporalio.workflow.ExternalWorkflowHandle[WorkflowResult]" + /// python="temporalio.workflow.get_external_workflow_handle(request.id, run_id=result.run_id)" + /// typescript-type="workflow.ExternalWorkflowHandle" + /// typescript="workflow.getExternalWorkflowHandle(request.id, result.runId ?? undefined)" + /// typescript-package="@temporalio/workflow" + /// @nexus.operation name="SignalWithStartWorkflowExecution" + /// @nexus.experimental + signal-with-start-workflow: func( + request: signal-with-start-workflow-request, + ) -> signal-with-start-workflow-response; +} diff --git a/proxy/interceptor.go b/proxy/interceptor.go index b4519197..141c3ba6 100644 --- a/proxy/interceptor.go +++ b/proxy/interceptor.go @@ -988,10 +988,12 @@ func visitPayloads( } } - if o.Input != nil { - if err := visitPayload(ctx, options, o, concState, &o.Input); err != nil { - return err - } + // System Nexus envelopes carry a proto-message request in Input whose + // own fields hold the user payloads; visitScheduleNexusOperationInput + // transparently descends into them (and falls back to visiting Input as + // a single opaque payload for ordinary Nexus operations). + if err := visitScheduleNexusOperationInput(ctx, options, concState, o); err != nil { + return err } ctx.Context = prevCtx diff --git a/proxy/system_nexus.go b/proxy/system_nexus.go new file mode 100644 index 00000000..9c8d69a5 --- /dev/null +++ b/proxy/system_nexus.go @@ -0,0 +1,129 @@ +package proxy + +import ( + "fmt" + + "go.temporal.io/api/command/v1" + "go.temporal.io/api/nexussystem" + "google.golang.org/protobuf/proto" +) + +// systemNexusServiceName is the Nexus service used for system Nexus envelopes +// routed to the workflow service. Together with nexussystem.Endpoint it +// identifies a ScheduleNexusOperationCommandAttributes whose Input is a system +// Nexus envelope. +const systemNexusServiceName = "temporal.api.workflowservice.v1.WorkflowService" + +// Payload encoding metadata used to identify the proto-binary envelope. +const ( + payloadMetadataEncodingKey = "encoding" + payloadEncodingProtoBinary = "binary/protobuf" +) + +// isSystemNexusEnvelope reports whether the given schedule-nexus-operation +// command targets the system Nexus endpoint and service, and therefore carries +// a proto-message envelope in its Input rather than an opaque user payload. +func isSystemNexusEnvelope(attrs *command.ScheduleNexusOperationCommandAttributes) bool { + if attrs == nil { + return false + } + return attrs.GetEndpoint() == nexussystem.Endpoint && + attrs.GetService() == systemNexusServiceName +} + +// visitScheduleNexusOperationInput visits the payloads referenced by a +// ScheduleNexusOperationCommandAttributes.Input field. +// +// For ordinary Nexus operations the Input is an opaque single payload and is +// visited directly. For system Nexus envelopes (endpoint __temporal_system, +// service WorkflowService) the Input is instead a proto-binary-serialized +// request message whose own fields contain the user payloads. In that case the +// envelope is deserialized, its inner payloads are visited recursively (so +// external storage and codecs apply to the inner payloads, not the envelope), +// and the message is re-serialized back into Input. The envelope itself is +// never offloaded or codec-encoded. +func visitScheduleNexusOperationInput( + ctx *VisitPayloadsContext, + options *VisitPayloadsOptions, + concState *payloadConcurrencyState, + attrs *command.ScheduleNexusOperationCommandAttributes, +) error { + if attrs.Input == nil { + return nil + } + + if !isSystemNexusEnvelope(attrs) { + // Ordinary Nexus operation: visit the Input as a single opaque payload. + return visitPayload(ctx, options, attrs, concState, &attrs.Input) + } + + return visitSystemNexusEnvelope(ctx, options, concState, attrs) +} + +// visitSystemNexusEnvelope decodes the system Nexus envelope in attrs.Input, +// visits the payloads inside the decoded request message, and re-encodes it. +func visitSystemNexusEnvelope( + ctx *VisitPayloadsContext, + options *VisitPayloadsOptions, + concState *payloadConcurrencyState, + attrs *command.ScheduleNexusOperationCommandAttributes, +) error { + msg, ok := nexussystem.InputMessage(attrs.GetService(), attrs.GetOperation()) + if !ok { + return fmt.Errorf( + "unknown system nexus operation %q for service %q", + attrs.GetOperation(), attrs.GetService(), + ) + } + + input := attrs.Input + if encoding := string(input.GetMetadata()[payloadMetadataEncodingKey]); encoding != payloadEncodingProtoBinary { + return fmt.Errorf( + "system nexus envelope for operation %q must be encoded as %q, got %q", + attrs.GetOperation(), payloadEncodingProtoBinary, encoding, + ) + } + + if err := proto.Unmarshal(input.GetData(), msg); err != nil { + return fmt.Errorf( + "failed to unmarshal system nexus envelope for operation %q: %w", + attrs.GetOperation(), err, + ) + } + + // Visit the payloads contained within the decoded request message. We pass + // the decoded message both as the parent and as the object to traverse so + // that the generated visitor descends into its payload-bearing fields. + // + // In concurrent mode the visitor may spawn goroutines that write into the + // decoded message's payload fields. We give those goroutines a sub-state + // that shares the parent semaphore (to respect the global concurrency + // limit) but has its own WaitGroup, so we can wait for exactly the + // envelope's inner-payload goroutines before re-serializing -- mirroring how + // the well-known Any visitor isolates child traversals. + envState := concState + if concState != nil { + envState = &payloadConcurrencyState{sem: concState.sem} + } + + if err := visitPayloads(ctx, options, msg, envState, msg); err != nil { + return err + } + + if envState != nil { + envState.wg.Wait() + if errPtr := envState.firstErr.Load(); errPtr != nil { + return *errPtr + } + } + + data, err := proto.Marshal(msg) + if err != nil { + return fmt.Errorf( + "failed to marshal system nexus envelope for operation %q: %w", + attrs.GetOperation(), err, + ) + } + input.Data = data + return nil +} diff --git a/proxy/system_nexus_test.go b/proxy/system_nexus_test.go new file mode 100644 index 00000000..3599fdd3 --- /dev/null +++ b/proxy/system_nexus_test.go @@ -0,0 +1,216 @@ +package proxy + +import ( + "context" + "sort" + "sync" + "testing" + + "github.com/stretchr/testify/require" + command "go.temporal.io/api/command/v1" + common "go.temporal.io/api/common/v1" + sdk "go.temporal.io/api/sdk/v1" + workflowservice "go.temporal.io/api/workflowservice/v1" + "google.golang.org/protobuf/proto" +) + +const ( + systemNexusEndpoint = "__temporal_system" + systemNexusService = "temporal.api.workflowservice.v1.WorkflowService" + systemNexusSignalOp = "SignalWithStartWorkflowExecution" + protoBinaryEncoding = "binary/protobuf" + jsonProtoEncoding = "json/protobuf" + collectingVisitorTag = "visited-" +) + +// signalWithStartEnvelope builds a SignalWithStartWorkflowExecutionRequest with +// a payload in every payload-bearing field, marshals it proto-binary, and wraps +// it in a *common.Payload with the given encoding metadata. +func signalWithStartEnvelope(t *testing.T, encoding string) *common.Payload { + t.Helper() + req := &workflowservice.SignalWithStartWorkflowExecutionRequest{ + Namespace: "default", + WorkflowId: "wf-id", + SignalName: "my-signal", + Input: &common.Payloads{Payloads: []*common.Payload{ + {Data: []byte("workflow-input")}, + }}, + SignalInput: &common.Payloads{Payloads: []*common.Payload{ + {Data: []byte("signal-input")}, + }}, + Memo: &common.Memo{Fields: map[string]*common.Payload{ + "memo-key": {Data: []byte("memo-value")}, + }}, + Header: &common.Header{Fields: map[string]*common.Payload{ + "header-key": {Data: []byte("header-value")}, + }}, + SearchAttributes: &common.SearchAttributes{IndexedFields: map[string]*common.Payload{ + "sa-key": {Data: []byte("sa-value")}, + }}, + UserMetadata: &sdk.UserMetadata{ + Summary: &common.Payload{Data: []byte("summary-value")}, + Details: &common.Payload{Data: []byte("details-value")}, + }, + } + data, err := proto.Marshal(req) + require.NoError(t, err) + return &common.Payload{ + Metadata: map[string][]byte{"encoding": []byte(encoding)}, + Data: data, + } +} + +func scheduleSystemNexusCommand(input *common.Payload) *command.ScheduleNexusOperationCommandAttributes { + return &command.ScheduleNexusOperationCommandAttributes{ + Endpoint: systemNexusEndpoint, + Service: systemNexusService, + Operation: systemNexusSignalOp, + Input: input, + } +} + +// collectVisitor records the data of every payload it sees and rewrites each to +// "visited-" so callers can confirm write-back. +func collectVisitor(seen *[]string, mu *sync.Mutex) func(*VisitPayloadsContext, []*common.Payload) ([]*common.Payload, error) { + return func(_ *VisitPayloadsContext, p []*common.Payload) ([]*common.Payload, error) { + out := make([]*common.Payload, len(p)) + for i, pl := range p { + mu.Lock() + *seen = append(*seen, string(pl.Data)) + mu.Unlock() + out[i] = &common.Payload{Data: append([]byte(collectingVisitorTag), pl.Data...)} + } + return out, nil + } +} + +func decodeEnvelope(t *testing.T, input *common.Payload) *workflowservice.SignalWithStartWorkflowExecutionRequest { + t.Helper() + require.Equal(t, []byte(protoBinaryEncoding), input.GetMetadata()["encoding"]) + var req workflowservice.SignalWithStartWorkflowExecutionRequest + require.NoError(t, proto.Unmarshal(input.GetData(), &req)) + return &req +} + +func TestSystemNexusEnvelopeVisitsInnerPayloads(t *testing.T) { + attrs := scheduleSystemNexusCommand(signalWithStartEnvelope(t, protoBinaryEncoding)) + + var seen []string + var mu sync.Mutex + err := VisitPayloads(context.Background(), &command.Command{ + Attributes: &command.Command_ScheduleNexusOperationCommandAttributes{ + ScheduleNexusOperationCommandAttributes: attrs, + }, + }, VisitPayloadsOptions{ + Visitor: collectVisitor(&seen, &mu), + }) + require.NoError(t, err) + + sort.Strings(seen) + require.Equal(t, []string{ + "details-value", + "header-value", + "memo-value", + "sa-value", + "signal-input", + "summary-value", + "workflow-input", + }, seen) + + // The envelope itself must remain a proto-binary payload and round-trip, + // with the inner payloads rewritten by the visitor. + req := decodeEnvelope(t, attrs.Input) + require.Equal(t, []byte("visited-workflow-input"), req.Input.Payloads[0].Data) + require.Equal(t, []byte("visited-signal-input"), req.SignalInput.Payloads[0].Data) + require.Equal(t, []byte("visited-memo-value"), req.Memo.Fields["memo-key"].Data) + require.Equal(t, []byte("visited-header-value"), req.Header.Fields["header-key"].Data) + require.Equal(t, []byte("visited-sa-value"), req.SearchAttributes.IndexedFields["sa-key"].Data) + require.Equal(t, []byte("visited-summary-value"), req.UserMetadata.Summary.Data) + require.Equal(t, []byte("visited-details-value"), req.UserMetadata.Details.Data) +} + +func TestSystemNexusEnvelopeVisitsInnerPayloadsConcurrent(t *testing.T) { + attrs := scheduleSystemNexusCommand(signalWithStartEnvelope(t, protoBinaryEncoding)) + + var seen []string + var mu sync.Mutex + err := VisitPayloads(context.Background(), &command.Command{ + Attributes: &command.Command_ScheduleNexusOperationCommandAttributes{ + ScheduleNexusOperationCommandAttributes: attrs, + }, + }, VisitPayloadsOptions{ + ConcurrencyLimit: 4, + Visitor: collectVisitor(&seen, &mu), + }) + require.NoError(t, err) + + require.Len(t, seen, 7) + req := decodeEnvelope(t, attrs.Input) + require.Equal(t, []byte("visited-workflow-input"), req.Input.Payloads[0].Data) + require.Equal(t, []byte("visited-details-value"), req.UserMetadata.Details.Data) +} + +func TestSystemNexusEnvelopeRejectsNonProtoBinaryEncoding(t *testing.T) { + attrs := scheduleSystemNexusCommand(signalWithStartEnvelope(t, jsonProtoEncoding)) + + err := VisitPayloads(context.Background(), &command.Command{ + Attributes: &command.Command_ScheduleNexusOperationCommandAttributes{ + ScheduleNexusOperationCommandAttributes: attrs, + }, + }, VisitPayloadsOptions{ + Visitor: func(_ *VisitPayloadsContext, p []*common.Payload) ([]*common.Payload, error) { + return p, nil + }, + }) + require.Error(t, err) + require.ErrorContains(t, err, "binary/protobuf") +} + +func TestSystemNexusEnvelopeRejectsUnknownOperation(t *testing.T) { + attrs := scheduleSystemNexusCommand(signalWithStartEnvelope(t, protoBinaryEncoding)) + attrs.Operation = "NoSuchOperation" + + err := VisitPayloads(context.Background(), &command.Command{ + Attributes: &command.Command_ScheduleNexusOperationCommandAttributes{ + ScheduleNexusOperationCommandAttributes: attrs, + }, + }, VisitPayloadsOptions{ + Visitor: func(_ *VisitPayloadsContext, p []*common.Payload) ([]*common.Payload, error) { + return p, nil + }, + }) + require.Error(t, err) + require.ErrorContains(t, err, "unknown system nexus operation") +} + +// TestNonSystemNexusInputVisitedAsSinglePayload confirms that an ordinary (non +// system) Nexus operation continues to have its Input visited as one opaque +// payload rather than being decoded as an envelope. +func TestNonSystemNexusInputVisitedAsSinglePayload(t *testing.T) { + attrs := &command.ScheduleNexusOperationCommandAttributes{ + Endpoint: "my-endpoint", + Service: "my.Service", + Operation: "DoThing", + Input: &common.Payload{Data: []byte("user-payload")}, + } + + var seen []string + var single bool + err := VisitPayloads(context.Background(), &command.Command{ + Attributes: &command.Command_ScheduleNexusOperationCommandAttributes{ + ScheduleNexusOperationCommandAttributes: attrs, + }, + }, VisitPayloadsOptions{ + Visitor: func(ctx *VisitPayloadsContext, p []*common.Payload) ([]*common.Payload, error) { + single = ctx.SinglePayloadRequired + for _, pl := range p { + seen = append(seen, string(pl.Data)) + } + return p, nil + }, + }) + require.NoError(t, err) + require.True(t, single, "ordinary Nexus Input should be visited as a single payload") + require.Equal(t, []string{"user-payload"}, seen) + require.Equal(t, []byte("user-payload"), attrs.Input.Data) +} From c9ba01236500b402fd29ba2e771236431f183837 Mon Sep 17 00:00:00 2001 From: Dan Plyukhin Date: Wed, 24 Jun 2026 16:02:51 -0400 Subject: [PATCH 2/4] Decode system Nexus envelopes via type metadata; drop registry Simplify system Nexus envelope handling per review: - Identify a system Nexus envelope solely by the reserved __temporal_system endpoint (all operations on that endpoint are system Nexus operations), instead of also matching the service name. - Decode the envelope's proto message using the payload's "messageType" metadata via protoregistry.GlobalTypes, removing the need for the generated (service, operation) -> input-type registry. The nexussystem package, its checked-in WIT, and the nexus-system-registry Make target are deleted. - Inline the system-vs-ordinary Input dispatch directly into the generated visitPayloads (the ScheduleNexusOperationCommandAttributes case) rather than a separate helper function; visitSystemNexusEnvelope remains as the envelope decode/visit/encode helper. Envelopes still must be binary/protobuf; missing/unknown message types error. --- Makefile | 29 +--- cmd/proxygenerator/interceptor.go | 16 +- nexussystem/registry.go | 25 --- .../wit/deps/nexus-temporal-types/model.wit | 156 ------------------ nexussystem/wit/workflow-service.wit | 119 ------------- proxy/interceptor.go | 20 ++- proxy/system_nexus.go | 90 +++++----- proxy/system_nexus_test.go | 44 ++++- 8 files changed, 101 insertions(+), 398 deletions(-) delete mode 100644 nexussystem/registry.go delete mode 100644 nexussystem/wit/deps/nexus-temporal-types/model.wit delete mode 100644 nexussystem/wit/workflow-service.wit diff --git a/Makefile b/Makefile index 1307664c..4ec09ae0 100644 --- a/Makefile +++ b/Makefile @@ -97,37 +97,10 @@ grpc-mock: go run ./cmd/mockgen-fix WorkflowService workflowservicemock/v1/service_grpc.pb.mock.go .PHONY: proxy -proxy: gen-proto-desc nexus-system-registry +proxy: gen-proto-desc printf $(COLOR) "Generate proxy code..." (cd ./cmd/proxygenerator && go mod tidy && go run ./) -##### Nexus system operation registry ##### -# The registry maps (service, operation) -> input proto message for system -# Nexus envelopes. It is generated by nex-gen from a checked-in WIT and consumed -# by the proxy payload visitor. nex-gen is invoked via the NEX_GEN_BIN env var -# (path to the built binary) or, if unset, the `nex-gen` binary on PATH. -NEXUS_SYSTEM_DIR := nexussystem -NEXUS_SYSTEM_WIT := $(NEXUS_SYSTEM_DIR)/wit/workflow-service.wit -NEXUS_SYSTEM_WIT_DEPS := $(NEXUS_SYSTEM_DIR)/wit/deps -NEX_GEN ?= $(if $(NEX_GEN_BIN),$(NEX_GEN_BIN),nex-gen) - -.PHONY: nexus-system-registry -nexus-system-registry: gen-proto-desc - printf $(COLOR) "Generate Nexus system operation registry..." - rm -rf $(NEXUS_SYSTEM_DIR)/gen - $(NEX_GEN) generate \ - --lang go \ - --input $(NEXUS_SYSTEM_WIT) \ - --input $(NEXUS_SYSTEM_WIT_DEPS) \ - --descriptors $(PROTO_OUT)/descriptor_set.pb \ - --output $(NEXUS_SYSTEM_DIR)/gen - # Keep only the dependency-free registry; rewrite its package to the - # top-level nexussystem package and check it in as registry.go. - sed 's/^package registry$$/package nexussystem/' \ - $(NEXUS_SYSTEM_DIR)/gen/registry/registry.go > $(NEXUS_SYSTEM_DIR)/registry.go - rm -rf $(NEXUS_SYSTEM_DIR)/gen - gofmt -w $(NEXUS_SYSTEM_DIR)/registry.go - goimports: printf $(COLOR) "Run goimports..." goimports -w $(PROTO_OUT) diff --git a/cmd/proxygenerator/interceptor.go b/cmd/proxygenerator/interceptor.go index 86b08dec..7a75768b 100644 --- a/cmd/proxygenerator/interceptor.go +++ b/cmd/proxygenerator/interceptor.go @@ -551,11 +551,17 @@ func visitPayloads( o.Outcome = &workflowservice.PollNexusOperationExecutionResponse_Result{Result: result} } {{else if and (eq $type "*command.ScheduleNexusOperationCommandAttributes") (eq . "Input")}} - // System Nexus envelopes carry a proto-message request in Input whose - // own fields hold the user payloads; visitScheduleNexusOperationInput - // transparently descends into them (and falls back to visiting Input as - // a single opaque payload for ordinary Nexus operations). - if err := visitScheduleNexusOperationInput(ctx, options, concState, o); err != nil { return err } + if o.Input != nil { + if isSystemNexusEnvelope(o) { + // System Nexus envelopes carry a proto-message request in Input + // whose own fields hold the user payloads; descend into them + // without offloading or codec-encoding the envelope itself. + if err := visitSystemNexusEnvelope(ctx, options, concState, o); err != nil { return err } + } else { + // Ordinary Nexus operation: visit Input as a single opaque payload. + if err := visitPayload(ctx, options, o, concState, &o.Input); err != nil { return err } + } + } {{else}} if o.{{.}} != nil { if err := visitPayload(ctx, options, o, concState, &o.{{.}}); err != nil { return err } diff --git a/nexussystem/registry.go b/nexussystem/registry.go deleted file mode 100644 index 147037a6..00000000 --- a/nexussystem/registry.go +++ /dev/null @@ -1,25 +0,0 @@ -// Code generated by nex-gen. DO NOT EDIT. -package nexussystem - -import ( - "google.golang.org/protobuf/proto" - - workflowservice "go.temporal.io/api/workflowservice/v1" -) - -// Endpoint is the Nexus endpoint shared by all registered system operations. -const Endpoint = "__temporal_system" - -// InputMessage returns a freshly-allocated input proto message for the given -// Nexus system service and operation. The second return value reports whether -// the (service, operation) pair is registered. -func InputMessage(service string, operation string) (proto.Message, bool) { - switch service { - case "temporal.api.workflowservice.v1.WorkflowService": - switch operation { - case "SignalWithStartWorkflowExecution": - return &workflowservice.SignalWithStartWorkflowExecutionRequest{}, true - } - } - return nil, false -} diff --git a/nexussystem/wit/deps/nexus-temporal-types/model.wit b/nexussystem/wit/deps/nexus-temporal-types/model.wit deleted file mode 100644 index c5741c8c..00000000 --- a/nexussystem/wit/deps/nexus-temporal-types/model.wit +++ /dev/null @@ -1,156 +0,0 @@ -package nexus:temporal-types@1.0.0; - -interface model { - /// String-shaped placeholder for semantic types that generators reinterpret. - type placeholder = string; - - /// @nexus.proto "temporal.api.common.v1.Payload" typescript-package="@temporalio/proto" - /// @nexus.type python="typing.Any" typescript="common.Payload" typescript-package="@temporalio/common" go="any" - type payload = placeholder; - - /// @nexus.proto "temporal.api.common.v1.Payloads" - /// typescript-package="@temporalio/proto" - type payloads = list; - - /// Callable result annotation for workflow functions. - /// @nexus.type - /// python="collections.abc.Awaitable[WorkflowResult]" - /// typescript="Promise" - /// go="any" - type workflow-result = placeholder; - - /// Receiver/context argument for workflow callable method forms. - /// @nexus.type python="typing.Any" typescript="any" - type callable-prefix = placeholder; - - /// @nexus.function-args - /// varargs=true - /// param="args" - /// typescript-drop-prefix=true - workflow-call: async func(callable-prefix: callable-prefix, args: payloads) -> workflow-result; - - /// Callable result annotation for signal functions. - /// @nexus.type python="None | collections.abc.Awaitable[None]" typescript="void" go="any" - type signal-result = placeholder; - - /// @nexus.function-args - /// varargs=true - /// param="signal-args" - /// typescript-drop-prefix=true - signal-call: func(callable-prefix: callable-prefix, signal-args: payloads) -> signal-result; - - /// @nexus.proto "temporal.api.common.v1.WorkflowType" typescript-package="@temporalio/proto" - /// @nexus.type python="str" typescript="string" go="string" - type workflow-type = placeholder; - - /// @nexus.function - /// primary=true - /// signature="workflow-call" - /// args-field="input" - /// result-type-parameter="WorkflowResult" - /// alternate-type="workflow-type" - /// @nexus.add-rpc-compatible-with "workflow-type" - type workflow-function = placeholder; - - /// @nexus.function - /// signature="signal-call" - /// args-field="signal-input" - /// alternate-type="string" - /// python-converter="signal_function_to_proto" - /// typescript-converter="signalFunctionToProto" - /// @nexus.add-rpc-compatible-with "string" - /// @nexus.typescript-with-arguments - /// signature="signal-call" - /// args-field="signal-input" - /// alternate-type="string" - /// value-type="workflow.SignalDefinition" - /// args-type="Value extends workflow.SignalDefinition ? Args : never" - /// name-expr="value.name" - /// typescript-package="@temporalio/workflow" - type signal-function = placeholder; - - /// @nexus.proto "temporal.api.common.v1.RetryPolicy" typescript-package="@temporalio/proto" - /// @nexus.type - /// python="temporalio.common.RetryPolicy" - /// typescript="common.RetryPolicy" - /// typescript-package="@temporalio/common" - /// go="go.temporal.io/sdk/temporal.RetryPolicy" - type retry-policy = placeholder; - - /// @nexus.proto "temporal.api.taskqueue.v1.TaskQueue" typescript-package="@temporalio/proto" - /// @nexus.type python="str" typescript="string" go="string" - type task-queue = placeholder; - - /// @nexus.proto "temporal.api.common.v1.Memo" typescript-package="@temporalio/proto" - /// @nexus.type python="collections.abc.Mapping[str, typing.Any]" typescript="Record" go="map[string]any" - type memo = placeholder; - - /// @nexus.proto "temporal.api.common.v1.SearchAttributes" typescript-package="@temporalio/proto" - /// @nexus.type - /// python="temporalio.common.TypedSearchAttributes" - /// typescript="common.TypedSearchAttributes" - /// typescript-package="@temporalio/common" - type search-attributes = placeholder; - - /// @nexus.proto "temporal.api.common.v1.Priority" typescript-package="@temporalio/proto" - /// @nexus.type - /// python="temporalio.common.Priority" - /// typescript="common.Priority" - /// typescript-package="@temporalio/common" - /// go="go.temporal.io/sdk/temporal.Priority" - type priority = placeholder; - - /// @nexus.proto "temporal.api.workflow.v1.VersioningOverride" typescript-package="@temporalio/proto" - /// @nexus.type - /// python="temporalio.common.VersioningOverride" - /// typescript="common.VersioningOverride" - /// typescript-package="@temporalio/common" - /// go="go.temporal.io/sdk/client.VersioningOverride" - type versioning-override = placeholder; - - /// @nexus.proto "google.protobuf.Duration" typescript-package="@temporalio/proto" - /// @nexus.type - /// python="datetime.timedelta" - /// typescript="common.Duration" - /// typescript-package="@temporalio/common" - /// go="time.Duration" - type duration = placeholder; - - /// @nexus.proto "temporal.api.enums.v1.WorkflowIdReusePolicy" typescript-package="@temporalio/proto" - /// @nexus.type - /// python="temporalio.common.WorkflowIDReusePolicy" - /// typescript="common.WorkflowIdReusePolicy" - /// typescript-package="@temporalio/common" - /// go="go.temporal.io/api/enums/v1:enums.WorkflowIdReusePolicy" - enum workflow-id-reuse-policy { - allow-duplicate, - allow-duplicate-failed-only, - reject-duplicate, - terminate-if-running, - } - - /// @nexus.proto "temporal.api.enums.v1.WorkflowIdConflictPolicy" typescript-package="@temporalio/proto" - /// @nexus.type - /// python="temporalio.common.WorkflowIDConflictPolicy" - /// typescript="common.WorkflowIdConflictPolicy" - /// typescript-package="@temporalio/common" - /// go="go.temporal.io/api/enums/v1:enums.WorkflowIdConflictPolicy" - enum workflow-id-conflict-policy { - fail, - use-existing, - terminate-existing, - } - - /// @nexus.proto "temporal.api.sdk.v1.UserMetadata" typescript-package="@temporalio/proto" - /// @nexus.flatten-in-api - record user-metadata { - /// @nexus.doc "Single-line fixed summary for the workflow execution that may appear in UI and CLI. This can be in single-line Temporal Markdown format." - /// @nexus.proto-field "summary" - /// @nexus.flattened-type python="str" typescript="string" - static-summary: option, - /// @nexus.doc "General fixed details for the workflow execution that may appear in UI and CLI. This can be in Temporal Markdown format and can span multiple lines. This value is fixed on the workflow execution and cannot be updated." - /// @nexus.proto-field "details" - /// @nexus.flattened-type python="str" typescript="string" - static-details: option, - } -} diff --git a/nexussystem/wit/workflow-service.wit b/nexussystem/wit/workflow-service.wit deleted file mode 100644 index bae8d430..00000000 --- a/nexussystem/wit/workflow-service.wit +++ /dev/null @@ -1,119 +0,0 @@ -package temporal:nexus@1.0.0; - -world system { - export workflow-service; -} - -/// @nexus.endpoint "__temporal_system" -/// @nexus.service-name "temporal.api.workflowservice.v1.WorkflowService" -/// @nexus.delay-load-temporalio-workflow -/// @nexus.experimental -interface workflow-service { - use nexus:temporal-types/model@1.0.0.{ - duration, - memo, - payloads, - placeholder, - priority, - retry-policy, - search-attributes, - signal-function, - task-queue, - user-metadata, - versioning-override, - workflow-function, - workflow-id-conflict-policy, - workflow-id-reuse-policy, - }; - - /// @nexus.doc "Request fields for signaling a workflow, starting it first if needed." - /// @nexus.experimental - /// @nexus.proto "temporal.api.workflowservice.v1.SignalWithStartWorkflowExecutionRequest" typescript-package="@temporalio/proto" - record signal-with-start-workflow-request { - /// @nexus.doc - /// python="Workflow type name or callable identifying the workflow to start." - /// typescript="Workflow type name or workflow function identifying the workflow to start." - /// @nexus.proto-field "workflow_type" - workflow: workflow-function, - /// @nexus.doc "Unique identifier for the workflow execution." - /// @nexus.proto-field "workflow_id" - id: string, - /// @nexus.doc "Task queue to run the workflow on." - task-queue: task-queue, - /// @nexus.doc - /// python="Signal name or callable to send with the start request." - /// typescript="Signal name or signal definition to send with the start request." - /// @nexus.proto-field "signal_name" - signal: signal-function, - /// @nexus.doc "Total workflow execution timeout, including retries and continue-as-new." - /// @nexus.proto-field "workflow_execution_timeout" - execution-timeout: option, - /// @nexus.doc "Timeout of a single workflow run." - /// @nexus.proto-field "workflow_run_timeout" - run-timeout: option, - /// @nexus.doc "Timeout of a single workflow task." - /// @nexus.proto-field "workflow_task_timeout" - task-timeout: option, - /// @nexus.omit - identity: placeholder, - /// @nexus.doc "Request ID used to deduplicate workflow start requests." - request-id: option, - /// @nexus.doc "Behavior when a closed workflow with the same ID exists. Default is allow-duplicate." - /// @nexus.proto-field "workflow_id_reuse_policy" - /// @nexus.default "allow-duplicate" - id-reuse-policy: workflow-id-reuse-policy, - /// @nexus.doc "Behavior when a workflow is currently running with the same ID. Set to use-existing for idempotent deduplication on workflow ID. Cannot be set if id-reuse-policy is terminate-if-running." - /// @nexus.proto-field "workflow_id_conflict_policy" - id-conflict-policy: option, - /// @nexus.doc "Retry policy for the workflow." - retry-policy: option, - /// @nexus.doc "Cron schedule for recurring workflow executions. See https://docs.temporal.io/cron-job." - cron-schedule: option, - /// @nexus.doc "Memo for the workflow." - memo: option, - /// @nexus.doc "Typed search attributes for the workflow." - search-attributes: option, - /// @nexus.doc "Priority of the workflow execution." - priority: option, - /// @nexus.doc "Override for workflow versioning behavior." - versioning-override: option, - /// @nexus.doc "Amount of time to wait before starting the workflow. This does not work with cron-schedule." - /// @nexus.proto-field "workflow_start_delay" - start-delay: option, - user-metadata: option, - /// @nexus.source python="workflow_namespace" typescript="workflowNamespace" go="WorkflowNamespace" - namespace: string, - /// @nexus.omit - control: placeholder, - /// @nexus.omit - header: placeholder, - /// @nexus.omit - links: placeholder, - /// @nexus.omit - time-skipping-config: placeholder, - } - - /// @nexus.experimental - /// @nexus.proto "temporal.api.workflowservice.v1.SignalWithStartWorkflowExecutionResponse" typescript-package="@temporalio/proto" - record signal-with-start-workflow-response { - run-id: option, - started: option, - /// @nexus.omit - signal-link: placeholder, - } - - /// @nexus.doc - /// "Signal a workflow, starting it first if needed." - /// returns="A workflow handle to the started workflow." - /// @nexus.output-transform - /// python-type="temporalio.workflow.ExternalWorkflowHandle[WorkflowResult]" - /// python="temporalio.workflow.get_external_workflow_handle(request.id, run_id=result.run_id)" - /// typescript-type="workflow.ExternalWorkflowHandle" - /// typescript="workflow.getExternalWorkflowHandle(request.id, result.runId ?? undefined)" - /// typescript-package="@temporalio/workflow" - /// @nexus.operation name="SignalWithStartWorkflowExecution" - /// @nexus.experimental - signal-with-start-workflow: func( - request: signal-with-start-workflow-request, - ) -> signal-with-start-workflow-response; -} diff --git a/proxy/interceptor.go b/proxy/interceptor.go index 141c3ba6..e3bd5860 100644 --- a/proxy/interceptor.go +++ b/proxy/interceptor.go @@ -988,12 +988,20 @@ func visitPayloads( } } - // System Nexus envelopes carry a proto-message request in Input whose - // own fields hold the user payloads; visitScheduleNexusOperationInput - // transparently descends into them (and falls back to visiting Input as - // a single opaque payload for ordinary Nexus operations). - if err := visitScheduleNexusOperationInput(ctx, options, concState, o); err != nil { - return err + if o.Input != nil { + if isSystemNexusEnvelope(o) { + // System Nexus envelopes carry a proto-message request in Input + // whose own fields hold the user payloads; descend into them + // without offloading or codec-encoding the envelope itself. + if err := visitSystemNexusEnvelope(ctx, options, concState, o); err != nil { + return err + } + } else { + // Ordinary Nexus operation: visit Input as a single opaque payload. + if err := visitPayload(ctx, options, o, concState, &o.Input); err != nil { + return err + } + } } ctx.Context = prevCtx diff --git a/proxy/system_nexus.go b/proxy/system_nexus.go index 9c8d69a5..51d79bfc 100644 --- a/proxy/system_nexus.go +++ b/proxy/system_nexus.go @@ -4,85 +4,71 @@ import ( "fmt" "go.temporal.io/api/command/v1" - "go.temporal.io/api/nexussystem" "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/reflect/protoreflect" + "google.golang.org/protobuf/reflect/protoregistry" ) -// systemNexusServiceName is the Nexus service used for system Nexus envelopes -// routed to the workflow service. Together with nexussystem.Endpoint it -// identifies a ScheduleNexusOperationCommandAttributes whose Input is a system -// Nexus envelope. -const systemNexusServiceName = "temporal.api.workflowservice.v1.WorkflowService" +// systemNexusEndpoint is the reserved Nexus endpoint used for system Nexus +// envelopes. Every operation routed to this endpoint carries a proto-message +// request in its ScheduleNexusOperationCommandAttributes.Input rather than an +// opaque user payload. +const systemNexusEndpoint = "__temporal_system" -// Payload encoding metadata used to identify the proto-binary envelope. +// Payload encoding/type metadata used to decode the proto-binary envelope. const ( - payloadMetadataEncodingKey = "encoding" - payloadEncodingProtoBinary = "binary/protobuf" + payloadMetadataEncodingKey = "encoding" + payloadMetadataMessageTypeKey = "messageType" + payloadEncodingProtoBinary = "binary/protobuf" ) // isSystemNexusEnvelope reports whether the given schedule-nexus-operation -// command targets the system Nexus endpoint and service, and therefore carries -// a proto-message envelope in its Input rather than an opaque user payload. +// command targets the system Nexus endpoint, and therefore carries a +// proto-message envelope in its Input rather than an opaque user payload. All +// operations on the system endpoint are system Nexus operations. func isSystemNexusEnvelope(attrs *command.ScheduleNexusOperationCommandAttributes) bool { - if attrs == nil { - return false - } - return attrs.GetEndpoint() == nexussystem.Endpoint && - attrs.GetService() == systemNexusServiceName + return attrs != nil && attrs.GetEndpoint() == systemNexusEndpoint } -// visitScheduleNexusOperationInput visits the payloads referenced by a -// ScheduleNexusOperationCommandAttributes.Input field. +// visitSystemNexusEnvelope decodes the system Nexus envelope in attrs.Input, +// visits the payloads inside the decoded request message, and re-encodes it. // -// For ordinary Nexus operations the Input is an opaque single payload and is -// visited directly. For system Nexus envelopes (endpoint __temporal_system, -// service WorkflowService) the Input is instead a proto-binary-serialized -// request message whose own fields contain the user payloads. In that case the -// envelope is deserialized, its inner payloads are visited recursively (so -// external storage and codecs apply to the inner payloads, not the envelope), -// and the message is re-serialized back into Input. The envelope itself is -// never offloaded or codec-encoded. -func visitScheduleNexusOperationInput( +// The envelope's proto message type is taken from the payload's "messageType" +// metadata, so no operation registry is required. The envelope must be encoded +// as binary/protobuf. The inner payloads (and only those) are passed to the +// visitor, so external storage and codecs apply to them and not to the envelope +// itself, which is never offloaded or codec-encoded. +func visitSystemNexusEnvelope( ctx *VisitPayloadsContext, options *VisitPayloadsOptions, concState *payloadConcurrencyState, attrs *command.ScheduleNexusOperationCommandAttributes, ) error { - if attrs.Input == nil { - return nil - } + input := attrs.Input - if !isSystemNexusEnvelope(attrs) { - // Ordinary Nexus operation: visit the Input as a single opaque payload. - return visitPayload(ctx, options, attrs, concState, &attrs.Input) + if encoding := string(input.GetMetadata()[payloadMetadataEncodingKey]); encoding != payloadEncodingProtoBinary { + return fmt.Errorf( + "system nexus envelope for operation %q must be encoded as %q, got %q", + attrs.GetOperation(), payloadEncodingProtoBinary, encoding, + ) } - return visitSystemNexusEnvelope(ctx, options, concState, attrs) -} - -// visitSystemNexusEnvelope decodes the system Nexus envelope in attrs.Input, -// visits the payloads inside the decoded request message, and re-encodes it. -func visitSystemNexusEnvelope( - ctx *VisitPayloadsContext, - options *VisitPayloadsOptions, - concState *payloadConcurrencyState, - attrs *command.ScheduleNexusOperationCommandAttributes, -) error { - msg, ok := nexussystem.InputMessage(attrs.GetService(), attrs.GetOperation()) - if !ok { + messageType := string(input.GetMetadata()[payloadMetadataMessageTypeKey]) + if messageType == "" { return fmt.Errorf( - "unknown system nexus operation %q for service %q", - attrs.GetOperation(), attrs.GetService(), + "system nexus envelope for operation %q is missing the %q metadata", + attrs.GetOperation(), payloadMetadataMessageTypeKey, ) } - input := attrs.Input - if encoding := string(input.GetMetadata()[payloadMetadataEncodingKey]); encoding != payloadEncodingProtoBinary { + mt, err := protoregistry.GlobalTypes.FindMessageByName(protoreflect.FullName(messageType)) + if err != nil { return fmt.Errorf( - "system nexus envelope for operation %q must be encoded as %q, got %q", - attrs.GetOperation(), payloadEncodingProtoBinary, encoding, + "system nexus envelope for operation %q references unknown message type %q: %w", + attrs.GetOperation(), messageType, err, ) } + msg := mt.New().Interface() if err := proto.Unmarshal(input.GetData(), msg); err != nil { return fmt.Errorf( diff --git a/proxy/system_nexus_test.go b/proxy/system_nexus_test.go index 3599fdd3..3f9f90a4 100644 --- a/proxy/system_nexus_test.go +++ b/proxy/system_nexus_test.go @@ -15,18 +15,25 @@ import ( ) const ( - systemNexusEndpoint = "__temporal_system" systemNexusService = "temporal.api.workflowservice.v1.WorkflowService" systemNexusSignalOp = "SignalWithStartWorkflowExecution" protoBinaryEncoding = "binary/protobuf" jsonProtoEncoding = "json/protobuf" collectingVisitorTag = "visited-" + signalWithStartType = "temporal.api.workflowservice.v1.SignalWithStartWorkflowExecutionRequest" ) // signalWithStartEnvelope builds a SignalWithStartWorkflowExecutionRequest with // a payload in every payload-bearing field, marshals it proto-binary, and wraps -// it in a *common.Payload with the given encoding metadata. +// it in a *common.Payload with the given encoding and messageType metadata. func signalWithStartEnvelope(t *testing.T, encoding string) *common.Payload { + t.Helper() + return signalWithStartEnvelopeWithType(t, encoding, signalWithStartType) +} + +// signalWithStartEnvelopeWithType is like signalWithStartEnvelope but allows +// overriding the messageType metadata (e.g. to an unknown type). +func signalWithStartEnvelopeWithType(t *testing.T, encoding, messageType string) *common.Payload { t.Helper() req := &workflowservice.SignalWithStartWorkflowExecutionRequest{ Namespace: "default", @@ -54,8 +61,12 @@ func signalWithStartEnvelope(t *testing.T, encoding string) *common.Payload { } data, err := proto.Marshal(req) require.NoError(t, err) + metadata := map[string][]byte{"encoding": []byte(encoding)} + if messageType != "" { + metadata["messageType"] = []byte(messageType) + } return &common.Payload{ - Metadata: map[string][]byte{"encoding": []byte(encoding)}, + Metadata: metadata, Data: data, } } @@ -166,9 +177,28 @@ func TestSystemNexusEnvelopeRejectsNonProtoBinaryEncoding(t *testing.T) { require.ErrorContains(t, err, "binary/protobuf") } -func TestSystemNexusEnvelopeRejectsUnknownOperation(t *testing.T) { - attrs := scheduleSystemNexusCommand(signalWithStartEnvelope(t, protoBinaryEncoding)) - attrs.Operation = "NoSuchOperation" +func TestSystemNexusEnvelopeRejectsUnknownMessageType(t *testing.T) { + attrs := scheduleSystemNexusCommand( + signalWithStartEnvelopeWithType(t, protoBinaryEncoding, "temporal.api.workflowservice.v1.NoSuchMessage"), + ) + + err := VisitPayloads(context.Background(), &command.Command{ + Attributes: &command.Command_ScheduleNexusOperationCommandAttributes{ + ScheduleNexusOperationCommandAttributes: attrs, + }, + }, VisitPayloadsOptions{ + Visitor: func(_ *VisitPayloadsContext, p []*common.Payload) ([]*common.Payload, error) { + return p, nil + }, + }) + require.Error(t, err) + require.ErrorContains(t, err, "unknown message type") +} + +func TestSystemNexusEnvelopeRejectsMissingMessageType(t *testing.T) { + attrs := scheduleSystemNexusCommand( + signalWithStartEnvelopeWithType(t, protoBinaryEncoding, ""), + ) err := VisitPayloads(context.Background(), &command.Command{ Attributes: &command.Command_ScheduleNexusOperationCommandAttributes{ @@ -180,7 +210,7 @@ func TestSystemNexusEnvelopeRejectsUnknownOperation(t *testing.T) { }, }) require.Error(t, err) - require.ErrorContains(t, err, "unknown system nexus operation") + require.ErrorContains(t, err, "missing") } // TestNonSystemNexusInputVisitedAsSinglePayload confirms that an ordinary (non From 17fa2742c31be1845f645b8795ea8081148a7e29 Mon Sep 17 00:00:00 2001 From: Dan Plyukhin Date: Wed, 24 Jun 2026 16:50:11 -0400 Subject: [PATCH 3/4] Inline __temporal_nexus --- cmd/proxygenerator/interceptor.go | 6 +----- proxy/interceptor.go | 6 +----- proxy/system_nexus.go | 14 -------------- proxy/system_nexus_test.go | 2 +- 4 files changed, 3 insertions(+), 25 deletions(-) diff --git a/cmd/proxygenerator/interceptor.go b/cmd/proxygenerator/interceptor.go index 7a75768b..08cdaa57 100644 --- a/cmd/proxygenerator/interceptor.go +++ b/cmd/proxygenerator/interceptor.go @@ -552,13 +552,9 @@ func visitPayloads( } {{else if and (eq $type "*command.ScheduleNexusOperationCommandAttributes") (eq . "Input")}} if o.Input != nil { - if isSystemNexusEnvelope(o) { - // System Nexus envelopes carry a proto-message request in Input - // whose own fields hold the user payloads; descend into them - // without offloading or codec-encoding the envelope itself. + if o.GetEndpoint() == "__temporal_system" { if err := visitSystemNexusEnvelope(ctx, options, concState, o); err != nil { return err } } else { - // Ordinary Nexus operation: visit Input as a single opaque payload. if err := visitPayload(ctx, options, o, concState, &o.Input); err != nil { return err } } } diff --git a/proxy/interceptor.go b/proxy/interceptor.go index e3bd5860..e4e3ac40 100644 --- a/proxy/interceptor.go +++ b/proxy/interceptor.go @@ -989,15 +989,11 @@ func visitPayloads( } if o.Input != nil { - if isSystemNexusEnvelope(o) { - // System Nexus envelopes carry a proto-message request in Input - // whose own fields hold the user payloads; descend into them - // without offloading or codec-encoding the envelope itself. + if o.GetEndpoint() == "__temporal_system" { if err := visitSystemNexusEnvelope(ctx, options, concState, o); err != nil { return err } } else { - // Ordinary Nexus operation: visit Input as a single opaque payload. if err := visitPayload(ctx, options, o, concState, &o.Input); err != nil { return err } diff --git a/proxy/system_nexus.go b/proxy/system_nexus.go index 51d79bfc..2d08c7d5 100644 --- a/proxy/system_nexus.go +++ b/proxy/system_nexus.go @@ -9,12 +9,6 @@ import ( "google.golang.org/protobuf/reflect/protoregistry" ) -// systemNexusEndpoint is the reserved Nexus endpoint used for system Nexus -// envelopes. Every operation routed to this endpoint carries a proto-message -// request in its ScheduleNexusOperationCommandAttributes.Input rather than an -// opaque user payload. -const systemNexusEndpoint = "__temporal_system" - // Payload encoding/type metadata used to decode the proto-binary envelope. const ( payloadMetadataEncodingKey = "encoding" @@ -22,14 +16,6 @@ const ( payloadEncodingProtoBinary = "binary/protobuf" ) -// isSystemNexusEnvelope reports whether the given schedule-nexus-operation -// command targets the system Nexus endpoint, and therefore carries a -// proto-message envelope in its Input rather than an opaque user payload. All -// operations on the system endpoint are system Nexus operations. -func isSystemNexusEnvelope(attrs *command.ScheduleNexusOperationCommandAttributes) bool { - return attrs != nil && attrs.GetEndpoint() == systemNexusEndpoint -} - // visitSystemNexusEnvelope decodes the system Nexus envelope in attrs.Input, // visits the payloads inside the decoded request message, and re-encodes it. // diff --git a/proxy/system_nexus_test.go b/proxy/system_nexus_test.go index 3f9f90a4..360b61f3 100644 --- a/proxy/system_nexus_test.go +++ b/proxy/system_nexus_test.go @@ -73,7 +73,7 @@ func signalWithStartEnvelopeWithType(t *testing.T, encoding, messageType string) func scheduleSystemNexusCommand(input *common.Payload) *command.ScheduleNexusOperationCommandAttributes { return &command.ScheduleNexusOperationCommandAttributes{ - Endpoint: systemNexusEndpoint, + Endpoint: "__temporal_nexus", Service: systemNexusService, Operation: systemNexusSignalOp, Input: input, From 105ddddc549bb14d3e6caf80ec91def52c470aa5 Mon Sep 17 00:00:00 2001 From: Dan Plyukhin Date: Thu, 25 Jun 2026 15:12:24 -0400 Subject: [PATCH 4/4] Update system nexus endpoint name in tests. --- proxy/system_nexus_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proxy/system_nexus_test.go b/proxy/system_nexus_test.go index 360b61f3..a6384581 100644 --- a/proxy/system_nexus_test.go +++ b/proxy/system_nexus_test.go @@ -73,7 +73,7 @@ func signalWithStartEnvelopeWithType(t *testing.T, encoding, messageType string) func scheduleSystemNexusCommand(input *common.Payload) *command.ScheduleNexusOperationCommandAttributes { return &command.ScheduleNexusOperationCommandAttributes{ - Endpoint: "__temporal_nexus", + Endpoint: "__temporal_system", Service: systemNexusService, Operation: systemNexusSignalOp, Input: input,