From f8a0aba796f719a2a79d15e9677760a7445e3398 Mon Sep 17 00:00:00 2001 From: jeffyanta Date: Tue, 9 Dec 2025 14:00:13 -0500 Subject: [PATCH] Refactor metrics into a proper provider interface --- grpc/app/app.go | 46 +-- grpc/metrics/interceptor.go | 352 ++++++++++++++++++ grpc/metrics/new_relic_server_interceptor.go | 357 ------------------- metrics/constants.go | 3 +- metrics/events.go | 8 +- metrics/metrics.go | 14 +- metrics/newrelic/provider.go | 120 +++++++ metrics/noop/provider.go | 65 ++++ metrics/provider.go | 79 ++++ metrics/tracing.go | 24 +- ocp/transaction/nonce_pool.go | 13 +- ocp/transaction/nonce_pool_test.go | 3 +- ocp/worker/account/gift_card.go | 13 +- ocp/worker/currency/exchange_rate.go | 11 +- ocp/worker/currency/reserve.go | 11 +- ocp/worker/geyser/backup.go | 17 +- ocp/worker/geyser/consumer.go | 11 +- ocp/worker/nonce/allocator.go | 9 +- ocp/worker/nonce/keys.go | 19 +- ocp/worker/nonce/pool.go | 11 +- ocp/worker/sequencer/worker.go | 11 +- ocp/worker/swap/worker.go | 11 +- 22 files changed, 723 insertions(+), 485 deletions(-) create mode 100644 grpc/metrics/interceptor.go delete mode 100644 grpc/metrics/new_relic_server_interceptor.go create mode 100644 metrics/newrelic/provider.go create mode 100644 metrics/noop/provider.go create mode 100644 metrics/provider.go diff --git a/grpc/app/app.go b/grpc/app/app.go index 7c0d5c8..6aac7b1 100644 --- a/grpc/app/app.go +++ b/grpc/app/app.go @@ -23,17 +23,18 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/health" - healthgrpc "google.golang.org/grpc/health/grpc_health_v1" + health_grpc "google.golang.org/grpc/health/grpc_health_v1" "github.com/code-payments/ocp-server/grpc/client" "github.com/code-payments/ocp-server/grpc/headers" - "github.com/code-payments/ocp-server/grpc/metrics" + grpc_metrics "github.com/code-payments/ocp-server/grpc/metrics" "github.com/code-payments/ocp-server/grpc/protobuf/validation" + "github.com/code-payments/ocp-server/metrics" + newrelic_metrics "github.com/code-payments/ocp-server/metrics/newrelic" + noop_metrics "github.com/code-payments/ocp-server/metrics/noop" "github.com/code-payments/ocp-server/osutil" ) -// todo: Better metrics provider abstraction so we're not directly tied to NR - // App is a long lived application that services network requests. // It is expected that App's have gRPC services, but is not a hard requirement. // @@ -44,7 +45,7 @@ type App interface { // Init initializes the application in a blocking fashion. When Init returns, it // is expected that the application is ready to start receiving requests (provided // there are gRPC handlers installed). - Init(log *zap.Logger, metricsProvider *newrelic.Application, config Config) error + Init(log *zap.Logger, metricsProvider metrics.Provider, config Config) error // RegisterWithGRPC provides a mechanism for the application to register gRPC services // with the gRPC server. @@ -104,7 +105,7 @@ func Run(app App, options ...Option) error { log = zap.New(getLogCore(getLogLevel(config.LogLevel))) - var metricsProvider *newrelic.Application + var metricsProvider metrics.Provider if len(config.NewRelicLicenseKey) > 0 { nr, err := newrelic.NewApplication( newrelic.ConfigFromEnvironment(), @@ -118,14 +119,17 @@ func Run(app App, options ...Option) error { os.Exit(1) } - metricsProvider = nr + nrProvider := newrelic_metrics.NewProvider(nr) + metricsProvider = nrProvider - nrLogCore, err := nrzap.WrapBackgroundCore(getLogCore(getLogLevel(config.LogLevel)), metricsProvider) + nrLogCore, err := nrzap.WrapBackgroundCore(getLogCore(getLogLevel(config.LogLevel)), nrProvider.Application()) if err != nil { log.With(zap.Error(err)).Error("error wrapping logs with new relic") os.Exit(1) } log = zap.New(nrLogCore) + } else { + metricsProvider = noop_metrics.NewProvider() } if len(config.AppName) == 0 { @@ -227,33 +231,21 @@ func Run(app App, options ...Option) error { } } + // Metrics interceptor should be near the top of the chain, so we can + // capture as many calls as possible. However, it does need to be after + // headers since it relies on certain header values being present. defaultUnaryServerInterceptors := []grpc.UnaryServerInterceptor{ headers.UnaryServerInterceptor(), + grpc_metrics.UnaryServerInterceptor(metricsProvider), validation.UnaryServerInterceptor(log), client.MinVersionUnaryServerInterceptor(), } defaultStreamServerInterceptors := []grpc.StreamServerInterceptor{ headers.StreamServerInterceptor(), + grpc_metrics.StreamServerInterceptor(metricsProvider), validation.StreamServerInterceptor(log), client.MinVersionStreamServerInterceptor(), } - if metricsProvider != nil { - // Metrics interceptor should be near the top of the chain, so we can - // capture as many calls as possible. However, it does need to be after - // headers since it relies on certain header values being present. - defaultUnaryServerInterceptors = []grpc.UnaryServerInterceptor{ - headers.UnaryServerInterceptor(), - metrics.CustomNewRelicUnaryServerInterceptor(metricsProvider), - validation.UnaryServerInterceptor(log), - client.MinVersionUnaryServerInterceptor(), - } - defaultStreamServerInterceptors = []grpc.StreamServerInterceptor{ - headers.StreamServerInterceptor(), - metrics.CustomNewRelicStreamServerInterceptor(metricsProvider), - validation.StreamServerInterceptor(log), - client.MinVersionStreamServerInterceptor(), - } - } opts := opts{ unaryServerInterceptors: defaultUnaryServerInterceptors, @@ -280,8 +272,8 @@ func Run(app App, options ...Option) error { app.RegisterWithGRPC(secureServ) app.RegisterWithGRPC(insecureServ) - healthgrpc.RegisterHealthServer(secureServ, health.NewServer()) - healthgrpc.RegisterHealthServer(insecureServ, health.NewServer()) + health_grpc.RegisterHealthServer(secureServ, health.NewServer()) + health_grpc.RegisterHealthServer(insecureServ, health.NewServer()) secureServShutdownCh := make(chan struct{}) inssecureServShutdownCh := make(chan struct{}) diff --git a/grpc/metrics/interceptor.go b/grpc/metrics/interceptor.go new file mode 100644 index 0000000..e81cbbd --- /dev/null +++ b/grpc/metrics/interceptor.go @@ -0,0 +1,352 @@ +package metrics + +import ( + "context" + "fmt" + "net/http" + "net/url" + "strings" + + grpc_core "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/reflect/protoreflect" + + "github.com/code-payments/ocp-server/grpc" + "github.com/code-payments/ocp-server/grpc/client" + "github.com/code-payments/ocp-server/metrics" +) + +const ( + grpcRequestPackageAttributeKey = "grpc.request.package" + grpcRequestServiceAttributeKey = "grpc.request.service" + grpcRequestMethodAttributeKey = "grpc.request.method" + + grpcResponseStatusCodeAttributeKey = "grpc.response.statusCode" + grpcResponseStatusMessageAttributeKey = "grpc.response.statusMessage" + grpcResponseStatusCodeLevelAttributeKey = "grpc.response.statusCodeLevel" + + resultCodeAttributeKey = "code.response.resultCode" + resultCodeLevelAttributeKey = "code.response.resultCodeLevel" + + clientUserAgentAttributeKey = "grpc.client.userAgent" + + infoLevel = "info" + warningLevel = "warning" + errorLevel = "error" +) + +type traceStatusCodeHandler func(metrics.Trace, *status.Status) +type traceResultCodeHandler func(metrics.Trace, string) + +var ( + traceStatusCodeHandlers = map[codes.Code]traceStatusCodeHandler{ + codes.OK: infoTraceStatusCodeHandler, + codes.Aborted: infoTraceStatusCodeHandler, + codes.AlreadyExists: infoTraceStatusCodeHandler, + codes.Canceled: infoTraceStatusCodeHandler, + codes.DataLoss: infoTraceStatusCodeHandler, + codes.DeadlineExceeded: infoTraceStatusCodeHandler, + codes.FailedPrecondition: infoTraceStatusCodeHandler, + codes.InvalidArgument: infoTraceStatusCodeHandler, + codes.NotFound: infoTraceStatusCodeHandler, + codes.OutOfRange: infoTraceStatusCodeHandler, + codes.PermissionDenied: infoTraceStatusCodeHandler, + codes.ResourceExhausted: infoTraceStatusCodeHandler, + codes.Unauthenticated: infoTraceStatusCodeHandler, + codes.Unimplemented: infoTraceStatusCodeHandler, + + codes.Internal: warningTraceStatusCodeHandler, + codes.Unavailable: warningTraceStatusCodeHandler, + codes.Unknown: warningTraceStatusCodeHandler, + } + defaultTraceStatusCodeHandler = infoTraceStatusCodeHandler + + traceResultCodeHandlers = map[string]traceResultCodeHandler{ + "OK": infoTraceResultCodeHandler, + "NOT_FOUND": infoTraceResultCodeHandler, + + "DENIED": warningTraceResultCodeHandler, + } + defaultTraceResultCodeHandler = infoTraceResultCodeHandler +) + +func infoTraceStatusCodeHandler(trace metrics.Trace, s *status.Status) { + trace.SetResponse(nil).WriteHeader(int(s.Code())) + trace.AddAttribute(grpcResponseStatusCodeAttributeKey, s.Code().String()) + trace.AddAttribute(grpcResponseStatusMessageAttributeKey, s.Message()) + trace.AddAttribute(grpcResponseStatusCodeLevelAttributeKey, infoLevel) +} + +func warningTraceStatusCodeHandler(trace metrics.Trace, s *status.Status) { + trace.SetResponse(nil).WriteHeader(int(s.Code())) + trace.AddAttribute(grpcResponseStatusCodeAttributeKey, s.Code().String()) + trace.AddAttribute(grpcResponseStatusMessageAttributeKey, s.Message()) + trace.AddAttribute(grpcResponseStatusCodeLevelAttributeKey, warningLevel) +} + +func errorTraceStatusCodeHandler(trace metrics.Trace, s *status.Status) { + trace.SetResponse(nil).WriteHeader(int(s.Code())) + trace.AddAttribute(grpcResponseStatusCodeAttributeKey, s.Code().String()) + trace.AddAttribute(grpcResponseStatusMessageAttributeKey, s.Message()) + trace.AddAttribute(grpcResponseStatusCodeLevelAttributeKey, errorLevel) + trace.OnError(fmt.Errorf("gRPC Status: %s - %s", s.Code().String(), s.Message())) +} + +func infoTraceResultCodeHandler(trace metrics.Trace, resultCode string) { + trace.AddAttribute(resultCodeAttributeKey, resultCode) + trace.AddAttribute(resultCodeLevelAttributeKey, infoLevel) +} + +func warningTraceResultCodeHandler(trace metrics.Trace, resultCode string) { + trace.AddAttribute(resultCodeAttributeKey, resultCode) + trace.AddAttribute(resultCodeLevelAttributeKey, warningLevel) +} + +func errorTraceResultCodeHandler(trace metrics.Trace, resultCode string) { + trace.AddAttribute(resultCodeAttributeKey, resultCode) + trace.AddAttribute(resultCodeLevelAttributeKey, errorLevel) + trace.OnError(fmt.Errorf("Code RPC Result: %s", resultCode)) +} + +// UnaryServerInterceptor creates a unary server interceptor that uses the +// generic metrics.Provider interface. +func UnaryServerInterceptor(provider metrics.Provider) grpc_core.UnaryServerInterceptor { + if provider == nil { + return func(ctx context.Context, req interface{}, info *grpc_core.UnaryServerInfo, handler grpc_core.UnaryHandler) (interface{}, error) { + return handler(ctx, req) + } + } + + return func(ctx context.Context, req interface{}, info *grpc_core.UnaryServerInfo, handler grpc_core.UnaryHandler) (interface{}, error) { + // Inject the provider to allow for any custom metrics, events, etc + // in downstream code. + ctx = context.WithValue(ctx, metrics.ProviderContextKey, provider) + + trace := startProviderTrace(ctx, provider, info.FullMethod) + defer trace.End() + + ctx = metrics.NewContext(ctx, trace) + + includeParsedFullMethodName(trace, info.FullMethod) + includeClientMetadata(ctx, trace) + + resp, err := handler(ctx, req) + includeGRPCStatusCode(trace, err) + if err != nil { + return nil, err + } + + reflected := resp.(proto.Message).ProtoReflect() + includeCodeResultCodeForUnaryCall(trace, reflected) + + return resp, nil + } +} + +// StreamServerInterceptor creates a stream server interceptor that uses the +// generic metrics.Provider interface. +func StreamServerInterceptor(provider metrics.Provider) grpc_core.StreamServerInterceptor { + if provider == nil { + return func(srv interface{}, ss grpc_core.ServerStream, info *grpc_core.StreamServerInfo, handler grpc_core.StreamHandler) error { + return handler(srv, ss) + } + } + + return func(srv interface{}, ss grpc_core.ServerStream, info *grpc_core.StreamServerInfo, handler grpc_core.StreamHandler) error { + // Inject the provider to allow for any custom metrics, events, etc + // in downstream code. + ctx := context.WithValue(ss.Context(), metrics.ProviderContextKey, provider) + + trace := startProviderTrace(ctx, provider, info.FullMethod) + defer trace.End() + + ctx = metrics.NewContext(ctx, trace) + + includeParsedFullMethodName(trace, info.FullMethod) + includeClientMetadata(ctx, trace) + + err := handler(srv, newWrappedStream(ctx, trace, ss)) + includeGRPCStatusCode(trace, err) + return err + } +} + +type wrappedStream struct { + ctx context.Context + trace metrics.Trace + grpc_core.ServerStream +} + +func (w *wrappedStream) Context() context.Context { + return w.ctx +} + +func (w *wrappedStream) SendMsg(m interface{}) error { + reflected := m.(proto.Message).ProtoReflect() + includeOcpResultCodeForServerStreamCall(w.trace, reflected) + return w.ServerStream.SendMsg(m) +} + +func newWrappedStream(ctx context.Context, trace metrics.Trace, wrapped grpc_core.ServerStream) grpc_core.ServerStream { + return &wrappedStream{ctx, trace, wrapped} +} + +func startProviderTrace(ctx context.Context, provider metrics.Provider, fullMethod string) metrics.Trace { + method := strings.TrimPrefix(fullMethod, "/") + + // todo: we may not want to include all headers, especially if they contain + // sensitive information + var hdrs http.Header + if md, ok := metadata.FromIncomingContext(ctx); ok { + hdrs = make(http.Header, len(md)) + for k, vs := range md { + for _, v := range vs { + hdrs.Add(k, v) + } + } + } + + target := hdrs.Get(":authority") + u := getURL(method, target) + + trace := provider.StartTrace(method) + trace.SetRequest(metrics.Request{ + Header: hdrs, + URL: u, + Method: method, + Transport: "HTTP", + }) + + return trace +} + +func includeGRPCStatusCode(trace metrics.Trace, err error) { + grpcStatus := status.Convert(err) + handler, ok := traceStatusCodeHandlers[grpcStatus.Code()] + if !ok { + handler = defaultTraceStatusCodeHandler + } + handler(trace, grpcStatus) +} + +func includeCodeResultCodeForUnaryCall(trace metrics.Trace, reflected protoreflect.Message) { + // Check whether the response message has an enum called Result + resultEnumDescriptor := reflected.Descriptor().Enums().ByName("Result") + if resultEnumDescriptor == nil { + return + } + + // Check whether the response message has a field called result + resultFieldDescriptor := reflected.Descriptor().Fields().ByName("result") + if resultFieldDescriptor == nil { + return + } + + // This is the only sketchy part of the implementation. It'll panic if + // the field isn't an enum. It seems unlikely, because we've already + // determined an enum named Result exists, so we'd expect a reasonable + // field name of result. + resultEnumNumber := reflected.Get(resultFieldDescriptor).Enum() + + resultEnum := resultEnumDescriptor.Values().ByNumber(resultEnumNumber) + if resultEnum == nil { + return + } + + // Augment the trace + resultCode := strings.ToUpper(string(resultEnum.Name())) + handler, ok := traceResultCodeHandlers[resultCode] + if !ok { + defaultTraceResultCodeHandler(trace, resultCode) + } else { + handler(trace, resultCode) + } +} + +func includeOcpResultCodeForServerStreamCall(trace metrics.Trace, reflected protoreflect.Message) { + // Check whether the response message has a field set called success or error + var respMessage protoreflect.Message + resultMessageFieldDescriptor := reflected.Descriptor().Fields().ByName("success") + if resultMessageFieldDescriptor != nil { + respMessage = reflected.Get(resultMessageFieldDescriptor).Message() + } + if respMessage == nil || !respMessage.IsValid() { + resultMessageFieldDescriptor := reflected.Descriptor().Fields().ByName("error") + if resultMessageFieldDescriptor != nil { + respMessage = reflected.Get(resultMessageFieldDescriptor).Message() + } + } + if respMessage == nil || !respMessage.IsValid() { + return + } + + // Check whether the response message has an enum called Code + resultEnumDescriptor := respMessage.Descriptor().Enums().ByName("Code") + if resultEnumDescriptor == nil { + return + } + + // Check whether the response message has a field called code + resultEnumFieldDescriptor := respMessage.Descriptor().Fields().ByName("code") + if resultEnumFieldDescriptor == nil { + return + } + + // This is the only sketchy part of the implementation. It'll panic if + // the field isn't an enum. It seems unlikely, because we've already + // determined an enum named Result exists, so we'd expect a reasonable + // field name of result. + resultEnumNumber := respMessage.Get(resultEnumFieldDescriptor).Enum() + + resultEnum := resultEnumDescriptor.Values().ByNumber(resultEnumNumber) + if resultEnum == nil { + return + } + + // Augment the trace + resultCode := strings.ToUpper(string(resultEnum.Name())) + handler, ok := traceResultCodeHandlers[resultCode] + if !ok { + defaultTraceResultCodeHandler(trace, resultCode) + } else { + handler(trace, resultCode) + } +} + +func includeParsedFullMethodName(trace metrics.Trace, fullMethodName string) { + packageName, serviceName, methodName, err := grpc.ParseFullMethodName(fullMethodName) + if err != nil { + return + } + + trace.AddAttribute(grpcRequestPackageAttributeKey, packageName) + trace.AddAttribute(grpcRequestServiceAttributeKey, serviceName) + trace.AddAttribute(grpcRequestMethodAttributeKey, methodName) +} + +func includeClientMetadata(ctx context.Context, trace metrics.Trace) { + userAgent, err := client.GetUserAgent(ctx) + if err == nil { + trace.AddAttribute(clientUserAgentAttributeKey, userAgent.String()) + } +} + +func getURL(method, target string) *url.URL { + var host string + // target can be anything from + // https://github.com/grpc/grpc/blob/master/doc/naming.md + // see https://godoc.org/google.golang.org/grpc#DialContext + if strings.HasPrefix(target, "unix:") { + host = "localhost" + } else { + host = strings.TrimPrefix(target, "dns:///") + } + return &url.URL{ + Scheme: "grpc", + Host: host, + Path: method, + } +} diff --git a/grpc/metrics/new_relic_server_interceptor.go b/grpc/metrics/new_relic_server_interceptor.go deleted file mode 100644 index 019e9db..0000000 --- a/grpc/metrics/new_relic_server_interceptor.go +++ /dev/null @@ -1,357 +0,0 @@ -package metrics - -import ( - "context" - "net/http" - "net/url" - "strings" - - "github.com/newrelic/go-agent/v3/newrelic" - grpc_core "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/metadata" - "google.golang.org/grpc/status" - "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/reflect/protoreflect" - - "github.com/code-payments/ocp-server/grpc" - "github.com/code-payments/ocp-server/grpc/client" - "github.com/code-payments/ocp-server/metrics" -) - -type statusCodeHandler func(*newrelic.Transaction, *status.Status) -type resultCodeHandler func(*newrelic.Transaction, string) - -const ( - grpcRequestPackageAttributeKey = "grpc.request.package" - grpcRequestServiceAttributeKey = "grpc.request.service" - grpcRequestMethodAttributeKey = "grpc.request.method" - - grpcResponseStatusCodeAttributeKey = "grpc.response.statusCode" - grpcResponseStatusMessageAttributeKey = "grpc.response.statusMessage" - grpcResponseStatusCodeLevelAttributeKey = "grpc.response.statusCodeLevel" - - resultCodeAttributeKey = "code.response.resultCode" - resultCodeLevelAttributeKey = "code.response.resultCodeLevel" - - clientUserAgentAttributeKey = "grpc.client.userAgent" - - infoLevel = "info" - warningLevel = "warning" - errorLevel = "error" -) - -var ( - statusCodeHandlers = map[codes.Code]statusCodeHandler{ - codes.OK: infoStatusCodeHandler, - codes.Aborted: infoStatusCodeHandler, - codes.AlreadyExists: infoStatusCodeHandler, - codes.Canceled: infoStatusCodeHandler, - codes.DataLoss: infoStatusCodeHandler, - codes.DeadlineExceeded: infoStatusCodeHandler, - codes.FailedPrecondition: infoStatusCodeHandler, - codes.InvalidArgument: infoStatusCodeHandler, - codes.NotFound: infoStatusCodeHandler, - codes.OutOfRange: infoStatusCodeHandler, - codes.PermissionDenied: infoStatusCodeHandler, - codes.ResourceExhausted: infoStatusCodeHandler, - codes.Unauthenticated: infoStatusCodeHandler, - codes.Unimplemented: infoStatusCodeHandler, - - codes.Internal: warningStatusCodeHandler, - codes.Unavailable: warningStatusCodeHandler, - codes.Unknown: warningStatusCodeHandler, - } - defaultStatusCodeHandler = infoStatusCodeHandler - - resultCodeHandlers = map[string]resultCodeHandler{ - "OK": infoResultCodeHandler, - "NOT_FOUND": infoResultCodeHandler, - - "DENIED": warningResultCodeHandler, - } - defaultResultCodeHandler = infoResultCodeHandler -) - -func infoStatusCodeHandler(m *newrelic.Transaction, s *status.Status) { - m.SetWebResponse(nil).WriteHeader(int(codes.OK)) - m.AddAttribute(grpcResponseStatusCodeAttributeKey, s.Code().String()) - m.AddAttribute(grpcResponseStatusMessageAttributeKey, s.Message()) - m.AddAttribute(grpcResponseStatusCodeLevelAttributeKey, infoLevel) -} - -func warningStatusCodeHandler(m *newrelic.Transaction, s *status.Status) { - m.SetWebResponse(nil).WriteHeader(int(codes.OK)) - m.AddAttribute(grpcResponseStatusCodeAttributeKey, s.Code().String()) - m.AddAttribute(grpcResponseStatusMessageAttributeKey, s.Message()) - m.AddAttribute(grpcResponseStatusCodeLevelAttributeKey, warningLevel) -} - -func errorStatusCodeHandler(m *newrelic.Transaction, s *status.Status) { - m.SetWebResponse(nil).WriteHeader(int(codes.OK)) - m.AddAttribute(grpcResponseStatusCodeAttributeKey, s.Code().String()) - m.AddAttribute(grpcResponseStatusMessageAttributeKey, s.Message()) - m.AddAttribute(grpcResponseStatusCodeLevelAttributeKey, errorLevel) - m.NoticeError(&newrelic.Error{ - Message: s.Message(), - Class: "gRPC Status: " + s.Code().String(), - }) -} - -func infoResultCodeHandler(m *newrelic.Transaction, resultCode string) { - m.AddAttribute(resultCodeAttributeKey, resultCode) - m.AddAttribute(resultCodeLevelAttributeKey, infoLevel) -} - -func warningResultCodeHandler(m *newrelic.Transaction, resultCode string) { - m.AddAttribute(resultCodeAttributeKey, resultCode) - m.AddAttribute(resultCodeLevelAttributeKey, warningLevel) -} - -func errorResultCodeHandler(m *newrelic.Transaction, resultCode string) { - m.AddAttribute(resultCodeAttributeKey, resultCode) - m.AddAttribute(resultCodeLevelAttributeKey, errorLevel) - m.NoticeError(&newrelic.Error{ - Class: "Code RPC Result: " + resultCode, - }) -} - -// CustomNewRelicUnaryServerInterceptor is a custom implementation of the New -// Relic unary interceptor. -func CustomNewRelicUnaryServerInterceptor(app *newrelic.Application) grpc_core.UnaryServerInterceptor { - if app == nil { - return func(ctx context.Context, req interface{}, info *grpc_core.UnaryServerInfo, handler grpc_core.UnaryHandler) (interface{}, error) { - return handler(ctx, req) - } - } - - return func(ctx context.Context, req interface{}, info *grpc_core.UnaryServerInfo, handler grpc_core.UnaryHandler) (interface{}, error) { - // Inject the application to allow for any custom metrics, events, etc - // in downstream code. - ctx = context.WithValue(ctx, metrics.NewRelicContextKey, app) - - m := startTransaction(ctx, app, info.FullMethod) - defer m.End() - - ctx = newrelic.NewContext(ctx, m) - - includeParsedFullMethodName(m, info.FullMethod) - includeClientMetadata(ctx, m) - - resp, err := handler(ctx, req) - includeGRPCStatusCode(m, err) - if err != nil { - return nil, err - } - - reflected := resp.(proto.Message).ProtoReflect() - includeCodeResultCodeForUnaryCall(m, reflected) - - return resp, nil - } -} - -func CustomNewRelicStreamServerInterceptor(app *newrelic.Application) grpc_core.StreamServerInterceptor { - if app == nil { - return func(srv interface{}, ss grpc_core.ServerStream, info *grpc_core.StreamServerInfo, handler grpc_core.StreamHandler) error { - return handler(srv, ss) - } - } - - return func(srv interface{}, ss grpc_core.ServerStream, info *grpc_core.StreamServerInfo, handler grpc_core.StreamHandler) error { - // Inject the application to allow for any custom metrics, events, etc - // in downstream code. - ctx := context.WithValue(ss.Context(), metrics.NewRelicContextKey, app) - - m := startTransaction(ctx, app, info.FullMethod) - defer m.End() - - ctx = newrelic.NewContext(ctx, m) - - includeParsedFullMethodName(m, info.FullMethod) - includeClientMetadata(ctx, m) - - err := handler(srv, newWrappedStream(ctx, m, ss)) - includeGRPCStatusCode(m, err) - return err - } -} - -type wrappedStream struct { - ctx context.Context - txn *newrelic.Transaction - grpc_core.ServerStream -} - -func (w *wrappedStream) Context() context.Context { - return w.ctx -} - -func (w *wrappedStream) SendMsg(m interface{}) error { - reflected := m.(proto.Message).ProtoReflect() - includeCodeResultCodeForServerStreamCall(w.txn, reflected) - return w.ServerStream.SendMsg(m) -} - -func newWrappedStream(ctx context.Context, m *newrelic.Transaction, wrapped grpc_core.ServerStream) grpc_core.ServerStream { - return &wrappedStream{ctx, m, wrapped} -} - -func startTransaction(ctx context.Context, app *newrelic.Application, fullMethod string) *newrelic.Transaction { - method := strings.TrimPrefix(fullMethod, "/") - - // todo: we may not want to include all headers, especially if they contain - // sensitive information - var hdrs http.Header - if md, ok := metadata.FromIncomingContext(ctx); ok { - hdrs = make(http.Header, len(md)) - for k, vs := range md { - for _, v := range vs { - hdrs.Add(k, v) - } - } - } - - target := hdrs.Get(":authority") - url := getURL(method, target) - - webReq := newrelic.WebRequest{ - Header: hdrs, - URL: url, - Method: method, - Transport: newrelic.TransportHTTP, - } - txn := app.StartTransaction(method) - txn.SetWebRequest(webReq) - - return txn -} - -func getURL(method, target string) *url.URL { - var host string - // target can be anything from - // https://github.com/grpc/grpc/blob/master/doc/naming.md - // see https://godoc.org/google.golang.org/grpc#DialContext - if strings.HasPrefix(target, "unix:") { - host = "localhost" - } else { - host = strings.TrimPrefix(target, "dns:///") - } - return &url.URL{ - Scheme: "grpc", - Host: host, - Path: method, - } -} - -func includeGRPCStatusCode(m *newrelic.Transaction, err error) { - grpcStatus := status.Convert(err) - handler, ok := statusCodeHandlers[grpcStatus.Code()] - if !ok { - handler = defaultStatusCodeHandler - } - handler(m, grpcStatus) -} - -func includeCodeResultCodeForUnaryCall(m *newrelic.Transaction, reflected protoreflect.Message) { - // Check whether the response message has an enum called Result - resultEnumDescriptor := reflected.Descriptor().Enums().ByName("Result") - if resultEnumDescriptor == nil { - return - } - - // Check whether the response message has a field called result - resultFieldDescriptor := reflected.Descriptor().Fields().ByName("result") - if resultFieldDescriptor == nil { - return - } - - // This is the only sketchy part of the implementation. It'll panic if - // the field isn't an enum. It seems unlikely, because we've already - // determined an enum named Result exists, so we'd expect a reasonable - // field name of result. - resultEnumNumber := reflected.Get(resultFieldDescriptor).Enum() - - resultEnum := resultEnumDescriptor.Values().ByNumber(resultEnumNumber) - if resultEnum == nil { - return - } - - // Augment the transaction - resultCode := strings.ToUpper(string(resultEnum.Name())) - handler, ok := resultCodeHandlers[resultCode] - if !ok { - defaultResultCodeHandler(m, resultCode) - } else { - handler(m, resultCode) - } -} - -// todo: currently assumes SubmitIntent standards -func includeCodeResultCodeForServerStreamCall(m *newrelic.Transaction, reflected protoreflect.Message) { - // Check whether the response message has a field set called success or error - var respMessage protoreflect.Message - resultMessageFieldDescriptor := reflected.Descriptor().Fields().ByName("success") - if resultMessageFieldDescriptor != nil { - respMessage = reflected.Get(resultMessageFieldDescriptor).Message() - } - if respMessage == nil || !respMessage.IsValid() { - resultMessageFieldDescriptor := reflected.Descriptor().Fields().ByName("error") - if resultMessageFieldDescriptor != nil { - respMessage = reflected.Get(resultMessageFieldDescriptor).Message() - } - } - if respMessage == nil || !respMessage.IsValid() { - return - } - - // Check whether the response message has an enum called Code - resultEnumDescriptor := respMessage.Descriptor().Enums().ByName("Code") - if resultEnumDescriptor == nil { - return - } - - // Check whether the response message has a field called code - resultEnumFieldDescriptor := respMessage.Descriptor().Fields().ByName("code") - if resultEnumFieldDescriptor == nil { - return - } - - // This is the only sketchy part of the implementation. It'll panic if - // the field isn't an enum. It seems unlikely, because we've already - // determined an enum named Result exists, so we'd expect a reasonable - // field name of result. - resultEnumNumber := respMessage.Get(resultEnumFieldDescriptor).Enum() - - resultEnum := resultEnumDescriptor.Values().ByNumber(resultEnumNumber) - if resultEnum == nil { - return - } - - // Augment the transaction - resultCode := strings.ToUpper(string(resultEnum.Name())) - handler, ok := resultCodeHandlers[resultCode] - if !ok { - defaultResultCodeHandler(m, resultCode) - } else { - handler(m, resultCode) - } -} - -func includeParsedFullMethodName(m *newrelic.Transaction, fullMethodName string) { - packageName, serviceName, methodName, err := grpc.ParseFullMethodName(fullMethodName) - if err != nil { - return - } - - m.AddAttribute(grpcRequestPackageAttributeKey, packageName) - m.AddAttribute(grpcRequestServiceAttributeKey, serviceName) - m.AddAttribute(grpcRequestMethodAttributeKey, methodName) -} - -func includeClientMetadata(ctx context.Context, m *newrelic.Transaction) { - userAgent, err := client.GetUserAgent(ctx) - if err == nil { - m.AddAttribute(clientUserAgentAttributeKey, userAgent.String()) - } -} diff --git a/metrics/constants.go b/metrics/constants.go index 3cb2522..78eb55d 100644 --- a/metrics/constants.go +++ b/metrics/constants.go @@ -1,5 +1,6 @@ package metrics const ( - NewRelicContextKey = "newrelic_context" + // ProviderContextKey is the context key used to store the metrics Provider + ProviderContextKey = "metrics_provider_context" ) diff --git a/metrics/events.go b/metrics/events.go index a7fce39..677f0a7 100644 --- a/metrics/events.go +++ b/metrics/events.go @@ -2,14 +2,12 @@ package metrics import ( "context" - - "github.com/newrelic/go-agent/v3/newrelic" ) // RecordEvent records a new event with a name and set of key-value pairs func RecordEvent(ctx context.Context, eventName string, kvPairs map[string]interface{}) { - nr, ok := ctx.Value(NewRelicContextKey).(*newrelic.Application) - if ok { - nr.RecordCustomEvent(eventName, kvPairs) + provider, ok := ctx.Value(ProviderContextKey).(Provider) + if ok && provider != nil { + provider.RecordEvent(eventName, kvPairs) } } diff --git a/metrics/metrics.go b/metrics/metrics.go index 3ad8a2a..17a09e5 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -3,22 +3,20 @@ package metrics import ( "context" "time" - - "github.com/newrelic/go-agent/v3/newrelic" ) // RecordCount records a count metric func RecordCount(ctx context.Context, metricName string, count uint64) { - nr, ok := ctx.Value(NewRelicContextKey).(*newrelic.Application) - if ok { - nr.RecordCustomMetric(metricName, float64(count)) + provider, ok := ctx.Value(ProviderContextKey).(Provider) + if ok && provider != nil { + provider.RecordCount(metricName, count) } } // RecordDuration records a duration metric func RecordDuration(ctx context.Context, metricName string, duration time.Duration) { - nr, ok := ctx.Value(NewRelicContextKey).(*newrelic.Application) - if ok { - nr.RecordCustomMetric(metricName, float64(duration/time.Millisecond)) + provider, ok := ctx.Value(ProviderContextKey).(Provider) + if ok && provider != nil { + provider.RecordDuration(metricName, duration) } } diff --git a/metrics/newrelic/provider.go b/metrics/newrelic/provider.go new file mode 100644 index 0000000..9f952c7 --- /dev/null +++ b/metrics/newrelic/provider.go @@ -0,0 +1,120 @@ +package newrelic + +import ( + "net/http" + "net/url" + "time" + + "github.com/newrelic/go-agent/v3/newrelic" + + "github.com/code-payments/ocp-server/metrics" +) + +// Provider wraps a New Relic application to implement the metrics.Provider interface +type Provider struct { + app *newrelic.Application +} + +// NewProvider creates a new New Relic metrics provider +func NewProvider(app *newrelic.Application) *Provider { + return &Provider{app: app} +} + +// Application returns the underlying New Relic application for cases where +// direct access is needed (e.g., log integration) +func (p *Provider) Application() *newrelic.Application { + return p.app +} + +// StartTrace starts a new trace +func (p *Provider) StartTrace(name string) metrics.Trace { + return &Trace{txn: p.app.StartTransaction(name)} +} + +// RecordEvent records a custom event with key-value attributes +func (p *Provider) RecordEvent(eventName string, attributes map[string]interface{}) { + p.app.RecordCustomEvent(eventName, attributes) +} + +// RecordCount records a count metric +func (p *Provider) RecordCount(metricName string, count uint64) { + p.app.RecordCustomMetric(metricName, float64(count)) +} + +// RecordDuration records a duration metric +func (p *Provider) RecordDuration(metricName string, duration time.Duration) { + p.app.RecordCustomMetric(metricName, float64(duration/time.Millisecond)) +} + +// Trace wraps a New Relic transaction +type Trace struct { + txn *newrelic.Transaction +} + +// StartSpan starts a new span within the trace +func (t *Trace) StartSpan(name string) metrics.Span { + return &Span{seg: t.txn.StartSegment(name)} +} + +// AddAttribute adds a key-value attribute to the trace +func (t *Trace) AddAttribute(key string, value interface{}) { + t.txn.AddAttribute(key, value) +} + +// OnError records an error on the trace +func (t *Trace) OnError(err error) { + t.txn.NoticeError(err) +} + +// SetRequest sets HTTP request information on the trace +func (t *Trace) SetRequest(r metrics.Request) { + var u *url.URL + if r.URL != nil { + u = r.URL.(*url.URL) + } + + transport := newrelic.TransportHTTP + switch r.Transport { + case "HTTP": + transport = newrelic.TransportHTTP + case "HTTPS": + transport = newrelic.TransportHTTPS + } + + t.txn.SetWebRequest(newrelic.WebRequest{ + Header: r.Header, + URL: u, + Method: r.Method, + Transport: transport, + }) +} + +// SetResponse sets the HTTP response writer for the trace +func (t *Trace) SetResponse(w http.ResponseWriter) http.ResponseWriter { + return t.txn.SetWebResponse(w) +} + +// End completes the trace +func (t *Trace) End() { + t.txn.End() +} + +// Unwrap returns the underlying New Relic transaction for advanced use cases +func (t *Trace) Unwrap() *newrelic.Transaction { + return t.txn +} + +// Span wraps a New Relic segment +type Span struct { + seg *newrelic.Segment +} + +// AddAttribute adds a key-value attribute to the span +func (s *Span) AddAttribute(key string, value interface{}) { + s.seg.AddAttribute(key, value) +} + +// End completes the span +func (s *Span) End() { + s.seg.End() +} diff --git a/metrics/noop/provider.go b/metrics/noop/provider.go new file mode 100644 index 0000000..28ec3d0 --- /dev/null +++ b/metrics/noop/provider.go @@ -0,0 +1,65 @@ +package noop + +import ( + "net/http" + "time" + + "github.com/code-payments/ocp-server/metrics" +) + +// Provider is a no-op metrics provider that discards all metrics. +// Use this when metrics collection is disabled. +type Provider struct{} + +// NewProvider creates a new no-op metrics provider +func NewProvider() *Provider { + return &Provider{} +} + +// StartTrace returns a no-op trace +func (p *Provider) StartTrace(name string) metrics.Trace { + return &Trace{} +} + +// RecordEvent is a no-op +func (p *Provider) RecordEvent(eventName string, attributes map[string]interface{}) {} + +// RecordCount is a no-op +func (p *Provider) RecordCount(metricName string, count uint64) {} + +// RecordDuration is a no-op +func (p *Provider) RecordDuration(metricName string, duration time.Duration) {} + +// Trace is a no-op trace +type Trace struct{} + +// StartSpan returns a no-op span +func (t *Trace) StartSpan(name string) metrics.Span { + return &Span{} +} + +// AddAttribute is a no-op +func (t *Trace) AddAttribute(key string, value interface{}) {} + +// OnError is a no-op +func (t *Trace) OnError(err error) {} + +// SetRequest is a no-op +func (t *Trace) SetRequest(r metrics.Request) {} + +// SetResponse returns the writer unchanged +func (t *Trace) SetResponse(w http.ResponseWriter) http.ResponseWriter { + return w +} + +// End is a no-op +func (t *Trace) End() {} + +// Span is a no-op span +type Span struct{} + +// AddAttribute is a no-op +func (s *Span) AddAttribute(key string, value interface{}) {} + +// End is a no-op +func (s *Span) End() {} diff --git a/metrics/provider.go b/metrics/provider.go new file mode 100644 index 0000000..adeece0 --- /dev/null +++ b/metrics/provider.go @@ -0,0 +1,79 @@ +package metrics + +import ( + "context" + "net/http" + "time" +) + +// Provider defines an abstract metrics provider that can record events, +// metrics, and traces. This allows swapping between different backends +// (New Relic, Datadog, Prometheus, no-op, etc.). +type Provider interface { + // StartTrace starts a new trace + StartTrace(name string) Trace + + // RecordEvent records a custom event with key-value attributes + RecordEvent(eventName string, attributes map[string]interface{}) + + // RecordCount records a count metric + RecordCount(metricName string, count uint64) + + // RecordDuration records a duration metric + RecordDuration(metricName string, duration time.Duration) +} + +// Trace represents an active trace that can contain multiple spans and attributes. +type Trace interface { + // StartSpan starts a new span within the trace + StartSpan(name string) Span + + // AddAttribute adds a key-value attribute to the trace + AddAttribute(key string, value interface{}) + + // OnError records an error on the trace + OnError(err error) + + // SetRequest sets HTTP request information on the trace + SetRequest(r Request) + + // SetResponse sets the HTTP response writer for the trace + SetResponse(w http.ResponseWriter) http.ResponseWriter + + // End completes the trace + End() +} + +// Request contains HTTP request information for tracing +type Request struct { + Header http.Header + URL interface{} // *url.URL + Method string + Transport string +} + +// Span represents a timed span within a trace for tracing individual operations. +type Span interface { + // AddAttribute adds a key-value attribute to the span + AddAttribute(key string, value interface{}) + + // End completes the span + End() +} + +// traceContextKey is the context key for storing the current trace +type traceContextKey struct{} + +// TraceKey is the context key for Trace +var TraceKey = traceContextKey{} + +// NewContext returns a new context with the trace attached +func NewContext(ctx context.Context, trace Trace) context.Context { + return context.WithValue(ctx, TraceKey, trace) +} + +// TraceFromContext retrieves the trace from context, if present +func TraceFromContext(ctx context.Context) Trace { + trace, _ := ctx.Value(TraceKey).(Trace) + return trace +} diff --git a/metrics/tracing.go b/metrics/tracing.go index 8ca9a9e..f757dfc 100644 --- a/metrics/tracing.go +++ b/metrics/tracing.go @@ -3,30 +3,28 @@ package metrics import ( "context" "fmt" - - "github.com/newrelic/go-agent/v3/newrelic" ) // TraceMethodCall traces a method call with a given struct/package and method names func TraceMethodCall(ctx context.Context, structOrPackageName, methodName string) *MethodTracer { - txn := newrelic.FromContext(ctx) - if txn == nil { + trace := TraceFromContext(ctx) + if trace == nil { return nil } - seg := txn.StartSegment(fmt.Sprintf("%s %s", structOrPackageName, methodName)) + span := trace.StartSpan(fmt.Sprintf("%s %s", structOrPackageName, methodName)) return &MethodTracer{ - txn: txn, - seg: seg, + trace: trace, + span: span, } } // MethodTracer collects analytics for a given method call within an existing // trace. type MethodTracer struct { - txn *newrelic.Transaction - seg *newrelic.Segment + trace Trace + span Span } // AddAttribute adds a key-value pair metadata to the method trace @@ -35,7 +33,7 @@ func (t *MethodTracer) AddAttribute(key string, value interface{}) { return } - t.seg.AddAttribute(key, value) + t.span.AddAttribute(key, value) } // AddAttributes adds a set of key-value pair metadata to the method trace @@ -45,7 +43,7 @@ func (t *MethodTracer) AddAttributes(attributes map[string]interface{}) { } for key, value := range attributes { - t.seg.AddAttribute(key, value) + t.span.AddAttribute(key, value) } } @@ -59,7 +57,7 @@ func (t *MethodTracer) OnError(err error) { return } - t.txn.NoticeError(err) + t.trace.OnError(err) } // End completes the trace for the method call. @@ -68,5 +66,5 @@ func (t *MethodTracer) End() { return } - t.seg.End() + t.span.End() } diff --git a/ocp/transaction/nonce_pool.go b/ocp/transaction/nonce_pool.go index 38aaef1..d3496c2 100644 --- a/ocp/transaction/nonce_pool.go +++ b/ocp/transaction/nonce_pool.go @@ -9,14 +9,13 @@ import ( "github.com/google/uuid" "github.com/mr-tron/base58/base58" - "github.com/newrelic/go-agent/v3/newrelic" "github.com/pkg/errors" "go.uber.org/zap" + "github.com/code-payments/ocp-server/metrics" "github.com/code-payments/ocp-server/ocp/common" ocp_data "github.com/code-payments/ocp-server/ocp/data" "github.com/code-payments/ocp-server/ocp/data/nonce" - "github.com/code-payments/ocp-server/metrics" "github.com/code-payments/ocp-server/pointer" "github.com/code-payments/ocp-server/solana" ) @@ -251,7 +250,7 @@ type LocalNoncePool struct { data ocp_data.Provider - metricsProvider *newrelic.Application + metricsProvider metrics.Provider env nonce.Environment envInstance string @@ -272,7 +271,7 @@ type LocalNoncePool struct { func NewLocalNoncePool( log *zap.Logger, data ocp_data.Provider, - metricsProvider *newrelic.Application, + metricsProvider metrics.Provider, env nonce.Environment, envInstance string, poolType nonce.Purpose, @@ -579,6 +578,10 @@ func (np *LocalNoncePool) metricsPoller() { } func (np *LocalNoncePool) recordPoolSizeMetricEvent() { + if np.metricsProvider == nil { + return + } + np.mu.Lock() if np.isClosed { np.mu.Unlock() @@ -591,7 +594,7 @@ func (np *LocalNoncePool) recordPoolSizeMetricEvent() { kvs["current_nonce_pool_size"] = size kvs["desired_nonce_pool_size"] = np.opts.desiredPoolSize - np.metricsProvider.RecordCustomEvent("LocalNoncePoolSizePollingCheck", kvs) + np.metricsProvider.RecordEvent("LocalNoncePoolSizePollingCheck", kvs) } func (np *LocalNoncePool) getBaseMetricKvs() map[string]interface{} { diff --git a/ocp/transaction/nonce_pool_test.go b/ocp/transaction/nonce_pool_test.go index a459f42..1111837 100644 --- a/ocp/transaction/nonce_pool_test.go +++ b/ocp/transaction/nonce_pool_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" + noop_metrics "github.com/code-payments/ocp-server/metrics/noop" "github.com/code-payments/ocp-server/ocp/common" ocp_data "github.com/code-payments/ocp-server/ocp/data" "github.com/code-payments/ocp-server/ocp/data/nonce" @@ -295,7 +296,7 @@ func newLocalNoncePoolTest(t *testing.T) *localNoncePoolTest { pool, err := NewLocalNoncePool( log, data, - nil, + noop_metrics.NewProvider(), nonce.EnvironmentSolana, nonce.EnvironmentInstanceSolanaMainnet, nonce.PurposeClientIntent, diff --git a/ocp/worker/account/gift_card.go b/ocp/worker/account/gift_card.go index 810c561..ae7352d 100644 --- a/ocp/worker/account/gift_card.go +++ b/ocp/worker/account/gift_card.go @@ -10,7 +10,6 @@ import ( "time" "github.com/mr-tron/base58" - "github.com/newrelic/go-agent/v3/newrelic" "go.uber.org/zap" commonpb "github.com/code-payments/ocp-protobuf-api/generated/go/common/v1" @@ -40,17 +39,17 @@ func (p *runtime) giftCardAutoReturnWorker(runtimeCtx context.Context, interval func() (err error) { time.Sleep(delay) - nr := runtimeCtx.Value(metrics.NewRelicContextKey).(*newrelic.Application) - m := nr.StartTransaction("account_runtime__handle_gift_card_auto_return") - defer m.End() - tracedCtx := newrelic.NewContext(runtimeCtx, m) + provider := runtimeCtx.Value(metrics.ProviderContextKey).(metrics.Provider) + trace := provider.StartTrace("account_runtime__handle_gift_card_auto_return") + defer trace.End() + tracedCtx := metrics.NewContext(runtimeCtx, trace) // todo: configurable batch size records, err := p.data.GetPrioritizedAccountInfosRequiringAutoReturnCheck(tracedCtx, GiftCardExpiry, 32) if err == account.ErrAccountInfoNotFound { return nil } else if err != nil { - m.NoticeError(err) + trace.OnError(err) return err } @@ -63,7 +62,7 @@ func (p *runtime) giftCardAutoReturnWorker(runtimeCtx context.Context, interval err := p.maybeInitiateGiftCardAutoReturn(tracedCtx, record) if err != nil { - m.NoticeError(err) + trace.OnError(err) } }(record) } diff --git a/ocp/worker/currency/exchange_rate.go b/ocp/worker/currency/exchange_rate.go index 5f50e0a..445e991 100644 --- a/ocp/worker/currency/exchange_rate.go +++ b/ocp/worker/currency/exchange_rate.go @@ -4,7 +4,6 @@ import ( "context" "time" - "github.com/newrelic/go-agent/v3/newrelic" "github.com/pkg/errors" "go.uber.org/zap" @@ -34,14 +33,14 @@ func (p *exchangeRateRuntime) Start(runtimeCtx context.Context, interval time.Du func() error { p.log.Debug("updating exchange rates") - nr := runtimeCtx.Value(metrics.NewRelicContextKey).(*newrelic.Application) - m := nr.StartTransaction("currency_exchange_rate_runtime") - defer m.End() - tracedCtx := newrelic.NewContext(runtimeCtx, m) + provider := runtimeCtx.Value(metrics.ProviderContextKey).(metrics.Provider) + trace := provider.StartTrace("currency_exchange_rate_runtime") + defer trace.End() + tracedCtx := metrics.NewContext(runtimeCtx, trace) err := p.GetCurrentExchangeRates(tracedCtx) if err != nil { - m.NoticeError(err) + trace.OnError(err) p.log.With(zap.Error(err)).Warn("failed to process current rate data") } diff --git a/ocp/worker/currency/reserve.go b/ocp/worker/currency/reserve.go index b23750c..95b99b2 100644 --- a/ocp/worker/currency/reserve.go +++ b/ocp/worker/currency/reserve.go @@ -4,7 +4,6 @@ import ( "context" "time" - "github.com/newrelic/go-agent/v3/newrelic" "go.uber.org/zap" "github.com/code-payments/ocp-server/metrics" @@ -37,14 +36,14 @@ func (p *reserveRuntime) Start(runtimeCtx context.Context, interval time.Duratio func() error { p.log.Debug("updating exchange rates") - nr := runtimeCtx.Value(metrics.NewRelicContextKey).(*newrelic.Application) - m := nr.StartTransaction("currency_reserve_runtime") - defer m.End() - tracedCtx := newrelic.NewContext(runtimeCtx, m) + provider := runtimeCtx.Value(metrics.ProviderContextKey).(metrics.Provider) + trace := provider.StartTrace("currency_reserve_runtime") + defer trace.End() + tracedCtx := metrics.NewContext(runtimeCtx, trace) err := p.UpdateAllLaunchpadCurrencyReserves(tracedCtx) if err != nil { - m.NoticeError(err) + trace.OnError(err) p.log.With(zap.Error(err)).Warn("failed to process current reserve data") } diff --git a/ocp/worker/geyser/backup.go b/ocp/worker/geyser/backup.go index 3b4a278..5b842db 100644 --- a/ocp/worker/geyser/backup.go +++ b/ocp/worker/geyser/backup.go @@ -5,7 +5,6 @@ import ( "sync" "time" - "github.com/newrelic/go-agent/v3/newrelic" "go.uber.org/zap" "github.com/code-payments/ocp-server/database/query" @@ -46,10 +45,10 @@ func (p *runtime) backupTimelockStateWorker(runtimeCtx context.Context, state ti batchStart := time.Now() func() { - nr := runtimeCtx.Value(metrics.NewRelicContextKey).(*newrelic.Application) - m := nr.StartTransaction("geyser_consumer_runtime__backup_timelock_state_worker") - defer m.End() - tracedCtx := newrelic.NewContext(runtimeCtx, m) + provider := runtimeCtx.Value(metrics.ProviderContextKey).(metrics.Provider) + trace := provider.StartTrace("geyser_consumer_runtime__backup_timelock_state_worker") + defer trace.End() + tracedCtx := metrics.NewContext(runtimeCtx, trace) timelockRecords, err := p.data.GetAllTimelocksByState( tracedCtx, @@ -121,10 +120,10 @@ func (p *runtime) backupExternalDepositWorker(runtimeCtx context.Context, interv select { case <-time.After(interval): func() { - nr := runtimeCtx.Value(metrics.NewRelicContextKey).(*newrelic.Application) - m := nr.StartTransaction("geyser_consumer_runtime__backup_external_deposit_worker") - defer m.End() - tracedCtx := newrelic.NewContext(runtimeCtx, m) + provider := runtimeCtx.Value(metrics.ProviderContextKey).(metrics.Provider) + trace := provider.StartTrace("geyser_consumer_runtime__backup_external_deposit_worker") + defer trace.End() + tracedCtx := metrics.NewContext(runtimeCtx, trace) accountInfoRecords, err := p.data.GetPrioritizedAccountInfosRequiringDepositSync(tracedCtx, 256) if err == account.ErrAccountInfoNotFound { diff --git a/ocp/worker/geyser/consumer.go b/ocp/worker/geyser/consumer.go index d9c1102..5cebe7b 100644 --- a/ocp/worker/geyser/consumer.go +++ b/ocp/worker/geyser/consumer.go @@ -5,7 +5,6 @@ import ( "time" "github.com/mr-tron/base58" - "github.com/newrelic/go-agent/v3/newrelic" "github.com/pkg/errors" "go.uber.org/zap" @@ -77,10 +76,10 @@ func (p *runtime) programUpdateWorker(runtimeCtx context.Context, id int) { for update := range p.programUpdatesChan { func() { - nr := runtimeCtx.Value(metrics.NewRelicContextKey).(*newrelic.Application) - m := nr.StartTransaction("geyser_consumer_runtime__program_update_worker") - defer m.End() - tracedCtx := newrelic.NewContext(runtimeCtx, m) + provider := runtimeCtx.Value(metrics.ProviderContextKey).(metrics.Provider) + trace := provider.StartTrace("geyser_consumer_runtime__program_update_worker") + defer trace.End() + tracedCtx := metrics.NewContext(runtimeCtx, trace) p.metricStatusLock.Lock() p.programUpdateWorkerMetrics[id].active = true @@ -120,7 +119,7 @@ func (p *runtime) programUpdateWorker(runtimeCtx context.Context, id int) { err = handler.Handle(tracedCtx, update) if err != nil { - m.NoticeError(err) + trace.OnError(err) log.With(zap.Error(err)).Warn("failed to process program account update") } diff --git a/ocp/worker/nonce/allocator.go b/ocp/worker/nonce/allocator.go index 4e6fc51..ca25382 100644 --- a/ocp/worker/nonce/allocator.go +++ b/ocp/worker/nonce/allocator.go @@ -4,7 +4,6 @@ import ( "context" "time" - "github.com/newrelic/go-agent/v3/newrelic" "go.uber.org/zap" "github.com/code-payments/ocp-server/metrics" @@ -24,10 +23,10 @@ func (p *runtime) generateNonceAccountsOnSolanaMainnet(runtimeCtx context.Contex func() (err error) { time.Sleep(time.Second) - nr := runtimeCtx.Value(metrics.NewRelicContextKey).(*newrelic.Application) - m := nr.StartTransaction("nonce_runtime__nonce_accounts") - defer m.End() - tracedCtx := newrelic.NewContext(runtimeCtx, m) + provider := runtimeCtx.Value(metrics.ProviderContextKey).(metrics.Provider) + trace := provider.StartTrace("nonce_runtime__nonce_accounts") + defer trace.End() + tracedCtx := metrics.NewContext(runtimeCtx, trace) num_invalid, err := p.data.GetNonceCountByStateAndPurpose(tracedCtx, nonce.EnvironmentSolana, nonce.EnvironmentInstanceSolanaMainnet, nonce.StateInvalid, purpose) if err != nil { diff --git a/ocp/worker/nonce/keys.go b/ocp/worker/nonce/keys.go index ebe2326..93e3848 100644 --- a/ocp/worker/nonce/keys.go +++ b/ocp/worker/nonce/keys.go @@ -5,7 +5,6 @@ import ( "fmt" "time" - "github.com/newrelic/go-agent/v3/newrelic" "go.uber.org/zap" "github.com/code-payments/ocp-server/database/query" @@ -33,25 +32,23 @@ func (p *runtime) generateKey(ctx context.Context) (*vault.Record, error) { return key, nil } -func (p *runtime) generateKeys(ctx context.Context) error { +func (p *runtime) generateKeys(runtimeCtx context.Context) error { err := retry.Loop( func() (err error) { // Give the server some time to breath. time.Sleep(time.Second * 15) - nr := ctx.Value(metrics.NewRelicContextKey).(*newrelic.Application) - m := nr.StartTransaction("nonce_runtime__vault_keys") - defer func() { - m.End() - *m = newrelic.Transaction{} - }() + provider := runtimeCtx.Value(metrics.ProviderContextKey).(metrics.Provider) + trace := provider.StartTrace("nonce_runtime__vault_keys") + defer trace.End() + tracedCtx := metrics.NewContext(runtimeCtx, trace) - res, err := p.data.GetKeyCountByState(ctx, vault.StateAvailable) + res, err := p.data.GetKeyCountByState(tracedCtx, vault.StateAvailable) if err != nil { return err } - reserveSize := ((p.conf.onDemandTransactionNoncePoolSize.Get(ctx) + p.conf.clientSwapNoncePoolSize.Get(ctx)) * 2) + reserveSize := ((p.conf.onDemandTransactionNoncePoolSize.Get(tracedCtx) + p.conf.clientSwapNoncePoolSize.Get(tracedCtx)) * 2) // If we have sufficient keys, don't generate any more. if res >= reserveSize { @@ -69,7 +66,7 @@ func (p *runtime) generateKeys(ctx context.Context) error { // We don't have enough in the reserve, so we need to generate some. for i := 0; i < int(missing); i++ { - key, err := p.generateKey(ctx) + key, err := p.generateKey(tracedCtx) if err != nil { p.log.With(zap.Error(err)).Warn("Failure generating key") continue diff --git a/ocp/worker/nonce/pool.go b/ocp/worker/nonce/pool.go index eac39c9..b00ba02 100644 --- a/ocp/worker/nonce/pool.go +++ b/ocp/worker/nonce/pool.go @@ -5,7 +5,6 @@ import ( "sync" "time" - "github.com/newrelic/go-agent/v3/newrelic" "github.com/pkg/errors" "go.uber.org/zap" @@ -31,10 +30,10 @@ func (p *runtime) worker(runtimeCtx context.Context, env nonce.Environment, inst func() (err error) { time.Sleep(delay) - nr := runtimeCtx.Value(metrics.NewRelicContextKey).(*newrelic.Application) - m := nr.StartTransaction("nonce_runtime__handle_" + state.String()) - defer m.End() - tracedCtx := newrelic.NewContext(runtimeCtx, m) + provider := runtimeCtx.Value(metrics.ProviderContextKey).(metrics.Provider) + trace := provider.StartTrace("nonce_runtime__handle_" + state.String()) + defer trace.End() + tracedCtx := metrics.NewContext(runtimeCtx, trace) // Get a batch of nonce records in similar state (e.g. newly created, released, reserved, etc...) items, err := p.data.GetAllNonceByState( @@ -60,7 +59,7 @@ func (p *runtime) worker(runtimeCtx context.Context, env nonce.Environment, inst err := p.handle(tracedCtx, record) if err != nil { - m.NoticeError(err) + trace.OnError(err) } }(item) } diff --git a/ocp/worker/sequencer/worker.go b/ocp/worker/sequencer/worker.go index b83f24f..7d81345 100644 --- a/ocp/worker/sequencer/worker.go +++ b/ocp/worker/sequencer/worker.go @@ -9,7 +9,6 @@ import ( "time" "github.com/mr-tron/base58" - "github.com/newrelic/go-agent/v3/newrelic" "github.com/pkg/errors" "go.uber.org/zap" @@ -31,10 +30,10 @@ func (p *runtime) worker(runtimeCtx context.Context, state fulfillment.State, in func() (err error) { time.Sleep(delay) - nr := runtimeCtx.Value(metrics.NewRelicContextKey).(*newrelic.Application) - m := nr.StartTransaction("sequencer_runtime__handle_" + state.String()) - defer m.End() - tracedCtx := newrelic.NewContext(runtimeCtx, m) + provider := runtimeCtx.Value(metrics.ProviderContextKey).(metrics.Provider) + trace := provider.StartTrace("sequencer_runtime__handle_" + state.String()) + defer trace.End() + tracedCtx := metrics.NewContext(runtimeCtx, trace) // Get a batch of records in similar state (e.g. newly created, released, reserved, etc...) items, err := p.data.GetAllFulfillmentsByState( @@ -59,7 +58,7 @@ func (p *runtime) worker(runtimeCtx context.Context, state fulfillment.State, in err := p.handle(tracedCtx, record) if err != nil { - m.NoticeError(err) + trace.OnError(err) } }(item) } diff --git a/ocp/worker/swap/worker.go b/ocp/worker/swap/worker.go index 0af4979..2b76185 100644 --- a/ocp/worker/swap/worker.go +++ b/ocp/worker/swap/worker.go @@ -5,7 +5,6 @@ import ( "sync" "time" - "github.com/newrelic/go-agent/v3/newrelic" "github.com/pkg/errors" "go.uber.org/zap" @@ -25,10 +24,10 @@ func (p *runtime) worker(runtimeCtx context.Context, state swap.State, interval func() (err error) { time.Sleep(delay) - nr := runtimeCtx.Value(metrics.NewRelicContextKey).(*newrelic.Application) - m := nr.StartTransaction("swap_runtime__handle_" + state.String()) - defer m.End() - tracedCtx := newrelic.NewContext(runtimeCtx, m) + provider := runtimeCtx.Value(metrics.ProviderContextKey).(metrics.Provider) + trace := provider.StartTrace("swap_runtime__handle_" + state.String()) + defer trace.End() + tracedCtx := metrics.NewContext(runtimeCtx, trace) items, err := p.data.GetAllSwapsByState( tracedCtx, @@ -50,7 +49,7 @@ func (p *runtime) worker(runtimeCtx context.Context, state swap.State, interval err := p.handle(tracedCtx, record) if err != nil { - m.NoticeError(err) + trace.OnError(err) } }(item) }