Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
filippo.io/edwards25519 v1.1.0
github.com/aws/aws-sdk-go-v2 v0.17.0
github.com/code-payments/code-vm-indexer v1.2.0
github.com/code-payments/ocp-protobuf-api v0.8.0
github.com/code-payments/ocp-protobuf-api v0.9.0
github.com/emirpasic/gods v1.12.0
github.com/envoyproxy/protoc-gen-validate v1.2.1
github.com/golang/protobuf v1.5.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I
github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ=
github.com/code-payments/code-vm-indexer v1.2.0 h1:rSHpBMiT9BKgmKcXg/VIoi/h0t7jNxGx07Qz59m+6Q0=
github.com/code-payments/code-vm-indexer v1.2.0/go.mod h1:vn91YN2qNqb+gGJeZe2+l+TNxVmEEiRHXXnIn2Y40h8=
github.com/code-payments/ocp-protobuf-api v0.8.0 h1:Fm1dH7dFQu/xn5ohuPrJqcju8pWtIi4upH5f97KVrvY=
github.com/code-payments/ocp-protobuf-api v0.8.0/go.mod h1:tw6BooY5a8l6CtSZnKOruyKII0W04n89pcM4BizrgG8=
github.com/code-payments/ocp-protobuf-api v0.9.0 h1:BWRgd3smO8cz6MUm5DxNqXClp1D2pn2F4VUuna9jzxM=
github.com/code-payments/ocp-protobuf-api v0.9.0/go.mod h1:tw6BooY5a8l6CtSZnKOruyKII0W04n89pcM4BizrgG8=
github.com/containerd/continuity v0.0.0-20190827140505-75bee3e2ccb6 h1:NmTXa/uVnDyp0TY5MKi197+3HWcnYWfnHGyaFthlnGw=
github.com/containerd/continuity v0.0.0-20190827140505-75bee3e2ccb6/go.mod h1:GL3xCUCBDV3CZiTSEKksMWbLE66hEyuu9qyDOOqM47Y=
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
Expand Down
4 changes: 3 additions & 1 deletion ocp/auth/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ var wireTypes = map[protoreflect.Kind]protowire.Type{
protoreflect.GroupKind: protowire.StartGroupType,
}

func forceConsistentMarshal(m proto.Message) ([]byte, error) {
// ForceConsistentMarshal marshals a proto message using field number ordering
// for consistent serialization across different language implementations.
func ForceConsistentMarshal(m proto.Message) ([]byte, error) {
out, err := consistentMarshal(nil, m.ProtoReflect())
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions ocp/auth/encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestProtoEncoding_CrossLanguageSupport(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, goValue, base64.StdEncoding.EncodeToString(marshalled))

marshalled, err = forceConsistentMarshal(&msg)
marshalled, err = ForceConsistentMarshal(&msg)
require.NoError(t, err)
assert.Equal(t, otherLanguageValue, base64.StdEncoding.EncodeToString(marshalled))
}
Expand Down Expand Up @@ -83,7 +83,7 @@ func TestProtoEncoding_SDKTestParity(t *testing.T) {
require.NoError(t, err)
assert.NotEqual(t, marshalled, expected)

marshalled, err = forceConsistentMarshal(&msg)
marshalled, err = ForceConsistentMarshal(&msg)
require.NoError(t, err)
assert.Equal(t, marshalled, expected)
}
2 changes: 1 addition & 1 deletion ocp/auth/signature.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type marshalStrategy func(proto.Message) ([]byte, error)

// defaultMarshalStrategies are the default marshal strategies
var defaultMarshalStrategies = []marshalStrategy{
forceConsistentMarshal,
ForceConsistentMarshal,
proto.Marshal, // todo: deprecate this option
}

Expand Down
39 changes: 39 additions & 0 deletions ocp/currency/reserve.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package currency

import (
"context"
"errors"
"time"

"github.com/code-payments/ocp-server/ocp/common"
ocp_data "github.com/code-payments/ocp-server/ocp/data"
"github.com/code-payments/ocp-server/solana"
"github.com/code-payments/ocp-server/solana/currencycreator"
"github.com/code-payments/ocp-server/solana/token"
)

// GetLaunchpadCurrencyCirculatingSupply gets the current circulating supply in
// quarks for a launchpad currency directly from the blockchain
func GetLaunchpadCurrencyCirculatingSupply(ctx context.Context, data ocp_data.Provider, mint *common.Account) (uint64, time.Time, error) {
metadataRecord, err := data.GetCurrencyMetadata(ctx, mint.PublicKey().ToBase58())
if err != nil {
return 0, time.Time{}, err
}

accounts, err := common.GetLaunchpadCurrencyAccounts(metadataRecord)
if err != nil {
return 0, time.Time{}, err
}

ai, err := data.GetBlockchainAccountInfo(ctx, accounts.VaultMint.PublicKey().ToBase58(), solana.CommitmentFinalized)
if err != nil {
return 0, time.Time{}, err
}

var tokenAccount token.Account
if !tokenAccount.Unmarshal(ai.Data) {
return 0, time.Time{}, errors.New("invalid token account state")
}

return currencycreator.DefaultMintMaxQuarkSupply - tokenAccount.Amount, time.Now(), nil
}
36 changes: 36 additions & 0 deletions ocp/rpc/currency/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package currency

import (
"time"

"github.com/code-payments/ocp-server/config"
"github.com/code-payments/ocp-server/config/env"
)

const (
envConfigPrefix = "CURRENCY_SERVICE_"

ExchangeRatePollIntervalConfigEnvName = envConfigPrefix + "EXCHANGE_RATE_POLL_INTERVAL"
defaultExchangeRatePollInterval = 5 * time.Minute

ReserveStatePollIntervalConfigEnvName = envConfigPrefix + "RESERVE_STATE_POLL_INTERVAL"
defaultReserveStatePollInterval = 15 * time.Second
)

type conf struct {
exchangeRatePollInterval config.Duration
reserveStatePollInterval config.Duration
}

// ConfigProvider defines how config values are pulled
type ConfigProvider func() *conf

// WithEnvConfigs returns configuration pulled from environment variables
func WithEnvConfigs() ConfigProvider {
return func() *conf {
return &conf{
exchangeRatePollInterval: env.NewDurationConfig(ExchangeRatePollIntervalConfigEnvName, defaultExchangeRatePollInterval),
reserveStatePollInterval: env.NewDurationConfig(ReserveStatePollIntervalConfigEnvName, defaultReserveStatePollInterval),
}
}
}
193 changes: 176 additions & 17 deletions ocp/rpc/currency/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ import (
"strings"
"time"

"github.com/google/uuid"
"github.com/pkg/errors"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"

commonpb "github.com/code-payments/ocp-protobuf-api/generated/go/common/v1"
currencypb "github.com/code-payments/ocp-protobuf-api/generated/go/currency/v1"
"github.com/pkg/errors"

"github.com/code-payments/ocp-server/cache"
currency_lib "github.com/code-payments/ocp-server/currency"
Expand All @@ -23,30 +26,44 @@ import (
currency_util "github.com/code-payments/ocp-server/ocp/currency"
ocp_data "github.com/code-payments/ocp-server/ocp/data"
"github.com/code-payments/ocp-server/ocp/data/currency"
"github.com/code-payments/ocp-server/protoutil"
"github.com/code-payments/ocp-server/solana/currencycreator"
timelock_token "github.com/code-payments/ocp-server/solana/timelock/v1"
)

const (
streamPingDelay = 5 * time.Second
streamInitialRecvTimeout = 250 * time.Millisecond
)

type currencyServer struct {
log *zap.Logger
data ocp_data.Provider

exchangeRateHistoryCache cache.Cache
reserveHistoryCache cache.Cache

liveMintStateWorker *liveMintStateWorker

currencypb.UnimplementedCurrencyServer
}

func NewCurrencyServer(
log *zap.Logger,
data ocp_data.Provider,
configProvider ConfigProvider,
) currencypb.CurrencyServer {
liveMintStateWorker := newLiveMintStateWorker(log, data, configProvider())
liveMintStateWorker.start(context.Background())

return &currencyServer{
log: log,
data: data,

exchangeRateHistoryCache: cache.NewCache(1_000),
reserveHistoryCache: cache.NewCache(1_000),

liveMintStateWorker: liveMintStateWorker,
}
}

Expand Down Expand Up @@ -75,6 +92,22 @@ func (s *currencyServer) GetAllRates(ctx context.Context, req *currencypb.GetAll
}, nil
}

func (s *currencyServer) loadExchangeRatesForTime(ctx context.Context, t time.Time) (*currency.MultiRateRecord, error) {
record, err := s.data.GetAllExchangeRates(ctx, t)
if err != nil {
return nil, errors.Wrap(err, "failed to get price record by date")
}
return record, nil
}

func (s *currencyServer) loadExchangeRatesLatest(ctx context.Context) (*currency.MultiRateRecord, error) {
latest, err := s.data.GetAllExchangeRates(ctx, currency_util.GetLatestExchangeRateTime())
if err != nil {
return nil, errors.Wrap(err, "failed to get latest price record")
}
return latest, nil
}

func (s *currencyServer) GetMints(ctx context.Context, req *currencypb.GetMintsRequest) (*currencypb.GetMintsResponse, error) {
log := s.log.With(zap.String("method", "GetMints"))
log = client.InjectLoggingMetadata(ctx, log)
Expand Down Expand Up @@ -353,22 +386,6 @@ func (s *currencyServer) GetHistoricalMintData(ctx context.Context, req *currenc
}, nil
}

func (s *currencyServer) loadExchangeRatesForTime(ctx context.Context, t time.Time) (*currency.MultiRateRecord, error) {
record, err := s.data.GetAllExchangeRates(ctx, t)
if err != nil {
return nil, errors.Wrap(err, "failed to get price record by date")
}
return record, nil
}

func (s *currencyServer) loadExchangeRatesLatest(ctx context.Context) (*currency.MultiRateRecord, error) {
latest, err := s.data.GetAllExchangeRates(ctx, currency_util.GetLatestExchangeRateTime())
if err != nil {
return nil, errors.Wrap(err, "failed to get latest price record")
}
return latest, nil
}

func (s *currencyServer) getCachedReserveHistory(
ctx context.Context,
mint string,
Expand Down Expand Up @@ -483,3 +500,145 @@ func getTimeRangeForPredefinedRange(predefinedRange currencypb.GetHistoricalMint
return now.Add(-100 * 365 * 24 * time.Hour), now, query.IntervalDay
}
}

func (s *currencyServer) StreamLiveMintData(
streamer currencypb.Currency_StreamLiveMintDataServer,
) error {
ctx := streamer.Context()

log := s.log.With(zap.String("method", "StreamLiveMintData"))
log = client.InjectLoggingMetadata(ctx, log)

// Wait for the initial request to get the list of mints
req, err := protoutil.BoundedReceive[currencypb.StreamLiveMintDataRequest](ctx, streamer, streamInitialRecvTimeout)
if err != nil {
log.With(zap.Error(err)).Debug("error receiving initial request")
return err
}

// Must be a request type
request := req.GetRequest()
if request == nil {
return status.Error(codes.InvalidArgument, "first message must be a request")
}

// Parse requested mints
var requestedMints []*common.Account
for _, protoMint := range request.GetMints() {
mint, err := common.NewAccountFromProto(protoMint)
if err != nil {
log.With(zap.Error(err)).Warn("invalid mint")
return status.Error(codes.Internal, "")
}
requestedMints = append(requestedMints, mint)
}

// Generate unique stream ID
streamID := uuid.New().String()
log = log.With(zap.String("stream_id", streamID))

// Register stream with state worker
stream := s.liveMintStateWorker.registerStream(streamID, requestedMints)
defer s.liveMintStateWorker.unregisterStream(streamID)

log.Debug("stream registered")

// Wait for initial data to be available
if err := s.liveMintStateWorker.waitForData(ctx); err != nil {
log.With(zap.Error(err)).Debug("context cancelled while waiting for data")
return status.Error(codes.Canceled, "")
}

// Initial flush: send current exchange rates if the stream wants them
if stream.wantsExchangeRates() {
exchangeRates := s.liveMintStateWorker.getExchangeRates()
if exchangeRates != nil && exchangeRates.SignedResponse != nil {
if err := streamer.Send(exchangeRates.SignedResponse); err != nil {
log.With(zap.Error(err)).Debug("failed to send initial exchange rates")
return err
}
}
}

// Initial flush: send current reserve states
reserveStates := s.liveMintStateWorker.getReserveStates()
if len(reserveStates) > 0 {
// Filter based on requested mints and build batch response
var filtered []*currencypb.VerifiedLaunchpadCurrencyReserveState
for _, state := range reserveStates {
if stream.wantsMint(state.Mint.PublicKey().ToBase58()) && state.SignedState != nil {
filtered = append(filtered, state.SignedState)
}
}
if len(filtered) > 0 {
resp := &currencypb.StreamLiveMintDataResponse{
Type: &currencypb.StreamLiveMintDataResponse_Data{
Data: &currencypb.StreamLiveMintDataResponse_LiveData{
Type: &currencypb.StreamLiveMintDataResponse_LiveData_LaunchpadCurrencyReserveStates{
LaunchpadCurrencyReserveStates: &currencypb.VerifiedLaunchapdCurrencyReserveStateBatch{
ReserveStates: filtered,
},
},
},
},
}
if err := streamer.Send(resp); err != nil {
log.With(zap.Error(err)).Debug("failed to send initial reserve states")
return err
}
}
}

log.Debug("initial flush complete")

// Set up ping/pong health monitoring
sendPingCh := time.After(0) // Send first ping immediately
streamHealthCh := protoutil.MonitorStreamHealth(ctx, log, streamer, func(req *currencypb.StreamLiveMintDataRequest) bool {
return req.GetPong() != nil
})

// Main loop: listen on stream channel and send updates
for {
select {
case update, ok := <-stream.streamCh:
if !ok {
log.Debug("stream channel closed")
return status.Error(codes.Aborted, "stream closed")
}

if update.response == nil {
continue
}

if err := streamer.Send(update.response); err != nil {
log.With(zap.Error(err)).Debug("failed to send update")
return err
}

case <-sendPingCh:
log.Debug("sending ping to client")
sendPingCh = time.After(streamPingDelay)

err := streamer.Send(&currencypb.StreamLiveMintDataResponse{
Type: &currencypb.StreamLiveMintDataResponse_Ping{
Ping: &commonpb.ServerPing{
Timestamp: timestamppb.Now(),
PingDelay: durationpb.New(streamPingDelay),
},
},
})
if err != nil {
log.Debug("failed to send ping, stream is unhealthy")
return status.Error(codes.Aborted, "terminating unhealthy stream")
}

case <-streamHealthCh:
log.Debug("stream is unhealthy, terminating")
return status.Error(codes.Aborted, "terminating unhealthy stream")

case <-ctx.Done():
log.Debug("context cancelled")
return status.Error(codes.Canceled, "")
}
}
}
Loading
Loading