From d81c29952ed8a7679716368af6a378ff2cdf8897 Mon Sep 17 00:00:00 2001 From: Paul Flynn Date: Fri, 10 Apr 2026 10:53:34 -0400 Subject: [PATCH 1/6] feat(service): add Connect RPC trace context interceptors Add client and server Connect interceptors for OTel trace propagation: - ConnectServerTraceInterceptor: extracts traceparent/tracestate from incoming requests; wired as the first handler interceptor globally - ConnectClientTraceInterceptor: injects trace context into outbound IPC requests; wired as the first IPC client interceptor Remove redundant per-handler trace extraction from authorization service handlers. This also fixes an ordering bug where spans were started before trace context was extracted, causing them to be root spans instead of children of the incoming trace. Co-Authored-By: Claude Opus 4.6 (1M context) Signed-off-by: Paul Flynn --- service/authorization/authorization.go | 6 - service/authorization/v2/authorization.go | 18 --- service/internal/server/server.go | 6 + service/internal/server/server_test.go | 16 +-- service/tracing/connect_interceptor.go | 38 ++++++ service/tracing/connect_interceptor_test.go | 144 ++++++++++++++++++++ 6 files changed, 196 insertions(+), 32 deletions(-) create mode 100644 service/tracing/connect_interceptor.go create mode 100644 service/tracing/connect_interceptor_test.go diff --git a/service/authorization/authorization.go b/service/authorization/authorization.go index 17c6b24070..edaed4530e 100644 --- a/service/authorization/authorization.go +++ b/service/authorization/authorization.go @@ -33,8 +33,6 @@ import ( "github.com/opentdf/platform/service/pkg/config" "github.com/opentdf/platform/service/pkg/db" "github.com/opentdf/platform/service/pkg/serviceregistry" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/trace" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -154,10 +152,6 @@ func (as AuthorizationService) IsReady(ctx context.Context) error { } func (as *AuthorizationService) GetDecisionsByToken(ctx context.Context, req *connect.Request[authorization.GetDecisionsByTokenRequest]) (*connect.Response[authorization.GetDecisionsByTokenResponse], error) { - // Extract trace context from the incoming request - propagator := otel.GetTextMapPropagator() - ctx = propagator.Extract(ctx, propagation.HeaderCarrier(req.Header())) - ctx, span := as.Start(ctx, "GetDecisionsByToken") defer span.End() diff --git a/service/authorization/v2/authorization.go b/service/authorization/v2/authorization.go index 4fb8e18b55..761e3ff682 100644 --- a/service/authorization/v2/authorization.go +++ b/service/authorization/v2/authorization.go @@ -19,8 +19,6 @@ import ( ctxAuth "github.com/opentdf/platform/service/pkg/auth" "github.com/opentdf/platform/service/pkg/cache" "github.com/opentdf/platform/service/pkg/serviceregistry" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/trace" "google.golang.org/protobuf/types/known/wrapperspb" ) @@ -141,10 +139,6 @@ func (as *Service) GetEntitlements(ctx context.Context, req *connect.Request[aut ctx, span := as.Start(ctx, "GetEntitlements") defer span.End() - // Extract trace context from the incoming request - propagator := otel.GetTextMapPropagator() - ctx = propagator.Extract(ctx, propagation.HeaderCarrier(req.Header())) - entityIdentifier := req.Msg.GetEntityIdentifier() withComprehensiveHierarchy := req.Msg.GetWithComprehensiveHierarchy() @@ -172,10 +166,6 @@ func (as *Service) GetDecision(ctx context.Context, req *connect.Request[authzV2 ctx, span := as.Start(ctx, "GetDecision") defer span.End() - // Extract trace context from the incoming request - propagator := otel.GetTextMapPropagator() - ctx = propagator.Extract(ctx, propagation.HeaderCarrier(req.Header())) - pdp, err := access.NewJustInTimePDP(ctx, as.logger, as.sdk, as.cache, as.config.AllowDirectEntitlements) if err != nil { return nil, statusifyError(ctx, as.logger, errors.Join(ErrFailedToInitPDP, err)) @@ -222,10 +212,6 @@ func (as *Service) GetDecisionMultiResource(ctx context.Context, req *connect.Re ctx, span := as.Start(ctx, "GetDecisionMultiResource") defer span.End() - // Extract trace context from the incoming request - propagator := otel.GetTextMapPropagator() - ctx = propagator.Extract(ctx, propagation.HeaderCarrier(req.Header())) - pdp, err := access.NewJustInTimePDP(ctx, as.logger, as.sdk, as.cache, as.config.AllowDirectEntitlements) if err != nil { return nil, statusifyError(ctx, as.logger, errors.Join(ErrFailedToInitPDP, err)) @@ -275,10 +261,6 @@ func (as *Service) GetDecisionBulk(ctx context.Context, req *connect.Request[aut ctx, span := as.Start(ctx, "GetDecisionBulk") defer span.End() - // Extract trace context from the incoming request - propagator := otel.GetTextMapPropagator() - ctx = propagator.Extract(ctx, propagation.HeaderCarrier(req.Header())) - pdp, err := access.NewJustInTimePDP(ctx, as.logger, as.sdk, as.cache, as.config.AllowDirectEntitlements) if err != nil { return nil, statusifyError(ctx, as.logger, errors.Join(ErrFailedToInitPDP, err)) diff --git a/service/internal/server/server.go b/service/internal/server/server.go index 0332e91e5d..84d942b1d7 100644 --- a/service/internal/server/server.go +++ b/service/internal/server/server.go @@ -523,6 +523,9 @@ func pprofHandler(h http.Handler) http.Handler { func newConnectRPC(c Config, authInt connect.Interceptor, ints []connect.Interceptor, logger *logger.Logger) (*ConnectRPC, error) { interceptors := make([]connect.HandlerOption, 0) + // Extract OTel trace context from incoming Connect requests before all other interceptors + interceptors = append(interceptors, connect.WithInterceptors(tracing.ConnectServerTraceInterceptor())) + if c.Auth.Enabled { if authInt == nil { return nil, errors.New("authentication enabled but no interceptor provided") @@ -597,6 +600,9 @@ func (s OpenTDFServer) Stop() { func (s inProcessServer) Conn() *sdk.ConnectRPCConnection { var clientInterceptors []connect.Interceptor + // Propagate OTel trace context on outbound IPC Connect RPCs + clientInterceptors = append(clientInterceptors, tracing.ConnectClientTraceInterceptor()) + // Add audit interceptor clientInterceptors = append(clientInterceptors, sdkAudit.MetadataAddingConnectInterceptor()) diff --git a/service/internal/server/server_test.go b/service/internal/server/server_test.go index e34bd29519..12884d1302 100644 --- a/service/internal/server/server_test.go +++ b/service/internal/server/server_test.go @@ -554,32 +554,32 @@ func TestNewConnectRPC(t *testing.T) { authEnabled: true, authInt: noopInterceptor(), extraInts: []connect.Interceptor{noopInterceptor(), noopInterceptor()}, - wantIntLen: 3, - wantDescription: "1 auth + 1 extras + 1 validation/audit", + wantIntLen: 4, + wantDescription: "1 trace + 1 auth + 1 extras + 1 validation/audit", }, { name: "auth enabled no extras", authEnabled: true, authInt: noopInterceptor(), extraInts: nil, - wantIntLen: 2, - wantDescription: "1 auth + 1 validation/audit", + wantIntLen: 3, + wantDescription: "1 trace + 1 auth + 1 validation/audit", }, { name: "auth disabled no extras", authEnabled: false, authInt: nil, extraInts: nil, - wantIntLen: 1, - wantDescription: "1 validation/audit only", + wantIntLen: 2, + wantDescription: "1 trace + 1 validation/audit only", }, { name: "auth disabled with extras", authEnabled: false, authInt: nil, extraInts: []connect.Interceptor{noopInterceptor()}, - wantIntLen: 2, - wantDescription: "1 extras + 1 validation/audit", + wantIntLen: 3, + wantDescription: "1 trace + 1 extras + 1 validation/audit", }, { name: "auth enabled but nil authInt returns error", diff --git a/service/tracing/connect_interceptor.go b/service/tracing/connect_interceptor.go new file mode 100644 index 0000000000..4b5ebea823 --- /dev/null +++ b/service/tracing/connect_interceptor.go @@ -0,0 +1,38 @@ +package tracing + +import ( + "context" + + "connectrpc.com/connect" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" +) + +// ConnectClientTraceInterceptor returns a Connect unary interceptor that injects +// OpenTelemetry trace context (traceparent/tracestate) into outbound HTTP +// request headers, enabling distributed trace propagation across Connect RPCs. +func ConnectClientTraceInterceptor() connect.UnaryInterceptorFunc { + return connect.UnaryInterceptorFunc(func(next connect.UnaryFunc) connect.UnaryFunc { + return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) { + if req.Spec().IsClient { + otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(req.Header())) + } + return next(ctx, req) + } + }) +} + +// ConnectServerTraceInterceptor returns a Connect unary interceptor that +// extracts OpenTelemetry trace context (traceparent/tracestate) from incoming +// HTTP request headers into the Go context, enabling distributed trace +// continuity for Connect RPC handlers. +func ConnectServerTraceInterceptor() connect.UnaryInterceptorFunc { + return connect.UnaryInterceptorFunc(func(next connect.UnaryFunc) connect.UnaryFunc { + return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) { + if !req.Spec().IsClient { + ctx = otel.GetTextMapPropagator().Extract(ctx, propagation.HeaderCarrier(req.Header())) + } + return next(ctx, req) + } + }) +} diff --git a/service/tracing/connect_interceptor_test.go b/service/tracing/connect_interceptor_test.go new file mode 100644 index 0000000000..3891656149 --- /dev/null +++ b/service/tracing/connect_interceptor_test.go @@ -0,0 +1,144 @@ +package tracing_test + +import ( + "context" + "net/http" + "net/http/httptest" + "sync" + "testing" + + "connectrpc.com/connect" + "github.com/opentdf/platform/service/tracing" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + "go.opentelemetry.io/otel/trace" + "google.golang.org/protobuf/types/known/emptypb" +) + +// TestTraceContextPropagation_EndToEnd verifies that the client interceptor +// injects traceparent/tracestate headers and the server interceptor extracts them, +// resulting in both sides sharing the same trace ID. +func TestTraceContextPropagation_EndToEnd(t *testing.T) { + // 1. Set up an in-memory OTel tracer so we can inspect spans + exporter := tracetest.NewInMemoryExporter() + tp := sdktrace.NewTracerProvider( + sdktrace.WithSyncer(exporter), + sdktrace.WithSampler(sdktrace.AlwaysSample()), + ) + defer func() { _ = tp.Shutdown(context.Background()) }() + + // Save and restore globals + prevTP := otel.GetTracerProvider() + prevProp := otel.GetTextMapPropagator() + defer func() { + otel.SetTracerProvider(prevTP) + otel.SetTextMapPropagator(prevProp) + }() + + otel.SetTracerProvider(tp) + otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, + )) + + // 2. Record the trace ID seen on the server side + var ( + mu sync.Mutex + serverTraceID trace.TraceID + serverSpanID trace.SpanID + ) + + // 3. Create a Connect handler with the server-side trace interceptor + mux := http.NewServeMux() + handler := connect.NewUnaryHandler( + "/test.v1.TestService/Ping", + func(ctx context.Context, _ *connect.Request[emptypb.Empty]) (*connect.Response[emptypb.Empty], error) { + // The server interceptor should have extracted trace context into ctx + sc := trace.SpanContextFromContext(ctx) + mu.Lock() + serverTraceID = sc.TraceID() + serverSpanID = sc.SpanID() + mu.Unlock() + return connect.NewResponse(&emptypb.Empty{}), nil + }, + connect.WithInterceptors(tracing.ConnectServerTraceInterceptor()), + ) + mux.Handle("/test.v1.TestService/", handler) + + srv := httptest.NewServer(mux) + defer srv.Close() + + // 4. Create a Connect client with the client-side trace interceptor + client := connect.NewClient[emptypb.Empty, emptypb.Empty]( + srv.Client(), + srv.URL+"/test.v1.TestService/Ping", + connect.WithInterceptors(tracing.ConnectClientTraceInterceptor()), + ) + + // 5. Start a client-side span to establish a trace context + tracer := tp.Tracer("test") + ctx, span := tracer.Start(context.Background(), "client-call") + clientTraceID := span.SpanContext().TraceID() + clientSpanID := span.SpanContext().SpanID() + + // 6. Make the Connect RPC call + _, err := client.CallUnary(ctx, connect.NewRequest(&emptypb.Empty{})) + span.End() + require.NoError(t, err) + + // 7. Verify trace context was propagated + mu.Lock() + defer mu.Unlock() + + assert.True(t, clientTraceID.IsValid(), "client trace ID should be valid") + assert.True(t, serverTraceID.IsValid(), "server trace ID should be valid") + assert.Equal(t, clientTraceID, serverTraceID, + "server must see the same trace ID as the client — trace context was propagated") + assert.Equal(t, clientSpanID, serverSpanID, + "server must see the client's span ID as the remote parent") + + t.Logf("client trace ID: %s span ID: %s", clientTraceID, clientSpanID) + t.Logf("server trace ID: %s span ID: %s", serverTraceID, serverSpanID) +} + +// TestTraceContextPropagation_NoTraceContext verifies that the interceptors +// are safe when no trace context exists (no-op propagator behavior). +func TestTraceContextPropagation_NoTraceContext(t *testing.T) { + // Use a no-op propagator — simulates a deployment without OTel configured + prevProp := otel.GetTextMapPropagator() + defer otel.SetTextMapPropagator(prevProp) + otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator()) + + var serverTraceID trace.TraceID + + mux := http.NewServeMux() + handler := connect.NewUnaryHandler( + "/test.v1.TestService/Ping", + func(ctx context.Context, _ *connect.Request[emptypb.Empty]) (*connect.Response[emptypb.Empty], error) { + serverTraceID = trace.SpanContextFromContext(ctx).TraceID() + return connect.NewResponse(&emptypb.Empty{}), nil + }, + connect.WithInterceptors(tracing.ConnectServerTraceInterceptor()), + ) + mux.Handle("/test.v1.TestService/", handler) + + srv := httptest.NewServer(mux) + defer srv.Close() + + client := connect.NewClient[emptypb.Empty, emptypb.Empty]( + srv.Client(), + srv.URL+"/test.v1.TestService/Ping", + connect.WithInterceptors(tracing.ConnectClientTraceInterceptor()), + ) + + _, err := client.CallUnary(context.Background(), connect.NewRequest(&emptypb.Empty{})) + require.NoError(t, err) + + // With no propagator, server should not see any trace context + assert.False(t, serverTraceID.IsValid(), + "server should not see a trace ID when no propagator is configured") +} From 6b8c37b5c966b610f0683f7fe000111abf3ed24b Mon Sep 17 00:00:00 2001 From: Paul Flynn Date: Fri, 10 Apr 2026 11:33:06 -0400 Subject: [PATCH 2/6] fix(service): implement full connect.Interceptor for streaming support, fix no-op test Address review feedback: - Implement full connect.Interceptor interface (WrapUnary, WrapStreamingClient, WrapStreamingHandler) instead of UnaryInterceptorFunc, so trace context propagates for streaming RPCs too - Fix NoTraceContext test to start a real span on the client side, proving the no-op propagator specifically blocks propagation rather than relying on the absence of a span Co-Authored-By: Claude Opus 4.6 (1M context) Signed-off-by: Paul Flynn --- service/tracing/connect_interceptor.go | 78 ++++++++---- service/tracing/connect_interceptor_test.go | 134 ++++++++++++++++---- 2 files changed, 162 insertions(+), 50 deletions(-) diff --git a/service/tracing/connect_interceptor.go b/service/tracing/connect_interceptor.go index 4b5ebea823..059a2aeae9 100644 --- a/service/tracing/connect_interceptor.go +++ b/service/tracing/connect_interceptor.go @@ -8,31 +8,63 @@ import ( "go.opentelemetry.io/otel/propagation" ) -// ConnectClientTraceInterceptor returns a Connect unary interceptor that injects +// ConnectClientTraceInterceptor returns a Connect interceptor that injects // OpenTelemetry trace context (traceparent/tracestate) into outbound HTTP // request headers, enabling distributed trace propagation across Connect RPCs. -func ConnectClientTraceInterceptor() connect.UnaryInterceptorFunc { - return connect.UnaryInterceptorFunc(func(next connect.UnaryFunc) connect.UnaryFunc { - return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) { - if req.Spec().IsClient { - otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(req.Header())) - } - return next(ctx, req) +// Handles both unary and streaming calls. +func ConnectClientTraceInterceptor() connect.Interceptor { + return &connectClientTraceInterceptor{} +} + +type connectClientTraceInterceptor struct{} + +func (i *connectClientTraceInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc { + return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) { + if req.Spec().IsClient { + otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(req.Header())) } - }) -} - -// ConnectServerTraceInterceptor returns a Connect unary interceptor that -// extracts OpenTelemetry trace context (traceparent/tracestate) from incoming -// HTTP request headers into the Go context, enabling distributed trace -// continuity for Connect RPC handlers. -func ConnectServerTraceInterceptor() connect.UnaryInterceptorFunc { - return connect.UnaryInterceptorFunc(func(next connect.UnaryFunc) connect.UnaryFunc { - return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) { - if !req.Spec().IsClient { - ctx = otel.GetTextMapPropagator().Extract(ctx, propagation.HeaderCarrier(req.Header())) - } - return next(ctx, req) + return next(ctx, req) + } +} + +func (i *connectClientTraceInterceptor) WrapStreamingClient(next connect.StreamingClientFunc) connect.StreamingClientFunc { + return func(ctx context.Context, spec connect.Spec) connect.StreamingClientConn { + conn := next(ctx, spec) + otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(conn.RequestHeader())) + return conn + } +} + +func (i *connectClientTraceInterceptor) WrapStreamingHandler(next connect.StreamingHandlerFunc) connect.StreamingHandlerFunc { + return next +} + +// ConnectServerTraceInterceptor returns a Connect interceptor that extracts +// OpenTelemetry trace context (traceparent/tracestate) from incoming HTTP +// request headers into the Go context, enabling distributed trace continuity +// for Connect RPC handlers. Handles both unary and streaming calls. +func ConnectServerTraceInterceptor() connect.Interceptor { + return &connectServerTraceInterceptor{} +} + +type connectServerTraceInterceptor struct{} + +func (i *connectServerTraceInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc { + return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) { + if !req.Spec().IsClient { + ctx = otel.GetTextMapPropagator().Extract(ctx, propagation.HeaderCarrier(req.Header())) } - }) + return next(ctx, req) + } +} + +func (i *connectServerTraceInterceptor) WrapStreamingClient(next connect.StreamingClientFunc) connect.StreamingClientFunc { + return next +} + +func (i *connectServerTraceInterceptor) WrapStreamingHandler(next connect.StreamingHandlerFunc) connect.StreamingHandlerFunc { + return func(ctx context.Context, conn connect.StreamingHandlerConn) error { + ctx = otel.GetTextMapPropagator().Extract(ctx, propagation.HeaderCarrier(conn.RequestHeader())) + return next(ctx, conn) + } } diff --git a/service/tracing/connect_interceptor_test.go b/service/tracing/connect_interceptor_test.go index 3891656149..62d7dc57f0 100644 --- a/service/tracing/connect_interceptor_test.go +++ b/service/tracing/connect_interceptor_test.go @@ -19,25 +19,24 @@ import ( "google.golang.org/protobuf/types/known/emptypb" ) -// TestTraceContextPropagation_EndToEnd verifies that the client interceptor -// injects traceparent/tracestate headers and the server interceptor extracts them, -// resulting in both sides sharing the same trace ID. -func TestTraceContextPropagation_EndToEnd(t *testing.T) { - // 1. Set up an in-memory OTel tracer so we can inspect spans +// setupOTel configures an in-memory tracer provider and W3C trace propagator, +// returning the provider and a cleanup function that restores prior globals. +func setupOTel(t *testing.T) *sdktrace.TracerProvider { + t.Helper() + exporter := tracetest.NewInMemoryExporter() tp := sdktrace.NewTracerProvider( sdktrace.WithSyncer(exporter), sdktrace.WithSampler(sdktrace.AlwaysSample()), ) - defer func() { _ = tp.Shutdown(context.Background()) }() - // Save and restore globals prevTP := otel.GetTracerProvider() prevProp := otel.GetTextMapPropagator() - defer func() { + t.Cleanup(func() { + _ = tp.Shutdown(context.Background()) otel.SetTracerProvider(prevTP) otel.SetTextMapPropagator(prevProp) - }() + }) otel.SetTracerProvider(tp) otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator( @@ -45,19 +44,25 @@ func TestTraceContextPropagation_EndToEnd(t *testing.T) { propagation.Baggage{}, )) - // 2. Record the trace ID seen on the server side + return tp +} + +// TestTraceContextPropagation_Unary verifies that the client interceptor +// injects traceparent/tracestate headers and the server interceptor extracts them, +// resulting in both sides sharing the same trace ID for unary RPCs. +func TestTraceContextPropagation_Unary(t *testing.T) { + tp := setupOTel(t) + var ( mu sync.Mutex serverTraceID trace.TraceID serverSpanID trace.SpanID ) - // 3. Create a Connect handler with the server-side trace interceptor mux := http.NewServeMux() handler := connect.NewUnaryHandler( "/test.v1.TestService/Ping", func(ctx context.Context, _ *connect.Request[emptypb.Empty]) (*connect.Response[emptypb.Empty], error) { - // The server interceptor should have extracted trace context into ctx sc := trace.SpanContextFromContext(ctx) mu.Lock() serverTraceID = sc.TraceID() @@ -72,45 +77,113 @@ func TestTraceContextPropagation_EndToEnd(t *testing.T) { srv := httptest.NewServer(mux) defer srv.Close() - // 4. Create a Connect client with the client-side trace interceptor client := connect.NewClient[emptypb.Empty, emptypb.Empty]( srv.Client(), srv.URL+"/test.v1.TestService/Ping", connect.WithInterceptors(tracing.ConnectClientTraceInterceptor()), ) - // 5. Start a client-side span to establish a trace context - tracer := tp.Tracer("test") - ctx, span := tracer.Start(context.Background(), "client-call") + ctx, span := tp.Tracer("test").Start(context.Background(), "client-call") clientTraceID := span.SpanContext().TraceID() clientSpanID := span.SpanContext().SpanID() - // 6. Make the Connect RPC call _, err := client.CallUnary(ctx, connect.NewRequest(&emptypb.Empty{})) span.End() require.NoError(t, err) - // 7. Verify trace context was propagated mu.Lock() defer mu.Unlock() assert.True(t, clientTraceID.IsValid(), "client trace ID should be valid") assert.True(t, serverTraceID.IsValid(), "server trace ID should be valid") assert.Equal(t, clientTraceID, serverTraceID, - "server must see the same trace ID as the client — trace context was propagated") + "server must see the same trace ID as the client") assert.Equal(t, clientSpanID, serverSpanID, "server must see the client's span ID as the remote parent") - t.Logf("client trace ID: %s span ID: %s", clientTraceID, clientSpanID) - t.Logf("server trace ID: %s span ID: %s", serverTraceID, serverSpanID) + t.Logf("client trace: %s/%s", clientTraceID, clientSpanID) + t.Logf("server trace: %s/%s", serverTraceID, serverSpanID) } -// TestTraceContextPropagation_NoTraceContext verifies that the interceptors -// are safe when no trace context exists (no-op propagator behavior). +// TestTraceContextPropagation_ServerStream verifies trace context propagation +// for server-streaming RPCs, exercising WrapStreamingClient on the client side +// and WrapStreamingHandler on the server side. +func TestTraceContextPropagation_ServerStream(t *testing.T) { + tp := setupOTel(t) + + var ( + mu sync.Mutex + serverTraceID trace.TraceID + serverSpanID trace.SpanID + ) + + mux := http.NewServeMux() + handler := connect.NewServerStreamHandler( + "/test.v1.TestService/StreamPing", + func(ctx context.Context, _ *connect.Request[emptypb.Empty], stream *connect.ServerStream[emptypb.Empty]) error { + sc := trace.SpanContextFromContext(ctx) + mu.Lock() + serverTraceID = sc.TraceID() + serverSpanID = sc.SpanID() + mu.Unlock() + return stream.Send(&emptypb.Empty{}) + }, + connect.WithInterceptors(tracing.ConnectServerTraceInterceptor()), + ) + mux.Handle("/test.v1.TestService/", handler) + + srv := httptest.NewServer(mux) + defer srv.Close() + + client := connect.NewClient[emptypb.Empty, emptypb.Empty]( + srv.Client(), + srv.URL+"/test.v1.TestService/StreamPing", + connect.WithInterceptors(tracing.ConnectClientTraceInterceptor()), + ) + + ctx, span := tp.Tracer("test").Start(context.Background(), "client-stream-call") + clientTraceID := span.SpanContext().TraceID() + clientSpanID := span.SpanContext().SpanID() + + stream, err := client.CallServerStream(ctx, connect.NewRequest(&emptypb.Empty{})) + require.NoError(t, err) + // Drain the stream + for stream.Receive() { + } + require.NoError(t, stream.Err()) + require.NoError(t, stream.Close()) + span.End() + + mu.Lock() + defer mu.Unlock() + + assert.True(t, clientTraceID.IsValid(), "client trace ID should be valid") + assert.True(t, serverTraceID.IsValid(), "server trace ID should be valid") + assert.Equal(t, clientTraceID, serverTraceID, + "server must see the same trace ID as the client (streaming)") + assert.Equal(t, clientSpanID, serverSpanID, + "server must see the client's span ID as the remote parent (streaming)") + + t.Logf("client trace: %s/%s", clientTraceID, clientSpanID) + t.Logf("server trace: %s/%s", serverTraceID, serverSpanID) +} + +// TestTraceContextPropagation_NoTraceContext verifies that a no-op propagator +// prevents trace context from reaching the server, even when the client has +// an active span. This proves the interceptor respects the propagator config. func TestTraceContextPropagation_NoTraceContext(t *testing.T) { - // Use a no-op propagator — simulates a deployment without OTel configured + // Set up a real tracer so the client has a valid span + tp := sdktrace.NewTracerProvider(sdktrace.WithSampler(sdktrace.AlwaysSample())) + defer func() { _ = tp.Shutdown(context.Background()) }() + + prevTP := otel.GetTracerProvider() prevProp := otel.GetTextMapPropagator() - defer otel.SetTextMapPropagator(prevProp) + defer func() { + otel.SetTracerProvider(prevTP) + otel.SetTextMapPropagator(prevProp) + }() + otel.SetTracerProvider(tp) + // No-op propagator — simulates a deployment without OTel propagation configured otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator()) var serverTraceID trace.TraceID @@ -135,10 +208,17 @@ func TestTraceContextPropagation_NoTraceContext(t *testing.T) { connect.WithInterceptors(tracing.ConnectClientTraceInterceptor()), ) - _, err := client.CallUnary(context.Background(), connect.NewRequest(&emptypb.Empty{})) + // Start a real span — the client has a valid trace context locally + ctx, span := tp.Tracer("test").Start(context.Background(), "client-call") + clientTraceID := span.SpanContext().TraceID() + require.True(t, clientTraceID.IsValid(), "client must have a valid trace ID for this test") + + _, err := client.CallUnary(ctx, connect.NewRequest(&emptypb.Empty{})) + span.End() require.NoError(t, err) - // With no propagator, server should not see any trace context + // The no-op propagator should prevent the trace context from being injected, + // so the server never sees it despite the client having an active span. assert.False(t, serverTraceID.IsValid(), "server should not see a trace ID when no propagator is configured") } From 9641f7a7c95df08592d9b2a37116b41cb639f830 Mon Sep 17 00:00:00 2001 From: Paul Flynn Date: Fri, 10 Apr 2026 11:45:21 -0400 Subject: [PATCH 3/6] fix(service): add trace interceptor to remote ERS connection The remote entity resolution connection (setupERSConnection) was built without any interceptors, so outbound ERS calls from GetDecision had no trace context propagation. Add ConnectClientTraceInterceptor to close this gap. Co-Authored-By: Claude Opus 4.6 (1M context) Signed-off-by: Paul Flynn --- service/pkg/server/start.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/service/pkg/server/start.go b/service/pkg/server/start.go index f8bfacd092..f30ceb69cc 100644 --- a/service/pkg/server/start.go +++ b/service/pkg/server/start.go @@ -388,6 +388,9 @@ func setupERSConnection(cfg *config.Config, oidcconfig *auth.OIDCConfiguration, } ersConnectRPCConn.Endpoint = cfg.SDKConfig.EntityResolutionConnection.Endpoint + // Propagate OTel trace context on outbound ERS Connect RPCs + ersConnectRPCConn.Options = append(ersConnectRPCConn.Options, connect.WithInterceptors(tracing.ConnectClientTraceInterceptor())) + logger.Info("added with custom ers connection", slog.String("ers_connection_endpoint", ersConnectRPCConn.Endpoint)) return ersConnectRPCConn, nil } From b7e3bf27bfe00066a8b8747b766c4b3d25c61213 Mon Sep 17 00:00:00 2001 From: Paul Flynn Date: Fri, 10 Apr 2026 12:49:57 -0400 Subject: [PATCH 4/6] refactor(service): replace custom trace interceptors with otelconnect Replace the hand-rolled propagation-only interceptors with connectrpc.com/otelconnect, which provides per-RPC spans, metrics (duration, message size, in-flight count), and trace propagation. Server interceptor uses WithTrustRemote so incoming spans are children of the remote trace, and WithoutServerPeerAttributes to reduce cardinality. Both use WithoutTraceEvents to keep spans lean. Co-Authored-By: Claude Opus 4.6 (1M context) Signed-off-by: Paul Flynn --- service/go.mod | 1 + service/go.sum | 2 + service/internal/server/server.go | 16 +++- service/pkg/server/start.go | 8 +- service/tracing/connect_interceptor.go | 85 ++++++--------------- service/tracing/connect_interceptor_test.go | 62 +++++++-------- 6 files changed, 75 insertions(+), 99 deletions(-) diff --git a/service/go.mod b/service/go.mod index 33cea5edb4..ecb9177807 100644 --- a/service/go.mod +++ b/service/go.mod @@ -9,6 +9,7 @@ require ( connectrpc.com/connect v1.19.1 connectrpc.com/grpchealth v1.4.0 connectrpc.com/grpcreflect v1.3.0 + connectrpc.com/otelconnect v0.9.0 connectrpc.com/validate v0.6.0 github.com/Masterminds/squirrel v1.5.4 github.com/Nerzal/gocloak/v13 v13.9.0 diff --git a/service/go.sum b/service/go.sum index 8892b27b38..25e0741d3b 100644 --- a/service/go.sum +++ b/service/go.sum @@ -10,6 +10,8 @@ connectrpc.com/grpchealth v1.4.0 h1:MJC96JLelARPgZTiRF9KRfY/2N9OcoQvF2EWX07v2IE= connectrpc.com/grpchealth v1.4.0/go.mod h1:WhW6m1EzTmq3Ky1FE8EfkIpSDc6TfUx2M2KqZO3ts/Q= connectrpc.com/grpcreflect v1.3.0 h1:Y4V+ACf8/vOb1XOc251Qun7jMB75gCUNw6llvB9csXc= connectrpc.com/grpcreflect v1.3.0/go.mod h1:nfloOtCS8VUQOQ1+GTdFzVg2CJo4ZGaat8JIovCtDYs= +connectrpc.com/otelconnect v0.9.0 h1:NggB3pzRC3pukQWaYbRHJulxuXvmCKCKkQ9hbrHAWoA= +connectrpc.com/otelconnect v0.9.0/go.mod h1:AEkVLjCPXra+ObGFCOClcJkNjS7zPaQSqvO0lCyjfZc= connectrpc.com/validate v0.6.0 h1:DcrgDKt2ZScrUs/d/mh9itD2yeEa0UbBBa+i0mwzx+4= connectrpc.com/validate v0.6.0/go.mod h1:ihrpI+8gVbLH1fvVWJL1I3j0CfWnF8P/90LsmluRiZs= dario.cat/mergo v1.0.2 h1:85+piFYR1tMbRrLcDwR18y4UKJ3aH1Tbzi24VRW1TK8= diff --git a/service/internal/server/server.go b/service/internal/server/server.go index 84d942b1d7..8d16b784f7 100644 --- a/service/internal/server/server.go +++ b/service/internal/server/server.go @@ -523,8 +523,12 @@ func pprofHandler(h http.Handler) http.Handler { func newConnectRPC(c Config, authInt connect.Interceptor, ints []connect.Interceptor, logger *logger.Logger) (*ConnectRPC, error) { interceptors := make([]connect.HandlerOption, 0) - // Extract OTel trace context from incoming Connect requests before all other interceptors - interceptors = append(interceptors, connect.WithInterceptors(tracing.ConnectServerTraceInterceptor())) + // OTel tracing and metrics for incoming Connect requests, before all other interceptors + serverTraceInt, err := tracing.ConnectServerTraceInterceptor() + if err != nil { + return nil, fmt.Errorf("failed to create server trace interceptor: %w", err) + } + interceptors = append(interceptors, connect.WithInterceptors(serverTraceInt)) if c.Auth.Enabled { if authInt == nil { @@ -600,8 +604,12 @@ func (s OpenTDFServer) Stop() { func (s inProcessServer) Conn() *sdk.ConnectRPCConnection { var clientInterceptors []connect.Interceptor - // Propagate OTel trace context on outbound IPC Connect RPCs - clientInterceptors = append(clientInterceptors, tracing.ConnectClientTraceInterceptor()) + // OTel tracing and metrics for outbound IPC Connect RPCs + if clientTraceInt, err := tracing.ConnectClientTraceInterceptor(); err != nil { + s.logger.Error("failed to create IPC client trace interceptor", slog.String("error", err.Error())) + } else { + clientInterceptors = append(clientInterceptors, clientTraceInt) + } // Add audit interceptor clientInterceptors = append(clientInterceptors, sdkAudit.MetadataAddingConnectInterceptor()) diff --git a/service/pkg/server/start.go b/service/pkg/server/start.go index f30ceb69cc..a0dea672c6 100644 --- a/service/pkg/server/start.go +++ b/service/pkg/server/start.go @@ -388,8 +388,12 @@ func setupERSConnection(cfg *config.Config, oidcconfig *auth.OIDCConfiguration, } ersConnectRPCConn.Endpoint = cfg.SDKConfig.EntityResolutionConnection.Endpoint - // Propagate OTel trace context on outbound ERS Connect RPCs - ersConnectRPCConn.Options = append(ersConnectRPCConn.Options, connect.WithInterceptors(tracing.ConnectClientTraceInterceptor())) + // OTel tracing and metrics for outbound ERS Connect RPCs + ersTraceInt, err := tracing.ConnectClientTraceInterceptor() + if err != nil { + return nil, fmt.Errorf("failed to create ERS trace interceptor: %w", err) + } + ersConnectRPCConn.Options = append(ersConnectRPCConn.Options, connect.WithInterceptors(ersTraceInt)) logger.Info("added with custom ers connection", slog.String("ers_connection_endpoint", ersConnectRPCConn.Endpoint)) return ersConnectRPCConn, nil diff --git a/service/tracing/connect_interceptor.go b/service/tracing/connect_interceptor.go index 059a2aeae9..6168965990 100644 --- a/service/tracing/connect_interceptor.go +++ b/service/tracing/connect_interceptor.go @@ -1,70 +1,29 @@ package tracing import ( - "context" - "connectrpc.com/connect" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/propagation" + "connectrpc.com/otelconnect" ) -// ConnectClientTraceInterceptor returns a Connect interceptor that injects -// OpenTelemetry trace context (traceparent/tracestate) into outbound HTTP -// request headers, enabling distributed trace propagation across Connect RPCs. -// Handles both unary and streaming calls. -func ConnectClientTraceInterceptor() connect.Interceptor { - return &connectClientTraceInterceptor{} -} - -type connectClientTraceInterceptor struct{} - -func (i *connectClientTraceInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc { - return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) { - if req.Spec().IsClient { - otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(req.Header())) - } - return next(ctx, req) - } -} - -func (i *connectClientTraceInterceptor) WrapStreamingClient(next connect.StreamingClientFunc) connect.StreamingClientFunc { - return func(ctx context.Context, spec connect.Spec) connect.StreamingClientConn { - conn := next(ctx, spec) - otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(conn.RequestHeader())) - return conn - } -} - -func (i *connectClientTraceInterceptor) WrapStreamingHandler(next connect.StreamingHandlerFunc) connect.StreamingHandlerFunc { - return next -} - -// ConnectServerTraceInterceptor returns a Connect interceptor that extracts -// OpenTelemetry trace context (traceparent/tracestate) from incoming HTTP -// request headers into the Go context, enabling distributed trace continuity -// for Connect RPC handlers. Handles both unary and streaming calls. -func ConnectServerTraceInterceptor() connect.Interceptor { - return &connectServerTraceInterceptor{} -} - -type connectServerTraceInterceptor struct{} - -func (i *connectServerTraceInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc { - return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) { - if !req.Spec().IsClient { - ctx = otel.GetTextMapPropagator().Extract(ctx, propagation.HeaderCarrier(req.Header())) - } - return next(ctx, req) - } -} - -func (i *connectServerTraceInterceptor) WrapStreamingClient(next connect.StreamingClientFunc) connect.StreamingClientFunc { - return next -} - -func (i *connectServerTraceInterceptor) WrapStreamingHandler(next connect.StreamingHandlerFunc) connect.StreamingHandlerFunc { - return func(ctx context.Context, conn connect.StreamingHandlerConn) error { - ctx = otel.GetTextMapPropagator().Extract(ctx, propagation.HeaderCarrier(conn.RequestHeader())) - return next(ctx, conn) - } +// ConnectClientTraceInterceptor returns a Connect interceptor backed by +// otelconnect that injects OpenTelemetry trace context into outbound requests +// and creates per-RPC spans and metrics. +func ConnectClientTraceInterceptor() (connect.Interceptor, error) { + return otelconnect.NewInterceptor( + otelconnect.WithoutTraceEvents(), + ) +} + +// ConnectServerTraceInterceptor returns a Connect interceptor backed by +// otelconnect that extracts OpenTelemetry trace context from incoming requests +// and creates per-RPC spans and metrics. +// +// WithTrustRemote makes server spans children of the incoming trace rather +// than linked root spans. WithoutServerPeerAttributes reduces cardinality. +func ConnectServerTraceInterceptor() (connect.Interceptor, error) { + return otelconnect.NewInterceptor( + otelconnect.WithTrustRemote(), + otelconnect.WithoutServerPeerAttributes(), + otelconnect.WithoutTraceEvents(), + ) } diff --git a/service/tracing/connect_interceptor_test.go b/service/tracing/connect_interceptor_test.go index 62d7dc57f0..27d433ceaf 100644 --- a/service/tracing/connect_interceptor_test.go +++ b/service/tracing/connect_interceptor_test.go @@ -53,10 +53,14 @@ func setupOTel(t *testing.T) *sdktrace.TracerProvider { func TestTraceContextPropagation_Unary(t *testing.T) { tp := setupOTel(t) + serverInt, err := tracing.ConnectServerTraceInterceptor() + require.NoError(t, err) + clientInt, err := tracing.ConnectClientTraceInterceptor() + require.NoError(t, err) + var ( mu sync.Mutex serverTraceID trace.TraceID - serverSpanID trace.SpanID ) mux := http.NewServeMux() @@ -66,11 +70,10 @@ func TestTraceContextPropagation_Unary(t *testing.T) { sc := trace.SpanContextFromContext(ctx) mu.Lock() serverTraceID = sc.TraceID() - serverSpanID = sc.SpanID() mu.Unlock() return connect.NewResponse(&emptypb.Empty{}), nil }, - connect.WithInterceptors(tracing.ConnectServerTraceInterceptor()), + connect.WithInterceptors(serverInt), ) mux.Handle("/test.v1.TestService/", handler) @@ -80,14 +83,13 @@ func TestTraceContextPropagation_Unary(t *testing.T) { client := connect.NewClient[emptypb.Empty, emptypb.Empty]( srv.Client(), srv.URL+"/test.v1.TestService/Ping", - connect.WithInterceptors(tracing.ConnectClientTraceInterceptor()), + connect.WithInterceptors(clientInt), ) ctx, span := tp.Tracer("test").Start(context.Background(), "client-call") clientTraceID := span.SpanContext().TraceID() - clientSpanID := span.SpanContext().SpanID() - _, err := client.CallUnary(ctx, connect.NewRequest(&emptypb.Empty{})) + _, err = client.CallUnary(ctx, connect.NewRequest(&emptypb.Empty{})) span.End() require.NoError(t, err) @@ -98,11 +100,9 @@ func TestTraceContextPropagation_Unary(t *testing.T) { assert.True(t, serverTraceID.IsValid(), "server trace ID should be valid") assert.Equal(t, clientTraceID, serverTraceID, "server must see the same trace ID as the client") - assert.Equal(t, clientSpanID, serverSpanID, - "server must see the client's span ID as the remote parent") - t.Logf("client trace: %s/%s", clientTraceID, clientSpanID) - t.Logf("server trace: %s/%s", serverTraceID, serverSpanID) + t.Logf("client trace: %s", clientTraceID) + t.Logf("server trace: %s", serverTraceID) } // TestTraceContextPropagation_ServerStream verifies trace context propagation @@ -111,10 +111,14 @@ func TestTraceContextPropagation_Unary(t *testing.T) { func TestTraceContextPropagation_ServerStream(t *testing.T) { tp := setupOTel(t) + serverInt, err := tracing.ConnectServerTraceInterceptor() + require.NoError(t, err) + clientInt, err := tracing.ConnectClientTraceInterceptor() + require.NoError(t, err) + var ( mu sync.Mutex serverTraceID trace.TraceID - serverSpanID trace.SpanID ) mux := http.NewServeMux() @@ -124,11 +128,10 @@ func TestTraceContextPropagation_ServerStream(t *testing.T) { sc := trace.SpanContextFromContext(ctx) mu.Lock() serverTraceID = sc.TraceID() - serverSpanID = sc.SpanID() mu.Unlock() return stream.Send(&emptypb.Empty{}) }, - connect.WithInterceptors(tracing.ConnectServerTraceInterceptor()), + connect.WithInterceptors(serverInt), ) mux.Handle("/test.v1.TestService/", handler) @@ -138,16 +141,14 @@ func TestTraceContextPropagation_ServerStream(t *testing.T) { client := connect.NewClient[emptypb.Empty, emptypb.Empty]( srv.Client(), srv.URL+"/test.v1.TestService/StreamPing", - connect.WithInterceptors(tracing.ConnectClientTraceInterceptor()), + connect.WithInterceptors(clientInt), ) ctx, span := tp.Tracer("test").Start(context.Background(), "client-stream-call") clientTraceID := span.SpanContext().TraceID() - clientSpanID := span.SpanContext().SpanID() stream, err := client.CallServerStream(ctx, connect.NewRequest(&emptypb.Empty{})) require.NoError(t, err) - // Drain the stream for stream.Receive() { } require.NoError(t, stream.Err()) @@ -161,18 +162,15 @@ func TestTraceContextPropagation_ServerStream(t *testing.T) { assert.True(t, serverTraceID.IsValid(), "server trace ID should be valid") assert.Equal(t, clientTraceID, serverTraceID, "server must see the same trace ID as the client (streaming)") - assert.Equal(t, clientSpanID, serverSpanID, - "server must see the client's span ID as the remote parent (streaming)") - t.Logf("client trace: %s/%s", clientTraceID, clientSpanID) - t.Logf("server trace: %s/%s", serverTraceID, serverSpanID) + t.Logf("client trace: %s", clientTraceID) + t.Logf("server trace: %s", serverTraceID) } // TestTraceContextPropagation_NoTraceContext verifies that a no-op propagator // prevents trace context from reaching the server, even when the client has // an active span. This proves the interceptor respects the propagator config. func TestTraceContextPropagation_NoTraceContext(t *testing.T) { - // Set up a real tracer so the client has a valid span tp := sdktrace.NewTracerProvider(sdktrace.WithSampler(sdktrace.AlwaysSample())) defer func() { _ = tp.Shutdown(context.Background()) }() @@ -183,9 +181,13 @@ func TestTraceContextPropagation_NoTraceContext(t *testing.T) { otel.SetTextMapPropagator(prevProp) }() otel.SetTracerProvider(tp) - // No-op propagator — simulates a deployment without OTel propagation configured otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator()) + serverInt, err := tracing.ConnectServerTraceInterceptor() + require.NoError(t, err) + clientInt, err := tracing.ConnectClientTraceInterceptor() + require.NoError(t, err) + var serverTraceID trace.TraceID mux := http.NewServeMux() @@ -195,7 +197,7 @@ func TestTraceContextPropagation_NoTraceContext(t *testing.T) { serverTraceID = trace.SpanContextFromContext(ctx).TraceID() return connect.NewResponse(&emptypb.Empty{}), nil }, - connect.WithInterceptors(tracing.ConnectServerTraceInterceptor()), + connect.WithInterceptors(serverInt), ) mux.Handle("/test.v1.TestService/", handler) @@ -205,20 +207,20 @@ func TestTraceContextPropagation_NoTraceContext(t *testing.T) { client := connect.NewClient[emptypb.Empty, emptypb.Empty]( srv.Client(), srv.URL+"/test.v1.TestService/Ping", - connect.WithInterceptors(tracing.ConnectClientTraceInterceptor()), + connect.WithInterceptors(clientInt), ) - // Start a real span — the client has a valid trace context locally ctx, span := tp.Tracer("test").Start(context.Background(), "client-call") clientTraceID := span.SpanContext().TraceID() require.True(t, clientTraceID.IsValid(), "client must have a valid trace ID for this test") - _, err := client.CallUnary(ctx, connect.NewRequest(&emptypb.Empty{})) + _, err = client.CallUnary(ctx, connect.NewRequest(&emptypb.Empty{})) span.End() require.NoError(t, err) - // The no-op propagator should prevent the trace context from being injected, - // so the server never sees it despite the client having an active span. - assert.False(t, serverTraceID.IsValid(), - "server should not see a trace ID when no propagator is configured") + // With a no-op propagator, the client's trace context is not injected into + // headers. otelconnect still creates a server span, but it starts a new + // independent trace — the trace IDs must differ. + assert.NotEqual(t, clientTraceID, serverTraceID, + "server should have a different trace ID when no propagator is configured") } From b10ceb6e21a57fa5479228e430030d9ab7f8946b Mon Sep 17 00:00:00 2001 From: Paul Flynn Date: Fri, 10 Apr 2026 13:02:19 -0400 Subject: [PATCH 5/6] fix(service): ERS trace interceptor ordering, harden no-op test Move ERS trace interceptor before auth configuration so tracing is the outermost wrapper (spans cover auth latency). Add mutex and validity assertion to no-propagator test per review feedback. Co-Authored-By: Claude Opus 4.6 (1M context) Signed-off-by: Paul Flynn --- service/pkg/server/start.go | 14 +++++++------- service/tracing/connect_interceptor_test.go | 17 +++++++++++++---- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/service/pkg/server/start.go b/service/pkg/server/start.go index a0dea672c6..2efb25879b 100644 --- a/service/pkg/server/start.go +++ b/service/pkg/server/start.go @@ -372,6 +372,13 @@ func setupERSConnection(cfg *config.Config, oidcconfig *auth.OIDCConfiguration, ersConnectRPCConn := &sdk.ConnectRPCConnection{} + // OTel tracing and metrics for outbound ERS Connect RPCs (outermost interceptor) + ersTraceInt, err := tracing.ConnectClientTraceInterceptor() + if err != nil { + return nil, fmt.Errorf("failed to create ERS trace interceptor: %w", err) + } + ersConnectRPCConn.Options = append(ersConnectRPCConn.Options, connect.WithInterceptors(ersTraceInt)) + // Configure TLS tlsConfig := configureTLSForERS(cfg, ersConnectRPCConn) @@ -388,13 +395,6 @@ func setupERSConnection(cfg *config.Config, oidcconfig *auth.OIDCConfiguration, } ersConnectRPCConn.Endpoint = cfg.SDKConfig.EntityResolutionConnection.Endpoint - // OTel tracing and metrics for outbound ERS Connect RPCs - ersTraceInt, err := tracing.ConnectClientTraceInterceptor() - if err != nil { - return nil, fmt.Errorf("failed to create ERS trace interceptor: %w", err) - } - ersConnectRPCConn.Options = append(ersConnectRPCConn.Options, connect.WithInterceptors(ersTraceInt)) - logger.Info("added with custom ers connection", slog.String("ers_connection_endpoint", ersConnectRPCConn.Endpoint)) return ersConnectRPCConn, nil } diff --git a/service/tracing/connect_interceptor_test.go b/service/tracing/connect_interceptor_test.go index 27d433ceaf..7f95ee7442 100644 --- a/service/tracing/connect_interceptor_test.go +++ b/service/tracing/connect_interceptor_test.go @@ -188,13 +188,18 @@ func TestTraceContextPropagation_NoTraceContext(t *testing.T) { clientInt, err := tracing.ConnectClientTraceInterceptor() require.NoError(t, err) - var serverTraceID trace.TraceID + var ( + mu sync.Mutex + serverTraceID trace.TraceID + ) mux := http.NewServeMux() handler := connect.NewUnaryHandler( "/test.v1.TestService/Ping", func(ctx context.Context, _ *connect.Request[emptypb.Empty]) (*connect.Response[emptypb.Empty], error) { + mu.Lock() serverTraceID = trace.SpanContextFromContext(ctx).TraceID() + mu.Unlock() return connect.NewResponse(&emptypb.Empty{}), nil }, connect.WithInterceptors(serverInt), @@ -218,9 +223,13 @@ func TestTraceContextPropagation_NoTraceContext(t *testing.T) { span.End() require.NoError(t, err) - // With a no-op propagator, the client's trace context is not injected into - // headers. otelconnect still creates a server span, but it starts a new - // independent trace — the trace IDs must differ. + mu.Lock() + defer mu.Unlock() + + // otelconnect still creates a server span, so serverTraceID must be valid. + // But with a no-op propagator, the client's trace context is not injected + // into headers — the server starts a new independent trace. + require.True(t, serverTraceID.IsValid(), "server span should still be created") assert.NotEqual(t, clientTraceID, serverTraceID, "server should have a different trace ID when no propagator is configured") } From c19e4ca915e1e3ca2874ebef35217b39835acc35 Mon Sep 17 00:00:00 2001 From: Paul Flynn Date: Fri, 10 Apr 2026 15:25:14 -0400 Subject: [PATCH 6/6] fix(service): make ERS trace interceptor failure non-fatal Match the IPC path's error handling: log and continue without tracing rather than preventing the service from starting. Co-Authored-By: Claude Opus 4.6 (1M context) Signed-off-by: Paul Flynn --- service/pkg/server/start.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/service/pkg/server/start.go b/service/pkg/server/start.go index 2efb25879b..25e617fddb 100644 --- a/service/pkg/server/start.go +++ b/service/pkg/server/start.go @@ -373,11 +373,11 @@ func setupERSConnection(cfg *config.Config, oidcconfig *auth.OIDCConfiguration, ersConnectRPCConn := &sdk.ConnectRPCConnection{} // OTel tracing and metrics for outbound ERS Connect RPCs (outermost interceptor) - ersTraceInt, err := tracing.ConnectClientTraceInterceptor() - if err != nil { - return nil, fmt.Errorf("failed to create ERS trace interceptor: %w", err) + if ersTraceInt, err := tracing.ConnectClientTraceInterceptor(); err != nil { + logger.Error("failed to create ERS trace interceptor", slog.String("error", err.Error())) + } else { + ersConnectRPCConn.Options = append(ersConnectRPCConn.Options, connect.WithInterceptors(ersTraceInt)) } - ersConnectRPCConn.Options = append(ersConnectRPCConn.Options, connect.WithInterceptors(ersTraceInt)) // Configure TLS tlsConfig := configureTLSForERS(cfg, ersConnectRPCConn)