diff --git a/LOOP_ARCHITECTURE.md b/LOOP_ARCHITECTURE.md new file mode 100644 index 00000000..f9378d09 --- /dev/null +++ b/LOOP_ARCHITECTURE.md @@ -0,0 +1,312 @@ +# LOOP Architecture: Relayer vs RelayerSet + +Understanding the two different gRPC boundaries in the Chainlink LOOP architecture. + +## Overview + +There are **two separate gRPC boundaries** where chain-specific services (like Aptos) need to be implemented: + +1. **Relayer LOOP**: When an individual relayer runs as a separate process +2. **RelayerSet for Capabilities**: When capabilities run as separate processes and need to call back to relayers + +## Architecture Diagram + +``` +┌─────────────────────────────────────────────────────────────┐ +│ Core Node Process │ +│ │ +│ ┌──────────────┐ │ +│ │ Relayer Set │ ◄─────────────────┐ │ +│ │ - EVM │ │ │ +│ │ - Solana │ │ │ +│ │ - Aptos │ │ │ +│ └──────────────┘ │ │ +│ │ │ │ +│ │ Embedded Mode │ LOOP Mode │ +│ ▼ │ │ +│ ┌──────────────┐ ┌─────┴──────┐ │ +│ │ Relayer │ │ RelayerSet │ │ +│ │ Logic │ │ Server │ │ +│ └──────────────┘ └─────┬──────┘ │ +│ │ │ +└──────────────────────────────────────┼──────────────────────┘ + │ gRPC + ┌──────────────────┼──────────────────────┐ + │ │ │ + ┌─────▼──────┐ ┌─────▼──────┐ │ + │ Capability │ │ Capability │ │ + │ Process 1 │ │ Process 2 │ │ + │ │ │ │ │ + │ ┌────────┐ │ │ ┌────────┐ │ │ + │ │Relayer │ │ │ │Relayer │ │ │ + │ │ Set │ │ │ │ Set │ │ │ + │ │ Client │ │ │ │ Client │ │ │ + │ └────────┘ │ │ └────────┘ │ │ + └────────────┘ └────────────┘ │ + │ + Capability Processes (LOOP Mode) │ + │ +└─────────────────────────────────────────────────────────────┘ +``` + +## The Two Packages + +### 1. `pkg/loop/internal/relayer/` - Standalone Relayer LOOP + +**Location**: `chainlink-common/pkg/loop/internal/relayer/aptos.go` + +**Purpose**: Allows a relayer to run as a separate LOOP process (optional feature). + +**Process Boundary**: +``` +Core Node ◄──[gRPC]──► Relayer Process (standalone) +``` + +**When Used**: +- When environment variable `CL_APTOS_PLUGIN_CMD` is set +- Relayer runs as external binary, not embedded in core node + +**Key Components**: + +```go +// AptosClient - Used by core node to call standalone relayer +type AptosClient struct { + grpcClient aptospb.AptosClient +} + +func (ac *AptosClient) AccountAPTBalance(ctx context.Context, req aptos.AccountAPTBalanceRequest) (*aptos.AccountAPTBalanceReply, error) { + // Convert Go types → Proto types + reply, err := ac.grpcClient.AccountAPTBalance(ctx, &aptospb.AccountAPTBalanceRequest{ + Address: req.Address[:], + }) + // Convert Proto types → Go types + return &aptos.AccountAPTBalanceReply{Value: reply.Value}, nil +} + +// aptosServer - Used by standalone relayer to receive calls +type aptosServer struct { + impl types.AptosService // Actual business logic +} + +func (s *aptosServer) AccountAPTBalance(ctx context.Context, req *aptospb.AccountAPTBalanceRequest) (*aptospb.AccountAPTBalanceReply, error) { + // Convert Proto types → Go types + reply, err := s.impl.AccountAPTBalance(ctx, aptos.AccountAPTBalanceRequest{ + Address: aptos.AccountAddress(req.Address), + }) + // Convert Go types → Proto types + return &aptospb.AccountAPTBalanceReply{Value: reply.Value}, nil +} +``` + +**Call Flow**: +``` +Core Node Logic + → AptosClient (proto conversion) + → [gRPC over network/pipe] + → aptosServer (proto conversion) + → Aptos Business Logic (in separate process) +``` + +--- + +### 2. `pkg/loop/internal/relayerset/` - RelayerSet for Capabilities + +**Location**: `chainlink-common/pkg/loop/internal/relayerset/aptos.go` + +**Purpose**: Allows capability processes to call back to relayers in the core node. + +**Process Boundary**: +``` +Capability Process ◄──[gRPC]──► Core Node (RelayerSet) +``` + +**When Used**: +- **Always** when capabilities run in LOOP mode +- Capability needs to call chain-specific functions +- Most common use case + +**Key Components**: + +```go +// aptosClient - Used by capability to call back to core node +// Adds RelayID to context so server knows which relayer to use +type aptosClient struct { + relayID types.RelayID // Which relayer? (e.g., "aptos.mainnet") + client aptospb.AptosClient +} + +func (ac *aptosClient) AccountAPTBalance(ctx context.Context, in *aptospb.AccountAPTBalanceRequest, opts ...grpc.CallOption) (*aptospb.AccountAPTBalanceReply, error) { + // Add RelayID to context metadata + ctx = appendRelayID(ctx, ac.relayID) + return ac.client.AccountAPTBalance(ctx, in, opts...) +} + +// aptosServer - Used by core node to receive calls from capabilities +type aptosServer struct { + parent *Server // Has access to RelayerSet +} + +func (as *aptosServer) AccountAPTBalance(ctx context.Context, req *aptospb.AccountAPTBalanceRequest) (*aptospb.AccountAPTBalanceReply, error) { + // Get the AptosService for the specific relayer + aptosService, err := as.parent.getAptosService(ctx) + + // Convert proto → Go types and call business logic + reply, err := aptosService.AccountAPTBalance(ctx, aptos.AccountAPTBalanceRequest{ + Address: aptos.AccountAddress(req.Address), + }) + + // Convert Go types → proto + return &aptospb.AccountAPTBalanceReply{Value: reply.Value}, nil +} + +// Helper to extract RelayID and get the right service +func (s *Server) getAptosService(ctx context.Context) (types.AptosService, error) { + relayID, err := readContextValue(ctx, metadataRelayID) + relayer, err := s.impl.Get(ctx, relayID) // Get specific relayer + return relayer.Aptos() // Get its Aptos service +} +``` + +**Call Flow**: +``` +Capability Logic + → aptosClient.AccountAPTBalance() [adds RelayID to context] + → [gRPC back to Core Node] + → RelayerSet Server + → Extract RelayID from context + → Get correct Aptos relayer from RelayerSet + → Call Aptos Business Logic +``` + +--- + +## Key Differences + +| Aspect | `relayer/` (Standalone Relayer) | `relayerset/` (Capability Callbacks) | +|--------|--------------------------------|-------------------------------------| +| **Process Boundary** | Core Node ↔ Relayer Process | Capability Process ↔ Core Node | +| **Direction** | Core calls relayer | Capability calls back to core | +| **RelayID Handling** | Not needed (single relayer) | Critical (multiple relayers) | +| **Client Purpose** | Proto conversion only | Add RelayID + forward call | +| **Server Purpose** | Call business logic directly | Find relayer + call its logic | +| **When Needed** | Optional (relayer LOOP mode) | Required (capability LOOP mode) | +| **Frequency of Use** | Rare (most relayers embedded) | Common (most capabilities LOOPified) | + +--- + +## Context Metadata Pattern + +The `relayerset` uses gRPC metadata to pass the `RelayID`: + +```go +// Client side: Add RelayID to context +func appendRelayID(ctx context.Context, relayID types.RelayID) context.Context { + return metadata.AppendToOutgoingContext(ctx, + metadataRelayID, relayID.String()) +} + +// Server side: Extract RelayID from context +func readContextValue(ctx context.Context, key string) (string, error) { + md, ok := metadata.FromIncomingContext(ctx) + values := md.Get(key) + return values[0], nil +} +``` + +This allows multiple relayers (EVM mainnet, Solana devnet, Aptos testnet, etc.) to share the same gRPC connection, with the server routing calls to the correct relayer based on metadata. + +--- + +## Which Package Do I Need? + +### For Most Implementations: Start with `relayerset/` + +If you're building a new chain integration (like Aptos), you'll almost certainly need `relayerset/` first because: + +1. ✅ Capabilities typically run in LOOP mode +2. ✅ Capabilities need to call chain-specific functions +3. ✅ This is the standard architecture for most chains + +### Optional: Add `relayer/` Later + +You only need `relayer/` if: + +1. You want the relayer itself to run as a separate process +2. You're isolating relayer logic for security/stability +3. You're implementing optional LOOP mode for relayers + +**Most chains only implement `relayerset/` initially.** + +--- + +## Implementation Order + +### Step 1: Proto + Types (Both packages share these) +1. Create `.proto` file: `pkg/chains/aptos/aptos.proto` +2. Create Go types: `pkg/types/chains/aptos/aptos.go` +3. Add interface: `pkg/types/relayer.go` +4. Run `make generate` to create gRPC stubs + +### Step 2: RelayerSet (Required for capabilities) +1. Create `pkg/loop/internal/relayerset/aptos.go` +2. Update `pkg/loop/internal/relayerset/server.go` +3. Update `pkg/loop/internal/relayerset/client.go` +4. Update `pkg/loop/internal/relayerset/relayer.go` +5. Register in `pkg/loop/internal/pb/relayerset/helper.go` + +### Step 3: Business Logic +1. Implement actual Aptos service in `chainlink-aptos/` +2. Connect to Aptos SDK/RPC + +### Step 4: Relayer LOOP (Optional) +1. Create `pkg/loop/internal/relayer/aptos.go` +2. Create standalone binary in `chainlink-aptos/cmd/` +3. Add environment variable support + +--- + +## Real-World Example + +### Capability Calling Aptos + +```go +// In capability process (e.g., WriteTarget capability) +func (w *WriteTarget) Execute(ctx context.Context, req capabilities.CapabilityRequest) { + // Get Aptos service from relayer set + aptosService, err := w.relayer.Aptos() + + // Call method - this goes over gRPC back to core node + balance, err := aptosService.AccountAPTBalance(ctx, + aptos.AccountAPTBalanceRequest{ + Address: accountAddr, + }) + + // Use result... +} +``` + +**What happens under the hood:** + +1. `w.relayer.Aptos()` returns an `aptosClient` wrapper +2. `AccountAPTBalance()` call: + - Adds RelayID ("aptos.mainnet") to context metadata + - Makes gRPC call back to core node +3. Core node's `aptosServer`: + - Extracts RelayID from metadata + - Finds Aptos mainnet relayer in RelayerSet + - Calls that relayer's `AccountAPTBalance()` +4. Result flows back through gRPC to capability + +--- + +## Summary + +- **`relayer/`**: For standalone relayer processes (optional, advanced) +- **`relayerset/`**: For capability-to-relayer calls (required, standard) + +**Most implementations only need `relayerset/` initially.** + +The key insight: gRPC boundaries can be in different places depending on what's LOOPified: +- If relayers are LOOPified → use `relayer/` +- If capabilities are LOOPified → use `relayerset/` ✅ (most common) + diff --git a/go.mod b/go.mod index 31a92c34..b017beb1 100644 --- a/go.mod +++ b/go.mod @@ -16,8 +16,8 @@ require ( github.com/prometheus/client_golang v1.22.0 github.com/shopspring/decimal v1.4.0 github.com/smacker/go-tree-sitter v0.0.0-20240827094217-dd81d9e9be82 - github.com/smartcontractkit/chainlink-common v0.9.6-0.20260209153333-67bf1aaa3e1e - github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260206000552-087e235a7963 + github.com/smartcontractkit/chainlink-common v0.10.1-0.20260213162239-ce0002b6079e + github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260210221717-2546aed27ebe github.com/stretchr/testify v1.11.1 github.com/valyala/fastjson v1.6.4 go.opentelemetry.io/otel v1.39.0 diff --git a/go.sum b/go.sum index 3105237d..72940086 100644 --- a/go.sum +++ b/go.sum @@ -324,12 +324,12 @@ github.com/smacker/go-tree-sitter v0.0.0-20240827094217-dd81d9e9be82 h1:6C8qej6f github.com/smacker/go-tree-sitter v0.0.0-20240827094217-dd81d9e9be82/go.mod h1:xe4pgH49k4SsmkQq5OT8abwhWmnzkhpgnXeekbx2efw= github.com/smartcontractkit/chain-selectors v1.0.89 h1:L9oWZGqQXWyTPnC6ODXgu3b0DFyLmJ9eHv+uJrE9IZY= github.com/smartcontractkit/chain-selectors v1.0.89/go.mod h1:qy7whtgG5g+7z0jt0nRyii9bLND9m15NZTzuQPkMZ5w= -github.com/smartcontractkit/chainlink-common v0.9.6-0.20260209153333-67bf1aaa3e1e h1:3zBkN2h2JzgjEntuV/YqqqCC9vNrBdwC5/FKfJi+1G8= -github.com/smartcontractkit/chainlink-common v0.9.6-0.20260209153333-67bf1aaa3e1e/go.mod h1:TDyLV7/Y+lnZegvfeZXj5myOG0cKrsmuGnJ8OQQuPWo= +github.com/smartcontractkit/chainlink-common v0.10.1-0.20260213162239-ce0002b6079e h1:jtHLhpl3lP5oZBB73ImW2MHGr+IbQtAW9euiidPJF+w= +github.com/smartcontractkit/chainlink-common v0.10.1-0.20260213162239-ce0002b6079e/go.mod h1:13YN2kb3Vqpw2S7d4IwhX/578WPGC0JHN5JrOnAEsOc= github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10 h1:FJAFgXS9oqASnkS03RE1HQwYQQxrO4l46O5JSzxqLgg= github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10/go.mod h1:oiDa54M0FwxevWwyAX773lwdWvFYYlYHHQV1LQ5HpWY= -github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260206000552-087e235a7963 h1:DCLvEn4KkFzYbK/AYl4vJmf6EHaskPYvGDGdd0kOma0= -github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260206000552-087e235a7963/go.mod h1:Jqt53s27Tr0jDl8mdBXg1xhu6F8Fci8JOuq43tgHOM8= +github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260210221717-2546aed27ebe h1:Vc4zoSc/j6/FdCQ7vcyHTTB7kzHI2f+lHCHqFuiCcJQ= +github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260210221717-2546aed27ebe/go.mod h1:Jqt53s27Tr0jDl8mdBXg1xhu6F8Fci8JOuq43tgHOM8= github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b h1:QuI6SmQFK/zyUlVWEf0GMkiUYBPY4lssn26nKSd/bOM= github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b/go.mod h1:qSTSwX3cBP3FKQwQacdjArqv0g6QnukjV4XuzO6UyoY= github.com/smartcontractkit/chainlink-protos/node-platform v0.0.0-20260205130626-db2a2aab956b h1:36knUpKHHAZ86K4FGWXtx8i/EQftGdk2bqCoEu/Cha8= diff --git a/integration-tests/go.mod b/integration-tests/go.mod index bfd499a5..01889315 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -14,7 +14,7 @@ require ( github.com/rs/zerolog v1.34.0 github.com/smartcontractkit/chain-selectors v1.0.89 github.com/smartcontractkit/chainlink-aptos v0.0.0-20251212131933-e5e85d6fa4d3 - github.com/smartcontractkit/chainlink-common v0.9.6-0.20260209153333-67bf1aaa3e1e + github.com/smartcontractkit/chainlink-common v0.10.1-0.20260213162239-ce0002b6079e github.com/smartcontractkit/chainlink-deployments-framework v0.74.2 github.com/smartcontractkit/chainlink-testing-framework/framework v0.13.0 github.com/smartcontractkit/chainlink/core/scripts v0.0.0-20260114190217-6f3f008c67a6 @@ -348,7 +348,7 @@ require ( github.com/smartcontractkit/chainlink-protos/chainlink-ccv/committee-verifier v0.0.0-20251211142334-5c3421fe2c8d // indirect github.com/smartcontractkit/chainlink-protos/chainlink-ccv/message-discovery v0.0.0-20251211142334-5c3421fe2c8d // indirect github.com/smartcontractkit/chainlink-protos/chainlink-ccv/verifier v0.0.0-20251211142334-5c3421fe2c8d // indirect - github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260206000552-087e235a7963 // indirect + github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260210221717-2546aed27ebe // indirect github.com/smartcontractkit/chainlink-protos/job-distributor v0.17.0 // indirect github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b // indirect github.com/smartcontractkit/chainlink-protos/node-platform v0.0.0-20260205130626-db2a2aab956b // indirect diff --git a/integration-tests/go.sum b/integration-tests/go.sum index b28a2ce1..38acadee 100644 --- a/integration-tests/go.sum +++ b/integration-tests/go.sum @@ -1307,8 +1307,8 @@ github.com/smartcontractkit/chainlink-ccip/deployment v0.0.0-20251222203705-84e9 github.com/smartcontractkit/chainlink-ccip/deployment v0.0.0-20251222203705-84e93cab86b5/go.mod h1:kDMTKjZB4pnhQVAdwVMzA0THXAxjaON58JSO+CYLYBg= github.com/smartcontractkit/chainlink-ccv v0.0.0-20260106165728-3d896e87cc56 h1:M6eS2r11Vbbll/bve5Us17cNYDlgs+dvrggPFVnhrgQ= github.com/smartcontractkit/chainlink-ccv v0.0.0-20260106165728-3d896e87cc56/go.mod h1:6N8NSPmsy+sxtRBmBUwWlDyxPyauS7HMDzUl/lyJw7Y= -github.com/smartcontractkit/chainlink-common v0.9.6-0.20260209153333-67bf1aaa3e1e h1:3zBkN2h2JzgjEntuV/YqqqCC9vNrBdwC5/FKfJi+1G8= -github.com/smartcontractkit/chainlink-common v0.9.6-0.20260209153333-67bf1aaa3e1e/go.mod h1:TDyLV7/Y+lnZegvfeZXj5myOG0cKrsmuGnJ8OQQuPWo= +github.com/smartcontractkit/chainlink-common v0.10.1-0.20260213162239-ce0002b6079e h1:jtHLhpl3lP5oZBB73ImW2MHGr+IbQtAW9euiidPJF+w= +github.com/smartcontractkit/chainlink-common v0.10.1-0.20260213162239-ce0002b6079e/go.mod h1:13YN2kb3Vqpw2S7d4IwhX/578WPGC0JHN5JrOnAEsOc= github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20251211140724-319861e514c4 h1:NOUsjsMzNecbjiPWUQGlRSRAutEvCFrqqyETDJeh5q4= github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20251211140724-319861e514c4/go.mod h1:Zpvul9sTcZNAZOVzt5vBl1XZGNvQebFpnpn3/KOQvOQ= github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20251215152504-b1e41f508340 h1:PsjEI+5jZIz9AS4eOsLS5VpSWJINf38clXV3wryPyMk= @@ -1341,8 +1341,8 @@ github.com/smartcontractkit/chainlink-protos/chainlink-ccv/message-discovery v0. github.com/smartcontractkit/chainlink-protos/chainlink-ccv/message-discovery v0.0.0-20251211142334-5c3421fe2c8d/go.mod h1:ATjAPIVJibHRcIfiG47rEQkUIOoYa6KDvWj3zwCAw6g= github.com/smartcontractkit/chainlink-protos/chainlink-ccv/verifier v0.0.0-20251211142334-5c3421fe2c8d h1:AJy55QJ/pBhXkZjc7N+ATnWfxrcjq9BI9DmdtdjwDUQ= github.com/smartcontractkit/chainlink-protos/chainlink-ccv/verifier v0.0.0-20251211142334-5c3421fe2c8d/go.mod h1:5JdppgngCOUS76p61zCinSCgOhPeYQ+OcDUuome5THQ= -github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260206000552-087e235a7963 h1:DCLvEn4KkFzYbK/AYl4vJmf6EHaskPYvGDGdd0kOma0= -github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260206000552-087e235a7963/go.mod h1:Jqt53s27Tr0jDl8mdBXg1xhu6F8Fci8JOuq43tgHOM8= +github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260210221717-2546aed27ebe h1:Vc4zoSc/j6/FdCQ7vcyHTTB7kzHI2f+lHCHqFuiCcJQ= +github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260210221717-2546aed27ebe/go.mod h1:Jqt53s27Tr0jDl8mdBXg1xhu6F8Fci8JOuq43tgHOM8= github.com/smartcontractkit/chainlink-protos/job-distributor v0.17.0 h1:xHPmFDhff7QpeFxKsZfk+24j4AlnQiFjjRh5O87Peu4= github.com/smartcontractkit/chainlink-protos/job-distributor v0.17.0/go.mod h1:/dVVLXrsp+V0AbcYGJo3XMzKg3CkELsweA/TTopCsKE= github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b h1:QuI6SmQFK/zyUlVWEf0GMkiUYBPY4lssn26nKSd/bOM= diff --git a/relayer/aptos_service.go b/relayer/aptos_service.go new file mode 100644 index 00000000..3ae06b93 --- /dev/null +++ b/relayer/aptos_service.go @@ -0,0 +1,274 @@ +package relayer + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "math/big" + "time" + + aptosgosdk "github.com/aptos-labs/aptos-go-sdk" + "github.com/aptos-labs/aptos-go-sdk/bcs" + "github.com/google/uuid" + + "github.com/smartcontractkit/chainlink-aptos/relayer/chain" + "github.com/smartcontractkit/chainlink-aptos/relayer/utils" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + commontypes "github.com/smartcontractkit/chainlink-common/pkg/types" + commonaptos "github.com/smartcontractkit/chainlink-common/pkg/types/chains/aptos" +) + +type aptosService struct { + commontypes.UnimplementedAptosService + chain chain.Chain + logger logger.Logger +} + +func (s *aptosService) AccountAPTBalance(ctx context.Context, req commonaptos.AccountAPTBalanceRequest) (*commonaptos.AccountAPTBalanceReply, error) { + client, err := s.chain.GetClient() + if err != nil { + return nil, fmt.Errorf("failed to get client: %w", err) + } + sdkAddr := aptosgosdk.AccountAddress(req.Address[:]) + reply, err := client.AccountAPTBalance(sdkAddr) + if err != nil { + return nil, fmt.Errorf("failed to get account APT balance: %w", err) + } + return &commonaptos.AccountAPTBalanceReply{Value: reply}, nil +} + +func (s *aptosService) View(ctx context.Context, req commonaptos.ViewRequest) (*commonaptos.ViewReply, error) { + if req.Payload == nil { + return nil, fmt.Errorf("view payload is required") + } + + client, err := s.chain.GetClient() + if err != nil { + return nil, fmt.Errorf("failed to get client: %w", err) + } + + sdkPayload := &aptosgosdk.ViewPayload{ + Module: aptosgosdk.ModuleId{ + Address: aptosgosdk.AccountAddress(req.Payload.Module.Address), + Name: req.Payload.Module.Name, + }, + Function: req.Payload.Function, + ArgTypes: convertTypeTagsToSDK(req.Payload.ArgTypes), + Args: req.Payload.Args, + } + + result, err := client.View(sdkPayload) + if err != nil { + return nil, fmt.Errorf("failed to call view function: %w", err) + } + + data, err := json.Marshal(result) + if err != nil { + return nil, fmt.Errorf("failed to marshal view result: %w", err) + } + + return &commonaptos.ViewReply{Data: data}, nil +} + +func (s *aptosService) TransactionByHash(ctx context.Context, req commonaptos.TransactionByHashRequest) (*commonaptos.TransactionByHashReply, error) { + client, err := s.chain.GetClient() + if err != nil { + return nil, fmt.Errorf("failed to get client: %w", err) + } + + tx, err := client.TransactionByHash(req.Hash) + if err != nil { + return nil, fmt.Errorf("failed to get transaction by hash: %w", err) + } + + data, err := json.Marshal(tx.Inner) + if err != nil { + return nil, fmt.Errorf("failed to marshal transaction data: %w", err) + } + + return &commonaptos.TransactionByHashReply{ + Transaction: &commonaptos.Transaction{ + Type: commonaptos.TransactionVariant(tx.Type), + Hash: string(tx.Hash()), + Version: tx.Version(), + Success: tx.Success(), + Data: data, + }, + }, nil +} + +func (s *aptosService) SubmitTransaction(ctx context.Context, req commonaptos.SubmitTransactionRequest) (*commonaptos.SubmitTransactionReply, error) { + // Deserialize the BCS-encoded TransactionPayload (containing an EntryFunction) + var txPayload aptosgosdk.TransactionPayload + if err := bcs.Deserialize(&txPayload, req.EncodedPayload); err != nil { + return nil, fmt.Errorf("failed to deserialize transaction payload: %w", err) + } + + entryFn, ok := txPayload.Payload.(*aptosgosdk.EntryFunction) + if !ok { + return nil, fmt.Errorf("expected EntryFunction payload, got %T", txPayload.Payload) + } + + gasLimit := big.NewInt(int64(req.GasConfig.MaxGasAmount)) + accounts, err := s.chain.KeyStore().Accounts(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get accounts: %w", err) + } + + if len(accounts) == 0 { + return nil, errors.New("no enabled accounts available") + } + + // Find account with highest balance + publicKey, err := s.getAccountWithHighestBalance(ctx, accounts) + if err != nil { + return nil, fmt.Errorf("failed to determine account for SubmitTransaction: %w", err) + } + txID := uuid.New().String() + err = s.chain.TxManager().EnqueueCRE( + txID, + &commontypes.TxMeta{ + GasLimit: gasLimit, + }, + publicKey, + entryFn, + true, // simulateTx + ) + if err != nil { + return nil, fmt.Errorf("failed to enqueue transaction: %w", err) + } + + // Poll TxManager for status until terminal + var txStatus commontypes.TransactionStatus + for { + txStatus, err = s.chain.TxManager().GetStatus(txID) + if err != nil { + return nil, fmt.Errorf("failed to get transaction status: %w", err) + } + + switch txStatus { + case commontypes.Finalized: + return &commonaptos.SubmitTransactionReply{ + PendingTransaction: &commonaptos.PendingTransaction{ + Hash: txID, + }, + }, nil + case commontypes.Failed, commontypes.Fatal: + return nil, fmt.Errorf("transaction failed with status: %v", txStatus) + case commontypes.Pending, commontypes.Unknown, commontypes.Unconfirmed: + // still in progress, wait and retry + select { + case <-ctx.Done(): + return nil, fmt.Errorf("context cancelled while waiting for transaction: %w", ctx.Err()) + case <-time.After(500 * time.Millisecond): + continue + } + default: + return nil, fmt.Errorf("unexpected transaction status: %v", txStatus) + } + } +} + +// getAccountWithHighestBalance returns the public key of the account with the highest APT balance. +func (s *aptosService) getAccountWithHighestBalance(ctx context.Context, accounts []string) (string, error) { + if len(accounts) == 0 { + return "", errors.New("no accounts provided") + } + if len(accounts) == 1 { + s.logger.Debugw("only one enabled account for chain", "account", accounts[0]) + return accounts[0], nil + } + + client, err := s.chain.GetClient() + if err != nil { + return "", fmt.Errorf("failed to get client: %w", err) + } + + var highestBalance uint64 + var selectedAccount string + var foundAny bool + + for _, account := range accounts { + addr, err := utils.HexPublicKeyToAddress(account) + if err != nil { + s.logger.Warnw("failed to convert public key to address, skipping", "account", account, "error", err) + continue + } + + balance, err := client.AccountAPTBalance(addr) + if err != nil { + s.logger.Warnw("failed to get balance for account, skipping", "account", account, "address", addr.String(), "error", err) + continue + } + + if !foundAny || balance > highestBalance { + highestBalance = balance + selectedAccount = account + foundAny = true + } + } + + if !foundAny { + // Fallback to first account if all balance queries failed + return accounts[0], nil + } + + s.logger.Debugw("selected account with highest balance for chain", + "account", selectedAccount, + "balance", highestBalance, + "totalAccounts", len(accounts)) + + return selectedAccount, nil +} + +// convertTypeTagsToSDK converts common TypeTags to SDK TypeTags. +func convertTypeTagsToSDK(tags []commonaptos.TypeTag) []aptosgosdk.TypeTag { + out := make([]aptosgosdk.TypeTag, len(tags)) + for i, tag := range tags { + out[i] = aptosgosdk.TypeTag{Value: convertTypeTagImplToSDK(tag.Value)} + } + return out +} + +func convertTypeTagImplToSDK(impl commonaptos.TypeTagImpl) aptosgosdk.TypeTagImpl { + switch v := impl.(type) { + case commonaptos.BoolTag: + return &aptosgosdk.BoolTag{} + case commonaptos.U8Tag: + return &aptosgosdk.U8Tag{} + case commonaptos.U16Tag: + return &aptosgosdk.U16Tag{} + case commonaptos.U32Tag: + return &aptosgosdk.U32Tag{} + case commonaptos.U64Tag: + return &aptosgosdk.U64Tag{} + case commonaptos.U128Tag: + return &aptosgosdk.U128Tag{} + case commonaptos.U256Tag: + return &aptosgosdk.U256Tag{} + case commonaptos.AddressTag: + return &aptosgosdk.AddressTag{} + case commonaptos.SignerTag: + return &aptosgosdk.SignerTag{} + case commonaptos.VectorTag: + return &aptosgosdk.VectorTag{ + TypeParam: aptosgosdk.TypeTag{Value: convertTypeTagImplToSDK(v.ElementType.Value)}, + } + case commonaptos.StructTag: + typeParams := make([]aptosgosdk.TypeTag, len(v.TypeParams)) + for i, tp := range v.TypeParams { + typeParams[i] = aptosgosdk.TypeTag{Value: convertTypeTagImplToSDK(tp.Value)} + } + return &aptosgosdk.StructTag{ + Address: aptosgosdk.AccountAddress(v.Address), + Module: v.Module, + Name: v.Name, + TypeParams: typeParams, + } + case commonaptos.GenericTag: + return &aptosgosdk.GenericTag{Num: uint64(v.Index)} + default: + return nil + } +} diff --git a/relayer/chain/chain.go b/relayer/chain/chain.go index ad53db34..9ae975ce 100644 --- a/relayer/chain/chain.go +++ b/relayer/chain/chain.go @@ -39,6 +39,7 @@ type Chain interface { TxManager() *txm.AptosTxm LogPoller() *logpoller.AptosLogPoller GetClient() (aptos.AptosRpcClient, error) + KeyStore() loop.Keystore } type ChainOpts struct { @@ -73,10 +74,11 @@ var _ Chain = (*chain)(nil) type chain struct { starter commonutils.StartStopOnce - id string - cfg *config.TOMLConfig - lggr logger.Logger - ds sqlutil.DataSource + id string + cfg *config.TOMLConfig + lggr logger.Logger + ds sqlutil.DataSource + keyStore loop.Keystore // Sub-services txm *txm.AptosTxm @@ -113,10 +115,11 @@ func newChain(cfg *config.TOMLConfig, loopKs loop.Keystore, lggr logger.Logger, } ch := &chain{ - id: cfg.ChainID, - cfg: cfg, - lggr: logger.Named(lggr, "Chain"), - ds: ds, + id: cfg.ChainID, + cfg: cfg, + lggr: logger.Named(lggr, "Chain"), + ds: ds, + keyStore: loopKs, } ch.txm, err = txm.New(lggr, loopKs, *cfg.TransactionManager, ch.GetClient) @@ -169,6 +172,10 @@ func (c *chain) ChainID() string { return c.id } +func (c *chain) KeyStore() loop.Keystore { + return c.keyStore +} + // GetClient returns a client, randomly selecting one from available and valid nodes func (c *chain) GetClient() (aptos.AptosRpcClient, error) { var node *config.Node diff --git a/relayer/relay.go b/relayer/relay.go index 06f7f799..875b2dd3 100644 --- a/relayer/relay.go +++ b/relayer/relay.go @@ -21,6 +21,7 @@ import ( ) var _ types.Relayer = (*relayer)(nil) //nolint:staticcheck +var _ types.AptosService = (*relayer)(nil) type relayer struct { chain chain.Chain @@ -28,11 +29,13 @@ type relayer struct { starter utils.StartStopOnce stopCh services.StopChan + aptosService } func NewRelayer(lggr logger.Logger, chain chain.Chain, capRegistry core.CapabilitiesRegistry) (*relayer, error) { ctx := context.TODO() + // CAN I REMOVE THIS ? if chain.Config().Workflow != nil { capability, err := write_target.NewAptosWriteTarget(ctx, chain, lggr) if err != nil { @@ -49,6 +52,10 @@ func NewRelayer(lggr logger.Logger, chain chain.Chain, capRegistry core.Capabili chain: chain, lggr: lggr, stopCh: make(chan struct{}), + aptosService: aptosService{ + chain: chain, + logger: lggr, + }, }, nil } @@ -56,6 +63,10 @@ func (r *relayer) Name() string { return r.lggr.Name() } +func (r *relayer) Replay(ctx context.Context, fromBlock string, args map[string]any) error { + return errors.ErrUnsupported +} + // Start starts the relayer respecting the given context. func (r *relayer) Start(ctx context.Context) error { return r.starter.StartOnce("AptosRelayer", func() error { @@ -180,10 +191,6 @@ func (r *relayer) Solana() (types.SolanaService, error) { return nil, errors.New("SolanaService is not supported for aptos") } -func (r *relayer) Replay(ctx context.Context, fromBlock string, args map[string]any) error { - return errors.ErrUnsupported -} - // ChainService interface func (r *relayer) GetChainStatus(ctx context.Context) (types.ChainStatus, error) { return r.chain.GetChainStatus(ctx) @@ -204,3 +211,7 @@ func (r *relayer) ListNodeStatuses(ctx context.Context, pageSize int32, pageToke func (r *relayer) Transact(ctx context.Context, from, to string, amount *big.Int, balanceCheck bool) error { return r.chain.Transact(ctx, from, to, amount, balanceCheck) } + +func (r *relayer) Aptos() (types.AptosService, error) { + return r, nil +} diff --git a/relayer/txm/txm.go b/relayer/txm/txm.go index 4c8f4e1b..fb7ea48f 100644 --- a/relayer/txm/txm.go +++ b/relayer/txm/txm.go @@ -221,6 +221,93 @@ func (a *AptosTxm) Enqueue(transactionID string, txMetadata *commontypes.TxMeta, return nil } +// EnqueueCRE is like Enqueue but accepts a deserialized EntryFunction directly, +// skipping the string-based function parsing and BCS serialisation of parameters. +// The EntryFunction already contains the module, function name, type tags, and +// pre-encoded BCS args. +func (a *AptosTxm) EnqueueCRE(transactionID string, txMetadata *commontypes.TxMeta, publicKey string, entryFunction *aptos.EntryFunction, simulateTx bool) error { + if entryFunction == nil { + return errors.New("entry function is required") + } + + if transactionID == "" { + transactionID = uuid.New().String() + } else { + a.transactionsLock.Lock() + _, transactionExists := a.transactions[transactionID] + a.transactionsLock.Unlock() + if transactionExists { + return errors.New("transaction already exists") + } + } + + ctxLogger := GetContexedTxLogger(a.baseLogger, transactionID, txMetadata) + + ed25519PublicKey, err := utils.HexPublicKeyToEd25519PublicKey(publicKey) + if err != nil { + return fmt.Errorf("failed to convert public key: %+w", err) + } + + acc := utils.Ed25519PublicKeyToAddress(ed25519PublicKey) + fromAddress := acc.String() + + fromAccountAddress := &aptos.AccountAddress{} + err = fromAccountAddress.ParseStringRelaxed(fromAddress) + if err != nil { + return fmt.Errorf("failed to parse from address: %+w", err) + } + + currentTimestamp := getTimestampSecs() + tx := &AptosTx{ + ID: transactionID, + Metadata: txMetadata, + Timestamp: currentTimestamp, + FromAddress: *fromAccountAddress, + PublicKey: ed25519PublicKey, + ContractAddress: entryFunction.Module.Address, + ModuleName: entryFunction.Module.Name, + FunctionName: entryFunction.Function, + TypeTags: entryFunction.ArgTypes, + BcsValues: entryFunction.Args, + Status: commontypes.Pending, + Simulate: simulateTx, + } + + a.transactionsLock.Lock() + if (currentTimestamp - a.transactionsLastPruneTime) > a.config.PruneIntervalSecs { + for txID, tx := range a.transactions { + if tx.Status != commontypes.Finalized && tx.Status != commontypes.Failed && tx.Status != commontypes.Fatal { + continue + } + if (currentTimestamp - tx.Timestamp) < a.config.PruneTxExpirationSecs { + continue + } + ctxLogger.Debugw("Pruning transaction", "status", tx.Status) + delete(a.transactions, txID) + } + a.transactionsLastPruneTime = currentTimestamp + } + a.transactions[transactionID] = tx + a.transactionsLock.Unlock() + + select { + case a.broadcastChan <- transactionID: + ctxLogger.Debugw("Tx enqueued", "fromAddr", fromAddress) + default: + // if the channel is full, we drop the transaction. + // we do this instead of setting the tx in `a.transactions` post-broadcast to avoid a race + // with the broadcastLoop, which expects to find the tx in `a.transactions` upon reception of + // the id. + a.transactionsLock.Lock() + delete(a.transactions, transactionID) + a.transactionsLock.Unlock() + + return fmt.Errorf("failed to enqueue tx: %+v", tx) + } + + return nil +} + func (a *AptosTxm) GetStatus(transactionID string) (commontypes.TransactionStatus, error) { if transactionID == "" { return commontypes.Unknown, errors.New("nil tx id") diff --git a/relayer/txm/txm_local_test.go b/relayer/txm/txm_local_test.go index 55b021b3..ff2fa303 100644 --- a/relayer/txm/txm_local_test.go +++ b/relayer/txm/txm_local_test.go @@ -12,6 +12,7 @@ import ( "time" "github.com/aptos-labs/aptos-go-sdk" + "github.com/aptos-labs/aptos-go-sdk/bcs" "github.com/google/uuid" "github.com/stretchr/testify/require" "golang.org/x/crypto/sha3" @@ -152,6 +153,75 @@ func runTxmTest(t *testing.T, logger logger.Logger, config Config, rpcURL string logger.Debugw("Counter value after test", "value", counterValue) require.Equal(t, expectedValue, counterValue) + + // submit all txs at once and wait for all afterwards + // helps testing reties and failure recoveries + var txIDsCRE []string + + accountBytes, err := bcs.Serialize(&accountAddress) + require.NoError(t, err) + + threeBytes, err := bcs.SerializeU64(3) + require.NoError(t, err) + fourBytes, err := bcs.SerializeU64(4) + require.NoError(t, err) + + for i := 0; i < iterations; i++ { + incrementId := uuid.New().String() + err := txm.EnqueueCRE( + incrementId, + getSampleTxMetadata(), + publicKeyHex, + &aptos.EntryFunction{ + Module: aptos.ModuleId{ + Address: accountAddress, + Name: "counter", + }, + Function: "increment", + ArgTypes: []aptos.TypeTag{}, + Args: [][]byte{ + accountBytes, + }, + }, + true, + ) + require.NoError(t, err) + expectedValue += 1 + txIDsCRE = append(txIDsCRE, incrementId) + + incrementMultId := uuid.New().String() + err = txm.EnqueueCRE( + incrementMultId, + getSampleTxMetadata(), + publicKeyHex, + &aptos.EntryFunction{ + Module: aptos.ModuleId{ + Address: accountAddress, + Name: "counter", + }, + Function: "increment_mult", + ArgTypes: []aptos.TypeTag{}, + Args: [][]byte{ + accountBytes, + threeBytes, + fourBytes, + }, + }, + true, + ) + require.NoError(t, err) + expectedValue += 3 * 4 + txIDsCRE = append(txIDsCRE, incrementMultId) + } + + for _, txId := range txIDsCRE { + waitForTxmId(t, txm, txId, time.Minute*2) + } + + counterValueCRE := testutils.ReadCounterValue(t, client, accountAddress) + logger.Debugw("Counter value after test", "value", counterValueCRE) + + require.Equal(t, expectedValue, counterValueCRE) } func deployTestModule(t *testing.T, txm *AptosTxm, fromAddress aptos.AccountAddress, publicKeyHex string) {