Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
c33e351
feat: add claimsync package and integrate with bridgesync
joanestebanr Mar 11, 2026
b6a7027
feat: migrate claim types to claimsync and add RPC interface (pm285)
joanestebanr Mar 16, 2026
aaaaf0b
feat: add TrueFalseAutoMode, claimsync RPC by syncerID, AutoStart and…
joanestebanr Mar 17, 2026
f763697
feat: lxbridge doesn't sync claims, that have been spplited to new lx…
joanestebanr Mar 17, 2026
289b379
fix: Unittest and linter
joanestebanr Mar 17, 2026
0ee3e40
fix: unittest
joanestebanr Mar 17, 2026
e3bf86b
fix: unittest
joanestebanr Mar 17, 2026
3ccacd9
fix: unittest
joanestebanr Mar 18, 2026
1f7b33d
fix: ut
joanestebanr Mar 18, 2026
d937b0a
feat: coverage
joanestebanr Mar 18, 2026
dadffff
feat: coverage
joanestebanr Mar 18, 2026
01947a3
feat: coverage
joanestebanr Mar 18, 2026
77bb924
feat: coverage
joanestebanr Mar 18, 2026
d02662d
feat: coverage
joanestebanr Mar 18, 2026
247862f
feat: coverage
joanestebanr Mar 18, 2026
c670f92
fix: validator nil ptr
joanestebanr Mar 18, 2026
76bdf02
fix: claimsync issue if first block has no events
joanestebanr Mar 19, 2026
1ea89c1
fix: lint
joanestebanr Mar 19, 2026
f018cd4
fix: error in FEP that get stuck because claimsyncer was not started
joanestebanr Mar 19, 2026
f883259
fix: start claimsync
joanestebanr Mar 19, 2026
71d18bd
feat: refactor claimsync initial block setup via InitialBlockClaimSyn…
joanestebanr Mar 20, 2026
489b733
feat: fix ut, reduce verbosity
joanestebanr Mar 20, 2026
f74f02a
fix: remove unneeded function GeneratePreBuildParams
joanestebanr Mar 20, 2026
3133a30
fix: nil pointer in bridge service
joanestebanr Mar 20, 2026
9956a2c
feat: coverage
joanestebanr Mar 20, 2026
3cb7173
fix: PR comments
joanestebanr Mar 20, 2026
fcc4789
fix: PR comments
joanestebanr Mar 20, 2026
d3a7b4c
fix: coverage
joanestebanr Mar 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,12 @@ packages:
config:
dir: "{{ .InterfaceDir }}/mocks"
all: true
github.com/agglayer/aggkit/claimsync:
config:
dir: "{{ .InterfaceDir }}/mocks"
interfaces:
ClaimSyncer:
github.com/agglayer/aggkit/claimsync/types:
config:
dir: "{{ .InterfaceDir }}/mocks"
all: true
52 changes: 37 additions & 15 deletions aggsender/aggsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/agglayer/aggkit/aggsender/trigger"
"github.com/agglayer/aggkit/aggsender/types"
"github.com/agglayer/aggkit/aggsender/validator"
claimsynctypes "github.com/agglayer/aggkit/claimsync/types"
aggkitcommon "github.com/agglayer/aggkit/common"
"github.com/agglayer/aggkit/db/compatibility"
"github.com/agglayer/aggkit/log"
Expand All @@ -37,17 +38,19 @@ type RateLimiter interface {
type AggSender struct {
log aggkitcommon.Logger

storage db.AggSenderStorage
aggLayerClient agglayer.AgglayerClientInterface
compatibilityStoragedChecker compatibility.CompatibilityChecker
certStatusChecker types.CertificateStatusChecker
certQuerier types.CertificateQuerier
rollupDataQuerier types.RollupDataQuerier
validatorPoller types.ValidatorPoller
localValidator types.CertificateValidateAndSigner
storage db.AggSenderStorage
aggLayerClient agglayer.AgglayerClientInterface
compatibilityStoragedChecker compatibility.CompatibilityChecker
certStatusChecker types.CertificateStatusChecker
certQuerier types.CertificateQuerier
rollupDataQuerier types.RollupDataQuerier
initialBlockClaimSyncerSetter types.InitialBlockClaimSyncerSetter
validatorPoller types.ValidatorPoller
localValidator types.CertificateValidateAndSigner

l1Client aggkittypes.BaseEthereumClienter
l1InfoTreeSyncer types.L1InfoTreeSyncer
l2ClaimSyncer claimsynctypes.ClaimSyncer

certificateSendTrigger types.CertificateSendTrigger

Expand All @@ -67,6 +70,7 @@ func New(
aggLayerClient agglayer.AgglayerClientInterface,
l1InfoTreeSyncer types.L1InfoTreeSyncer,
l2Syncer types.L2BridgeSyncer,
l2ClaimSyncer claimsynctypes.ClaimSyncer,
l1Client aggkittypes.BaseEthereumClienter,
l2Client aggkittypes.BaseEthereumClienter,
rollupDataQuerier types.RollupDataQuerier,
Expand Down Expand Up @@ -102,6 +106,7 @@ func New(
aggLayerClient,
l1InfoTreeSyncer,
l2Syncer,
l2ClaimSyncer,
l1Client,
l2Client,
rollupDataQuerier,
Expand All @@ -119,6 +124,7 @@ func newAggsender(
aggLayerClient agglayer.AgglayerClientInterface,
l1InfoTreeSyncer types.L1InfoTreeSyncer,
l2Syncer types.L2BridgeSyncer,
l2ClaimSyncer claimsynctypes.ClaimSyncer,
l1Client aggkittypes.BaseEthereumClienter,
l2Client aggkittypes.BaseEthereumClienter,
rollupDataQuerier types.RollupDataQuerier,
Expand All @@ -144,10 +150,18 @@ func newAggsender(

certQuerier := query.NewCertificateQuerier(
l2Syncer,
l2ClaimSyncer,
aggchainFEPCaller,
aggLayerClient,
initialLER,
)
l2OriginNetwork := l2Syncer.OriginNetwork()
initialBlockClaimSyncerSetter := query.NewSetInitialBlockToClaimSyncer(
certQuerier,
aggLayerClient,
l2OriginNetwork,
logger,
)

flowManager, err := flows.NewBuilderFlow(
ctx,
Expand All @@ -158,6 +172,7 @@ func newAggsender(
l2Client,
l1InfoTreeSyncer,
l2Syncer,
l2ClaimSyncer,
rollupDataQuerier,
committeeQuerier,
certQuerier,
Expand All @@ -169,8 +184,6 @@ func newAggsender(

logger.Infof("Aggsender Config: %s.", cfg.String())

l2OriginNetwork := l2Syncer.OriginNetwork()

compatibilityStoragedChecker := compatibility.NewCompatibilityCheck(
cfg.RequireStorageContentCompatibility,
func(ctx context.Context) (db.RuntimeData, error) {
Expand Down Expand Up @@ -228,9 +241,11 @@ func newAggsender(
),
certStatusChecker: statuschecker.NewCertStatusChecker(
logger, storage, aggLayerClient, certQuerier, l2OriginNetwork),
l1Client: l1Client,
l1InfoTreeSyncer: l1InfoTreeSyncer,
certificateSendTrigger: certificateSendTrigger,
l1Client: l1Client,
l1InfoTreeSyncer: l1InfoTreeSyncer,
l2ClaimSyncer: l2ClaimSyncer,
certificateSendTrigger: certificateSendTrigger,
initialBlockClaimSyncerSetter: initialBlockClaimSyncerSetter,
}, nil
}

Expand Down Expand Up @@ -269,9 +284,16 @@ func (a *AggSender) Start(ctx context.Context) {
a.log.Info("AggSender started")
metrics.Register()
a.status.Start(time.Now().UTC())

a.status.SetStatus(types.StatusCheckingDBCompatibility, a.log)
a.checkDBCompatibility(ctx)
a.status.SetStatus(types.StatusCheckingInitialStage, a.log)
a.certStatusChecker.CheckInitialStatus(ctx, a.cfg.DelayBetweenRetries.Duration, a.status)
a.status.SetStatus(types.StartingClaimSyncerStage, a.log)
err := a.initialBlockClaimSyncerSetter.SetClaimSyncerNextRequiredBlock(ctx, a.l2ClaimSyncer, nil)
if err != nil {
a.log.Panicf("error setting next required block for claim syncer: %v", err)
}
a.status.SetStatus(types.StatusFlowCheckingInitialStage, a.log)
if err := a.flow.CheckInitialStatus(ctx); err != nil {
a.log.Panicf("error checking flow Initial Status: %v", err)
}
Expand Down Expand Up @@ -320,7 +342,7 @@ func (a *AggSender) sendCertificates(ctx context.Context, returnAfterNIterations
a.certificateSendTrigger.OnIdle()
}

a.status.Status = types.StatusCertificateStage
a.status.SetStatus(types.StatusCertificateStage, a.log)
iteration := 0
for {
select {
Expand Down
20 changes: 13 additions & 7 deletions aggsender/aggsender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
aggsendertypes "github.com/agglayer/aggkit/aggsender/types"
"github.com/agglayer/aggkit/bridgesync"
bridgetypes "github.com/agglayer/aggkit/bridgesync/types"
claimsynctypes "github.com/agglayer/aggkit/claimsync/types"
aggkitcommon "github.com/agglayer/aggkit/common"
"github.com/agglayer/aggkit/config/types"
mocksdb "github.com/agglayer/aggkit/db/compatibility/mocks"
Expand Down Expand Up @@ -95,7 +96,7 @@ func TestAggSenderStart(t *testing.T) {
sendTrigger.EXPECT().Status().Return("test status").Once()
sendTrigger.EXPECT().OnIdle().Maybe()
bridgeL2SyncerMock.EXPECT().OriginNetwork().Return(uint32(2))
bridgeL2SyncerMock.EXPECT().GetLastProcessedBlock(mock.Anything).Return(uint64(0), nil)
bridgeL2SyncerMock.EXPECT().GetLastProcessedBlock(mock.Anything).Return(uint64(0), true, nil)
aggLayerMock.EXPECT().GetLatestPendingCertificateHeader(mock.Anything, mock.Anything).Return(nil, nil).Twice()
aggLayerMock.EXPECT().GetLatestSettledCertificateHeader(mock.Anything, mock.Anything).Return(nil, nil).Twice()
rollupQuerierMock.EXPECT().GetRollupChainID().Return(uint64(1234), nil)
Expand All @@ -118,6 +119,7 @@ func TestAggSenderStart(t *testing.T) {
aggLayerMock,
mockL1InfoTreeSyncer, // l1 info tree syncer
bridgeL2SyncerMock,
nil, // claim syncer
nil, // l1 client
nil, // l2 client
rollupQuerierMock,
Expand Down Expand Up @@ -262,7 +264,7 @@ func TestSendCertificate_NoClaims(t *testing.T) {
Status: agglayertypes.Settled,
}, nil).Once()
mockStorage.EXPECT().SaveLastSentCertificate(mock.Anything, mock.Anything).Return(nil).Once()
mockL2BridgeQuerier.EXPECT().GetLastProcessedBlock(mock.Anything).Return(uint64(50), nil)
mockL2BridgeQuerier.EXPECT().GetLastProcessedBlock(mock.Anything).Return(uint64(50), true, nil)
mockL2BridgeQuerier.EXPECT().GetBridgesAndClaims(mock.Anything, uint64(11), uint64(50)).Return([]bridgesync.Bridge{
{
BlockNum: 30,
Expand All @@ -276,8 +278,8 @@ func TestSendCertificate_NoClaims(t *testing.T) {
Metadata: []byte("metadata"),
DepositCount: 1,
},
}, []bridgesync.Claim{}, nil).Once()
mockL2BridgeQuerier.EXPECT().GetUnsetClaimsForBlockRange(mock.Anything, uint64(11), uint64(50)).Return([]bridgetypes.Unclaim{}, nil).Once()
}, []claimsynctypes.Claim{}, nil).Once()
mockL2BridgeQuerier.EXPECT().GetUnsetClaimsForBlockRange(mock.Anything, uint64(11), uint64(50)).Return([]claimsynctypes.Unclaim{}, nil).Once()
mockL1Querier.EXPECT().GetTargetL1InfoRoot(ctx).Return(&treetypes.Root{}, nil, nil).Once()
mockL2BridgeQuerier.EXPECT().GetExitRootByIndex(mock.Anything, uint32(1)).Return(common.Hash{}, nil).Once()
mockL2BridgeQuerier.EXPECT().OriginNetwork().Return(uint32(1)).Once()
Expand Down Expand Up @@ -555,6 +557,7 @@ func TestNewAggSender(t *testing.T) {
mockAgglayerClient,
mockL1InfoTreeSyncer, // l1 info tree syncer
mockBridgeSyncer,
nil, // claim syncer
nil, // l1 client
nil, // l2 client
mockRollupQuerier,
Expand All @@ -575,6 +578,9 @@ func TestAggSenderStartFailFlowCheckInitialStatus(t *testing.T) {
testData := newAggsenderTestData(t, testDataFlagMockStorage|testDataFlagMockFlow|testDataFlagMockStatusChecker)
testData.sut.cfg.RequireStorageContentCompatibility = false
testData.certStatusCheckerMock.EXPECT().CheckInitialStatus(mock.Anything, mock.Anything, testData.sut.status).Once()
mockInitialBlockSetter := mocks.NewInitialBlockClaimSyncerSetter(t)
mockInitialBlockSetter.EXPECT().SetClaimSyncerNextRequiredBlock(mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
testData.sut.initialBlockClaimSyncerSetter = mockInitialBlockSetter
testData.flowMock.EXPECT().CheckInitialStatus(mock.Anything).Return(fmt.Errorf("error")).Once()

require.Panics(t, func() {
Expand Down Expand Up @@ -808,14 +814,14 @@ func NewBridgesData(t *testing.T, num int, blockNum []uint64) []bridgesync.Bridg
return res
}

func NewClaimData(t *testing.T, num int, blockNum []uint64) []bridgesync.Claim {
func NewClaimData(t *testing.T, num int, blockNum []uint64) []claimsynctypes.Claim {
t.Helper()
if num == 0 {
num = len(blockNum)
}
res := make([]bridgesync.Claim, 0)
res := make([]claimsynctypes.Claim, 0)
for i := 0; i < num; i++ {
res = append(res, bridgesync.Claim{
res = append(res, claimsynctypes.Claim{
BlockNum: blockNum[i%len(blockNum)],
BlockPos: 0,
})
Expand Down
41 changes: 26 additions & 15 deletions aggsender/aggsender_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,41 @@ package aggsender

import (
"context"
"errors"

"github.com/agglayer/aggkit/agglayer"
"github.com/agglayer/aggkit/aggsender/metrics"
"github.com/agglayer/aggkit/aggsender/types"
"github.com/agglayer/aggkit/aggsender/validator"
v1 "github.com/agglayer/aggkit/aggsender/validator/proto/v1"
claimsynctypes "github.com/agglayer/aggkit/claimsync/types"
aggkitcommon "github.com/agglayer/aggkit/common"
configtypes "github.com/agglayer/aggkit/config/types"
"github.com/agglayer/aggkit/grpc"
signertypes "github.com/agglayer/go_signer/signer/types"
ethcommon "github.com/ethereum/go-ethereum/common"
)

var (
ErrNilCertificate = errors.New("aggsender-validator nil certificate")
ErrMetadataNotCompatible = errors.New("aggsender-validator metadata not compatible with the current version")
)

type AggsenderValidator struct {
log aggkitcommon.Logger
validator types.CertificateValidator
validatorService *grpc.Server
cfg validator.Config
log aggkitcommon.Logger
validator types.CertificateValidator
validatorService *grpc.Server
initialBlockClaimSyncerSetter types.InitialBlockClaimSyncerSetter
l2ClaimSyncer claimsynctypes.ClaimSyncer
cfg validator.Config
}

func NewAggsenderValidator(ctx context.Context,
logger aggkitcommon.Logger,
cfg validator.Config,
l2ClaimSyncer claimsynctypes.ClaimSyncer,
flow types.AggsenderVerifierFlow,
l1InfoTreeDataQuerier validator.L1InfoTreeRootByLeafQuerier,
aggLayerClient agglayer.AggLayerClientCertificateIDQuerier,
certQuerier types.CertificateQuerier,
aggchainFEPQuerier types.AggchainFEPRollupQuerier,
initialLER ethcommon.Hash,
signer signertypes.Signer) (*AggsenderValidator, error) {
signer signertypes.Signer,
initialBlockClaimSyncerSetter types.InitialBlockClaimSyncerSetter) (*AggsenderValidator, error) {
validatorCert := validator.NewAggsenderValidator(
logger, flow, l1InfoTreeDataQuerier, certQuerier, initialLER)
grpcServer, err := grpc.NewServer(cfg.ServerConfig)
Expand All @@ -51,14 +51,25 @@ func NewAggsenderValidator(ctx context.Context,
signer,
))
return &AggsenderValidator{
log: logger,
validator: validatorCert,
validatorService: grpcServer,
cfg: cfg,
log: logger,
validator: validatorCert,
validatorService: grpcServer,
cfg: cfg,
l2ClaimSyncer: l2ClaimSyncer,
initialBlockClaimSyncerSetter: initialBlockClaimSyncerSetter,
}, nil
}

// Start starts the AggsenderValidator service.
func (a *AggsenderValidator) Start(ctx context.Context) {
metrics.Register()
// The validator only attempts once: if it fails, it stops.
rh := aggkitcommon.NewRetryHandler([]configtypes.Duration{{Duration: a.cfg.DelayBetweenRetries.Duration}},
1)
err := a.initialBlockClaimSyncerSetter.SetClaimSyncerNextRequiredBlock(ctx, a.l2ClaimSyncer, rh)
if err != nil {
a.log.Panicf("failed to set claim syncer next required block: %v", err)
}
a.validatorService.Start(ctx)
}

Expand Down
Loading
Loading