From daafaeae48f448d7cce18d79d8a32023067d5523 Mon Sep 17 00:00:00 2001 From: jeffyanta Date: Wed, 21 Jan 2026 10:18:30 -0500 Subject: [PATCH 1/6] Add GetLaunchpadCurrencyCirculatingSupply utility --- go.mod | 2 +- go.sum | 4 ++-- ocp/currency/reserve.go | 39 ++++++++++++++++++++++++++++++++++ ocp/worker/currency/reserve.go | 14 ++++-------- 4 files changed, 46 insertions(+), 13 deletions(-) create mode 100644 ocp/currency/reserve.go diff --git a/go.mod b/go.mod index c870383..e1ea6f2 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( filippo.io/edwards25519 v1.1.0 github.com/aws/aws-sdk-go-v2 v0.17.0 github.com/code-payments/code-vm-indexer v1.2.0 - github.com/code-payments/ocp-protobuf-api v0.8.0 + github.com/code-payments/ocp-protobuf-api v0.8.1-0.20260121144749-40e6cb475f50 github.com/emirpasic/gods v1.12.0 github.com/envoyproxy/protoc-gen-validate v1.2.1 github.com/golang/protobuf v1.5.4 diff --git a/go.sum b/go.sum index 70987d7..8b25a69 100644 --- a/go.sum +++ b/go.sum @@ -78,8 +78,8 @@ github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= github.com/code-payments/code-vm-indexer v1.2.0 h1:rSHpBMiT9BKgmKcXg/VIoi/h0t7jNxGx07Qz59m+6Q0= github.com/code-payments/code-vm-indexer v1.2.0/go.mod h1:vn91YN2qNqb+gGJeZe2+l+TNxVmEEiRHXXnIn2Y40h8= -github.com/code-payments/ocp-protobuf-api v0.8.0 h1:Fm1dH7dFQu/xn5ohuPrJqcju8pWtIi4upH5f97KVrvY= -github.com/code-payments/ocp-protobuf-api v0.8.0/go.mod h1:tw6BooY5a8l6CtSZnKOruyKII0W04n89pcM4BizrgG8= +github.com/code-payments/ocp-protobuf-api v0.8.1-0.20260121144749-40e6cb475f50 h1:19DpNyp6YYbLN6JzL4K+YOBA+x8X41RPY+U1NCLHsGU= +github.com/code-payments/ocp-protobuf-api v0.8.1-0.20260121144749-40e6cb475f50/go.mod h1:tw6BooY5a8l6CtSZnKOruyKII0W04n89pcM4BizrgG8= github.com/containerd/continuity v0.0.0-20190827140505-75bee3e2ccb6 h1:NmTXa/uVnDyp0TY5MKi197+3HWcnYWfnHGyaFthlnGw= github.com/containerd/continuity v0.0.0-20190827140505-75bee3e2ccb6/go.mod h1:GL3xCUCBDV3CZiTSEKksMWbLE66hEyuu9qyDOOqM47Y= github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= diff --git a/ocp/currency/reserve.go b/ocp/currency/reserve.go new file mode 100644 index 0000000..8c78666 --- /dev/null +++ b/ocp/currency/reserve.go @@ -0,0 +1,39 @@ +package currency + +import ( + "context" + "errors" + "time" + + "github.com/code-payments/ocp-server/ocp/common" + ocp_data "github.com/code-payments/ocp-server/ocp/data" + "github.com/code-payments/ocp-server/solana" + "github.com/code-payments/ocp-server/solana/currencycreator" + "github.com/code-payments/ocp-server/solana/token" +) + +// GetLaunchpadCurrencyCirculatingSupply gets the current circulating supply in +// quarks for a launchpad currency directly from the blockchain +func GetLaunchpadCurrencyCirculatingSupply(ctx context.Context, data ocp_data.Provider, mint *common.Account) (uint64, time.Time, error) { + metadataRecord, err := data.GetCurrencyMetadata(ctx, mint.PublicKey().ToBase58()) + if err != nil { + return 0, time.Time{}, err + } + + accounts, err := common.GetLaunchpadCurrencyAccounts(metadataRecord) + if err != nil { + return 0, time.Time{}, err + } + + ai, err := data.GetBlockchainAccountInfo(ctx, accounts.VaultMint.PublicKey().ToBase58(), solana.CommitmentFinalized) + if err != nil { + return 0, time.Time{}, err + } + + var tokenAccount token.Account + if !tokenAccount.Unmarshal(ai.Data) { + return 0, time.Time{}, errors.New("invalid token account state") + } + + return currencycreator.DefaultMintMaxQuarkSupply - tokenAccount.Amount, time.Now(), nil +} diff --git a/ocp/worker/currency/reserve.go b/ocp/worker/currency/reserve.go index 2201389..99f7dab 100644 --- a/ocp/worker/currency/reserve.go +++ b/ocp/worker/currency/reserve.go @@ -9,14 +9,12 @@ import ( "github.com/code-payments/ocp-server/metrics" "github.com/code-payments/ocp-server/ocp/common" "github.com/code-payments/ocp-server/ocp/config" + currency_util "github.com/code-payments/ocp-server/ocp/currency" ocp_data "github.com/code-payments/ocp-server/ocp/data" "github.com/code-payments/ocp-server/ocp/data/currency" "github.com/code-payments/ocp-server/ocp/worker" "github.com/code-payments/ocp-server/retry" "github.com/code-payments/ocp-server/retry/backoff" - "github.com/code-payments/ocp-server/solana" - "github.com/code-payments/ocp-server/solana/currencycreator" - "github.com/code-payments/ocp-server/solana/token" ) type reserveRuntime struct { @@ -74,20 +72,16 @@ func (p *reserveRuntime) Start(runtimeCtx context.Context, interval time.Duratio func (p *reserveRuntime) UpdateAllLaunchpadCurrencyReserves(ctx context.Context) error { err1 := func() error { jeffyMintAccount, _ := common.NewAccountFromPublicKeyString(config.JeffyMintPublicKey) - jeffyVaultAccount, _ := common.NewAccountFromPublicKeyString("BMYftxDcbLDTzRCkLmQ9amwNgqsZ74A1wsd1gURum3Ep") - var tokenAccount token.Account - ai, err := p.data.GetBlockchainAccountInfo(ctx, jeffyVaultAccount.PublicKey().ToBase58(), solana.CommitmentFinalized) + jeffyCirculatingSupply, ts, err := currency_util.GetLaunchpadCurrencyCirculatingSupply(ctx, p.data, jeffyMintAccount) if err != nil { return err } - tokenAccount.Unmarshal(ai.Data) - jeffyVaultBalance := tokenAccount.Amount return p.data.PutCurrencyReserve(ctx, ¤cy.ReserveRecord{ Mint: jeffyMintAccount.PublicKey().ToBase58(), - SupplyFromBonding: currencycreator.DefaultMintMaxQuarkSupply - jeffyVaultBalance, - Time: time.Now(), + SupplyFromBonding: jeffyCirculatingSupply, + Time: ts, }) }() From dc81e8b508c9af18888444104f0472b5766ef477 Mon Sep 17 00:00:00 2001 From: jeffyanta Date: Wed, 21 Jan 2026 12:00:19 -0500 Subject: [PATCH 2/6] Implement StreamLiveMintData RPC --- go.mod | 2 +- go.sum | 4 +- ocp/auth/encoding.go | 4 +- ocp/auth/encoding_test.go | 4 +- ocp/auth/signature.go | 2 +- ocp/rpc/currency/server.go | 202 ++++++++++++++++-- ocp/rpc/currency/stream.go | 129 ++++++++++++ ocp/rpc/currency/worker.go | 419 +++++++++++++++++++++++++++++++++++++ 8 files changed, 742 insertions(+), 24 deletions(-) create mode 100644 ocp/rpc/currency/stream.go create mode 100644 ocp/rpc/currency/worker.go diff --git a/go.mod b/go.mod index e1ea6f2..1e00173 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( filippo.io/edwards25519 v1.1.0 github.com/aws/aws-sdk-go-v2 v0.17.0 github.com/code-payments/code-vm-indexer v1.2.0 - github.com/code-payments/ocp-protobuf-api v0.8.1-0.20260121144749-40e6cb475f50 + github.com/code-payments/ocp-protobuf-api v0.8.1-0.20260121164058-c08592cc5e84 github.com/emirpasic/gods v1.12.0 github.com/envoyproxy/protoc-gen-validate v1.2.1 github.com/golang/protobuf v1.5.4 diff --git a/go.sum b/go.sum index 8b25a69..fbb0578 100644 --- a/go.sum +++ b/go.sum @@ -78,8 +78,8 @@ github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= github.com/code-payments/code-vm-indexer v1.2.0 h1:rSHpBMiT9BKgmKcXg/VIoi/h0t7jNxGx07Qz59m+6Q0= github.com/code-payments/code-vm-indexer v1.2.0/go.mod h1:vn91YN2qNqb+gGJeZe2+l+TNxVmEEiRHXXnIn2Y40h8= -github.com/code-payments/ocp-protobuf-api v0.8.1-0.20260121144749-40e6cb475f50 h1:19DpNyp6YYbLN6JzL4K+YOBA+x8X41RPY+U1NCLHsGU= -github.com/code-payments/ocp-protobuf-api v0.8.1-0.20260121144749-40e6cb475f50/go.mod h1:tw6BooY5a8l6CtSZnKOruyKII0W04n89pcM4BizrgG8= +github.com/code-payments/ocp-protobuf-api v0.8.1-0.20260121164058-c08592cc5e84 h1:OPCtG0vM4p0LU5xQJpHdX6pYTYQlEoZZrmk8WhgIKb4= +github.com/code-payments/ocp-protobuf-api v0.8.1-0.20260121164058-c08592cc5e84/go.mod h1:tw6BooY5a8l6CtSZnKOruyKII0W04n89pcM4BizrgG8= github.com/containerd/continuity v0.0.0-20190827140505-75bee3e2ccb6 h1:NmTXa/uVnDyp0TY5MKi197+3HWcnYWfnHGyaFthlnGw= github.com/containerd/continuity v0.0.0-20190827140505-75bee3e2ccb6/go.mod h1:GL3xCUCBDV3CZiTSEKksMWbLE66hEyuu9qyDOOqM47Y= github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= diff --git a/ocp/auth/encoding.go b/ocp/auth/encoding.go index 61e3c62..090c154 100644 --- a/ocp/auth/encoding.go +++ b/ocp/auth/encoding.go @@ -43,7 +43,9 @@ var wireTypes = map[protoreflect.Kind]protowire.Type{ protoreflect.GroupKind: protowire.StartGroupType, } -func forceConsistentMarshal(m proto.Message) ([]byte, error) { +// ForceConsistentMarshal marshals a proto message using field number ordering +// for consistent serialization across different language implementations. +func ForceConsistentMarshal(m proto.Message) ([]byte, error) { out, err := consistentMarshal(nil, m.ProtoReflect()) if err != nil { return nil, err diff --git a/ocp/auth/encoding_test.go b/ocp/auth/encoding_test.go index 83feb83..a76cef0 100644 --- a/ocp/auth/encoding_test.go +++ b/ocp/auth/encoding_test.go @@ -42,7 +42,7 @@ func TestProtoEncoding_CrossLanguageSupport(t *testing.T) { require.NoError(t, err) assert.Equal(t, goValue, base64.StdEncoding.EncodeToString(marshalled)) - marshalled, err = forceConsistentMarshal(&msg) + marshalled, err = ForceConsistentMarshal(&msg) require.NoError(t, err) assert.Equal(t, otherLanguageValue, base64.StdEncoding.EncodeToString(marshalled)) } @@ -83,7 +83,7 @@ func TestProtoEncoding_SDKTestParity(t *testing.T) { require.NoError(t, err) assert.NotEqual(t, marshalled, expected) - marshalled, err = forceConsistentMarshal(&msg) + marshalled, err = ForceConsistentMarshal(&msg) require.NoError(t, err) assert.Equal(t, marshalled, expected) } diff --git a/ocp/auth/signature.go b/ocp/auth/signature.go index b77b786..18aab8c 100644 --- a/ocp/auth/signature.go +++ b/ocp/auth/signature.go @@ -63,7 +63,7 @@ type marshalStrategy func(proto.Message) ([]byte, error) // defaultMarshalStrategies are the default marshal strategies var defaultMarshalStrategies = []marshalStrategy{ - forceConsistentMarshal, + ForceConsistentMarshal, proto.Marshal, // todo: deprecate this option } diff --git a/ocp/rpc/currency/server.go b/ocp/rpc/currency/server.go index 702e20e..6ba8454 100644 --- a/ocp/rpc/currency/server.go +++ b/ocp/rpc/currency/server.go @@ -6,13 +6,16 @@ import ( "strings" "time" + "github.com/google/uuid" + "github.com/pkg/errors" "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" + commonpb "github.com/code-payments/ocp-protobuf-api/generated/go/common/v1" currencypb "github.com/code-payments/ocp-protobuf-api/generated/go/currency/v1" - "github.com/pkg/errors" "github.com/code-payments/ocp-server/cache" currency_lib "github.com/code-payments/ocp-server/currency" @@ -23,10 +26,16 @@ import ( currency_util "github.com/code-payments/ocp-server/ocp/currency" ocp_data "github.com/code-payments/ocp-server/ocp/data" "github.com/code-payments/ocp-server/ocp/data/currency" + "github.com/code-payments/ocp-server/protoutil" "github.com/code-payments/ocp-server/solana/currencycreator" timelock_token "github.com/code-payments/ocp-server/solana/timelock/v1" ) +const ( + streamPingDelay = 5 * time.Second + streamInitialRecvTimeout = 250 * time.Millisecond +) + type currencyServer struct { log *zap.Logger data ocp_data.Provider @@ -34,6 +43,8 @@ type currencyServer struct { exchangeRateHistoryCache cache.Cache reserveHistoryCache cache.Cache + liveMintStateWorker *liveMintStateWorker + currencypb.UnimplementedCurrencyServer } @@ -50,6 +61,21 @@ func NewCurrencyServer( } } +func NewCurrencyServerWithStateWorker( + log *zap.Logger, + data ocp_data.Provider, +) currencypb.CurrencyServer { + return ¤cyServer{ + log: log, + data: data, + + exchangeRateHistoryCache: cache.NewCache(1_000), + reserveHistoryCache: cache.NewCache(1_000), + + liveMintStateWorker: newLiveMintStateWorker(log, data), + } +} + func (s *currencyServer) GetAllRates(ctx context.Context, req *currencypb.GetAllRatesRequest) (resp *currencypb.GetAllRatesResponse, err error) { log := s.log.With(zap.String("method", "GetAllRates")) log = client.InjectLoggingMetadata(ctx, log) @@ -75,6 +101,22 @@ func (s *currencyServer) GetAllRates(ctx context.Context, req *currencypb.GetAll }, nil } +func (s *currencyServer) loadExchangeRatesForTime(ctx context.Context, t time.Time) (*currency.MultiRateRecord, error) { + record, err := s.data.GetAllExchangeRates(ctx, t) + if err != nil { + return nil, errors.Wrap(err, "failed to get price record by date") + } + return record, nil +} + +func (s *currencyServer) loadExchangeRatesLatest(ctx context.Context) (*currency.MultiRateRecord, error) { + latest, err := s.data.GetAllExchangeRates(ctx, currency_util.GetLatestExchangeRateTime()) + if err != nil { + return nil, errors.Wrap(err, "failed to get latest price record") + } + return latest, nil +} + func (s *currencyServer) GetMints(ctx context.Context, req *currencypb.GetMintsRequest) (*currencypb.GetMintsResponse, error) { log := s.log.With(zap.String("method", "GetMints")) log = client.InjectLoggingMetadata(ctx, log) @@ -353,22 +395,6 @@ func (s *currencyServer) GetHistoricalMintData(ctx context.Context, req *currenc }, nil } -func (s *currencyServer) loadExchangeRatesForTime(ctx context.Context, t time.Time) (*currency.MultiRateRecord, error) { - record, err := s.data.GetAllExchangeRates(ctx, t) - if err != nil { - return nil, errors.Wrap(err, "failed to get price record by date") - } - return record, nil -} - -func (s *currencyServer) loadExchangeRatesLatest(ctx context.Context) (*currency.MultiRateRecord, error) { - latest, err := s.data.GetAllExchangeRates(ctx, currency_util.GetLatestExchangeRateTime()) - if err != nil { - return nil, errors.Wrap(err, "failed to get latest price record") - } - return latest, nil -} - func (s *currencyServer) getCachedReserveHistory( ctx context.Context, mint string, @@ -483,3 +509,145 @@ func getTimeRangeForPredefinedRange(predefinedRange currencypb.GetHistoricalMint return now.Add(-100 * 365 * 24 * time.Hour), now, query.IntervalDay } } + +func (s *currencyServer) StreamLiveMintData( + streamer currencypb.Currency_StreamLiveMintDataServer, +) error { + ctx := streamer.Context() + + log := s.log.With(zap.String("method", "StreamLiveMintData")) + log = client.InjectLoggingMetadata(ctx, log) + + // Wait for the initial request to get the list of mints + req, err := protoutil.BoundedReceive[currencypb.StreamLiveMintDataRequest](ctx, streamer, streamInitialRecvTimeout) + if err != nil { + log.With(zap.Error(err)).Debug("error receiving initial request") + return err + } + + // Must be a request type + request := req.GetRequest() + if request == nil { + return status.Error(codes.InvalidArgument, "first message must be a request") + } + + // Parse requested mints + var requestedMints []*common.Account + for _, protoMint := range request.GetMints() { + mint, err := common.NewAccountFromProto(protoMint) + if err != nil { + log.With(zap.Error(err)).Warn("invalid mint") + return status.Error(codes.Internal, "") + } + requestedMints = append(requestedMints, mint) + } + + // Generate unique stream ID + streamID := uuid.New().String() + log = log.With(zap.String("stream_id", streamID)) + + // Register stream with state worker + stream := s.liveMintStateWorker.RegisterStream(streamID, requestedMints) + defer s.liveMintStateWorker.UnregisterStream(streamID) + + log.Debug("stream registered") + + // Wait for initial data to be available + if err := s.liveMintStateWorker.WaitForData(ctx); err != nil { + log.With(zap.Error(err)).Debug("context cancelled while waiting for data") + return status.Error(codes.Canceled, "") + } + + // Initial flush: send current exchange rates if the stream wants them + if stream.wantsExchangeRates() { + exchangeRates := s.liveMintStateWorker.GetExchangeRates() + if exchangeRates != nil && exchangeRates.SignedResponse != nil { + if err := streamer.Send(exchangeRates.SignedResponse); err != nil { + log.With(zap.Error(err)).Debug("failed to send initial exchange rates") + return err + } + } + } + + // Initial flush: send current reserve states + reserveStates := s.liveMintStateWorker.GetReserveStates() + if len(reserveStates) > 0 { + // Filter based on requested mints and build batch response + var filtered []*currencypb.VerifiedLaunchpadCurrencyReserveState + for _, state := range reserveStates { + if stream.wantsMint(state.Mint.PublicKey().ToBase58()) && state.SignedState != nil { + filtered = append(filtered, state.SignedState) + } + } + if len(filtered) > 0 { + resp := ¤cypb.StreamLiveMintDataResponse{ + Type: ¤cypb.StreamLiveMintDataResponse_Data{ + Data: ¤cypb.StreamLiveMintDataResponse_LiveData{ + Type: ¤cypb.StreamLiveMintDataResponse_LiveData_LaunchpadCurrencyReserveStates{ + LaunchpadCurrencyReserveStates: ¤cypb.VerifiedLaunchapdCurrencyReserveStateBatch{ + ReserveStates: filtered, + }, + }, + }, + }, + } + if err := streamer.Send(resp); err != nil { + log.With(zap.Error(err)).Debug("failed to send initial reserve states") + return err + } + } + } + + log.Debug("initial flush complete") + + // Set up ping/pong health monitoring + sendPingCh := time.After(0) // Send first ping immediately + streamHealthCh := protoutil.MonitorStreamHealth(ctx, log, streamer, func(req *currencypb.StreamLiveMintDataRequest) bool { + return req.GetPong() != nil + }) + + // Main loop: listen on stream channel and send updates + for { + select { + case update, ok := <-stream.streamCh: + if !ok { + log.Debug("stream channel closed") + return status.Error(codes.Aborted, "stream closed") + } + + if update.response == nil { + continue + } + + if err := streamer.Send(update.response); err != nil { + log.With(zap.Error(err)).Debug("failed to send update") + return err + } + + case <-sendPingCh: + log.Debug("sending ping to client") + sendPingCh = time.After(streamPingDelay) + + err := streamer.Send(¤cypb.StreamLiveMintDataResponse{ + Type: ¤cypb.StreamLiveMintDataResponse_Ping{ + Ping: &commonpb.ServerPing{ + Timestamp: timestamppb.Now(), + PingDelay: durationpb.New(streamPingDelay), + }, + }, + }) + if err != nil { + log.Debug("failed to send ping, stream is unhealthy") + return status.Error(codes.Aborted, "terminating unhealthy stream") + } + + case <-streamHealthCh: + log.Debug("stream is unhealthy, terminating") + return status.Error(codes.Aborted, "terminating unhealthy stream") + + case <-ctx.Done(): + log.Debug("context cancelled") + return status.Error(codes.Canceled, "") + } + } +} diff --git a/ocp/rpc/currency/stream.go b/ocp/rpc/currency/stream.go new file mode 100644 index 0000000..b81fa88 --- /dev/null +++ b/ocp/rpc/currency/stream.go @@ -0,0 +1,129 @@ +package currency + +import ( + "sync" + "time" + + "github.com/pkg/errors" + + currencypb "github.com/code-payments/ocp-protobuf-api/generated/go/currency/v1" + + "github.com/code-payments/ocp-server/ocp/common" +) + +// streamUpdate represents an update to send to streams (pre-signed) +type streamUpdate struct { + // Pre-signed response ready to send directly + response *currencypb.StreamLiveMintDataResponse +} + +type liveMintDataStream struct { + sync.Mutex + + id string + mints map[string]struct{} // mints this stream subscribes to + streamCh chan *streamUpdate + closed bool +} + +func newLiveMintDataStream(id string, mints []*common.Account, bufferSize int) *liveMintDataStream { + mintSet := make(map[string]struct{}, len(mints)) + for _, mint := range mints { + mintSet[mint.PublicKey().ToBase58()] = struct{}{} + } + + return &liveMintDataStream{ + id: id, + mints: mintSet, + streamCh: make(chan *streamUpdate, bufferSize), + } +} + +func (s *liveMintDataStream) notifyExchangeRates(data *liveExchangeRateData, timeout time.Duration) error { + if data.SignedResponse == nil { + return errors.New("exchange rates missing pre-signed response") + } + update := &streamUpdate{ + response: data.SignedResponse, + } + return s.notify(update, timeout) +} + +func (s *liveMintDataStream) notifyReserveStates(states []*liveReserveStateData, timeout time.Duration) error { + // Filter reserve states based on subscribed mints + var filtered []*currencypb.VerifiedLaunchpadCurrencyReserveState + for _, state := range states { + if s.wantsMint(state.Mint.PublicKey().ToBase58()) { + if state.SignedState == nil { + return errors.New("reserve state missing pre-signed state") + } + filtered = append(filtered, state.SignedState) + } + } + + // Only send if there are relevant updates + if len(filtered) == 0 { + return nil + } + + // Build the response with the filtered pre-signed states + response := ¤cypb.StreamLiveMintDataResponse{ + Type: ¤cypb.StreamLiveMintDataResponse_Data{ + Data: ¤cypb.StreamLiveMintDataResponse_LiveData{ + Type: ¤cypb.StreamLiveMintDataResponse_LiveData_LaunchpadCurrencyReserveStates{ + LaunchpadCurrencyReserveStates: ¤cypb.VerifiedLaunchapdCurrencyReserveStateBatch{ + ReserveStates: filtered, + }, + }, + }, + }, + } + + update := &streamUpdate{ + response: response, + } + return s.notify(update, timeout) +} + +func (s *liveMintDataStream) notify(update *streamUpdate, timeout time.Duration) error { + s.Lock() + + if s.closed { + s.Unlock() + return errors.New("cannot notify closed stream") + } + + select { + case s.streamCh <- update: + case <-time.After(timeout): + s.Unlock() + s.close() + return errors.New("timed out sending data to streamCh") + } + + s.Unlock() + return nil +} + +func (s *liveMintDataStream) close() { + s.Lock() + defer s.Unlock() + + if s.closed { + return + } + + s.closed = true + close(s.streamCh) +} + +func (s *liveMintDataStream) wantsMint(mint string) bool { + _, ok := s.mints[mint] + return ok +} + +func (s *liveMintDataStream) wantsExchangeRates() bool { + // Exchange rates are only for core mint + _, ok := s.mints[common.CoreMintAccount.PublicKey().ToBase58()] + return ok +} diff --git a/ocp/rpc/currency/worker.go b/ocp/rpc/currency/worker.go new file mode 100644 index 0000000..df505f8 --- /dev/null +++ b/ocp/rpc/currency/worker.go @@ -0,0 +1,419 @@ +package currency + +import ( + "context" + "crypto/ed25519" + "sync" + "time" + + "go.uber.org/zap" + "google.golang.org/protobuf/types/known/timestamppb" + + commonpb "github.com/code-payments/ocp-protobuf-api/generated/go/common/v1" + currencypb "github.com/code-payments/ocp-protobuf-api/generated/go/currency/v1" + + "github.com/code-payments/ocp-server/ocp/auth" + "github.com/code-payments/ocp-server/ocp/common" + "github.com/code-payments/ocp-server/ocp/config" + currency_util "github.com/code-payments/ocp-server/ocp/currency" + ocp_data "github.com/code-payments/ocp-server/ocp/data" + "github.com/code-payments/ocp-server/ocp/data/currency" +) + +const ( + exchangeRatePollInterval = 5 * time.Minute + reserveStatePollInterval = 15 * time.Second + streamNotifyTimeout = 10 * time.Second + streamBufferSize = 100 +) + +var ( + jeffyMintAccount, _ = common.NewAccountFromPublicKeyString(config.JeffyMintPublicKey) +) + +// trackedLaunchpadMints is the hardcoded set of launchpad mints to track +// (excludes core mint as it only has exchange rate data) +var trackedLaunchpadMints = []*common.Account{ + jeffyMintAccount, +} + +// liveExchangeRateData represents live exchange rate data with its pre-signed response +type liveExchangeRateData struct { + Rates map[string]float64 + Timestamp time.Time + SignedResponse *currencypb.StreamLiveMintDataResponse +} + +// liveReserveStateData represents live launchpad currency reserve state with its pre-signed response +type liveReserveStateData struct { + Mint *common.Account + SupplyFromBonding uint64 + Timestamp time.Time + SignedState *currencypb.VerifiedLaunchpadCurrencyReserveState +} + +type liveMintStateWorker struct { + log *zap.Logger + data ocp_data.Provider + + stateMu sync.RWMutex + exchangeRates *liveExchangeRateData + launchpadReserves map[string]*liveReserveStateData + + streamsMu sync.RWMutex + streams map[string]*liveMintDataStream + + dataReady chan struct{} // closed when initial data is loaded + dataReadyOnce sync.Once + + // Track initial load completion + initMu sync.Mutex + exchangeRatesLoaded bool + reserveStatesLoadedMints map[string]struct{} + + ctx context.Context + cancel context.CancelFunc +} + +func newLiveMintStateWorker(log *zap.Logger, data ocp_data.Provider) *liveMintStateWorker { + ctx, cancel := context.WithCancel(context.Background()) + return &liveMintStateWorker{ + log: log, + data: data, + launchpadReserves: make(map[string]*liveReserveStateData), + streams: make(map[string]*liveMintDataStream), + dataReady: make(chan struct{}), + reserveStatesLoadedMints: make(map[string]struct{}), + ctx: ctx, + cancel: cancel, + } +} + +// Start begins the polling goroutines for exchange rates and reserve state +func (m *liveMintStateWorker) Start(ctx context.Context) error { + go m.pollExchangeRates(ctx) + go m.pollReserveState(ctx) + return nil +} + +// Stop cancels the polling goroutines and closes all streams +func (m *liveMintStateWorker) Stop() { + m.cancel() + + m.streamsMu.Lock() + defer m.streamsMu.Unlock() + + for _, stream := range m.streams { + stream.close() + } + m.streams = make(map[string]*liveMintDataStream) +} + +// RegisterStream creates and registers a new stream for the given mints +func (m *liveMintStateWorker) RegisterStream(id string, mints []*common.Account) *liveMintDataStream { + stream := newLiveMintDataStream(id, mints, streamBufferSize) + + m.streamsMu.Lock() + m.streams[id] = stream + m.streamsMu.Unlock() + + return stream +} + +// UnregisterStream removes a stream and closes it +func (m *liveMintStateWorker) UnregisterStream(id string) { + m.streamsMu.Lock() + stream, ok := m.streams[id] + if ok { + delete(m.streams, id) + } + m.streamsMu.Unlock() + + if stream != nil { + stream.close() + } +} + +// WaitForData blocks until initial data is loaded or context is cancelled +func (m *liveMintStateWorker) WaitForData(ctx context.Context) error { + select { + case <-m.dataReady: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +// GetExchangeRates returns the current pre-signed exchange rate data +func (m *liveMintStateWorker) GetExchangeRates() *liveExchangeRateData { + m.stateMu.RLock() + defer m.stateMu.RUnlock() + + return m.exchangeRates +} + +// GetReserveStates returns all current pre-signed launchpad currency reserve states +func (m *liveMintStateWorker) GetReserveStates() []*liveReserveStateData { + m.stateMu.RLock() + defer m.stateMu.RUnlock() + + result := make([]*liveReserveStateData, 0, len(m.launchpadReserves)) + for _, data := range m.launchpadReserves { + result = append(result, data) + } + return result +} + +func (m *liveMintStateWorker) tryMarkDataReady() { + m.initMu.Lock() + defer m.initMu.Unlock() + + // Check if all data has been loaded + if !m.exchangeRatesLoaded { + return + } + if len(m.reserveStatesLoadedMints) < len(trackedLaunchpadMints) { + return + } + + m.dataReadyOnce.Do(func() { + close(m.dataReady) + }) +} + +func (m *liveMintStateWorker) markExchangeRatesLoaded() { + m.initMu.Lock() + m.exchangeRatesLoaded = true + m.initMu.Unlock() + m.tryMarkDataReady() +} + +func (m *liveMintStateWorker) markReserveStateLoaded(mint string) { + m.initMu.Lock() + m.reserveStatesLoadedMints[mint] = struct{}{} + m.initMu.Unlock() +} + +func (m *liveMintStateWorker) pollExchangeRates(ctx context.Context) { + log := m.log.With(zap.String("poller", "exchange_rates")) + + // Initial poll immediately + m.fetchAndUpdateExchangeRates(ctx, log) + + ticker := time.NewTicker(exchangeRatePollInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-m.ctx.Done(): + return + case <-ticker.C: + m.fetchAndUpdateExchangeRates(ctx, log) + } + } +} + +func (m *liveMintStateWorker) fetchAndUpdateExchangeRates(ctx context.Context, log *zap.Logger) { + rates, err := m.data.GetAllExchangeRates(ctx, time.Now()) + if err != nil { + log.With(zap.Error(err)).Warn("failed to fetch exchange rates") + return + } + + // Sign the exchange rates once when fetched + signedResponse, err := m.signExchangeRates(rates) + if err != nil { + log.With(zap.Error(err)).Warn("failed to sign exchange rates") + return + } + + m.stateMu.Lock() + m.exchangeRates = &liveExchangeRateData{ + Rates: rates.Rates, + Timestamp: rates.Time, + SignedResponse: signedResponse, + } + m.stateMu.Unlock() + + m.notifyExchangeRates() + m.markExchangeRatesLoaded() +} + +func (m *liveMintStateWorker) pollReserveState(ctx context.Context) { + log := m.log.With(zap.String("poller", "reserve_state")) + + // Initial poll immediately + m.fetchAndUpdateReserveState(ctx, log) + + ticker := time.NewTicker(reserveStatePollInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-m.ctx.Done(): + return + case <-ticker.C: + m.fetchAndUpdateReserveState(ctx, log) + } + } +} + +func (m *liveMintStateWorker) fetchAndUpdateReserveState(ctx context.Context, log *zap.Logger) { + var updatedStates []*liveReserveStateData + + for _, mint := range trackedLaunchpadMints { + mintAddr := mint.PublicKey().ToBase58() + + supply, ts, err := currency_util.GetLaunchpadCurrencyCirculatingSupply(ctx, m.data, mint) + if err != nil { + log.With( + zap.Error(err), + zap.String("mint", mintAddr), + ).Warn("failed to fetch launchpad currency circulating supply") + continue + } + + // Sign the reserve state once when fetched + signedState, err := m.signReserveState(mint, supply, ts) + if err != nil { + log.With( + zap.Error(err), + zap.String("mint", mintAddr), + ).Warn("failed to sign reserve state") + continue + } + + // Track that this mint was loaded (before updating launchpadReserves) + m.markReserveStateLoaded(mintAddr) + + stateData := &liveReserveStateData{ + Mint: mint, + SupplyFromBonding: supply, + Timestamp: ts, + SignedState: signedState, + } + + m.stateMu.Lock() + m.launchpadReserves[mintAddr] = stateData + m.stateMu.Unlock() + + updatedStates = append(updatedStates, stateData) + } + + if len(updatedStates) > 0 { + m.notifyReserveStates(updatedStates) + } + + // Try to mark data ready after all reserve states are processed + m.tryMarkDataReady() +} + +func (m *liveMintStateWorker) notifyExchangeRates() { + m.stateMu.RLock() + data := m.exchangeRates + m.stateMu.RUnlock() + + if data == nil { + return + } + + m.streamsMu.RLock() + streams := make([]*liveMintDataStream, 0, len(m.streams)) + for _, stream := range m.streams { + streams = append(streams, stream) + } + m.streamsMu.RUnlock() + + for _, stream := range streams { + if stream.wantsExchangeRates() { + if err := stream.notifyExchangeRates(data, streamNotifyTimeout); err != nil { + m.log.With( + zap.Error(err), + zap.String("stream_id", stream.id), + ).Debug("failed to notify stream of exchange rates") + } + } + } +} + +func (m *liveMintStateWorker) notifyReserveStates(states []*liveReserveStateData) { + m.streamsMu.RLock() + streams := make([]*liveMintDataStream, 0, len(m.streams)) + for _, stream := range m.streams { + streams = append(streams, stream) + } + m.streamsMu.RUnlock() + + for _, stream := range streams { + if err := stream.notifyReserveStates(states, streamNotifyTimeout); err != nil { + m.log.With( + zap.Error(err), + zap.String("stream_id", stream.id), + ).Debug("failed to notify stream of reserve states") + } + } +} + +// signExchangeRates creates a pre-signed response for exchange rates. +func (m *liveMintStateWorker) signExchangeRates(rates *currency.MultiRateRecord) (*currencypb.StreamLiveMintDataResponse, error) { + subsidizer := common.GetSubsidizer() + + // Build and sign each exchange rate individually + var verifiedRates []*currencypb.VerifiedCoreMintFiatExchangeRate + for code, rate := range rates.Rates { + exchangeRate := ¤cypb.CoreMintFiatExchangeRate{ + CurrencyCode: code, + ExchangeRate: rate, + Timestamp: timestamppb.New(rates.Time), + } + + // Sign the individual exchange rate + messageBytes, err := auth.ForceConsistentMarshal(exchangeRate) + if err != nil { + return nil, err + } + signature := ed25519.Sign(subsidizer.PrivateKey().ToBytes(), messageBytes) + + verifiedRates = append(verifiedRates, ¤cypb.VerifiedCoreMintFiatExchangeRate{ + ExchangeRate: exchangeRate, + Signature: &commonpb.Signature{Value: signature}, + }) + } + + return ¤cypb.StreamLiveMintDataResponse{ + Type: ¤cypb.StreamLiveMintDataResponse_Data{ + Data: ¤cypb.StreamLiveMintDataResponse_LiveData{ + Type: ¤cypb.StreamLiveMintDataResponse_LiveData_CoreMintFiatExchangeRates{ + CoreMintFiatExchangeRates: ¤cypb.VerifiedCoreMintFiatExchangeRateBatch{ + ExchangeRates: verifiedRates, + }, + }, + }, + }, + }, nil +} + +// signReserveState creates a pre-signed verified state for a reserve state. +func (m *liveMintStateWorker) signReserveState(mint *common.Account, supplyFromBonding uint64, ts time.Time) (*currencypb.VerifiedLaunchpadCurrencyReserveState, error) { + reserveState := ¤cypb.LaunchpadCurrencyReserveState{ + Mint: mint.ToProto(), + SupplyFromBonding: supplyFromBonding, + Timestamp: timestamppb.New(ts), + } + + subsidizer := common.GetSubsidizer() + messageBytes, err := auth.ForceConsistentMarshal(reserveState) + if err != nil { + return nil, err + } + signature := ed25519.Sign(subsidizer.PrivateKey().ToBytes(), messageBytes) + + return ¤cypb.VerifiedLaunchpadCurrencyReserveState{ + ReserveState: reserveState, + Signature: &commonpb.Signature{Value: signature}, + }, nil +} From 5e7ae9ae8367f305f8c222adb8b1654d3012324a Mon Sep 17 00:00:00 2001 From: jeffyanta Date: Wed, 21 Jan 2026 13:09:42 -0500 Subject: [PATCH 3/6] Remove double currency server constructor --- ocp/rpc/currency/server.go | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/ocp/rpc/currency/server.go b/ocp/rpc/currency/server.go index 6ba8454..1825768 100644 --- a/ocp/rpc/currency/server.go +++ b/ocp/rpc/currency/server.go @@ -58,19 +58,6 @@ func NewCurrencyServer( exchangeRateHistoryCache: cache.NewCache(1_000), reserveHistoryCache: cache.NewCache(1_000), - } -} - -func NewCurrencyServerWithStateWorker( - log *zap.Logger, - data ocp_data.Provider, -) currencypb.CurrencyServer { - return ¤cyServer{ - log: log, - data: data, - - exchangeRateHistoryCache: cache.NewCache(1_000), - reserveHistoryCache: cache.NewCache(1_000), liveMintStateWorker: newLiveMintStateWorker(log, data), } From 4c081b125c726c186ade176905b7438a97d2b480 Mon Sep 17 00:00:00 2001 From: jeffyanta Date: Wed, 21 Jan 2026 13:15:12 -0500 Subject: [PATCH 4/6] Start currency state worker on init --- ocp/rpc/currency/server.go | 15 +++++++++------ ocp/rpc/currency/worker.go | 22 +++++++++++----------- 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/ocp/rpc/currency/server.go b/ocp/rpc/currency/server.go index 1825768..f66d13e 100644 --- a/ocp/rpc/currency/server.go +++ b/ocp/rpc/currency/server.go @@ -52,6 +52,9 @@ func NewCurrencyServer( log *zap.Logger, data ocp_data.Provider, ) currencypb.CurrencyServer { + liveMintStateWorker := newLiveMintStateWorker(log, data) + liveMintStateWorker.start(context.Background()) + return ¤cyServer{ log: log, data: data, @@ -59,7 +62,7 @@ func NewCurrencyServer( exchangeRateHistoryCache: cache.NewCache(1_000), reserveHistoryCache: cache.NewCache(1_000), - liveMintStateWorker: newLiveMintStateWorker(log, data), + liveMintStateWorker: liveMintStateWorker, } } @@ -534,20 +537,20 @@ func (s *currencyServer) StreamLiveMintData( log = log.With(zap.String("stream_id", streamID)) // Register stream with state worker - stream := s.liveMintStateWorker.RegisterStream(streamID, requestedMints) - defer s.liveMintStateWorker.UnregisterStream(streamID) + stream := s.liveMintStateWorker.registerStream(streamID, requestedMints) + defer s.liveMintStateWorker.unregisterStream(streamID) log.Debug("stream registered") // Wait for initial data to be available - if err := s.liveMintStateWorker.WaitForData(ctx); err != nil { + if err := s.liveMintStateWorker.waitForData(ctx); err != nil { log.With(zap.Error(err)).Debug("context cancelled while waiting for data") return status.Error(codes.Canceled, "") } // Initial flush: send current exchange rates if the stream wants them if stream.wantsExchangeRates() { - exchangeRates := s.liveMintStateWorker.GetExchangeRates() + exchangeRates := s.liveMintStateWorker.getExchangeRates() if exchangeRates != nil && exchangeRates.SignedResponse != nil { if err := streamer.Send(exchangeRates.SignedResponse); err != nil { log.With(zap.Error(err)).Debug("failed to send initial exchange rates") @@ -557,7 +560,7 @@ func (s *currencyServer) StreamLiveMintData( } // Initial flush: send current reserve states - reserveStates := s.liveMintStateWorker.GetReserveStates() + reserveStates := s.liveMintStateWorker.getReserveStates() if len(reserveStates) > 0 { // Filter based on requested mints and build batch response var filtered []*currencypb.VerifiedLaunchpadCurrencyReserveState diff --git a/ocp/rpc/currency/worker.go b/ocp/rpc/currency/worker.go index df505f8..45a8362 100644 --- a/ocp/rpc/currency/worker.go +++ b/ocp/rpc/currency/worker.go @@ -89,15 +89,15 @@ func newLiveMintStateWorker(log *zap.Logger, data ocp_data.Provider) *liveMintSt } } -// Start begins the polling goroutines for exchange rates and reserve state -func (m *liveMintStateWorker) Start(ctx context.Context) error { +// start begins the polling goroutines for exchange rates and reserve state +func (m *liveMintStateWorker) start(ctx context.Context) error { go m.pollExchangeRates(ctx) go m.pollReserveState(ctx) return nil } -// Stop cancels the polling goroutines and closes all streams -func (m *liveMintStateWorker) Stop() { +// stop cancels the polling goroutines and closes all streams +func (m *liveMintStateWorker) stop() { m.cancel() m.streamsMu.Lock() @@ -109,8 +109,8 @@ func (m *liveMintStateWorker) Stop() { m.streams = make(map[string]*liveMintDataStream) } -// RegisterStream creates and registers a new stream for the given mints -func (m *liveMintStateWorker) RegisterStream(id string, mints []*common.Account) *liveMintDataStream { +// registerStream creates and registers a new stream for the given mints +func (m *liveMintStateWorker) registerStream(id string, mints []*common.Account) *liveMintDataStream { stream := newLiveMintDataStream(id, mints, streamBufferSize) m.streamsMu.Lock() @@ -120,8 +120,8 @@ func (m *liveMintStateWorker) RegisterStream(id string, mints []*common.Account) return stream } -// UnregisterStream removes a stream and closes it -func (m *liveMintStateWorker) UnregisterStream(id string) { +// unregisterStream removes a stream and closes it +func (m *liveMintStateWorker) unregisterStream(id string) { m.streamsMu.Lock() stream, ok := m.streams[id] if ok { @@ -135,7 +135,7 @@ func (m *liveMintStateWorker) UnregisterStream(id string) { } // WaitForData blocks until initial data is loaded or context is cancelled -func (m *liveMintStateWorker) WaitForData(ctx context.Context) error { +func (m *liveMintStateWorker) waitForData(ctx context.Context) error { select { case <-m.dataReady: return nil @@ -145,7 +145,7 @@ func (m *liveMintStateWorker) WaitForData(ctx context.Context) error { } // GetExchangeRates returns the current pre-signed exchange rate data -func (m *liveMintStateWorker) GetExchangeRates() *liveExchangeRateData { +func (m *liveMintStateWorker) getExchangeRates() *liveExchangeRateData { m.stateMu.RLock() defer m.stateMu.RUnlock() @@ -153,7 +153,7 @@ func (m *liveMintStateWorker) GetExchangeRates() *liveExchangeRateData { } // GetReserveStates returns all current pre-signed launchpad currency reserve states -func (m *liveMintStateWorker) GetReserveStates() []*liveReserveStateData { +func (m *liveMintStateWorker) getReserveStates() []*liveReserveStateData { m.stateMu.RLock() defer m.stateMu.RUnlock() From 38c593c78caaa8651638ea22cad266510c6dd5ec Mon Sep 17 00:00:00 2001 From: jeffyanta Date: Wed, 21 Jan 2026 13:28:54 -0500 Subject: [PATCH 5/6] Pull ocp-protobuf-api v0.9.0 --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 1e00173..82857c6 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( filippo.io/edwards25519 v1.1.0 github.com/aws/aws-sdk-go-v2 v0.17.0 github.com/code-payments/code-vm-indexer v1.2.0 - github.com/code-payments/ocp-protobuf-api v0.8.1-0.20260121164058-c08592cc5e84 + github.com/code-payments/ocp-protobuf-api v0.9.0 github.com/emirpasic/gods v1.12.0 github.com/envoyproxy/protoc-gen-validate v1.2.1 github.com/golang/protobuf v1.5.4 diff --git a/go.sum b/go.sum index fbb0578..71a95c3 100644 --- a/go.sum +++ b/go.sum @@ -78,8 +78,8 @@ github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= github.com/code-payments/code-vm-indexer v1.2.0 h1:rSHpBMiT9BKgmKcXg/VIoi/h0t7jNxGx07Qz59m+6Q0= github.com/code-payments/code-vm-indexer v1.2.0/go.mod h1:vn91YN2qNqb+gGJeZe2+l+TNxVmEEiRHXXnIn2Y40h8= -github.com/code-payments/ocp-protobuf-api v0.8.1-0.20260121164058-c08592cc5e84 h1:OPCtG0vM4p0LU5xQJpHdX6pYTYQlEoZZrmk8WhgIKb4= -github.com/code-payments/ocp-protobuf-api v0.8.1-0.20260121164058-c08592cc5e84/go.mod h1:tw6BooY5a8l6CtSZnKOruyKII0W04n89pcM4BizrgG8= +github.com/code-payments/ocp-protobuf-api v0.9.0 h1:BWRgd3smO8cz6MUm5DxNqXClp1D2pn2F4VUuna9jzxM= +github.com/code-payments/ocp-protobuf-api v0.9.0/go.mod h1:tw6BooY5a8l6CtSZnKOruyKII0W04n89pcM4BizrgG8= github.com/containerd/continuity v0.0.0-20190827140505-75bee3e2ccb6 h1:NmTXa/uVnDyp0TY5MKi197+3HWcnYWfnHGyaFthlnGw= github.com/containerd/continuity v0.0.0-20190827140505-75bee3e2ccb6/go.mod h1:GL3xCUCBDV3CZiTSEKksMWbLE66hEyuu9qyDOOqM47Y= github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= From a3572328add4a5f518517da162efb08f5720e253 Mon Sep 17 00:00:00 2001 From: jeffyanta Date: Wed, 21 Jan 2026 13:45:20 -0500 Subject: [PATCH 6/6] Make currency worker polling intervals configurable --- ocp/rpc/currency/config.go | 36 ++++++++++++++++++++++++++++++++++++ ocp/rpc/currency/server.go | 3 ++- ocp/rpc/currency/stream.go | 5 +++++ ocp/rpc/currency/worker.go | 15 +++++---------- 4 files changed, 48 insertions(+), 11 deletions(-) create mode 100644 ocp/rpc/currency/config.go diff --git a/ocp/rpc/currency/config.go b/ocp/rpc/currency/config.go new file mode 100644 index 0000000..33fccbf --- /dev/null +++ b/ocp/rpc/currency/config.go @@ -0,0 +1,36 @@ +package currency + +import ( + "time" + + "github.com/code-payments/ocp-server/config" + "github.com/code-payments/ocp-server/config/env" +) + +const ( + envConfigPrefix = "CURRENCY_SERVICE_" + + ExchangeRatePollIntervalConfigEnvName = envConfigPrefix + "EXCHANGE_RATE_POLL_INTERVAL" + defaultExchangeRatePollInterval = 5 * time.Minute + + ReserveStatePollIntervalConfigEnvName = envConfigPrefix + "RESERVE_STATE_POLL_INTERVAL" + defaultReserveStatePollInterval = 15 * time.Second +) + +type conf struct { + exchangeRatePollInterval config.Duration + reserveStatePollInterval config.Duration +} + +// ConfigProvider defines how config values are pulled +type ConfigProvider func() *conf + +// WithEnvConfigs returns configuration pulled from environment variables +func WithEnvConfigs() ConfigProvider { + return func() *conf { + return &conf{ + exchangeRatePollInterval: env.NewDurationConfig(ExchangeRatePollIntervalConfigEnvName, defaultExchangeRatePollInterval), + reserveStatePollInterval: env.NewDurationConfig(ReserveStatePollIntervalConfigEnvName, defaultReserveStatePollInterval), + } + } +} diff --git a/ocp/rpc/currency/server.go b/ocp/rpc/currency/server.go index f66d13e..9996106 100644 --- a/ocp/rpc/currency/server.go +++ b/ocp/rpc/currency/server.go @@ -51,8 +51,9 @@ type currencyServer struct { func NewCurrencyServer( log *zap.Logger, data ocp_data.Provider, + configProvider ConfigProvider, ) currencypb.CurrencyServer { - liveMintStateWorker := newLiveMintStateWorker(log, data) + liveMintStateWorker := newLiveMintStateWorker(log, data, configProvider()) liveMintStateWorker.start(context.Background()) return ¤cyServer{ diff --git a/ocp/rpc/currency/stream.go b/ocp/rpc/currency/stream.go index b81fa88..d3afdc4 100644 --- a/ocp/rpc/currency/stream.go +++ b/ocp/rpc/currency/stream.go @@ -11,6 +11,11 @@ import ( "github.com/code-payments/ocp-server/ocp/common" ) +const ( + streamNotifyTimeout = 10 * time.Second + streamBufferSize = 100 +) + // streamUpdate represents an update to send to streams (pre-signed) type streamUpdate struct { // Pre-signed response ready to send directly diff --git a/ocp/rpc/currency/worker.go b/ocp/rpc/currency/worker.go index 45a8362..23e1079 100644 --- a/ocp/rpc/currency/worker.go +++ b/ocp/rpc/currency/worker.go @@ -20,13 +20,6 @@ import ( "github.com/code-payments/ocp-server/ocp/data/currency" ) -const ( - exchangeRatePollInterval = 5 * time.Minute - reserveStatePollInterval = 15 * time.Second - streamNotifyTimeout = 10 * time.Second - streamBufferSize = 100 -) - var ( jeffyMintAccount, _ = common.NewAccountFromPublicKeyString(config.JeffyMintPublicKey) ) @@ -54,6 +47,7 @@ type liveReserveStateData struct { type liveMintStateWorker struct { log *zap.Logger + conf *conf data ocp_data.Provider stateMu sync.RWMutex @@ -75,10 +69,11 @@ type liveMintStateWorker struct { cancel context.CancelFunc } -func newLiveMintStateWorker(log *zap.Logger, data ocp_data.Provider) *liveMintStateWorker { +func newLiveMintStateWorker(log *zap.Logger, data ocp_data.Provider, conf *conf) *liveMintStateWorker { ctx, cancel := context.WithCancel(context.Background()) return &liveMintStateWorker{ log: log, + conf: conf, data: data, launchpadReserves: make(map[string]*liveReserveStateData), streams: make(map[string]*liveMintDataStream), @@ -200,7 +195,7 @@ func (m *liveMintStateWorker) pollExchangeRates(ctx context.Context) { // Initial poll immediately m.fetchAndUpdateExchangeRates(ctx, log) - ticker := time.NewTicker(exchangeRatePollInterval) + ticker := time.NewTicker(m.conf.exchangeRatePollInterval.Get(ctx)) defer ticker.Stop() for { @@ -247,7 +242,7 @@ func (m *liveMintStateWorker) pollReserveState(ctx context.Context) { // Initial poll immediately m.fetchAndUpdateReserveState(ctx, log) - ticker := time.NewTicker(reserveStatePollInterval) + ticker := time.NewTicker(m.conf.reserveStatePollInterval.Get(ctx)) defer ticker.Stop() for {