From 541efb71c0bfcda85f6e8ca55e98f40b5c8d8297 Mon Sep 17 00:00:00 2001 From: jeffyanta Date: Tue, 6 Jan 2026 13:30:17 -0500 Subject: [PATCH] Refactor swap flows to remove transaction signature collection after funding --- go.mod | 2 +- go.sum | 4 +- ocp/data/swap/memory/store.go | 4 +- ocp/data/swap/postgres/model.go | 41 +- ocp/data/swap/swap.go | 14 +- ocp/data/swap/tests/tests.go | 15 +- ocp/rpc/transaction/config.go | 6 +- ocp/rpc/transaction/errors.go | 77 +--- ocp/rpc/transaction/swap.go | 762 +++++++++++--------------------- ocp/worker/swap/config.go | 9 +- ocp/worker/swap/runtime.go | 1 - ocp/worker/swap/util.go | 270 +++-------- ocp/worker/swap/worker.go | 81 +--- 13 files changed, 393 insertions(+), 893 deletions(-) diff --git a/go.mod b/go.mod index d5b9b7b..44dae3f 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( filippo.io/edwards25519 v1.1.0 github.com/aws/aws-sdk-go-v2 v0.17.0 github.com/code-payments/code-vm-indexer v1.2.0 - github.com/code-payments/ocp-protobuf-api v0.6.0 + github.com/code-payments/ocp-protobuf-api v0.7.0 github.com/emirpasic/gods v1.12.0 github.com/envoyproxy/protoc-gen-validate v1.2.1 github.com/golang/protobuf v1.5.4 diff --git a/go.sum b/go.sum index 13e0488..e19533f 100644 --- a/go.sum +++ b/go.sum @@ -78,8 +78,8 @@ github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= github.com/code-payments/code-vm-indexer v1.2.0 h1:rSHpBMiT9BKgmKcXg/VIoi/h0t7jNxGx07Qz59m+6Q0= github.com/code-payments/code-vm-indexer v1.2.0/go.mod h1:vn91YN2qNqb+gGJeZe2+l+TNxVmEEiRHXXnIn2Y40h8= -github.com/code-payments/ocp-protobuf-api v0.6.0 h1:dv/QWox20Z8RRHEwPSlWMAJPDMIvkzw97FESZArS8WA= -github.com/code-payments/ocp-protobuf-api v0.6.0/go.mod h1:tw6BooY5a8l6CtSZnKOruyKII0W04n89pcM4BizrgG8= +github.com/code-payments/ocp-protobuf-api v0.7.0 h1:pHIVYXmDus32LEzaj92qDWKYrPawuzBIJ+Xlzzf9udg= +github.com/code-payments/ocp-protobuf-api v0.7.0/go.mod h1:tw6BooY5a8l6CtSZnKOruyKII0W04n89pcM4BizrgG8= github.com/containerd/continuity v0.0.0-20190827140505-75bee3e2ccb6 h1:NmTXa/uVnDyp0TY5MKi197+3HWcnYWfnHGyaFthlnGw= github.com/containerd/continuity v0.0.0-20190827140505-75bee3e2ccb6/go.mod h1:GL3xCUCBDV3CZiTSEKksMWbLE66hEyuu9qyDOOqM47Y= github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= diff --git a/ocp/data/swap/memory/store.go b/ocp/data/swap/memory/store.go index 3edbf79..e9a00dd 100644 --- a/ocp/data/swap/memory/store.go +++ b/ocp/data/swap/memory/store.go @@ -6,9 +6,8 @@ import ( "sync" "time" - "github.com/code-payments/ocp-server/ocp/data/swap" "github.com/code-payments/ocp-server/database/query" - "github.com/code-payments/ocp-server/pointer" + "github.com/code-payments/ocp-server/ocp/data/swap" ) type ById []*swap.Record @@ -43,7 +42,6 @@ func (s *store) Save(_ context.Context, data *swap.Record) error { data.Version++ - item.TransactionSignature = pointer.StringCopy(data.TransactionSignature) item.TransactionBlob = data.TransactionBlob item.State = data.State item.Version = data.Version diff --git a/ocp/data/swap/postgres/model.go b/ocp/data/swap/postgres/model.go index 331f658..26fb8a8 100644 --- a/ocp/data/swap/postgres/model.go +++ b/ocp/data/swap/postgres/model.go @@ -7,10 +7,9 @@ import ( "github.com/jmoiron/sqlx" - "github.com/code-payments/ocp-server/ocp/data/swap" pgutil "github.com/code-payments/ocp-server/database/postgres" q "github.com/code-payments/ocp-server/database/query" - "github.com/code-payments/ocp-server/pointer" + "github.com/code-payments/ocp-server/ocp/data/swap" ) const ( @@ -18,22 +17,22 @@ const ( ) type model struct { - Id sql.NullInt64 `db:"id"` - SwapId string `db:"swap_id"` - Owner string `db:"owner"` - FromMint string `db:"from_mint"` - ToMint string `db:"to_mint"` - Amount uint64 `db:"amount"` - FundingId string `db:"funding_id"` - FundingSource uint8 `db:"funding_source"` - Nonce string `db:"nonce"` - Blockhash string `db:"blockhash"` - ProofSignature string `db:"proof_signature"` - TransactionSignature sql.NullString `db:"transaction_signature"` - TransactionBlob []byte `db:"transaction_blob"` - State uint8 `db:"state"` - Version uint64 `db:"version"` - CreatedAt time.Time `db:"created_at"` + Id sql.NullInt64 `db:"id"` + SwapId string `db:"swap_id"` + Owner string `db:"owner"` + FromMint string `db:"from_mint"` + ToMint string `db:"to_mint"` + Amount uint64 `db:"amount"` + FundingId string `db:"funding_id"` + FundingSource uint8 `db:"funding_source"` + Nonce string `db:"nonce"` + Blockhash string `db:"blockhash"` + ProofSignature string `db:"proof_signature"` + TransactionSignature string `db:"transaction_signature"` + TransactionBlob []byte `db:"transaction_blob"` + State uint8 `db:"state"` + Version uint64 `db:"version"` + CreatedAt time.Time `db:"created_at"` } func toModel(obj *swap.Record) (*model, error) { @@ -57,7 +56,7 @@ func toModel(obj *swap.Record) (*model, error) { Nonce: obj.Nonce, Blockhash: obj.Blockhash, ProofSignature: obj.ProofSignature, - TransactionSignature: sql.NullString{String: *pointer.StringOrDefault(obj.TransactionSignature, ""), Valid: obj.TransactionSignature != nil}, + TransactionSignature: obj.TransactionSignature, TransactionBlob: obj.TransactionBlob, State: uint8(obj.State), Version: obj.Version, @@ -78,7 +77,7 @@ func fromModel(m *model) *swap.Record { Nonce: m.Nonce, Blockhash: m.Blockhash, ProofSignature: m.ProofSignature, - TransactionSignature: pointer.StringIfValid(m.TransactionSignature.Valid, m.TransactionSignature.String), + TransactionSignature: m.TransactionSignature, TransactionBlob: m.TransactionBlob, State: swap.State(m.State), Version: m.Version, @@ -94,7 +93,7 @@ func (m *model) dbSave(ctx context.Context, db *sqlx.DB) error { ON CONFLICT (swap_id) DO UPDATE - SET transaction_signature = $11, transaction_blob = $12, state = $13, version = ` + tableName + `.version + 1 + SET transaction_blob = $12, state = $13, version = ` + tableName + `.version + 1 WHERE ` + tableName + `.swap_id = $1 AND ` + tableName + `.version = $14 RETURNING diff --git a/ocp/data/swap/swap.go b/ocp/data/swap/swap.go index 1e94395..b338c03 100644 --- a/ocp/data/swap/swap.go +++ b/ocp/data/swap/swap.go @@ -3,8 +3,6 @@ package swap import ( "errors" "time" - - "github.com/code-payments/ocp-server/pointer" ) type State uint8 @@ -48,7 +46,7 @@ type Record struct { ProofSignature string - TransactionSignature *string + TransactionSignature string TransactionBlob []byte State State @@ -78,7 +76,7 @@ func (r *Record) Clone() Record { ProofSignature: r.ProofSignature, - TransactionSignature: pointer.StringCopy(r.TransactionSignature), + TransactionSignature: r.TransactionSignature, TransactionBlob: r.TransactionBlob, State: r.State, @@ -108,7 +106,7 @@ func (r *Record) CopyTo(dst *Record) { dst.ProofSignature = r.ProofSignature - dst.TransactionSignature = pointer.StringCopy(r.TransactionSignature) + dst.TransactionSignature = r.TransactionSignature dst.TransactionBlob = r.TransactionBlob dst.State = r.State @@ -159,14 +157,10 @@ func (r *Record) Validate() error { return errors.New("proof signature is required") } - if r.TransactionSignature != nil && len(*r.TransactionSignature) == 0 { + if len(r.TransactionSignature) == 0 { return errors.New("transaction signature is empty") } - if len(r.TransactionBlob) != 0 && r.TransactionSignature == nil { - return errors.New("transaction signature is missing") - } - return nil } diff --git a/ocp/data/swap/tests/tests.go b/ocp/data/swap/tests/tests.go index 4947667..d9b7807 100644 --- a/ocp/data/swap/tests/tests.go +++ b/ocp/data/swap/tests/tests.go @@ -9,9 +9,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/code-payments/ocp-server/ocp/data/swap" "github.com/code-payments/ocp-server/database/query" - "github.com/code-payments/ocp-server/pointer" + "github.com/code-payments/ocp-server/ocp/data/swap" ) func RunTests(t *testing.T, s swap.Store, teardown func()) { @@ -58,7 +57,7 @@ func testRoundTrip(t *testing.T, s swap.Store) { ProofSignature: "test_proof_signature", - TransactionSignature: pointer.String("test_transaction_signature"), + TransactionSignature: "test_transaction_signature", TransactionBlob: []byte("test_transaction_blob"), State: swap.StateFinalized, @@ -107,7 +106,7 @@ func testUpdateHappyPath(t *testing.T, s swap.Store) { ProofSignature: "test_proof_signature", - TransactionSignature: nil, + TransactionSignature: "test_transaction_signature", TransactionBlob: nil, State: swap.StateCreated, @@ -119,7 +118,6 @@ func testUpdateHappyPath(t *testing.T, s swap.Store) { assert.EqualValues(t, 1, expected.Id) assert.EqualValues(t, 1, expected.Version) - expected.TransactionSignature = pointer.String("test_transaction_signature") expected.TransactionBlob = []byte("transaction_blob") expected.State = swap.StateFinalized @@ -155,7 +153,7 @@ func testUpdateStaleRecord(t *testing.T, s swap.Store) { ProofSignature: "test_proof_signature", - TransactionSignature: pointer.String("test_transaction_signature"), + TransactionSignature: "test_transaction_signature", TransactionBlob: []byte("test_transaction_blob"), State: swap.StateFinalized, @@ -169,7 +167,6 @@ func testUpdateStaleRecord(t *testing.T, s swap.Store) { stale := expected.Clone() expected.State = swap.StateUnknown - expected.TransactionSignature = nil expected.TransactionBlob = nil stale.Version -= 1 @@ -214,7 +211,7 @@ func testGetAllByOwnerAndState(t *testing.T, s swap.Store) { ProofSignature: fmt.Sprintf("test_proof_signature_%d", i), - TransactionSignature: pointer.String(fmt.Sprintf("test_transaction_signature_%d", i)), + TransactionSignature: fmt.Sprintf("test_transaction_signature_%d", i), TransactionBlob: []byte(fmt.Sprintf("test_transaction_blob_%d", i)), State: swap.State(i % int(swap.StateCancelled+1)), @@ -281,7 +278,7 @@ func testGetAllByState(t *testing.T, s swap.Store) { ProofSignature: fmt.Sprintf("test_proof_signature_%d", i), - TransactionSignature: pointer.String(fmt.Sprintf("test_transaction_signature_%d", i)), + TransactionSignature: fmt.Sprintf("test_transaction_signature_%d", i), TransactionBlob: []byte(fmt.Sprintf("test_transaction_blob_%d", i)), State: state, diff --git a/ocp/rpc/transaction/config.go b/ocp/rpc/transaction/config.go index 2b014bd..8f9645b 100644 --- a/ocp/rpc/transaction/config.go +++ b/ocp/rpc/transaction/config.go @@ -19,13 +19,13 @@ const ( defaultDisableSwaps = false SubmitIntentTimeoutConfigEnvName = envConfigPrefix + "SUBMIT_INTENT_TIMEOUT" - defaultSubmitIntentTimeout = 5 * time.Second + defaultSubmitIntentTimeout = 15 * time.Second SwapTimeoutConfigEnvName = envConfigPrefix + "SWAP_TIMEOUT" - defaultSwapTimeout = 120 * time.Second + defaultSwapTimeout = time.Minute ClientReceiveTimeoutConfigEnvName = envConfigPrefix + "CLIENT_RECEIVE_TIMEOUT" - defaultClientReceiveTimeout = time.Second + defaultClientReceiveTimeout = 2 * time.Second FeeCollectorOwnerPublicKeyConfigEnvName = envConfigPrefix + "FEE_COLLECTOR_OWNER_PUBLIC_KEY" defaultFeeCollectorPublicKey = "invalid" // Ensure something valid is set diff --git a/ocp/rpc/transaction/errors.go b/ocp/rpc/transaction/errors.go index cf2f6d8..94db098 100644 --- a/ocp/rpc/transaction/errors.go +++ b/ocp/rpc/transaction/errors.go @@ -291,7 +291,7 @@ func handleSubmitIntentStructuredError(streamer transactionpb.Transaction_Submit return streamer.Send(errResp) } -func handleStartSwapError(streamer transactionpb.Transaction_StartSwapServer, err error) error { +func handleStatefulSwapError(streamer transactionpb.Transaction_StatefulSwapServer, err error) error { // gRPC status errors are passed through as is if _, ok := status.FromError(err); ok { return err @@ -300,24 +300,24 @@ func handleStartSwapError(streamer transactionpb.Transaction_StartSwapServer, er // Case 1: Errors that map to a Code error response switch err.(type) { case SwapValidationError: - return handleStartSwapStructuredError( + return handleStatefulSwapStructuredError( streamer, - transactionpb.StartSwapResponse_Error_INVALID_SWAP, + transactionpb.StatefulSwapResponse_Error_INVALID_SWAP, toReasonStringErrorDetails(err), ) case SwapDeniedError: - return handleStartSwapStructuredError( + return handleStatefulSwapStructuredError( streamer, - transactionpb.StartSwapResponse_Error_DENIED, + transactionpb.StatefulSwapResponse_Error_DENIED, toDeniedErrorDetails(err), ) } switch err { case ErrInvalidSignature: - return handleStartSwapStructuredError( + return handleStatefulSwapStructuredError( streamer, - transactionpb.StartSwapResponse_Error_SIGNATURE_ERROR, + transactionpb.StatefulSwapResponse_Error_SIGNATURE_ERROR, toReasonStringErrorDetails(err), ) case ErrNotImplemented: @@ -336,65 +336,10 @@ func handleStartSwapError(streamer transactionpb.Transaction_StartSwapServer, er return status.Error(codes.Internal, "rpc server failure") } -func handleStartSwapStructuredError(streamer transactionpb.Transaction_StartSwapServer, code transactionpb.StartSwapResponse_Error_Code, errorDetails ...*transactionpb.ErrorDetails) error { - errResp := &transactionpb.StartSwapResponse{ - Response: &transactionpb.StartSwapResponse_Error_{ - Error: &transactionpb.StartSwapResponse_Error{ - Code: code, - ErrorDetails: errorDetails, - }, - }, - } - return streamer.Send(errResp) -} - -func handleSwapError(streamer transactionpb.Transaction_SwapServer, err error) error { - // gRPC status errors are passed through as is - if _, ok := status.FromError(err); ok { - return err - } - - // Case 1: Errors that map to a Code error response - switch err.(type) { - case SwapValidationError: - return handleSwapStructuredError( - streamer, - transactionpb.SwapResponse_Error_INVALID_SWAP, - toReasonStringErrorDetails(err), - ) - case SwapDeniedError: - return handleSwapStructuredError( - streamer, - transactionpb.SwapResponse_Error_DENIED, - toDeniedErrorDetails(err), - ) - } - - switch err { - case ErrInvalidSignature: - return handleSwapStructuredError( - streamer, - transactionpb.SwapResponse_Error_SIGNATURE_ERROR, - toReasonStringErrorDetails(err), - ) - case ErrNotImplemented: - return status.Error(codes.Unimplemented, err.Error()) - } - - // Case 2: Errors that map to gRPC status errors - switch err { - case ErrTimedOutReceivingRequest, context.DeadlineExceeded: - return status.Error(codes.DeadlineExceeded, err.Error()) - case context.Canceled: - return status.Error(codes.Canceled, err.Error()) - } - return status.Error(codes.Internal, "rpc server failure") -} - -func handleSwapStructuredError(streamer transactionpb.Transaction_SwapServer, code transactionpb.SwapResponse_Error_Code, errorDetails ...*transactionpb.ErrorDetails) error { - errResp := &transactionpb.SwapResponse{ - Response: &transactionpb.SwapResponse_Error_{ - Error: &transactionpb.SwapResponse_Error{ +func handleStatefulSwapStructuredError(streamer transactionpb.Transaction_StatefulSwapServer, code transactionpb.StatefulSwapResponse_Error_Code, errorDetails ...*transactionpb.ErrorDetails) error { + errResp := &transactionpb.StatefulSwapResponse{ + Response: &transactionpb.StatefulSwapResponse_Error_{ + Error: &transactionpb.StatefulSwapResponse_Error{ Code: code, ErrorDetails: errorDetails, }, diff --git a/ocp/rpc/transaction/swap.go b/ocp/rpc/transaction/swap.go index 7747d00..32e93e2 100644 --- a/ocp/rpc/transaction/swap.go +++ b/ocp/rpc/transaction/swap.go @@ -8,7 +8,6 @@ import ( "time" "github.com/mr-tron/base58/base58" - "github.com/pkg/errors" "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -29,65 +28,76 @@ import ( "github.com/code-payments/ocp-server/solana" ) -func (s *transactionServer) StartSwap(streamer transactionpb.Transaction_StartSwapServer) error { - ctx := streamer.Context() +func (s *transactionServer) StatefulSwap(streamer transactionpb.Transaction_StatefulSwapServer) error { + // Bound the total RPC. Keeping the timeout higher to see where we land because + // there's a lot of stuff happening in this method. + ctx, cancel := context.WithTimeout(streamer.Context(), s.conf.swapTimeout.Get(streamer.Context())) + defer cancel() - log := s.log.With(zap.String("method", "StartSwap")) + log := s.log.With(zap.String("method", "StatefulSwap")) log = client.InjectLoggingMetadata(ctx, log) if s.conf.disableSwaps.Get(ctx) { - return handleStartSwapError(streamer, status.Error(codes.Unavailable, "temporarily unavailable")) + return handleStatefulSwapError(streamer, status.Error(codes.Unavailable, "temporarily unavailable")) } - req, err := protoutil.BoundedReceive[transactionpb.StartSwapRequest](ctx, streamer, s.conf.clientReceiveTimeout.Get(ctx)) + req, err := protoutil.BoundedReceive[transactionpb.StatefulSwapRequest](ctx, streamer, s.conf.clientReceiveTimeout.Get(ctx)) if err != nil { log.With(zap.Error(err)).Info("error receiving request from client") - return handleStartSwapError(streamer, err) + return handleStatefulSwapError(streamer, err) } - startReq := req.GetStart() - if startReq == nil { - return handleStartSwapError(streamer, status.Error(codes.InvalidArgument, "StartSwapRequest.Start is nil")) + initiateReq := req.GetInitiate() + if initiateReq == nil { + return handleStatefulSwapError(streamer, status.Error(codes.InvalidArgument, "StatefulSwapRequest.Initiate is nil")) } - owner, err := common.NewAccountFromProto(startReq.Owner) + owner, err := common.NewAccountFromProto(initiateReq.Owner) if err != nil { log.With(zap.Error(err)).Warn("invalid owner account") - return handleStartSwapError(streamer, err) + return handleStatefulSwapError(streamer, err) } log = log.With(zap.String("owner", owner.PublicKey().ToBase58())) - reqSignature := startReq.Signature - startReq.Signature = nil - if err := s.auth.Authenticate(ctx, owner, startReq, reqSignature); err != nil { - return handleStartSwapError(streamer, err) + swapAuthority, err := common.NewAccountFromProto(initiateReq.SwapAuthority) + if err != nil { + log.With(zap.Error(err)).Warn("invalid swap authority") + return handleStatefulSwapError(streamer, err) + } + log = log.With(zap.String("swap_authority", swapAuthority.PublicKey().ToBase58())) + + reqSignature := initiateReq.Signature + initiateReq.Signature = nil + if err := s.auth.Authenticate(ctx, owner, initiateReq, reqSignature); err != nil { + return handleStatefulSwapError(streamer, err) } - startCurrencyCreatorSwapReq := startReq.GetCurrencyCreator() - if startCurrencyCreatorSwapReq == nil { - return handleStartSwapError(streamer, status.Error(codes.InvalidArgument, "StartSwapRequest.Start.CurrencyCreator is nil")) + initiateCurrencyCreatorSwapReq := initiateReq.GetCurrencyCreator() + if initiateCurrencyCreatorSwapReq == nil { + return handleStatefulSwapError(streamer, status.Error(codes.InvalidArgument, "StatefulSwapRequest.Initiate.CurrencyCreator is nil")) } - swapId := base58.Encode(startCurrencyCreatorSwapReq.Id.Value) + swapId := base58.Encode(initiateCurrencyCreatorSwapReq.Id.Value) log = log.With(zap.String("swap_id", swapId)) - fromMint, err := common.NewAccountFromProto(startCurrencyCreatorSwapReq.FromMint) + fromMint, err := common.NewAccountFromProto(initiateCurrencyCreatorSwapReq.FromMint) if err != nil { log.With(zap.Error(err)).Warn("invalid source mint account") - return handleStartSwapError(streamer, err) + return handleStatefulSwapError(streamer, err) } log = log.With(zap.String("from_mint", fromMint.PublicKey().ToBase58())) - toMint, err := common.NewAccountFromProto(startCurrencyCreatorSwapReq.ToMint) + toMint, err := common.NewAccountFromProto(initiateCurrencyCreatorSwapReq.ToMint) if err != nil { log.With(zap.Error(err)).Warn("invalid destination mint account") - return handleStartSwapError(streamer, err) + return handleStatefulSwapError(streamer, err) } log = log.With(zap.String("to_mint", toMint.PublicKey().ToBase58())) log = log.With( - zap.String("funding_source", startCurrencyCreatorSwapReq.FundingSource.String()), - zap.String("funding_id", startCurrencyCreatorSwapReq.FundingId), + zap.Uint64("amount", initiateCurrencyCreatorSwapReq.Amount), + zap.String("funding_source", initiateCurrencyCreatorSwapReq.FundingSource.String()), + zap.String("funding_id", initiateCurrencyCreatorSwapReq.FundingId), ) // @@ -97,17 +107,17 @@ func (s *transactionServer) StartSwap(streamer transactionpb.Transaction_StartSw ownerManagemntState, err := common.GetOwnerManagementState(ctx, s.data, owner) if err != nil { log.With(zap.Error(err)).Warn("failure getting owner management state") - return handleStartSwapError(streamer, err) + return handleStatefulSwapError(streamer, err) } if ownerManagemntState != common.OwnerManagementStateOcpAccount { - return handleStartSwapError(streamer, NewSwapDeniedError("not an ocp account")) + return handleStatefulSwapError(streamer, NewSwapDeniedError("not an ocp account")) } - allow, err := s.antispamGuard.AllowSwap(ctx, swap.FundingSource(startCurrencyCreatorSwapReq.FundingSource), owner, fromMint, toMint) + allow, err := s.antispamGuard.AllowSwap(ctx, swap.FundingSource(initiateCurrencyCreatorSwapReq.FundingSource), owner, fromMint, toMint) if err != nil { - return handleStartSwapError(streamer, err) + return handleStatefulSwapError(streamer, err) } else if !allow { - return handleStartSwapError(streamer, NewSwapDeniedError("rate limited")) + return handleStatefulSwapError(streamer, NewSwapDeniedError("rate limited")) } // @@ -116,117 +126,139 @@ func (s *transactionServer) StartSwap(streamer transactionpb.Transaction_StartSw _, err = s.data.GetSwapById(ctx, swapId) if err == nil { - return handleStartSwapError(streamer, NewSwapDeniedError("attempt to reuse swap id")) + return handleStatefulSwapError(streamer, NewSwapDeniedError("attempt to reuse swap id")) } else if err != swap.ErrNotFound { log.With(zap.Error(err)).Warn("failure checking for existing swap record by id") - return handleStartSwapError(streamer, err) + return handleStatefulSwapError(streamer, err) } - _, err = s.data.GetSwapByFundingId(ctx, startCurrencyCreatorSwapReq.FundingId) + _, err = s.data.GetSwapByFundingId(ctx, initiateCurrencyCreatorSwapReq.FundingId) if err == nil { - return handleStartSwapError(streamer, NewSwapDeniedError("attempt to reuse swap funding id")) + return handleStatefulSwapError(streamer, NewSwapDeniedError("attempt to reuse swap funding id")) } else if err != swap.ErrNotFound { log.With(zap.Error(err)).Warn("failure checking for existing swap record by funding id") - return handleStartSwapError(streamer, err) + return handleStatefulSwapError(streamer, err) + } + + if owner.PublicKey().ToBase58() == swapAuthority.PublicKey().ToBase58() { + return handleStatefulSwapError(streamer, NewSwapValidationError("owner cannot be swap authority")) } if bytes.Equal(fromMint.PublicKey().ToBytes(), toMint.PublicKey().ToBytes()) { - return handleStartSwapError(streamer, NewSwapValidationError("must swap between two different mints")) + return handleStatefulSwapError(streamer, NewSwapValidationError("must swap between two different mints")) } - if startCurrencyCreatorSwapReq.Amount == 0 { - return handleStartSwapError(streamer, NewSwapValidationError("amount must be positive")) + if initiateCurrencyCreatorSwapReq.Amount == 0 { + return handleStatefulSwapError(streamer, NewSwapValidationError("amount must be positive")) } sourceVmConfig, err := common.GetVmConfigForMint(ctx, s.data, fromMint) if err == common.ErrUnsupportedMint { - return handleStartSwapError(streamer, NewSwapValidationError("invalid source mint")) + return handleStatefulSwapError(streamer, NewSwapValidationError("invalid source mint")) } else if err != nil { log.With(zap.Error(err)).Warn("failure getting source vm config") - return handleStartSwapError(streamer, err) + return handleStatefulSwapError(streamer, err) } destinationVmConfig, err := common.GetVmConfigForMint(ctx, s.data, toMint) if err == common.ErrUnsupportedMint { - return handleStartSwapError(streamer, NewSwapValidationError("invalid destination mint")) + return handleStatefulSwapError(streamer, NewSwapValidationError("invalid destination mint")) } else if err != nil { log.With(zap.Error(err)).Warn("failure getting destination vm config") - return handleStartSwapError(streamer, err) + return handleStatefulSwapError(streamer, err) } ownerSourceTimelockVault, err := owner.ToTimelockVault(sourceVmConfig) if err != nil { log.With(zap.Error(err)).Warn("failure getting owner source timelock vault") - return handleStartSwapError(streamer, err) + return handleStatefulSwapError(streamer, err) } _, err = s.data.GetTimelockByVault(ctx, ownerSourceTimelockVault.PublicKey().ToBase58()) if err == timelock.ErrTimelockNotFound { - return handleStartSwapError(streamer, NewSwapValidationError("source timelock vault account not opened")) + return handleStatefulSwapError(streamer, NewSwapValidationError("source timelock vault account not opened")) } else if err != nil { log.With(zap.Error(err)).Warn("failure getting source timelock record") - return handleStartSwapError(streamer, err) + return handleStatefulSwapError(streamer, err) } ownerDestinationTimelockVault, err := owner.ToTimelockVault(destinationVmConfig) if err != nil { log.With(zap.Error(err)).Warn("failure getting owner destination timelock vault") - return handleStartSwapError(streamer, err) + return handleStatefulSwapError(streamer, err) } _, err = s.data.GetTimelockByVault(ctx, ownerDestinationTimelockVault.PublicKey().ToBase58()) if err == timelock.ErrTimelockNotFound { - return handleStartSwapError(streamer, NewSwapValidationError("destination timelock vault account not opened")) + return handleStatefulSwapError(streamer, NewSwapValidationError("destination timelock vault account not opened")) } else if err != nil { log.With(zap.Error(err)).Warn("failure getting destination timelock record") - return handleStartSwapError(streamer, err) + return handleStatefulSwapError(streamer, err) } - switch startCurrencyCreatorSwapReq.FundingSource { + switch initiateCurrencyCreatorSwapReq.FundingSource { case transactionpb.FundingSource_FUNDING_SOURCE_SUBMIT_INTENT: - decodedFundingId, err := base58.Decode(startCurrencyCreatorSwapReq.FundingId) + decodedFundingId, err := base58.Decode(initiateCurrencyCreatorSwapReq.FundingId) if err != nil || len(decodedFundingId) != ed25519.PublicKeySize { log.With(zap.Error(err)).Warn("invalid funding id") - return handleStartSwapError(streamer, NewSwapValidationError("funding id is not a public key")) + return handleStatefulSwapError(streamer, NewSwapValidationError("funding id is not a public key")) } - _, err = s.data.GetIntent(ctx, startCurrencyCreatorSwapReq.FundingId) + _, err = s.data.GetIntent(ctx, initiateCurrencyCreatorSwapReq.FundingId) if err == nil { - return handleStartSwapError(streamer, NewSwapValidationError("funding intent already exists")) + return handleStatefulSwapError(streamer, NewSwapValidationError("funding intent already exists")) } else if err != intent.ErrIntentNotFound { log.With(zap.Error(err)).Warn("failure getting funding intent record") - return handleStartSwapError(streamer, err) + return handleStatefulSwapError(streamer, err) } balance, err := balance.CalculateFromCache(ctx, s.data, ownerSourceTimelockVault) if err != nil { log.With(zap.Error(err)).Warn("failure getting owner source timelock vault balance") - return handleStartSwapError(streamer, err) + return handleStatefulSwapError(streamer, err) } - if balance < startCurrencyCreatorSwapReq.Amount { - return handleStartSwapError(streamer, NewSwapValidationError("insufficient balance")) + if balance < initiateCurrencyCreatorSwapReq.Amount { + return handleStatefulSwapError(streamer, NewSwapValidationError("insufficient balance")) } case transactionpb.FundingSource_FUNDING_SOURCE_EXTERNAL_WALLET: - decodedFundingId, err := base58.Decode(startCurrencyCreatorSwapReq.FundingId) + decodedFundingId, err := base58.Decode(initiateCurrencyCreatorSwapReq.FundingId) if err != nil || len(decodedFundingId) != ed25519.SignatureSize { log.With(zap.Error(err)).Warn("invalid funding id") - return handleStartSwapError(streamer, NewSwapValidationError("funding id is not a signature")) - } - - _, err = s.data.GetTransaction(ctx, startCurrencyCreatorSwapReq.FundingId) - if err == nil { - // Current flows expect that client will call StartSwap before submitting the transaction - return handleStartSwapError(streamer, NewSwapValidationError("funding transaction already exists")) - } else if err != solana.ErrSignatureNotFound { - log.With(zap.Error(err)).Warn("failure getting funding transaction") - return handleStartSwapError(streamer, err) + return handleStatefulSwapError(streamer, NewSwapValidationError("funding id is not a signature")) } default: - return handleStartSwapError(streamer, NewSwapDeniedErrorf("funding source %s is not supported", startCurrencyCreatorSwapReq.FundingSource)) + return handleStatefulSwapError(streamer, NewSwapDeniedErrorf("funding source %s is not supported", initiateCurrencyCreatorSwapReq.FundingSource)) } // - // Section: Server parameters + // Section: Verified metadata signature verification + // + + verifiedMetadata := &transactionpb.VerifiedSwapMetadata{ + Kind: &transactionpb.VerifiedSwapMetadata_CurrencyCreator{ + CurrencyCreator: &transactionpb.VerifiedCurrencyCreatorSwapMetadata{ + ClientParameters: initiateCurrencyCreatorSwapReq, + }, + }, + } + + metadataSignature := initiateReq.ProofSignature + if err := s.auth.Authenticate(ctx, owner, verifiedMetadata, metadataSignature); err != nil { + return handleStatefulSwapStructuredError(streamer, transactionpb.StatefulSwapResponse_Error_SIGNATURE_ERROR) + } + + // + // Section: On-demand account creation + // + + err = vm.EnsureVirtualTimelockAccountIsInitialized(ctx, s.data, s.vmIndexerClient, toMint, owner, true) + if err != nil { + log.With(zap.Error(err)).Warn("timed out waiting for destination timelock account initialization") + return handleStatefulSwapError(streamer, err) + } + + // + // Section: Transaction construction // noncePool, err := transaction_util.SelectNoncePool( @@ -237,74 +269,189 @@ func (s *transactionServer) StartSwap(streamer transactionpb.Transaction_StartSw ) if err != nil { log.With(zap.Error(err)).Warn("failure selecting nonce pool") - return handleStartSwapError(streamer, err) + return handleStatefulSwapError(streamer, err) } selectedNonce, err := noncePool.GetNonce(ctx) if err != nil { log.With(zap.Error(err)).Warn("failure selecting available nonce") - return handleStartSwapError(streamer, err) + return handleStatefulSwapError(streamer, err) } defer func() { selectedNonce.ReleaseIfNotReserved(ctx) }() - serverParameters := &transactionpb.StartSwapResponse_ServerParameters_CurrencyCreator{ - Nonce: selectedNonce.Account.ToProto(), - Blockhash: &commonpb.Blockhash{Value: selectedNonce.Blockhash[:]}, + var swapHandler SwapHandler + if common.IsCoreMint(fromMint) { + swapHandler = NewCurrencyCreatorBuySwapHandler( + s.data, + s.vmIndexerClient, + owner, + swapAuthority, + toMint, + initiateCurrencyCreatorSwapReq.Amount, + selectedNonce.Account, + ) + } else if common.IsCoreMint(toMint) { + swapHandler = NewCurrencyCreatorSellSwapHandler( + s.data, + s.vmIndexerClient, + owner, + swapAuthority, + fromMint, + initiateCurrencyCreatorSwapReq.Amount, + selectedNonce.Account, + ) + } else { + swapHandler = NewCurrencyCreatorBuySellSwapHandler( + s.data, + s.vmIndexerClient, + owner, + swapAuthority, + fromMint, + toMint, + initiateCurrencyCreatorSwapReq.Amount, + selectedNonce.Account, + ) } - if err := streamer.Send(&transactionpb.StartSwapResponse{ - Response: &transactionpb.StartSwapResponse_ServerParameters_{ - ServerParameters: &transactionpb.StartSwapResponse_ServerParameters{ - Kind: &transactionpb.StartSwapResponse_ServerParameters_CurrencyCreator_{ - CurrencyCreator: serverParameters, - }, - }, - }, - }); err != nil { - return handleStartSwapError(streamer, err) + + var alts []solana.AddressLookupTable + for _, mint := range []*common.Account{fromMint, toMint} { + if common.IsCoreMint(mint) { + continue + } + + alt, err := transaction_util.GetAltForMint(ctx, s.data, mint) + if err != nil { + log.With(zap.Error(err)).Warn("failure getting alt") + return handleStatefulSwapError(streamer, err) + } + alts = append(alts, alt) } - req, err = protoutil.BoundedReceive[transactionpb.StartSwapRequest](ctx, streamer, s.conf.clientReceiveTimeout.Get(ctx)) + ixns, err := swapHandler.MakeInstructions(ctx) if err != nil { - log.With(zap.Error(err)).Info("error receiving request from client") - return handleStartSwapError(streamer, err) + log.With(zap.Error(err)).Warn("failure making instructions") + return handleStatefulSwapError(streamer, err) } + txn := solana.NewV0Transaction( + common.GetSubsidizer().PublicKey().ToBytes(), + alts, + ixns, + ) + + txn.SetBlockhash(selectedNonce.Blockhash) + + marshalledTxnMessage := txn.Message.Marshal() + // - // Section: Verified metadata signing + // Section: Server parameters // - submitSignatureReq := req.GetSubmitSignature() - if submitSignatureReq == nil { - return handleStartSwapError(streamer, status.Error(codes.InvalidArgument, "StartSwapRequest.SubmitSignature is nil")) + serverParameters := swapHandler.GetServerParameters() + + protoAlts := make([]*commonpb.SolanaAddressLookupTable, len(alts)) + for i, alt := range alts { + protoAlts[i] = transaction_util.ToProtoAlt(alt) } - verifiedMetadata := &transactionpb.VerifiedSwapMetadata{ - Kind: &transactionpb.VerifiedSwapMetadata_CurrencyCreator{ - CurrencyCreator: &transactionpb.VerifiedCurrencyCreatorSwapMetadata{ - ClientParameters: startCurrencyCreatorSwapReq, - ServerParameters: serverParameters, + protoServerParameters := &transactionpb.StatefulSwapResponse_ServerParameters_CurrencyCreator{ + Payer: common.GetSubsidizer().ToProto(), + Nonce: selectedNonce.Account.ToProto(), + Blockhash: &commonpb.Blockhash{Value: selectedNonce.Blockhash[:]}, + Alts: protoAlts, + ComputeUnitLimit: serverParameters.ComputeUnitLimit, + ComputeUnitPrice: serverParameters.ComputeUnitPrice, + MemoValue: serverParameters.MemoValue, + MemoryAccount: serverParameters.MemoryAccount.ToProto(), + MemoryIndex: uint32(serverParameters.MemoryIndex), + } + if err := streamer.Send(&transactionpb.StatefulSwapResponse{ + Response: &transactionpb.StatefulSwapResponse_ServerParameters_{ + ServerParameters: &transactionpb.StatefulSwapResponse_ServerParameters{ + Kind: &transactionpb.StatefulSwapResponse_ServerParameters_CurrencyCreator_{ + CurrencyCreator: protoServerParameters, + }, }, }, + }); err != nil { + return handleStatefulSwapError(streamer, err) } - metadataSignature := submitSignatureReq.Signature - if err := s.auth.Authenticate(ctx, owner, verifiedMetadata, metadataSignature); err != nil { - return handleStartSwapStructuredError(streamer, transactionpb.StartSwapResponse_Error_SIGNATURE_ERROR) + // + // Section: Transaction signing + // + + req, err = protoutil.BoundedReceive[transactionpb.StatefulSwapRequest](ctx, streamer, s.conf.clientReceiveTimeout.Get(ctx)) + if err != nil { + log.With(zap.Error(err)).Info("error receiving request from client") + return err + } + + submitSignaturesReq := req.GetSubmitSignatures() + if submitSignaturesReq == nil { + return handleStatefulSwapError(streamer, status.Error(codes.InvalidArgument, "StatefulSwapRequest.SubmitSignatures is nil")) + } + + for i := range txn.Message.Header.NumSignatures { + account := txn.Message.Accounts[i] + + var isClientSignature bool + var protoSignature *commonpb.Signature + + if bytes.Equal(account, owner.PublicKey().ToBytes()) { + isClientSignature = true + protoSignature = submitSignaturesReq.TransactionSignatures[0] + } else if bytes.Equal(account, swapAuthority.PublicKey().ToBytes()) { + isClientSignature = true + protoSignature = submitSignaturesReq.TransactionSignatures[1] + } + + if !isClientSignature { + continue + } + + if !ed25519.Verify( + account, + marshalledTxnMessage, + protoSignature.Value, + ) { + return handleStatefulSwapStructuredError( + streamer, + transactionpb.StatefulSwapResponse_Error_SIGNATURE_ERROR, + toInvalidTxnSignatureErrorDetails(0, txn, protoSignature), + ) + } + + copy(txn.Signatures[i][:], protoSignature.Value) } + err = txn.Sign( + common.GetSubsidizer().PrivateKey().ToBytes(), + sourceVmConfig.Authority.PrivateKey().ToBytes(), + destinationVmConfig.Authority.PrivateKey().ToBytes(), + ) + if err != nil { + log.With(zap.Error(err)).Info("failure signing transaction") + return handleStatefulSwapError(streamer, err) + } + + marshalledTxn := txn.Marshal() + + txnSignature := base58.Encode(txn.Signature()) + // // Section: Swap state DB commit // var initialState swap.State - switch startCurrencyCreatorSwapReq.FundingSource { + switch initiateCurrencyCreatorSwapReq.FundingSource { case transactionpb.FundingSource_FUNDING_SOURCE_SUBMIT_INTENT: initialState = swap.StateCreated case transactionpb.FundingSource_FUNDING_SOURCE_EXTERNAL_WALLET: initialState = swap.StateFunding default: - return handleStartSwapError(streamer, NewSwapDeniedErrorf("funding source %s is not supported", startCurrencyCreatorSwapReq.FundingSource)) + return handleStatefulSwapError(streamer, NewSwapDeniedErrorf("funding source %s is not supported", initiateCurrencyCreatorSwapReq.FundingSource)) } record := &swap.Record{ @@ -312,20 +459,20 @@ func (s *transactionServer) StartSwap(streamer transactionpb.Transaction_StartSw Owner: owner.PublicKey().ToBase58(), FromMint: fromMint.PublicKey().ToBase58(), ToMint: toMint.PublicKey().ToBase58(), - Amount: startCurrencyCreatorSwapReq.Amount, - FundingSource: swap.FundingSource(startCurrencyCreatorSwapReq.FundingSource), - FundingId: startCurrencyCreatorSwapReq.FundingId, + Amount: initiateCurrencyCreatorSwapReq.Amount, + FundingSource: swap.FundingSource(initiateCurrencyCreatorSwapReq.FundingSource), + FundingId: initiateCurrencyCreatorSwapReq.FundingId, Nonce: selectedNonce.Account.PublicKey().ToBase58(), Blockhash: base58.Encode(selectedNonce.Blockhash[:]), - ProofSignature: base58.Encode(submitSignatureReq.Signature.Value), - TransactionSignature: nil, - TransactionBlob: nil, + ProofSignature: base58.Encode(initiateReq.ProofSignature.Value), + TransactionSignature: txnSignature, + TransactionBlob: marshalledTxn, State: initialState, CreatedAt: time.Now(), } err = s.data.ExecuteInTx(ctx, sql.LevelDefault, func(ctx context.Context) error { - err = selectedNonce.MarkReservedWithSignature(ctx, record.ProofSignature) + err = selectedNonce.MarkReservedWithSignature(ctx, txnSignature) if err != nil { log.With(zap.Error(err)).Warn("failure reserving nonce") return err @@ -340,21 +487,21 @@ func (s *transactionServer) StartSwap(streamer transactionpb.Transaction_StartSw return nil }) if err != nil { - return handleStartSwapError(streamer, err) + return handleStatefulSwapError(streamer, err) } // // Section: Final RPC response // - err = streamer.Send(&transactionpb.StartSwapResponse{ - Response: &transactionpb.StartSwapResponse_Success_{ - Success: &transactionpb.StartSwapResponse_Success{ - Code: transactionpb.StartSwapResponse_Success_OK, + err = streamer.Send(&transactionpb.StatefulSwapResponse{ + Response: &transactionpb.StatefulSwapResponse_Success_{ + Success: &transactionpb.StatefulSwapResponse_Success{ + Code: transactionpb.StatefulSwapResponse_Success_OK, }, }, }) - return handleStartSwapError(streamer, err) + return handleStatefulSwapError(streamer, err) } func (s *transactionServer) GetSwap(ctx context.Context, req *transactionpb.GetSwapRequest) (*transactionpb.GetSwapResponse, error) { @@ -428,20 +575,14 @@ func (s *transactionServer) GetPendingSwaps(ctx context.Context, req *transactio }, nil } + // Swap is created, but requires client to initiate the funding createdSwaps, err := s.data.GetAllSwapsByOwnerAndState(ctx, owner.PublicKey().ToBase58(), swap.StateCreated) if err != nil && err != swap.ErrNotFound { log.With(zap.Error(err)).Warn("failure getting swaps in CREATED state") return nil, status.Error(codes.Internal, "") } - fundedSwaps, err := s.data.GetAllSwapsByOwnerAndState(ctx, owner.PublicKey().ToBase58(), swap.StateFunded) - if err != nil && err != swap.ErrNotFound { - log.With(zap.Error(err)).Warn("failure getting swaps in FUNDED state") - return nil, status.Error(codes.Internal, "") - } - allPendingSwaps := createdSwaps - allPendingSwaps = append(allPendingSwaps, fundedSwaps...) if len(allPendingSwaps) == 0 { return &transactionpb.GetPendingSwapsResponse{ @@ -466,365 +607,6 @@ func (s *transactionServer) GetPendingSwaps(ctx context.Context, req *transactio }, nil } -func (s *transactionServer) Swap(streamer transactionpb.Transaction_SwapServer) error { - // Bound the total RPC. Keeping the timeout higher to see where we land because - // there's a lot of stuff happening in this method. - ctx, cancel := context.WithTimeout(streamer.Context(), s.conf.swapTimeout.Get(streamer.Context())) - defer cancel() - - log := s.log.With(zap.String("method", "Swap")) - log = client.InjectLoggingMetadata(ctx, log) - - if s.conf.disableSwaps.Get(ctx) { - return handleSwapError(streamer, status.Error(codes.Unavailable, "temporarily unavailable")) - } - - req, err := protoutil.BoundedReceive[transactionpb.SwapRequest](ctx, streamer, s.conf.clientReceiveTimeout.Get(ctx)) - if err != nil { - log.With(zap.Error(err)).Info("error receiving request from client") - return handleSwapError(streamer, err) - } - - initiateReq := req.GetInitiate() - if initiateReq == nil { - return handleSwapError(streamer, status.Error(codes.InvalidArgument, "SwapRequest.Initiate is nil")) - } - - if initiateReq.GetStateless() != nil { - return handleSwapError(streamer, status.Error(codes.InvalidArgument, "SwapRequest.Initiate.Stateless is not currently supporteds")) - } - - statefulReq := initiateReq.GetStateful() - if statefulReq == nil { - return handleSwapError(streamer, status.Error(codes.InvalidArgument, "SwapRequest.Initiate.Stateful is nil")) - } - - owner, err := common.NewAccountFromProto(statefulReq.Owner) - if err != nil { - log.With(zap.Error(err)).Warn("invalid owner account") - return handleSwapError(streamer, err) - } - log = log.With(zap.String("owner", owner.PublicKey().ToBase58())) - - reqSignature := statefulReq.Signature - statefulReq.Signature = nil - if err := s.auth.Authenticate(ctx, owner, statefulReq, reqSignature); err != nil { - return err - } - - swapId := base58.Encode(statefulReq.SwapId.Value) - log = log.With(zap.String("swap_id", swapId)) - - swapRecord, err := s.data.GetSwapById(ctx, swapId) - if err == swap.ErrNotFound { - return handleSwapError(streamer, NewSwapValidationError("swap state not found")) - } else if err != nil { - log.With(zap.Error(err)).Warn("failure getting swap record") - return handleSwapError(streamer, err) - } - - swapAuthority, err := common.NewAccountFromProto(statefulReq.SwapAuthority) - if err != nil { - log.With(zap.Error(err)).Warn("invalid swap authority") - return handleSwapError(streamer, err) - } - log = log.With(zap.String("swap_authority", swapAuthority.PublicKey().ToBase58())) - - fromMint, err := common.NewAccountFromPublicKeyString(swapRecord.FromMint) - if err != nil { - log.With(zap.Error(err)).Warn("invalid mint") - return handleSwapError(streamer, err) - } - log = log.With(zap.String("from_mint", fromMint.PublicKey().ToBase58())) - - toMint, err := common.NewAccountFromPublicKeyString(swapRecord.ToMint) - if err != nil { - log.With(zap.Error(err)).Warn("invalid mint") - return handleSwapError(streamer, err) - } - log = log.With(zap.String("to_mint", toMint.PublicKey().ToBase58())) - - nonce, err := common.NewAccountFromPublicKeyString(swapRecord.Nonce) - if err != nil { - log.With(zap.Error(err)).Warn("invalid nonce") - return handleSwapError(streamer, err) - } - - decodedBlockhash, err := base58.Decode(swapRecord.Blockhash) - if err != nil { - log.With(zap.Error(err)).Warn("invalid blockhash") - return handleSwapError(streamer, err) - } - blockhash := solana.Blockhash(decodedBlockhash) - - sourceVmConfig, err := common.GetVmConfigForMint(ctx, s.data, fromMint) - if err != nil { - log.With(zap.Error(err)).Warn("failure getting source vm config") - return handleSwapError(streamer, err) - } - - destinationVmConfig, err := common.GetVmConfigForMint(ctx, s.data, toMint) - if err != nil { - log.With(zap.Error(err)).Warn("failure getting destination vm config") - return handleSwapError(streamer, err) - } - - ownerSourceVmSwapAta, err := owner.ToVmSwapAta(sourceVmConfig) - if err != nil { - log.With(zap.Error(err)).Warn("failure getting owner source vm swap ata") - return handleSwapError(streamer, err) - } - - // - // Section: Validation - // - - if swapRecord.Owner != owner.PublicKey().ToBase58() { - return handleSwapError(streamer, NewSwapDeniedError("not the owner of this swap")) - } - - if swapRecord.State != swap.StateFunded { - return handleSwapError(streamer, NewSwapDeniedErrorf("swap state is %s", swapRecord.State)) - } - - if owner.PublicKey().ToBase58() == swapAuthority.PublicKey().ToBase58() { - return handleSwapError(streamer, NewSwapValidationError("owner cannot be swap authority")) - } - - // todo: for any of these invalid funding cases, we should cancel the swap - intentRecord, err := s.data.GetIntent(ctx, swapRecord.FundingId) - if err != nil { - log.With(zap.Error(err)).Warn("failure getting funding intent record") - return handleSwapError(streamer, errors.New("unexpected nonce state")) - } - if intentRecord.IntentType != intent.SendPublicPayment { - return handleSwapError(streamer, NewSwapValidationError("funding intent is invalid")) - } - if intentRecord.SendPublicPaymentMetadata.Quantity < swapRecord.Amount { - return handleSwapError(streamer, NewSwapValidationError("funding intent is invalid")) - } - if intentRecord.SendPublicPaymentMetadata.DestinationTokenAccount != ownerSourceVmSwapAta.PublicKey().ToBase58() { - return handleSwapError(streamer, NewSwapValidationError("funding intent is invalid")) - } - - // - // Section: On-demand account creation - // - - err = vm.EnsureVirtualTimelockAccountIsInitialized(ctx, s.data, s.vmIndexerClient, toMint, owner, true) - if err != nil { - log.With(zap.Error(err)).Warn("timed out waiting for destination timelock account initialization") - return handleSwapError(streamer, err) - } - - // - // Section: Transaction construction - // - - var swapHandler SwapHandler - if common.IsCoreMint(fromMint) { - swapHandler = NewCurrencyCreatorBuySwapHandler( - s.data, - s.vmIndexerClient, - owner, - swapAuthority, - toMint, - swapRecord.Amount, - nonce, - ) - } else if common.IsCoreMint(toMint) { - swapHandler = NewCurrencyCreatorSellSwapHandler( - s.data, - s.vmIndexerClient, - owner, - swapAuthority, - fromMint, - swapRecord.Amount, - nonce, - ) - } else { - swapHandler = NewCurrencyCreatorBuySellSwapHandler( - s.data, - s.vmIndexerClient, - owner, - swapAuthority, - fromMint, - toMint, - swapRecord.Amount, - nonce, - ) - } - - var alts []solana.AddressLookupTable - for _, mint := range []*common.Account{fromMint, toMint} { - if common.IsCoreMint(mint) { - continue - } - - alt, err := transaction_util.GetAltForMint(ctx, s.data, mint) - if err != nil { - log.With(zap.Error(err)).Warn("failure getting alt") - return handleSwapError(streamer, err) - } - alts = append(alts, alt) - } - - ixns, err := swapHandler.MakeInstructions(ctx) - if err != nil { - log.With(zap.Error(err)).Warn("failure making instructions") - return handleSwapError(streamer, err) - } - - txn := solana.NewV0Transaction( - common.GetSubsidizer().PublicKey().ToBytes(), - alts, - ixns, - ) - - txn.SetBlockhash(solana.Blockhash(blockhash)) - - marshalledTxnMessage := txn.Message.Marshal() - - // - // Section: Server parameters - // - - serverParameters := swapHandler.GetServerParameters() - - protoAlts := make([]*commonpb.SolanaAddressLookupTable, len(alts)) - for i, alt := range alts { - protoAlts[i] = transaction_util.ToProtoAlt(alt) - } - - protoServerParameters := &transactionpb.SwapResponse{ - Response: &transactionpb.SwapResponse_ServerParameters_{ - ServerParameters: &transactionpb.SwapResponse_ServerParameters{ - Kind: &transactionpb.SwapResponse_ServerParameters_CurrencyCreatorStateful_{ - CurrencyCreatorStateful: &transactionpb.SwapResponse_ServerParameters_CurrencyCreatorStateful{ - Payer: common.GetSubsidizer().ToProto(), - Alts: protoAlts, - ComputeUnitLimit: serverParameters.ComputeUnitLimit, - ComputeUnitPrice: serverParameters.ComputeUnitPrice, - MemoValue: serverParameters.MemoValue, - MemoryAccount: serverParameters.MemoryAccount.ToProto(), - MemoryIndex: uint32(serverParameters.MemoryIndex), - }, - }, - }, - }, - } - if err := streamer.Send(protoServerParameters); err != nil { - return handleSwapError(streamer, err) - } - - // - // Section: Transaction signing - // - - req, err = protoutil.BoundedReceive[transactionpb.SwapRequest](ctx, streamer, s.conf.clientReceiveTimeout.Get(ctx)) - if err != nil { - log.With(zap.Error(err)).Info("error receiving request from client") - return err - } - - submitSignaturesReq := req.GetSubmitSignatures() - if submitSignaturesReq == nil { - return handleSwapError(streamer, status.Error(codes.InvalidArgument, "SwapRequest.SubmitSignatures is nil")) - } - - for i := range txn.Message.Header.NumSignatures { - account := txn.Message.Accounts[i] - - var isClientSignature bool - var protoSignature *commonpb.Signature - - if bytes.Equal(account, owner.PublicKey().ToBytes()) { - isClientSignature = true - protoSignature = submitSignaturesReq.Signatures[0] - } else if bytes.Equal(account, swapAuthority.PublicKey().ToBytes()) { - isClientSignature = true - protoSignature = submitSignaturesReq.Signatures[1] - } - - if !isClientSignature { - continue - } - - if !ed25519.Verify( - account, - marshalledTxnMessage, - protoSignature.Value, - ) { - return handleSwapStructuredError( - streamer, - transactionpb.SwapResponse_Error_SIGNATURE_ERROR, - toInvalidTxnSignatureErrorDetails(0, txn, protoSignature), - ) - } - - copy(txn.Signatures[i][:], protoSignature.Value) - } - - err = txn.Sign( - common.GetSubsidizer().PrivateKey().ToBytes(), - sourceVmConfig.Authority.PrivateKey().ToBytes(), - destinationVmConfig.Authority.PrivateKey().ToBytes(), - ) - if err != nil { - log.With(zap.Error(err)).Info("failure signing transaction") - return handleSwapError(streamer, err) - } - - marshalledTxn := txn.Marshal() - - txnSignature := base58.Encode(txn.Signature()) - - // - // Section: Swap state transition - // - - err = s.data.ExecuteInTx(ctx, sql.LevelDefault, func(ctx context.Context) error { - err := transaction_util.UpdateNonceSignature( - ctx, - s.data, - nonce.PublicKey().ToBase58(), - swapRecord.ProofSignature, - txnSignature, - ) - if err != nil { - log.With(zap.Error(err)).Warn("failure updating nonce record") - return err - } - - swapRecord.State = swap.StateSubmitting - swapRecord.TransactionSignature = &txnSignature - swapRecord.TransactionBlob = marshalledTxn - err = s.data.SaveSwap(ctx, swapRecord) - if err != nil { - log.With(zap.Error(err)).Warn("failure updating swap record") - return err - } - - return nil - }) - if err != nil { - return handleSwapError(streamer, err) - } - - // - // Section: Final RPC response - // - - err = streamer.Send(&transactionpb.SwapResponse{ - Response: &transactionpb.SwapResponse_Success_{ - Success: &transactionpb.SwapResponse_Success{ - Code: transactionpb.SwapResponse_Success_SWAP_SUBMITTED, - }, - }, - }) - return handleSwapError(streamer, err) -} - func toProtoSwap(record *swap.Record) (*transactionpb.SwapMetadata, error) { decodedSwapId, err := base58.Decode(record.SwapId) if err != nil { @@ -841,16 +623,6 @@ func toProtoSwap(record *swap.Record) (*transactionpb.SwapMetadata, error) { return nil, err } - nonce, err := common.NewAccountFromPublicKeyString(record.Nonce) - if err != nil { - return nil, err - } - - decodedBlockhash, err := base58.Decode(record.Blockhash) - if err != nil { - return nil, err - } - decodedSignature, err := base58.Decode(record.ProofSignature) if err != nil { return nil, err @@ -860,7 +632,7 @@ func toProtoSwap(record *swap.Record) (*transactionpb.SwapMetadata, error) { VerifiedMetadata: &transactionpb.VerifiedSwapMetadata{ Kind: &transactionpb.VerifiedSwapMetadata_CurrencyCreator{ CurrencyCreator: &transactionpb.VerifiedCurrencyCreatorSwapMetadata{ - ClientParameters: &transactionpb.StartSwapRequest_Start_CurrencyCreator{ + ClientParameters: &transactionpb.StatefulSwapRequest_Initiate_CurrencyCreator{ Id: &commonpb.SwapId{Value: decodedSwapId}, FromMint: fromMint.ToProto(), ToMint: toMint.ToProto(), @@ -868,10 +640,6 @@ func toProtoSwap(record *swap.Record) (*transactionpb.SwapMetadata, error) { FundingSource: transactionpb.FundingSource(record.FundingSource), FundingId: record.FundingId, }, - ServerParameters: &transactionpb.StartSwapResponse_ServerParameters_CurrencyCreator{ - Nonce: nonce.ToProto(), - Blockhash: &commonpb.Blockhash{Value: decodedBlockhash}, - }, }, }, }, diff --git a/ocp/worker/swap/config.go b/ocp/worker/swap/config.go index c710e0c..417ccfd 100644 --- a/ocp/worker/swap/config.go +++ b/ocp/worker/swap/config.go @@ -14,19 +14,15 @@ const ( defaultFulfillmentBatchSize = 100 ClientTimeoutToFundConfigEnvName = envConfigPrefix + "CLIENT_TIMEOUT_TO_FUND" - defaultClientTimeoutToFund = 3 * time.Minute - - ClientTimeoutToSwapConfigEnvName = envConfigPrefix + "CLIENT_TIMEOUT_TO_SWAP" - defaultClientTimeoutToSwap = 5 * time.Minute + defaultClientTimeoutToFund = 30 * time.Second ExternalWalletFinalizationTimeoutConfigEnvName = envConfigPrefix + "EXTERNAL_WALLET_FINALIZATION_TIMEOUT" - defaultExternalWalletFinalizationTimeout = 90 * time.Second + defaultExternalWalletFinalizationTimeout = 30 * time.Second ) type conf struct { batchSize config.Uint64 clientTimeoutToFund config.Duration - clientTimeoutToSwap config.Duration externalWalletFinalizationTimeout config.Duration } @@ -39,7 +35,6 @@ func WithEnvConfigs() ConfigProvider { return &conf{ batchSize: env.NewUint64Config(BatchSizeConfigEnvName, defaultFulfillmentBatchSize), clientTimeoutToFund: env.NewDurationConfig(ClientTimeoutToFundConfigEnvName, defaultClientTimeoutToFund), - clientTimeoutToSwap: env.NewDurationConfig(ClientTimeoutToSwapConfigEnvName, defaultClientTimeoutToSwap), externalWalletFinalizationTimeout: env.NewDurationConfig(ExternalWalletFinalizationTimeoutConfigEnvName, defaultExternalWalletFinalizationTimeout), } } diff --git a/ocp/worker/swap/runtime.go b/ocp/worker/swap/runtime.go index 5ee929e..d5c2c61 100644 --- a/ocp/worker/swap/runtime.go +++ b/ocp/worker/swap/runtime.go @@ -39,7 +39,6 @@ func (p *runtime) Start(ctx context.Context, interval time.Duration) error { swap.StateFunding, swap.StateFunded, swap.StateSubmitting, - swap.StateCancelling, } { go func(state swap.State) { diff --git a/ocp/worker/swap/util.go b/ocp/worker/swap/util.go index 5bdf811..7d95282 100644 --- a/ocp/worker/swap/util.go +++ b/ocp/worker/swap/util.go @@ -19,12 +19,7 @@ import ( "github.com/code-payments/ocp-server/ocp/data/swap" "github.com/code-payments/ocp-server/ocp/data/transaction" transaction_util "github.com/code-payments/ocp-server/ocp/transaction" - vm_util "github.com/code-payments/ocp-server/ocp/vm" "github.com/code-payments/ocp-server/solana" - compute_budget "github.com/code-payments/ocp-server/solana/computebudget" - "github.com/code-payments/ocp-server/solana/memo" - "github.com/code-payments/ocp-server/solana/system" - "github.com/code-payments/ocp-server/solana/vm" ) func (p *runtime) validateSwapState(record *swap.Record, states ...swap.State) error { @@ -44,6 +39,16 @@ func (p *runtime) markSwapFunded(ctx context.Context, record *swap.Record) error return p.data.SaveSwap(ctx, record) } +func (p *runtime) markSwapSubmitting(ctx context.Context, record *swap.Record) error { + err := p.validateSwapState(record, swap.StateFunded) + if err != nil { + return err + } + + record.State = swap.StateSubmitting + return p.data.SaveSwap(ctx, record) +} + func (p *runtime) markSwapFinalized(ctx context.Context, record *swap.Record) error { return p.data.ExecuteInTx(ctx, sql.LevelDefault, func(ctx context.Context) error { err := p.validateSwapState(record, swap.StateSubmitting) @@ -80,30 +85,9 @@ func (p *runtime) markSwapFailed(ctx context.Context, record *swap.Record) error }) } -func (p *runtime) markSwapCancelling(ctx context.Context, record *swap.Record, txn *solana.Transaction) error { - return p.data.ExecuteInTx(ctx, sql.LevelDefault, func(ctx context.Context) error { - err := p.validateSwapState(record, swap.StateFunded) - if err != nil { - return err - } - - txnSignature := base58.Encode(txn.Signature()) - - err = transaction_util.UpdateNonceSignature(ctx, p.data, record.Nonce, record.ProofSignature, txnSignature) - if err != nil { - return err - } - - record.TransactionSignature = &txnSignature - record.TransactionBlob = txn.Marshal() - record.State = swap.StateCancelling - return p.data.SaveSwap(ctx, record) - }) -} - func (p *runtime) markSwapCancelled(ctx context.Context, record *swap.Record) error { return p.data.ExecuteInTx(ctx, sql.LevelDefault, func(ctx context.Context) error { - err := p.validateSwapState(record, swap.StateCreated, swap.StateFunding, swap.StateCancelling) + err := p.validateSwapState(record, swap.StateCreated, swap.StateFunding) if err != nil { return err } @@ -114,11 +98,6 @@ func (p *runtime) markSwapCancelled(ctx context.Context, record *swap.Record) er if err != nil { return err } - case swap.StateCancelling: - err = p.markNonceReleasedDueToSubmittedTransaction(ctx, record) - if err != nil { - return err - } } record.TransactionBlob = nil @@ -139,7 +118,7 @@ func (p *runtime) submitTransaction(ctx context.Context, record *swap.Record) er return errors.Wrap(err, "error unmarshalling transaction") } - if base58.Encode(txn.Signature()) != *record.TransactionSignature { + if base58.Encode(txn.Signature()) != record.TransactionSignature { return errors.New("unexpected transaction signature") } @@ -171,7 +150,7 @@ func (p *runtime) updateBalancesForFinalizedSwap(ctx context.Context, record *sw return 0, err } - tokenBalances, err := p.data.GetBlockchainTransactionTokenBalances(ctx, *record.TransactionSignature) + tokenBalances, err := p.data.GetBlockchainTransactionTokenBalances(ctx, record.TransactionSignature) if err != nil { return 0, err } @@ -192,7 +171,7 @@ func (p *runtime) updateBalancesForFinalizedSwap(ctx context.Context, record *sw err = p.data.ExecuteInTx(ctx, sql.LevelDefault, func(ctx context.Context) error { // For transaction history intentRecord := &intent.Record{ - IntentId: getSwapDepositIntentID(*record.TransactionSignature, ownerDestinationTimelockVault), + IntentId: getSwapDepositIntentID(record.TransactionSignature, ownerDestinationTimelockVault), IntentType: intent.ExternalDeposit, MintAccount: toMint.PublicKey().ToBase58(), @@ -215,7 +194,7 @@ func (p *runtime) updateBalancesForFinalizedSwap(ctx context.Context, record *sw // For tracking in cached balances externalDepositRecord := &deposit.Record{ - Signature: *record.TransactionSignature, + Signature: record.TransactionSignature, Destination: ownerDestinationTimelockVault.PublicKey().ToBase58(), Amount: uint64(deltaQuarksIntoOmnibus), UsdMarketValue: usdMarketValue, @@ -233,85 +212,6 @@ func (p *runtime) updateBalancesForFinalizedSwap(ctx context.Context, record *sw return uint64(deltaQuarksIntoOmnibus), nil } -func (p *runtime) updateBalancesForCancelledSwap(ctx context.Context, record *swap.Record) error { - owner, err := common.NewAccountFromPublicKeyString(record.Owner) - if err != nil { - return err - } - - fromMint, err := common.NewAccountFromPublicKeyString(record.FromMint) - if err != nil { - return err - } - - soureVmConfig, err := common.GetVmConfigForMint(ctx, p.data, fromMint) - if err != nil { - return err - } - - ownerSourceTimelockVault, err := owner.ToTimelockVault(soureVmConfig) - if err != nil { - return err - } - - tokenBalances, err := p.data.GetBlockchainTransactionTokenBalances(ctx, *record.TransactionSignature) - if err != nil { - return err - } - - deltaQuarksIntoOmnibus, err := transaction_util.GetDeltaQuarksFromTokenBalances(soureVmConfig.Omnibus, tokenBalances) - if err != nil { - return err - } - if deltaQuarksIntoOmnibus <= 0 { - return errors.New("delta quarks into destination vm omnibus is not positive") - } - - usdMarketValue, _, err := currency_util.CalculateUsdMarketValue(ctx, p.data, fromMint, uint64(deltaQuarksIntoOmnibus), time.Now()) - if err != nil { - return err - } - - return p.data.ExecuteInTx(ctx, sql.LevelDefault, func(ctx context.Context) error { - // For transaction history - intentRecord := &intent.Record{ - IntentId: getSwapDepositIntentID(*record.TransactionSignature, ownerSourceTimelockVault), - IntentType: intent.ExternalDeposit, - - MintAccount: fromMint.PublicKey().ToBase58(), - - InitiatorOwnerAccount: owner.PublicKey().ToBase58(), - - ExternalDepositMetadata: &intent.ExternalDepositMetadata{ - DestinationTokenAccount: ownerSourceTimelockVault.PublicKey().ToBase58(), - Quantity: uint64(deltaQuarksIntoOmnibus), - UsdMarketValue: usdMarketValue, - }, - - State: intent.StateConfirmed, - CreatedAt: time.Now(), - } - err = p.data.SaveIntent(ctx, intentRecord) - if err != nil { - return err - } - - // For tracking in cached balances - externalDepositRecord := &deposit.Record{ - Signature: *record.TransactionSignature, - Destination: ownerSourceTimelockVault.PublicKey().ToBase58(), - Amount: uint64(deltaQuarksIntoOmnibus), - UsdMarketValue: usdMarketValue, - - Slot: tokenBalances.Slot, - ConfirmationState: transaction.ConfirmationFinalized, - - CreatedAt: time.Now(), - } - return p.data.SaveExternalDeposit(ctx, externalDepositRecord) - }) -} - func (p *runtime) notifySwapFinalized(ctx context.Context, swapRecord *swap.Record) error { owner, err := common.NewAccountFromPublicKeyString(swapRecord.Owner) if err != nil { @@ -349,93 +249,6 @@ func (p *runtime) notifySwapFinalized(ctx context.Context, swapRecord *swap.Reco return p.integration.OnSwapFinalized(ctx, owner, toMint, currencyName, fundingIntentRecord.SendPublicPaymentMetadata.ExchangeCurrency, amountReceived) } -// todo: put this in transaction utility package -func (p *runtime) makeCancellationTransaction(ctx context.Context, record *swap.Record) (*solana.Transaction, error) { - owner, err := common.NewAccountFromPublicKeyString(record.Owner) - if err != nil { - return nil, err - } - - fromMint, err := common.NewAccountFromPublicKeyString(record.FromMint) - if err != nil { - return nil, err - } - - nonce, err := common.NewAccountFromPublicKeyString(record.Nonce) - if err != nil { - return nil, err - } - - decodedBlockhash, err := base58.Decode(record.Blockhash) - if err != nil { - return nil, err - } - - sourceVmConfig, err := common.GetVmConfigForMint(ctx, p.data, fromMint) - if err != nil { - return nil, err - } - - sourceOwnerVmSwapPdaAccounts, err := owner.GetVmSwapAccounts(sourceVmConfig) - if err != nil { - return nil, err - } - - memoryAccount, memoryIndex, err := vm_util.GetVirtualTimelockAccountLocationInMemory(ctx, p.vmIndexerClient, sourceVmConfig.Vm, owner) - if err != nil { - return nil, err - } - - txn := solana.NewLegacyTransaction( - common.GetSubsidizer().PublicKey().ToBytes(), - system.AdvanceNonce(nonce.PublicKey().ToBytes(), common.GetSubsidizer().PublicKey().ToBytes()), - compute_budget.SetComputeUnitLimit(50_000), - compute_budget.SetComputeUnitPrice(1_000), - memo.Instruction("cancel_swap_v0"), - vm.NewCancelSwapInstruction( - &vm.CancelSwapInstructionAccounts{ - VmAuthority: sourceVmConfig.Authority.PublicKey().ToBytes(), - Vm: sourceVmConfig.Vm.PublicKey().ToBytes(), - VmMemory: memoryAccount.PublicKey().ToBytes(), - Swapper: owner.PublicKey().ToBytes(), - SwapPda: sourceOwnerVmSwapPdaAccounts.Pda.PublicKey().ToBytes(), - SwapAta: sourceOwnerVmSwapPdaAccounts.Ata.PublicKey().ToBytes(), - VmOmnibus: sourceVmConfig.Omnibus.PublicKey().ToBytes(), - }, - &vm.CancelSwapInstructionArgs{ - AccountIndex: memoryIndex, - Amount: record.Amount, - Bump: sourceOwnerVmSwapPdaAccounts.PdaBump, - }, - ), - vm.NewCloseSwapAccountIfEmptyInstruction( - &vm.CloseSwapAccountIfEmptyInstructionAccounts{ - VmAuthority: sourceVmConfig.Authority.PublicKey().ToBytes(), - Vm: sourceVmConfig.Vm.PublicKey().ToBytes(), - Swapper: owner.PublicKey().ToBytes(), - SwapPda: sourceOwnerVmSwapPdaAccounts.Pda.PublicKey().ToBytes(), - SwapAta: sourceOwnerVmSwapPdaAccounts.Ata.PublicKey().ToBytes(), - Destination: common.GetSubsidizer().PublicKey().ToBytes(), - }, - &vm.CloseSwapAccountIfEmptyInstructionArgs{ - Bump: sourceOwnerVmSwapPdaAccounts.PdaBump, - }, - ), - ) - - txn.SetBlockhash(solana.Blockhash(decodedBlockhash)) - - err = txn.Sign( - common.GetSubsidizer().PrivateKey().ToBytes(), - sourceVmConfig.Authority.PrivateKey().ToBytes(), - ) - if err != nil { - return nil, err - } - - return &txn, nil -} - func (p *runtime) markNonceReleasedDueToSubmittedTransaction(ctx context.Context, record *swap.Record) error { err := p.validateSwapState(record, swap.StateSubmitting, swap.StateCancelling) if err != nil { @@ -447,7 +260,7 @@ func (p *runtime) markNonceReleasedDueToSubmittedTransaction(ctx context.Context return err } - if *record.TransactionSignature != nonceRecord.Signature { + if record.TransactionSignature != nonceRecord.Signature { return errors.New("unexpected nonce signature") } @@ -474,7 +287,7 @@ func (p *runtime) markNonceAvailableDueToCancelledSwap(ctx context.Context, reco return err } - if record.ProofSignature != nonceRecord.Signature { + if record.TransactionSignature != nonceRecord.Signature { return errors.New("unexpected nonce signature") } @@ -491,9 +304,51 @@ func (p *runtime) markNonceAvailableDueToCancelledSwap(ctx context.Context, reco return p.data.SaveNonce(ctx, nonceRecord) } -func (p *runtime) validateExternalWalletFundingTransaction(ctx context.Context, record *swap.Record) (bool, error) { +func (p *runtime) validateIntentFunding(ctx context.Context, record *swap.Record) (bool, error) { + if record.FundingSource != swap.FundingSourceSubmitIntent { + return false, errors.New("invalid funding source") + } + + owner, err := common.NewAccountFromPublicKeyString(record.Owner) + if err != nil { + return false, errors.Wrap(err, "error parsing owner") + } + + fromMint, err := common.NewAccountFromPublicKeyString(record.FromMint) + if err != nil { + return false, errors.Wrap(err, "error parsing from mint") + } + + sourceVmConfig, err := common.GetVmConfigForMint(ctx, p.data, fromMint) + if err != nil { + return false, errors.Wrap(err, "error getting vm config for source mint") + } + + swapAta, err := owner.ToVmSwapAta(sourceVmConfig) + if err != nil { + return false, errors.Wrap(err, "error getting swap ata") + } + + intentRecord, err := p.data.GetIntent(ctx, record.FundingId) + if err != nil { + return false, errors.Wrap(err, "error getting intent") + } + + if intentRecord.IntentType != intent.SendPublicPayment { + return false, nil + } + if intentRecord.SendPublicPaymentMetadata.Quantity < record.Amount { + return false, nil + } + if intentRecord.SendPublicPaymentMetadata.DestinationTokenAccount != swapAta.PublicKey().ToBase58() { + return false, nil + } + return true, nil +} + +func (p *runtime) validateExternalWalletFunding(ctx context.Context, record *swap.Record) (bool, error) { if record.FundingSource != swap.FundingSourceExternalWallet { - return false, errors.New("invalid funding source`") + return false, errors.New("invalid funding source") } owner, err := common.NewAccountFromPublicKeyString(record.Owner) @@ -526,10 +381,9 @@ func (p *runtime) validateExternalWalletFundingTransaction(ctx context.Context, return false, errors.Wrap(err, "error getting delta quarks from token balances") } - if deltaQuarks != int64(record.Amount) { + if deltaQuarks < int64(record.Amount) { return false, nil } - return true, nil } diff --git a/ocp/worker/swap/worker.go b/ocp/worker/swap/worker.go index 82ddea2..297933c 100644 --- a/ocp/worker/swap/worker.go +++ b/ocp/worker/swap/worker.go @@ -87,8 +87,6 @@ func (p *runtime) handle(ctx context.Context, record *swap.Record) error { err = p.handleStateFunded(ctx, record) case swap.StateSubmitting: err = p.handleStateSubmitting(ctx, record) - case swap.StateCancelling: - err = p.handleStateCancelling(ctx, record) } if err != nil { log.With(zap.Error(err)).Warn("failure processing swap") @@ -128,7 +126,7 @@ func (p *runtime) handleStateFunding(ctx context.Context, record *swap.Record) e case intent.StateConfirmed: return p.markSwapFunded(ctx, record) case intent.StateFailed: - // todo: Should never happen, but maybe cancel the swap? + // todo: Recovery flow to put back source funds into the source VM return errors.New("funding intent failed") default: return nil @@ -145,16 +143,6 @@ func (p *runtime) handleStateFunding(ctx context.Context, record *swap.Record) e if finalizedTxn.Err != nil || finalizedTxn.Meta.Err != nil { return p.markSwapCancelled(ctx, record) } - - // Validate the funding transaction deposited exactly the expected amount - // into the user's swap ATA - valid, err := p.validateExternalWalletFundingTransaction(ctx, record) - if err != nil { - return errors.Wrap(err, "error validating external wallet funding transaction") - } else if !valid { - return p.markSwapCancelled(ctx, record) - } - return p.markSwapFunded(ctx, record) } @@ -166,7 +154,7 @@ func (p *runtime) handleStateFunding(ctx context.Context, record *swap.Record) e return nil default: - return errors.New("unknown funding source") + return errors.New("unsupported funding source") } } @@ -175,34 +163,28 @@ func (p *runtime) handleStateFunded(ctx context.Context, record *swap.Record) er return err } - // Determine the starting point for the timeout based on the funding source - var fundedAt time.Time + var isValid bool + var err error switch record.FundingSource { case swap.FundingSourceSubmitIntent: - intentRecord, err := p.data.GetIntent(ctx, record.FundingId) + isValid, err = p.validateIntentFunding(ctx, record) if err != nil { - return err + return errors.Wrap(err, "error validating intent funding") } - fundedAt = intentRecord.CreatedAt case swap.FundingSourceExternalWallet: - fundedAt = record.CreatedAt - default: - return errors.New("unknown funding source") - } - - // Cancel the swap if the client hasn't signed the swap transaction within a - // reasonable amount of time. The funds for the swap will be deposited back - // into the source VM. - if time.Since(fundedAt) > p.conf.clientTimeoutToSwap.Get(ctx) { - txn, err := p.makeCancellationTransaction(ctx, record) + isValid, err = p.validateExternalWalletFunding(ctx, record) if err != nil { - return err + return errors.Wrap(err, "error validating external wallet funding") } - - return p.markSwapCancelling(ctx, record, txn) + default: + return errors.New("unsupported funding source") } - return nil + if !isValid { + // todo: Return funds if the amount was wrong + return p.markSwapCancelled(ctx, record) + } + return p.markSwapSubmitting(ctx, record) } func (p *runtime) handleStateSubmitting(ctx context.Context, record *swap.Record) error { @@ -212,7 +194,7 @@ func (p *runtime) handleStateSubmitting(ctx context.Context, record *swap.Record // Monitor for a finalized swap transaction - finalizedTxn, err := p.data.GetBlockchainTransaction(ctx, *record.TransactionSignature, solana.CommitmentFinalized) + finalizedTxn, err := p.data.GetBlockchainTransaction(ctx, record.TransactionSignature, solana.CommitmentFinalized) if err != nil && err != solana.ErrSignatureNotFound { return errors.Wrap(err, "error getting finalized transaction") } @@ -244,34 +226,3 @@ func (p *runtime) handleStateSubmitting(ctx context.Context, record *swap.Record return p.submitTransaction(ctx, record) } - -func (p *runtime) handleStateCancelling(ctx context.Context, record *swap.Record) error { - if err := p.validateSwapState(record, swap.StateCancelling); err != nil { - return err - } - - // Monitor for a finalized cancellation transaction - - finalizedTxn, err := p.data.GetBlockchainTransaction(ctx, *record.TransactionSignature, solana.CommitmentFinalized) - if err != nil && err != solana.ErrSignatureNotFound { - return errors.Wrap(err, "error getting finalized transaction") - } - - if finalizedTxn != nil { - if finalizedTxn.Err != nil || finalizedTxn.Meta.Err != nil { - // todo: Try again? - return p.markSwapCancelled(ctx, record) - } else { - err = p.updateBalancesForCancelledSwap(ctx, record) - if err != nil { - return errors.Wrap(err, "error updating balances") - } - - return p.markSwapCancelled(ctx, record) - } - } - - // Otherwise, continually retry submitting the transaction - - return p.submitTransaction(ctx, record) -}