Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions service/authorization/authorization.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ import (
"github.com/opentdf/platform/service/pkg/config"
"github.com/opentdf/platform/service/pkg/db"
"github.com/opentdf/platform/service/pkg/serviceregistry"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -154,10 +152,6 @@ func (as AuthorizationService) IsReady(ctx context.Context) error {
}

func (as *AuthorizationService) GetDecisionsByToken(ctx context.Context, req *connect.Request[authorization.GetDecisionsByTokenRequest]) (*connect.Response[authorization.GetDecisionsByTokenResponse], error) {
// Extract trace context from the incoming request
propagator := otel.GetTextMapPropagator()
ctx = propagator.Extract(ctx, propagation.HeaderCarrier(req.Header()))

ctx, span := as.Start(ctx, "GetDecisionsByToken")
defer span.End()

Expand Down
18 changes: 0 additions & 18 deletions service/authorization/v2/authorization.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (
ctxAuth "github.com/opentdf/platform/service/pkg/auth"
"github.com/opentdf/platform/service/pkg/cache"
"github.com/opentdf/platform/service/pkg/serviceregistry"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
"google.golang.org/protobuf/types/known/wrapperspb"
)
Expand Down Expand Up @@ -141,10 +139,6 @@ func (as *Service) GetEntitlements(ctx context.Context, req *connect.Request[aut
ctx, span := as.Start(ctx, "GetEntitlements")
defer span.End()

// Extract trace context from the incoming request
propagator := otel.GetTextMapPropagator()
ctx = propagator.Extract(ctx, propagation.HeaderCarrier(req.Header()))

entityIdentifier := req.Msg.GetEntityIdentifier()
withComprehensiveHierarchy := req.Msg.GetWithComprehensiveHierarchy()

Expand Down Expand Up @@ -172,10 +166,6 @@ func (as *Service) GetDecision(ctx context.Context, req *connect.Request[authzV2
ctx, span := as.Start(ctx, "GetDecision")
defer span.End()

// Extract trace context from the incoming request
propagator := otel.GetTextMapPropagator()
ctx = propagator.Extract(ctx, propagation.HeaderCarrier(req.Header()))

pdp, err := access.NewJustInTimePDP(ctx, as.logger, as.sdk, as.cache, as.config.AllowDirectEntitlements)
if err != nil {
return nil, statusifyError(ctx, as.logger, errors.Join(ErrFailedToInitPDP, err))
Expand Down Expand Up @@ -222,10 +212,6 @@ func (as *Service) GetDecisionMultiResource(ctx context.Context, req *connect.Re
ctx, span := as.Start(ctx, "GetDecisionMultiResource")
defer span.End()

// Extract trace context from the incoming request
propagator := otel.GetTextMapPropagator()
ctx = propagator.Extract(ctx, propagation.HeaderCarrier(req.Header()))

pdp, err := access.NewJustInTimePDP(ctx, as.logger, as.sdk, as.cache, as.config.AllowDirectEntitlements)
if err != nil {
return nil, statusifyError(ctx, as.logger, errors.Join(ErrFailedToInitPDP, err))
Expand Down Expand Up @@ -275,10 +261,6 @@ func (as *Service) GetDecisionBulk(ctx context.Context, req *connect.Request[aut
ctx, span := as.Start(ctx, "GetDecisionBulk")
defer span.End()

// Extract trace context from the incoming request
propagator := otel.GetTextMapPropagator()
ctx = propagator.Extract(ctx, propagation.HeaderCarrier(req.Header()))

pdp, err := access.NewJustInTimePDP(ctx, as.logger, as.sdk, as.cache, as.config.AllowDirectEntitlements)
if err != nil {
return nil, statusifyError(ctx, as.logger, errors.Join(ErrFailedToInitPDP, err))
Expand Down
1 change: 1 addition & 0 deletions service/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
connectrpc.com/connect v1.19.1
connectrpc.com/grpchealth v1.4.0
connectrpc.com/grpcreflect v1.3.0
connectrpc.com/otelconnect v0.9.0
connectrpc.com/validate v0.6.0
github.com/Masterminds/squirrel v1.5.4
github.com/Nerzal/gocloak/v13 v13.9.0
Expand Down
2 changes: 2 additions & 0 deletions service/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ connectrpc.com/grpchealth v1.4.0 h1:MJC96JLelARPgZTiRF9KRfY/2N9OcoQvF2EWX07v2IE=
connectrpc.com/grpchealth v1.4.0/go.mod h1:WhW6m1EzTmq3Ky1FE8EfkIpSDc6TfUx2M2KqZO3ts/Q=
connectrpc.com/grpcreflect v1.3.0 h1:Y4V+ACf8/vOb1XOc251Qun7jMB75gCUNw6llvB9csXc=
connectrpc.com/grpcreflect v1.3.0/go.mod h1:nfloOtCS8VUQOQ1+GTdFzVg2CJo4ZGaat8JIovCtDYs=
connectrpc.com/otelconnect v0.9.0 h1:NggB3pzRC3pukQWaYbRHJulxuXvmCKCKkQ9hbrHAWoA=
connectrpc.com/otelconnect v0.9.0/go.mod h1:AEkVLjCPXra+ObGFCOClcJkNjS7zPaQSqvO0lCyjfZc=
connectrpc.com/validate v0.6.0 h1:DcrgDKt2ZScrUs/d/mh9itD2yeEa0UbBBa+i0mwzx+4=
connectrpc.com/validate v0.6.0/go.mod h1:ihrpI+8gVbLH1fvVWJL1I3j0CfWnF8P/90LsmluRiZs=
dario.cat/mergo v1.0.2 h1:85+piFYR1tMbRrLcDwR18y4UKJ3aH1Tbzi24VRW1TK8=
Expand Down
14 changes: 14 additions & 0 deletions service/internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,13 @@ func pprofHandler(h http.Handler) http.Handler {
func newConnectRPC(c Config, authInt connect.Interceptor, ints []connect.Interceptor, logger *logger.Logger) (*ConnectRPC, error) {
interceptors := make([]connect.HandlerOption, 0)

// OTel tracing and metrics for incoming Connect requests, before all other interceptors
serverTraceInt, err := tracing.ConnectServerTraceInterceptor()
if err != nil {
return nil, fmt.Errorf("failed to create server trace interceptor: %w", err)
}
interceptors = append(interceptors, connect.WithInterceptors(serverTraceInt))

if c.Auth.Enabled {
if authInt == nil {
return nil, errors.New("authentication enabled but no interceptor provided")
Expand Down Expand Up @@ -597,6 +604,13 @@ func (s OpenTDFServer) Stop() {
func (s inProcessServer) Conn() *sdk.ConnectRPCConnection {
var clientInterceptors []connect.Interceptor

// OTel tracing and metrics for outbound IPC Connect RPCs
if clientTraceInt, err := tracing.ConnectClientTraceInterceptor(); err != nil {
s.logger.Error("failed to create IPC client trace interceptor", slog.String("error", err.Error()))
} else {
clientInterceptors = append(clientInterceptors, clientTraceInt)
}

// Add audit interceptor
clientInterceptors = append(clientInterceptors, sdkAudit.MetadataAddingConnectInterceptor())

Expand Down
16 changes: 8 additions & 8 deletions service/internal/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,32 +554,32 @@ func TestNewConnectRPC(t *testing.T) {
authEnabled: true,
authInt: noopInterceptor(),
extraInts: []connect.Interceptor{noopInterceptor(), noopInterceptor()},
wantIntLen: 3,
wantDescription: "1 auth + 1 extras + 1 validation/audit",
wantIntLen: 4,
wantDescription: "1 trace + 1 auth + 1 extras + 1 validation/audit",
},
{
name: "auth enabled no extras",
authEnabled: true,
authInt: noopInterceptor(),
extraInts: nil,
wantIntLen: 2,
wantDescription: "1 auth + 1 validation/audit",
wantIntLen: 3,
wantDescription: "1 trace + 1 auth + 1 validation/audit",
},
{
name: "auth disabled no extras",
authEnabled: false,
authInt: nil,
extraInts: nil,
wantIntLen: 1,
wantDescription: "1 validation/audit only",
wantIntLen: 2,
wantDescription: "1 trace + 1 validation/audit only",
},
{
name: "auth disabled with extras",
authEnabled: false,
authInt: nil,
extraInts: []connect.Interceptor{noopInterceptor()},
wantIntLen: 2,
wantDescription: "1 extras + 1 validation/audit",
wantIntLen: 3,
wantDescription: "1 trace + 1 extras + 1 validation/audit",
},
{
name: "auth enabled but nil authInt returns error",
Expand Down
7 changes: 7 additions & 0 deletions service/pkg/server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,13 @@ func setupERSConnection(cfg *config.Config, oidcconfig *auth.OIDCConfiguration,

ersConnectRPCConn := &sdk.ConnectRPCConnection{}

// OTel tracing and metrics for outbound ERS Connect RPCs (outermost interceptor)
if ersTraceInt, err := tracing.ConnectClientTraceInterceptor(); err != nil {
logger.Error("failed to create ERS trace interceptor", slog.String("error", err.Error()))
} else {
ersConnectRPCConn.Options = append(ersConnectRPCConn.Options, connect.WithInterceptors(ersTraceInt))
}

// Configure TLS
tlsConfig := configureTLSForERS(cfg, ersConnectRPCConn)

Expand Down
29 changes: 29 additions & 0 deletions service/tracing/connect_interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package tracing

import (
"connectrpc.com/connect"
"connectrpc.com/otelconnect"
)

// ConnectClientTraceInterceptor returns a Connect interceptor backed by
// otelconnect that injects OpenTelemetry trace context into outbound requests
// and creates per-RPC spans and metrics.
func ConnectClientTraceInterceptor() (connect.Interceptor, error) {
return otelconnect.NewInterceptor(
otelconnect.WithoutTraceEvents(),
)
}

// ConnectServerTraceInterceptor returns a Connect interceptor backed by
// otelconnect that extracts OpenTelemetry trace context from incoming requests
// and creates per-RPC spans and metrics.
//
// WithTrustRemote makes server spans children of the incoming trace rather
// than linked root spans. WithoutServerPeerAttributes reduces cardinality.
func ConnectServerTraceInterceptor() (connect.Interceptor, error) {
return otelconnect.NewInterceptor(
otelconnect.WithTrustRemote(),
otelconnect.WithoutServerPeerAttributes(),
otelconnect.WithoutTraceEvents(),
)
}
Loading
Loading