From 2e05c593d97bc3bb32e9d2e17da9e25cc6e036aa Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Fri, 26 Jul 2024 01:00:03 +0200 Subject: [PATCH 1/3] shwap availability integration --- share/availability/full/availability.go | 28 +- share/availability/full/availability_test.go | 90 +++--- .../availability/full/reconstruction_test.go | 284 ------------------ share/availability/full/testing.go | 56 ---- share/availability/light/availability.go | 10 +- share/availability/light/availability_test.go | 234 ++++++--------- share/availability/light/testing.go | 107 ------- share/availability/test/corrupt_data.go | 130 -------- share/availability/test/testing.go | 163 ---------- share/getter.go | 100 ------ share/ipld/corrupted_data_test.go | 51 ---- share/mocks/getter.go | 83 ----- share/shwap/getters/cascade.go | 2 +- share/shwap/p2p/bitswap/getter.go | 6 +- 14 files changed, 156 insertions(+), 1188 deletions(-) delete mode 100644 share/availability/full/reconstruction_test.go delete mode 100644 share/availability/full/testing.go delete mode 100644 share/availability/light/testing.go delete mode 100644 share/availability/test/corrupt_data.go delete mode 100644 share/availability/test/testing.go delete mode 100644 share/getter.go delete mode 100644 share/ipld/corrupted_data_test.go delete mode 100644 share/mocks/getter.go diff --git a/share/availability/full/availability.go b/share/availability/full/availability.go index 9058a8eb0..8448d6cf8 100644 --- a/share/availability/full/availability.go +++ b/share/availability/full/availability.go @@ -5,14 +5,13 @@ import ( "errors" "fmt" - "github.com/filecoin-project/dagstore" logging "github.com/ipfs/go-log/v2" "github.com/celestiaorg/celestia-node/header" "github.com/celestiaorg/celestia-node/share" - "github.com/celestiaorg/celestia-node/share/eds" "github.com/celestiaorg/celestia-node/share/eds/byzantine" - "github.com/celestiaorg/celestia-node/share/ipld" + "github.com/celestiaorg/celestia-node/share/shwap" + "github.com/celestiaorg/celestia-node/store" ) var log = logging.Logger("share/full") @@ -21,14 +20,14 @@ var log = logging.Logger("share/full") // recovery technique. It is considered "full" because it is required // to download enough shares to fully reconstruct the data square. type ShareAvailability struct { - store *eds.Store - getter share.Getter + store *store.Store + getter shwap.Getter } // NewShareAvailability creates a new full ShareAvailability. func NewShareAvailability( - store *eds.Store, - getter share.Getter, + store *store.Store, + getter shwap.Getter, ) *ShareAvailability { return &ShareAvailability{ store: store, @@ -40,8 +39,9 @@ func NewShareAvailability( // enough Shares from the network. func (fa *ShareAvailability) SharesAvailable(ctx context.Context, header *header.ExtendedHeader) error { dah := header.DAH - // short-circuit if the given root is an empty data square, to avoid datastore hit + // if the data square is empty, we can safely link the header height in the store to an empty EDS. if share.DataHash(dah.Hash()).IsEmptyEDS() { + fa.store.Put(ctx, dah, header.Height(), share.EmptyEDS()) return nil } @@ -54,14 +54,10 @@ func (fa *ShareAvailability) SharesAvailable(ctx context.Context, header *header } // a hack to avoid loading the whole EDS in mem if we store it already. - if ok, _ := fa.store.Has(ctx, dah.Hash()); ok { + if ok, _ := fa.store.HasByHeight(ctx, header.Height()); ok { return nil } - adder := ipld.NewProofsAdder(len(dah.RowRoots), false) - ctx = ipld.CtxWithProofsAdder(ctx, adder) - defer adder.Purge() - eds, err := fa.getter.GetEDS(ctx, header) if err != nil { if errors.Is(err, context.Canceled) { @@ -69,14 +65,14 @@ func (fa *ShareAvailability) SharesAvailable(ctx context.Context, header *header } log.Errorw("availability validation failed", "root", dah.String(), "err", err.Error()) var byzantineErr *byzantine.ErrByzantine - if errors.Is(err, share.ErrNotFound) || errors.Is(err, context.DeadlineExceeded) && !errors.As(err, &byzantineErr) { + if errors.Is(err, shwap.ErrNotFound) || errors.Is(err, context.DeadlineExceeded) && !errors.As(err, &byzantineErr) { return share.ErrNotAvailable } return err } - err = fa.store.Put(ctx, dah.Hash(), eds) - if err != nil && !errors.Is(err, dagstore.ErrShardExists) { + err = fa.store.Put(ctx, dah, header.Height(), eds) + if err != nil { return fmt.Errorf("full availability: failed to store eds: %w", err) } return nil diff --git a/share/availability/full/availability_test.go b/share/availability/full/availability_test.go index 03e4a4676..f8d14ac5e 100644 --- a/share/availability/full/availability_test.go +++ b/share/availability/full/availability_test.go @@ -3,75 +3,89 @@ package full import ( "context" "testing" + "time" "github.com/golang/mock/gomock" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/celestiaorg/celestia-node/header/headertest" "github.com/celestiaorg/celestia-node/share" - availability_test "github.com/celestiaorg/celestia-node/share/availability/test" "github.com/celestiaorg/celestia-node/share/eds/edstest" - "github.com/celestiaorg/celestia-node/share/mocks" + "github.com/celestiaorg/celestia-node/share/shwap" + "github.com/celestiaorg/celestia-node/share/shwap/getters/mock" + "github.com/celestiaorg/celestia-node/store" ) -func TestShareAvailableOverMocknet_Full(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) +func TestSharesAvailable(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - net := availability_test.NewTestDAGNet(ctx, t) - _, root := RandNode(net, 32) - - eh := headertest.RandExtendedHeaderWithRoot(t, root) - nd := Node(net) - net.ConnectAll() + // RandServiceWithSquare creates a NewShareAvailability inside, so we can test it + eds := edstest.RandEDS(t, 16) + roots, err := share.NewAxisRoots(eds) + eh := headertest.RandExtendedHeaderWithRoot(t, roots) - err := nd.SharesAvailable(ctx, eh) - assert.NoError(t, err) -} + getter := mock.NewMockGetter(gomock.NewController(t)) + getter.EXPECT().GetEDS(gomock.Any(), eh).Return(eds, nil) -func TestSharesAvailable_Full(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + store, err := store.NewStore(store.DefaultParameters(), t.TempDir()) + avail := NewShareAvailability(store, getter) + err = avail.SharesAvailable(ctx, eh) + require.NoError(t, err) - // RandServiceWithSquare creates a NewShareAvailability inside, so we can test it - getter, roots := GetterWithRandSquare(t, 16) + // Check if the store has the root + has, err := store.HasByHash(ctx, roots.Hash()) + require.NoError(t, err) + require.True(t, has) - eh := headertest.RandExtendedHeaderWithRoot(t, roots) - avail := TestAvailability(t, getter) - err := avail.SharesAvailable(ctx, eh) - assert.NoError(t, err) + // Check if the store has the root linked to the height + has, err = store.HasByHeight(ctx, eh.Height()) + require.NoError(t, err) + require.True(t, has) } -func TestSharesAvailable_StoresToEDSStore(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) +func TestSharesAvailable_StoredEds(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - // RandServiceWithSquare creates a NewShareAvailability inside, so we can test it - getter, roots := GetterWithRandSquare(t, 16) + eds := edstest.RandEDS(t, 4) + roots, err := share.NewAxisRoots(eds) eh := headertest.RandExtendedHeaderWithRoot(t, roots) - avail := TestAvailability(t, getter) - err := avail.SharesAvailable(ctx, eh) - assert.NoError(t, err) + require.NoError(t, err) + + store, err := store.NewStore(store.DefaultParameters(), t.TempDir()) + avail := NewShareAvailability(store, nil) + + err = store.Put(ctx, roots, eh.Height(), eds) + require.NoError(t, err) - has, err := avail.store.Has(ctx, roots.Hash()) - assert.NoError(t, err) - assert.True(t, has) + has, err := store.HasByHeight(ctx, eh.Height()) + require.NoError(t, err) + require.True(t, has) + + err = avail.SharesAvailable(ctx, eh) + require.NoError(t, err) + + has, err = store.HasByHeight(ctx, eh.Height()) + require.NoError(t, err) + require.True(t, has) } -func TestSharesAvailable_Full_ErrNotAvailable(t *testing.T) { +func TestSharesAvailable_ErrNotAvailable(t *testing.T) { ctrl := gomock.NewController(t) - getter := mocks.NewMockGetter(ctrl) - ctx, cancel := context.WithCancel(context.Background()) + getter := mock.NewMockGetter(ctrl) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() eds := edstest.RandEDS(t, 4) roots, err := share.NewAxisRoots(eds) eh := headertest.RandExtendedHeaderWithRoot(t, roots) require.NoError(t, err) - avail := TestAvailability(t, getter) - errors := []error{share.ErrNotFound, context.DeadlineExceeded} + store, err := store.NewStore(store.DefaultParameters(), t.TempDir()) + avail := NewShareAvailability(store, getter) + + errors := []error{shwap.ErrNotFound, context.DeadlineExceeded} for _, getterErr := range errors { getter.EXPECT().GetEDS(gomock.Any(), gomock.Any()).Return(nil, getterErr) err := avail.SharesAvailable(ctx, eh) diff --git a/share/availability/full/reconstruction_test.go b/share/availability/full/reconstruction_test.go deleted file mode 100644 index 31edb3b6d..000000000 --- a/share/availability/full/reconstruction_test.go +++ /dev/null @@ -1,284 +0,0 @@ -//go:build !race - -package full - -import ( - "context" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/require" - "golang.org/x/sync/errgroup" - - "github.com/celestiaorg/celestia-node/header/headertest" - "github.com/celestiaorg/celestia-node/share" - "github.com/celestiaorg/celestia-node/share/availability/light" - availability_test "github.com/celestiaorg/celestia-node/share/availability/test" - "github.com/celestiaorg/celestia-node/share/eds" -) - -func init() { - eds.RetrieveQuadrantTimeout = time.Millisecond * 100 // to speed up tests -} - -// TestShareAvailable_OneFullNode asserts that a full node can ensure -// data is available (reconstruct data square) while being connected to -// light nodes only. -func TestShareAvailable_OneFullNode(t *testing.T) { - // NOTE: Numbers are taken from the original 'Fraud and Data Availability Proofs' paper - light.DefaultSampleAmount = 20 // s - const ( - origSquareSize = 16 // k - lightNodes = 69 // c - ) - - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) - defer cancel() - - net := availability_test.NewTestDAGNet(ctx, t) - source, root := RandNode(net, origSquareSize) // make a source node, a.k.a bridge - eh := headertest.RandExtendedHeader(t) - eh.DAH = root - full := Node(net) // make a full availability service which reconstructs data - - // ensure there is no connection between source and full nodes - // so that full reconstructs from the light nodes only - net.Disconnect(source.ID(), full.ID()) - - errg, errCtx := errgroup.WithContext(ctx) - errg.Go(func() error { - return full.SharesAvailable(errCtx, eh) - }) - - lights := make([]*availability_test.TestNode, lightNodes) - for i := 0; i < len(lights); i++ { - lights[i] = light.Node(net) - go func(i int) { - err := lights[i].SharesAvailable(ctx, eh) - if err != nil { - t.Log("light errors:", err) - } - }(i) - } - - for i := 0; i < len(lights); i++ { - net.Connect(lights[i].ID(), source.ID()) - } - - for i := 0; i < len(lights); i++ { - net.Connect(lights[i].ID(), full.ID()) - } - - err := errg.Wait() - require.NoError(t, err) -} - -// TestShareAvailable_ConnectedFullNodes asserts that two connected full nodes -// can ensure data availability via two isolated light node subnetworks. Full -// nodes start their availability process first, then light node start -// availability process and connect to full node and only after light node -// connect to the source node which has the data. After light node connect to the -// source, full node must be able to finish the availability process started in -// the beginning. -func TestShareAvailable_ConnectedFullNodes(t *testing.T) { - // NOTE: Numbers are taken from the original 'Fraud and Data Availability Proofs' paper - light.DefaultSampleAmount = 20 // s - const ( - origSquareSize = 16 // k - lightNodes = 60 // c - ) - - ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) - defer cancel() - - net := availability_test.NewTestDAGNet(ctx, t) - source, root := RandNode(net, origSquareSize) - eh := headertest.RandExtendedHeader(t) - eh.DAH = root - - // create two full nodes and ensure they are disconnected - full1 := Node(net) - full2 := Node(net) - - // pre-connect fulls - net.Connect(full1.ID(), full2.ID()) - // ensure fulls and source are not connected - // so that fulls take data from light nodes only - net.Disconnect(full1.ID(), source.ID()) - net.Disconnect(full2.ID(), source.ID()) - - // start reconstruction for fulls - errg, errCtx := errgroup.WithContext(ctx) - errg.Go(func() error { - return full1.SharesAvailable(errCtx, eh) - }) - errg.Go(func() error { - return full2.SharesAvailable(errCtx, eh) - }) - - // create light nodes and start sampling for them immediately - lights1, lights2 := make( - []*availability_test.TestNode, lightNodes/2), - make([]*availability_test.TestNode, lightNodes/2) - for i := 0; i < len(lights1); i++ { - lights1[i] = light.Node(net) - go func(i int) { - err := lights1[i].SharesAvailable(ctx, eh) - if err != nil { - t.Log("light1 errors:", err) - } - }(i) - - lights2[i] = light.Node(net) - go func(i int) { - err := lights2[i].SharesAvailable(ctx, eh) - if err != nil { - t.Log("light2 errors:", err) - } - }(i) - } - - // shape topology - for i := 0; i < len(lights1); i++ { - // ensure lights1 are only connected to full1 - net.Connect(lights1[i].ID(), full1.ID()) - net.Disconnect(lights1[i].ID(), full2.ID()) - // ensure lights2 are only connected to full2 - net.Connect(lights2[i].ID(), full2.ID()) - net.Disconnect(lights2[i].ID(), full1.ID()) - } - - // start connection lights with sources - for i := 0; i < len(lights1); i++ { - net.Connect(lights1[i].ID(), source.ID()) - net.Connect(lights2[i].ID(), source.ID()) - } - - err := errg.Wait() - require.NoError(t, err) -} - -// TestShareAvailable_DisconnectedFullNodes asserts that two disconnected full -// nodes cannot ensure data is available (reconstruct data square) while being -// connected to isolated light nodes subnetworks, which do not have enough nodes -// to reconstruct the data, but once ShareAvailability nodes connect, they can -// collectively reconstruct it. -// -//nolint:dupword -func TestShareAvailable_DisconnectedFullNodes(t *testing.T) { - // S - Source - // L - Light Node - // F - Full Node - // ── - connection - // - // Topology: - // NOTE: There are more Light Nodes in practice - // ┌─┬─┬─S─┬─┬─┐ - // │ │ │ │ │ │ - // │ │ │ │ │ │ - // │ │ │ │ │ │ - // L L L L L L - // │ │ │ │ │ │ - // └─┴─┤ ├─┴─┘ - // F└───┘F - // - - // NOTE: Numbers are taken from the original 'Fraud and Data Availability Proofs' paper - light.DefaultSampleAmount = 20 // s - const ( - origSquareSize = 16 // k - lightNodes = 32 // c - total number of nodes on two subnetworks - ) - - ctx, cancel := context.WithTimeout(context.Background(), time.Second*60) - defer cancel() - - net := availability_test.NewTestDAGNet(ctx, t) - source, root := RandNode(net, origSquareSize) - eh := headertest.RandExtendedHeader(t) - eh.DAH = root - - // create light nodes and start sampling for them immediately - lights1, lights2 := make( - []*availability_test.TestNode, lightNodes/2), - make([]*availability_test.TestNode, lightNodes/2) - - var wg sync.WaitGroup - wg.Add(lightNodes) - for i := 0; i < len(lights1); i++ { - lights1[i] = light.Node(net) - go func(i int) { - defer wg.Done() - err := lights1[i].SharesAvailable(ctx, eh) - if err != nil { - t.Log("light1 errors:", err) - } - }(i) - - lights2[i] = light.Node(net) - go func(i int) { - defer wg.Done() - err := lights2[i].SharesAvailable(ctx, eh) - if err != nil { - t.Log("light2 errors:", err) - } - }(i) - } - - // create two full nodes and ensure they are disconnected - full1 := Node(net) - full2 := Node(net) - net.Disconnect(full1.ID(), full2.ID()) - - // ensure fulls and source are not connected - // so that fulls take data from light nodes only - net.Disconnect(full1.ID(), source.ID()) - net.Disconnect(full2.ID(), source.ID()) - - // shape topology - for i := 0; i < len(lights1); i++ { - // ensure lights1 are only connected to source and full1 - net.Connect(lights1[i].ID(), source.ID()) - net.Connect(lights1[i].ID(), full1.ID()) - net.Disconnect(lights1[i].ID(), full2.ID()) - // ensure lights2 are only connected to source and full2 - net.Connect(lights2[i].ID(), source.ID()) - net.Connect(lights2[i].ID(), full2.ID()) - net.Disconnect(lights2[i].ID(), full1.ID()) - } - - // start reconstruction for fulls that should fail - ctxErr, cancelErr := context.WithTimeout(ctx, time.Second*5) - errg, errCtx := errgroup.WithContext(ctxErr) - errg.Go(func() error { - return full1.SharesAvailable(errCtx, eh) - }) - errg.Go(func() error { - return full2.SharesAvailable(errCtx, eh) - }) - - // check that any of the fulls cannot reconstruct on their own - err := errg.Wait() - require.ErrorIs(t, err, share.ErrNotAvailable) - cancelErr() - - // but after they connect - net.Connect(full1.ID(), full2.ID()) - - // with clean caches from the previous try - full1.ClearStorage() - full2.ClearStorage() - - // they both should be able to reconstruct the block - errg, bctx := errgroup.WithContext(ctx) - errg.Go(func() error { - return full1.SharesAvailable(bctx, eh) - }) - errg.Go(func() error { - return full2.SharesAvailable(bctx, eh) - }) - require.NoError(t, errg.Wait()) - // wait for all routines to finish before exit, in case there are any errors to log - wg.Wait() -} diff --git a/share/availability/full/testing.go b/share/availability/full/testing.go deleted file mode 100644 index ca3539807..000000000 --- a/share/availability/full/testing.go +++ /dev/null @@ -1,56 +0,0 @@ -package full - -import ( - "context" - "testing" - "time" - - "github.com/ipfs/go-datastore" - "github.com/stretchr/testify/require" - - "github.com/celestiaorg/celestia-node/share" - availability_test "github.com/celestiaorg/celestia-node/share/availability/test" - "github.com/celestiaorg/celestia-node/share/eds" - "github.com/celestiaorg/celestia-node/share/getters" - "github.com/celestiaorg/celestia-node/share/ipld" - "github.com/celestiaorg/celestia-node/share/p2p/discovery" -) - -// GetterWithRandSquare provides a share.Getter filled with 'n' NMT -// trees of 'n' random shares, essentially storing a whole square. -func GetterWithRandSquare(t *testing.T, n int) (share.Getter, *share.AxisRoots) { - bServ := ipld.NewMemBlockservice() - getter := getters.NewIPLDGetter(bServ) - return getter, availability_test.RandFillBS(t, n, bServ) -} - -// RandNode creates a Full Node filled with a random block of the given size. -func RandNode(dn *availability_test.TestDagNet, squareSize int) (*availability_test.TestNode, *share.AxisRoots) { - nd := Node(dn) - return nd, availability_test.RandFillBS(dn.T, squareSize, nd.BlockService) -} - -// Node creates a new empty Full Node. -func Node(dn *availability_test.TestDagNet) *availability_test.TestNode { - nd := dn.NewTestNode() - nd.Getter = getters.NewIPLDGetter(nd.BlockService) - nd.Availability = TestAvailability(dn.T, nd.Getter) - return nd -} - -func TestAvailability(t *testing.T, getter share.Getter) *ShareAvailability { - params := discovery.DefaultParameters() - params.AdvertiseInterval = time.Second - params.PeersLimit = 10 - - store, err := eds.NewStore(eds.DefaultParameters(), t.TempDir(), datastore.NewMapDatastore()) - require.NoError(t, err) - err = store.Start(context.Background()) - require.NoError(t, err) - - t.Cleanup(func() { - err = store.Stop(context.Background()) - require.NoError(t, err) - }) - return NewShareAvailability(store, getter) -} diff --git a/share/availability/light/availability.go b/share/availability/light/availability.go index c4300a29f..5b1e144de 100644 --- a/share/availability/light/availability.go +++ b/share/availability/light/availability.go @@ -12,7 +12,7 @@ import ( "github.com/celestiaorg/celestia-node/header" "github.com/celestiaorg/celestia-node/share" - "github.com/celestiaorg/celestia-node/share/getters" + "github.com/celestiaorg/celestia-node/share/shwap" ) var ( @@ -26,7 +26,7 @@ var ( // its availability. It is assumed that there are a lot of lightAvailability instances // on the network doing sampling over the same Root to collectively verify its availability. type ShareAvailability struct { - getter share.Getter + getter shwap.Getter params Parameters // TODO(@Wondertan): Once we come to parallelized DASer, this lock becomes a contention point @@ -38,7 +38,7 @@ type ShareAvailability struct { // NewShareAvailability creates a new light Availability. func NewShareAvailability( - getter share.Getter, + getter shwap.Getter, ds datastore.Batching, opts ...Option, ) *ShareAvailability { @@ -100,10 +100,6 @@ func (la *ShareAvailability) SharesAvailable(ctx context.Context, header *header return err } - // indicate to the share.Getter that a blockservice session should be created. This - // functionality is optional and must be supported by the used share.Getter. - ctx = getters.WithSession(ctx) - var ( failedSamplesLock sync.Mutex failedSamples []Sample diff --git a/share/availability/light/availability_test.go b/share/availability/light/availability_test.go index 07fbe28b3..b8ec9db78 100644 --- a/share/availability/light/availability_test.go +++ b/share/availability/light/availability_test.go @@ -3,28 +3,47 @@ package light import ( "context" _ "embed" - "strconv" + "sync" "testing" + "github.com/golang/mock/gomock" + "github.com/ipfs/go-datastore" "github.com/stretchr/testify/require" + "github.com/celestiaorg/rsmt2d" + + "github.com/celestiaorg/celestia-node/header" "github.com/celestiaorg/celestia-node/header/headertest" "github.com/celestiaorg/celestia-node/share" - availability_test "github.com/celestiaorg/celestia-node/share/availability/test" - "github.com/celestiaorg/celestia-node/share/ipld" - "github.com/celestiaorg/celestia-node/share/sharetest" + "github.com/celestiaorg/celestia-node/share/eds/edstest" + "github.com/celestiaorg/celestia-node/share/shwap" + "github.com/celestiaorg/celestia-node/share/shwap/getters/mock" + "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex" ) func TestSharesAvailableCaches(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - getter, eh := GetterWithRandSquare(t, 16) - dah := eh.DAH - avail := TestAvailability(getter) + eds := edstest.RandEDS(t, 16) + roots, err := share.NewAxisRoots(eds) + require.NoError(t, err) + eh := headertest.RandExtendedHeaderWithRoot(t, roots) + + getter := mock.NewMockGetter(gomock.NewController(t)) + getter.EXPECT(). + GetShare(gomock.Any(), eh, gomock.Any(), gomock.Any()). + DoAndReturn( + func(_ context.Context, _ *header.ExtendedHeader, row, col int) (share.Share, error) { + return eds.GetCell(uint(row), uint(col)), nil + }). + AnyTimes() + + ds := datastore.NewMapDatastore() + avail := NewShareAvailability(getter, ds) // cache doesn't have eds yet - has, err := avail.ds.Has(ctx, rootKey(dah)) + has, err := avail.ds.Has(ctx, rootKey(roots)) require.NoError(t, err) require.False(t, has) @@ -32,7 +51,7 @@ func TestSharesAvailableCaches(t *testing.T) { require.NoError(t, err) // is now stored success result - result, err := avail.ds.Get(ctx, rootKey(dah)) + result, err := avail.ds.Get(ctx, rootKey(roots)) require.NoError(t, err) failed, err := decodeSamples(result) require.NoError(t, err) @@ -43,15 +62,21 @@ func TestSharesAvailableHitsCache(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - getter, _ := GetterWithRandSquare(t, 16) - avail := TestAvailability(getter) + // create getter that always return ErrNotFound + getter := mock.NewMockGetter(gomock.NewController(t)) + getter.EXPECT(). + GetShare(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil, shrex.ErrNotFound). + AnyTimes() + + ds := datastore.NewMapDatastore() + avail := NewShareAvailability(getter, ds) - // create new roots, that is not available by getter - bServ := ipld.NewMemBlockservice() - roots := availability_test.RandFillBS(t, 16, bServ) + // generate random header + roots := edstest.RandomAxisRoots(t, 16) eh := headertest.RandExtendedHeaderWithRoot(t, roots) - // blockstore doesn't actually have the eds + // store doesn't actually have the eds err := avail.SharesAvailable(ctx, eh) require.ErrorIs(t, err, share.ErrNotAvailable) @@ -68,9 +93,11 @@ func TestSharesAvailableEmptyRoot(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - getter, _ := GetterWithRandSquare(t, 16) - avail := TestAvailability(getter) + getter := mock.NewMockGetter(gomock.NewController(t)) + ds := datastore.NewMapDatastore() + avail := NewShareAvailability(getter, ds) + // request for empty eds eh := headertest.RandExtendedHeaderWithRoot(t, share.EmptyEDSRoots()) err := avail.SharesAvailable(ctx, eh) require.NoError(t, err) @@ -80,16 +107,22 @@ func TestSharesAvailableFailed(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - getter, _ := GetterWithRandSquare(t, 16) - avail := TestAvailability(getter) + getter := mock.NewMockGetter(gomock.NewController(t)) + ds := datastore.NewMapDatastore() + avail := NewShareAvailability(getter, ds) // create new eds, that is not available by getter - bServ := ipld.NewMemBlockservice() - roots := availability_test.RandFillBS(t, 16, bServ) + eds := edstest.RandEDS(t, 16) + roots, err := share.NewAxisRoots(eds) + require.NoError(t, err) eh := headertest.RandExtendedHeaderWithRoot(t, roots) - // blockstore doesn't actually have the eds, so it should fail - err := avail.SharesAvailable(ctx, eh) + // getter doesn't have the eds, so it should fail + getter.EXPECT(). + GetShare(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil, shrex.ErrNotFound). + AnyTimes() + err = avail.SharesAvailable(ctx, eh) require.ErrorIs(t, err, share.ErrNotAvailable) // cache should have failed results now @@ -116,142 +149,45 @@ func TestSharesAvailableFailed(t *testing.T) { require.Empty(t, onceGetter.available) } -func TestShareAvailableOverMocknet_Light(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - net := availability_test.NewTestDAGNet(ctx, t) - _, root := RandNode(net, 16) - eh := headertest.RandExtendedHeader(t) - eh.DAH = root - nd := Node(net) - net.ConnectAll() - - err := nd.SharesAvailable(ctx, eh) - require.NoError(t, err) +type onceGetter struct { + *sync.Mutex + available map[Sample]struct{} } -func TestGetShare(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - n := 16 - getter, eh := GetterWithRandSquare(t, n) - - for i := range make([]bool, n) { - for j := range make([]bool, n) { - sh, err := getter.GetShare(ctx, eh, i, j) - require.NotNil(t, sh) - require.NoError(t, err) - } +func newOnceGetter() onceGetter { + return onceGetter{ + Mutex: &sync.Mutex{}, + available: make(map[Sample]struct{}), } } -func TestService_GetSharesByNamespace(t *testing.T) { - tests := []struct { - squareSize int - expectedShareCount int - }{ - {squareSize: 4, expectedShareCount: 2}, - {squareSize: 16, expectedShareCount: 2}, - {squareSize: 128, expectedShareCount: 2}, - } - - for _, tt := range tests { - t.Run("size: "+strconv.Itoa(tt.squareSize), func(t *testing.T) { - getter, bServ := EmptyGetter() - totalShares := tt.squareSize * tt.squareSize - randShares := sharetest.RandShares(t, totalShares) - idx1 := (totalShares - 1) / 2 - idx2 := totalShares / 2 - if tt.expectedShareCount > 1 { - // make it so that two rows have the same namespace - copy(share.GetNamespace(randShares[idx2]), share.GetNamespace(randShares[idx1])) - } - root := availability_test.FillBS(t, bServ, randShares) - eh := headertest.RandExtendedHeader(t) - eh.DAH = root - randNamespace := share.GetNamespace(randShares[idx1]) - - shares, err := getter.GetSharesByNamespace(context.Background(), eh, randNamespace) - require.NoError(t, err) - require.NoError(t, shares.Verify(root, randNamespace)) - flattened := shares.Flatten() - require.Len(t, flattened, tt.expectedShareCount) - for _, value := range flattened { - require.Equal(t, randNamespace, share.GetNamespace(value)) - } - if tt.expectedShareCount > 1 { - // idx1 is always smaller than idx2 - require.Equal(t, randShares[idx1], flattened[0]) - require.Equal(t, randShares[idx2], flattened[1]) - } - }) - t.Run("last two rows of a 4x4 square that have the same namespace have valid NMT proofs", func(t *testing.T) { - squareSize := 4 - totalShares := squareSize * squareSize - getter, bServ := EmptyGetter() - randShares := sharetest.RandShares(t, totalShares) - lastNID := share.GetNamespace(randShares[totalShares-1]) - for i := totalShares / 2; i < totalShares; i++ { - copy(share.GetNamespace(randShares[i]), lastNID) - } - root := availability_test.FillBS(t, bServ, randShares) - eh := headertest.RandExtendedHeader(t) - eh.DAH = root - - shares, err := getter.GetSharesByNamespace(context.Background(), eh, lastNID) - require.NoError(t, err) - require.NoError(t, shares.Verify(root, lastNID)) - }) +func (m onceGetter) AddSamples(samples []Sample) { + m.Lock() + defer m.Unlock() + for _, s := range samples { + m.available[s] = struct{}{} } } -func TestGetShares(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - n := 16 - getter, eh := GetterWithRandSquare(t, n) - - eds, err := getter.GetEDS(ctx, eh) - require.NoError(t, err) - gotRoots, err := share.NewAxisRoots(eds) - require.NoError(t, err) - - require.True(t, eh.DAH.Equals(gotRoots)) +func (m onceGetter) GetShare(_ context.Context, _ *header.ExtendedHeader, row, col int) (share.Share, error) { + m.Lock() + defer m.Unlock() + s := Sample{Row: uint16(row), Col: uint16(col)} + if _, ok := m.available[s]; ok { + delete(m.available, s) + return share.Share{}, nil + } + return share.Share{}, share.ErrNotAvailable } -func TestService_GetSharesByNamespaceNotFound(t *testing.T) { - getter, eh := GetterWithRandSquare(t, 1) - eh.DAH.RowRoots = nil - - emptyShares, err := getter.GetSharesByNamespace(context.Background(), eh, sharetest.RandV0Namespace()) - require.NoError(t, err) - require.Empty(t, emptyShares.Flatten()) +func (m onceGetter) GetEDS(_ context.Context, _ *header.ExtendedHeader) (*rsmt2d.ExtendedDataSquare, error) { + panic("not implemented") } -func BenchmarkService_GetSharesByNamespace(b *testing.B) { - tests := []struct { - amountShares int - }{ - {amountShares: 4}, - {amountShares: 16}, - {amountShares: 128}, - } - - for _, tt := range tests { - b.Run(strconv.Itoa(tt.amountShares), func(b *testing.B) { - t := &testing.T{} - getter, eh := GetterWithRandSquare(t, tt.amountShares) - root := eh.DAH - randNamespace := root.RowRoots[(len(root.RowRoots)-1)/2][:share.NamespaceSize] - root.RowRoots[(len(root.RowRoots) / 2)] = root.RowRoots[(len(root.RowRoots)-1)/2] - b.ResetTimer() - for i := 0; i < b.N; i++ { - _, err := getter.GetSharesByNamespace(context.Background(), eh, randNamespace) - require.NoError(t, err) - } - }) - } +func (m onceGetter) GetSharesByNamespace( + _ context.Context, + _ *header.ExtendedHeader, + _ share.Namespace, +) (shwap.NamespaceData, error) { + panic("not implemented") } diff --git a/share/availability/light/testing.go b/share/availability/light/testing.go deleted file mode 100644 index 82874f7d1..000000000 --- a/share/availability/light/testing.go +++ /dev/null @@ -1,107 +0,0 @@ -package light - -import ( - "context" - "sync" - "testing" - - "github.com/ipfs/boxo/blockservice" - "github.com/ipfs/go-datastore" - - "github.com/celestiaorg/rsmt2d" - - "github.com/celestiaorg/celestia-node/header" - "github.com/celestiaorg/celestia-node/header/headertest" - "github.com/celestiaorg/celestia-node/share" - availability_test "github.com/celestiaorg/celestia-node/share/availability/test" - "github.com/celestiaorg/celestia-node/share/getters" - "github.com/celestiaorg/celestia-node/share/ipld" -) - -// GetterWithRandSquare provides a share.Getter filled with 'n' NMT trees of 'n' random shares, -// essentially storing a whole square. -func GetterWithRandSquare(t *testing.T, n int) (share.Getter, *header.ExtendedHeader) { - bServ := ipld.NewMemBlockservice() - getter := getters.NewIPLDGetter(bServ) - root := availability_test.RandFillBS(t, n, bServ) - eh := headertest.RandExtendedHeader(t) - eh.DAH = root - - return getter, eh -} - -// EmptyGetter provides an unfilled share.Getter with corresponding blockservice.BlockService than -// can be filled by the test. -func EmptyGetter() (share.Getter, blockservice.BlockService) { - bServ := ipld.NewMemBlockservice() - getter := getters.NewIPLDGetter(bServ) - return getter, bServ -} - -// RandNode creates a Light Node filled with a random block of the given size. -func RandNode(dn *availability_test.TestDagNet, squareSize int) (*availability_test.TestNode, *share.AxisRoots) { - nd := Node(dn) - return nd, availability_test.RandFillBS(dn.T, squareSize, nd.BlockService) -} - -// Node creates a new empty Light Node. -func Node(dn *availability_test.TestDagNet) *availability_test.TestNode { - nd := dn.NewTestNode() - nd.Getter = getters.NewIPLDGetter(nd.BlockService) - nd.Availability = TestAvailability(nd.Getter) - return nd -} - -func TestAvailability(getter share.Getter) *ShareAvailability { - ds := datastore.NewMapDatastore() - return NewShareAvailability(getter, ds) -} - -func SubNetNode(sn *availability_test.SubNet) *availability_test.TestNode { - nd := Node(sn.TestDagNet) - sn.AddNode(nd) - return nd -} - -type onceGetter struct { - *sync.Mutex - available map[Sample]struct{} -} - -func newOnceGetter() onceGetter { - return onceGetter{ - Mutex: &sync.Mutex{}, - available: make(map[Sample]struct{}), - } -} - -func (m onceGetter) AddSamples(samples []Sample) { - m.Lock() - defer m.Unlock() - for _, s := range samples { - m.available[s] = struct{}{} - } -} - -func (m onceGetter) GetShare(_ context.Context, _ *header.ExtendedHeader, row, col int) (share.Share, error) { - m.Lock() - defer m.Unlock() - s := Sample{Row: uint16(row), Col: uint16(col)} - if _, ok := m.available[s]; ok { - delete(m.available, s) - return share.Share{}, nil - } - return share.Share{}, share.ErrNotAvailable -} - -func (m onceGetter) GetEDS(_ context.Context, _ *header.ExtendedHeader) (*rsmt2d.ExtendedDataSquare, error) { - panic("not implemented") -} - -func (m onceGetter) GetSharesByNamespace( - _ context.Context, - _ *header.ExtendedHeader, - _ share.Namespace, -) (share.NamespacedShares, error) { - panic("not implemented") -} diff --git a/share/availability/test/corrupt_data.go b/share/availability/test/corrupt_data.go deleted file mode 100644 index 1ff553f8b..000000000 --- a/share/availability/test/corrupt_data.go +++ /dev/null @@ -1,130 +0,0 @@ -package availability_test - -import ( - "context" - "crypto/rand" - "fmt" - mrand "math/rand" - "testing" - - "github.com/ipfs/boxo/blockstore" - blocks "github.com/ipfs/go-block-format" - "github.com/ipfs/go-cid" - ds "github.com/ipfs/go-datastore" - dssync "github.com/ipfs/go-datastore/sync" -) - -var _ blockstore.Blockstore = (*FraudulentBlockstore)(nil) - -// CorruptBlock is a block where the cid doesn't match the data. It fulfills the blocks.Block -// interface. -type CorruptBlock struct { - cid cid.Cid - data []byte -} - -func (b *CorruptBlock) RawData() []byte { - return b.data -} - -func (b *CorruptBlock) Cid() cid.Cid { - return b.cid -} - -func (b *CorruptBlock) String() string { - return fmt.Sprintf("[Block %s]", b.Cid()) -} - -func (b *CorruptBlock) Loggable() map[string]interface{} { - return map[string]interface{}{ - "block": b.Cid().String(), - } -} - -func NewCorruptBlock(data []byte, fakeCID cid.Cid) *CorruptBlock { - return &CorruptBlock{ - fakeCID, - data, - } -} - -// FraudulentBlockstore is a mock blockstore.Blockstore that saves both corrupted and original data -// for every block it receives. If FraudulentBlockstore.Attacking is true, it will serve the -// corrupted data on requests. -type FraudulentBlockstore struct { - ds.Datastore - Attacking bool -} - -func (fb FraudulentBlockstore) Has(context.Context, cid.Cid) (bool, error) { - return false, nil -} - -func (fb FraudulentBlockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) { - key := cid.String() - if fb.Attacking { - key = "corrupt_get" + key - } - - data, err := fb.Datastore.Get(ctx, ds.NewKey(key)) - if err != nil { - return nil, err - } - return NewCorruptBlock(data, cid), nil -} - -func (fb FraudulentBlockstore) GetSize(ctx context.Context, cid cid.Cid) (int, error) { - key := cid.String() - if fb.Attacking { - key = "corrupt_size" + key - } - - return fb.Datastore.GetSize(ctx, ds.NewKey(key)) -} - -func (fb FraudulentBlockstore) Put(ctx context.Context, block blocks.Block) error { - err := fb.Datastore.Put(ctx, ds.NewKey(block.Cid().String()), block.RawData()) - if err != nil { - return err - } - - // create data that doesn't match the CID with arbitrary lengths between 1 and - // len(block.RawData())*2 - corrupted := make([]byte, 1+mrand.Int()%(len(block.RawData())*2-1)) //nolint:gosec - _, _ = rand.Read(corrupted) - return fb.Datastore.Put(ctx, ds.NewKey("corrupt"+block.Cid().String()), corrupted) -} - -func (fb FraudulentBlockstore) PutMany(ctx context.Context, blocks []blocks.Block) error { - for _, b := range blocks { - err := fb.Put(ctx, b) - if err != nil { - return err - } - } - return nil -} - -func (fb FraudulentBlockstore) DeleteBlock(context.Context, cid.Cid) error { - panic("implement me") -} - -func (fb FraudulentBlockstore) AllKeysChan(context.Context) (<-chan cid.Cid, error) { - panic("implement me") -} - -func (fb FraudulentBlockstore) HashOnRead(bool) { - panic("implement me") -} - -// MockNode creates a TestNode that uses a FraudulentBlockstore to simulate serving corrupted data. -func MockNode(t *testing.T, net *TestDagNet) (*TestNode, *FraudulentBlockstore) { - t.Helper() - dstore := dssync.MutexWrap(ds.NewMapDatastore()) - mockBS := &FraudulentBlockstore{ - Datastore: dstore, - Attacking: false, - } - provider := net.NewTestNodeWithBlockstore(dstore, mockBS) - return provider, mockBS -} diff --git a/share/availability/test/testing.go b/share/availability/test/testing.go deleted file mode 100644 index 6fb4af30e..000000000 --- a/share/availability/test/testing.go +++ /dev/null @@ -1,163 +0,0 @@ -package availability_test - -import ( - "context" - "testing" - - "github.com/ipfs/boxo/bitswap" - "github.com/ipfs/boxo/bitswap/network" - "github.com/ipfs/boxo/blockservice" - "github.com/ipfs/boxo/blockstore" - "github.com/ipfs/boxo/routing/offline" - ds "github.com/ipfs/go-datastore" - dssync "github.com/ipfs/go-datastore/sync" - record "github.com/libp2p/go-libp2p-record" - "github.com/libp2p/go-libp2p/core/host" - "github.com/libp2p/go-libp2p/core/peer" - mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" - "github.com/stretchr/testify/require" - - "github.com/celestiaorg/celestia-node/share" - "github.com/celestiaorg/celestia-node/share/ipld" - "github.com/celestiaorg/celestia-node/share/sharetest" -) - -// RandFillBS fills the given BlockService with a random block of a given size. -func RandFillBS(t *testing.T, n int, bServ blockservice.BlockService) *share.AxisRoots { - shares := sharetest.RandShares(t, n*n) - return FillBS(t, bServ, shares) -} - -// FillBS fills the given BlockService with the given shares. -func FillBS(t *testing.T, bServ blockservice.BlockService, shares []share.Share) *share.AxisRoots { - eds, err := ipld.AddShares(context.TODO(), shares, bServ) - require.NoError(t, err) - roots, err := share.NewAxisRoots(eds) - require.NoError(t, err) - return roots -} - -type TestNode struct { - net *TestDagNet - share.Getter - share.Availability - blockservice.BlockService - host.Host -} - -// ClearStorage cleans up the storage of the node. -func (n *TestNode) ClearStorage() { - keys, err := n.Blockstore().AllKeysChan(n.net.ctx) - require.NoError(n.net.T, err) - - for k := range keys { - err := n.DeleteBlock(n.net.ctx, k) - require.NoError(n.net.T, err) - } -} - -type TestDagNet struct { - ctx context.Context - T *testing.T - net mocknet.Mocknet - nodes []*TestNode -} - -// NewTestDAGNet creates a new testing swarm utility to spawn different nodes and test how they -// interact and/or exchange data. -func NewTestDAGNet(ctx context.Context, t *testing.T) *TestDagNet { - return &TestDagNet{ - ctx: ctx, - T: t, - net: mocknet.New(), - } -} - -// NewTestNodeWithBlockstore creates a new plain TestNode with the given blockstore that can serve -// and request data. -func (dn *TestDagNet) NewTestNodeWithBlockstore(dstore ds.Datastore, bstore blockstore.Blockstore) *TestNode { - hst, err := dn.net.GenPeer() - require.NoError(dn.T, err) - routing := offline.NewOfflineRouter(dstore, record.NamespacedValidator{}) - bs := bitswap.New( - dn.ctx, - network.NewFromIpfsHost(hst, routing), - bstore, - bitswap.ProvideEnabled(false), // disable routines for DHT content provides, as we don't use them - bitswap.EngineBlockstoreWorkerCount(1), // otherwise it spawns 128 routines which is too much for tests - bitswap.EngineTaskWorkerCount(2), - bitswap.TaskWorkerCount(2), - bitswap.SetSimulateDontHavesOnTimeout(false), - bitswap.SetSendDontHaves(false), - ) - nd := &TestNode{ - net: dn, - BlockService: ipld.NewBlockservice(bstore, bs), - Host: hst, - } - dn.nodes = append(dn.nodes, nd) - return nd -} - -// NewTestNode creates a plain network node that can serve and request data. -func (dn *TestDagNet) NewTestNode() *TestNode { - dstore := dssync.MutexWrap(ds.NewMapDatastore()) - bstore := blockstore.NewBlockstore(dstore) - return dn.NewTestNodeWithBlockstore(dstore, bstore) -} - -// ConnectAll connects all the peers on registered on the TestDagNet. -func (dn *TestDagNet) ConnectAll() { - err := dn.net.LinkAll() - require.NoError(dn.T, err) - - err = dn.net.ConnectAllButSelf() - require.NoError(dn.T, err) -} - -// Connect connects two given peers. -func (dn *TestDagNet) Connect(peerA, peerB peer.ID) { - _, err := dn.net.LinkPeers(peerA, peerB) - require.NoError(dn.T, err) - _, err = dn.net.ConnectPeers(peerA, peerB) - require.NoError(dn.T, err) -} - -// Disconnect disconnects two peers. -// It does a hard disconnect, meaning that disconnected peers won't be able to reconnect on their -// own but only with DagNet.Connect or TestDagNet.ConnectAll. -func (dn *TestDagNet) Disconnect(peerA, peerB peer.ID) { - err := dn.net.UnlinkPeers(peerA, peerB) - require.NoError(dn.T, err) - err = dn.net.DisconnectPeers(peerA, peerB) - require.NoError(dn.T, err) -} - -type SubNet struct { - *TestDagNet - nodes []*TestNode -} - -func (dn *TestDagNet) SubNet() *SubNet { - return &SubNet{dn, nil} -} - -func (sn *SubNet) AddNode(nd *TestNode) { - sn.nodes = append(sn.nodes, nd) -} - -func (sn *SubNet) ConnectAll() { - nodes := sn.nodes - for _, n1 := range nodes { - for _, n2 := range nodes { - if n1 == n2 { - continue - } - _, err := sn.net.LinkPeers(n1.ID(), n2.ID()) - require.NoError(sn.T, err) - - _, err = sn.net.ConnectPeers(n1.ID(), n2.ID()) - require.NoError(sn.T, err) - } - } -} diff --git a/share/getter.go b/share/getter.go deleted file mode 100644 index 0d6a5c708..000000000 --- a/share/getter.go +++ /dev/null @@ -1,100 +0,0 @@ -package share - -import ( - "context" - "errors" - "fmt" - - "github.com/celestiaorg/nmt" - "github.com/celestiaorg/rsmt2d" - - "github.com/celestiaorg/celestia-node/header" -) - -var ( - // ErrNotFound is used to indicate that requested data could not be found. - ErrNotFound = errors.New("share: data not found") - // ErrOutOfBounds is used to indicate that a passed row or column index is out of bounds of the - // square size. - ErrOutOfBounds = errors.New("share: row or column index is larger than square size") -) - -// Getter interface provides a set of accessors for shares by the Root. -// Automatically verifies integrity of shares(exceptions possible depending on the implementation). -// -//go:generate mockgen -destination=mocks/getter.go -package=mocks . Getter -type Getter interface { - // GetShare gets a Share by coordinates in EDS. - GetShare(ctx context.Context, header *header.ExtendedHeader, row, col int) (Share, error) - - // GetEDS gets the full EDS identified by the given extended header. - GetEDS(context.Context, *header.ExtendedHeader) (*rsmt2d.ExtendedDataSquare, error) - - // GetSharesByNamespace gets all shares from an EDS within the given namespace. - // Shares are returned in a row-by-row order if the namespace spans multiple rows. - // Inclusion of returned data could be verified using Verify method on NamespacedShares. - // If no shares are found for target namespace non-inclusion could be also verified by calling - // Verify method. - GetSharesByNamespace(context.Context, *header.ExtendedHeader, Namespace) (NamespacedShares, error) -} - -// NamespacedShares represents all shares with proofs within a specific namespace of an EDS. -type NamespacedShares []NamespacedRow - -// Flatten returns the concatenated slice of all NamespacedRow shares. -func (ns NamespacedShares) Flatten() []Share { - var shares []Share - for _, row := range ns { - shares = append(shares, row.Shares...) - } - return shares -} - -// NamespacedRow represents all shares with proofs within a specific namespace of a single EDS row. -type NamespacedRow struct { - Shares []Share `json:"shares"` - Proof *nmt.Proof `json:"proof"` -} - -// Verify validates NamespacedShares by checking every row with nmt inclusion proof. -func (ns NamespacedShares) Verify(root *AxisRoots, namespace Namespace) error { - var originalRoots [][]byte - for _, row := range root.RowRoots { - if !namespace.IsOutsideRange(row, row) { - originalRoots = append(originalRoots, row) - } - } - - if len(originalRoots) != len(ns) { - return fmt.Errorf("amount of rows differs between root and namespace shares: expected %d, got %d", - len(originalRoots), len(ns)) - } - - for i, row := range ns { - if row.Proof == nil && row.Shares == nil { - return fmt.Errorf("row verification failed: no proofs and shares") - } - // verify row data against row hash from original root - if !row.Verify(originalRoots[i], namespace) { - return fmt.Errorf("row verification failed: row %d doesn't match original root: %s", i, root.String()) - } - } - return nil -} - -// Verify validates the row using nmt inclusion proof. -func (row *NamespacedRow) Verify(rowRoot []byte, namespace Namespace) bool { - // construct nmt leaves from shares by prepending namespace - leaves := make([][]byte, 0, len(row.Shares)) - for _, shr := range row.Shares { - leaves = append(leaves, append(GetNamespace(shr), shr...)) - } - - // verify namespace - return row.Proof.VerifyNamespace( - NewSHA256Hasher(), - namespace.ToNMT(), - leaves, - rowRoot, - ) -} diff --git a/share/ipld/corrupted_data_test.go b/share/ipld/corrupted_data_test.go deleted file mode 100644 index 0d0af6dd3..000000000 --- a/share/ipld/corrupted_data_test.go +++ /dev/null @@ -1,51 +0,0 @@ -package ipld_test - -import ( - "context" - "testing" - "time" - - "github.com/stretchr/testify/require" - - "github.com/celestiaorg/celestia-node/header/headertest" - "github.com/celestiaorg/celestia-node/share" - "github.com/celestiaorg/celestia-node/share/availability/full" - availability_test "github.com/celestiaorg/celestia-node/share/availability/test" - "github.com/celestiaorg/celestia-node/share/getters" -) - -// sharesAvailableTimeout is an arbitrarily picked interval of time in which a TestNode is expected -// to be able to complete a SharesAvailable request from a connected peer in a TestDagNet. -const sharesAvailableTimeout = 2 * time.Second - -// TestNamespaceHasher_CorruptedData is an integration test that verifies that the NamespaceHasher -// of a recipient of corrupted data will not panic, and will throw away the corrupted data. -func TestNamespaceHasher_CorruptedData(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - net := availability_test.NewTestDAGNet(ctx, t) - - requester := full.Node(net) - provider, mockBS := availability_test.MockNode(t, net) - provider.Availability = full.TestAvailability(t, getters.NewIPLDGetter(provider.BlockService)) - net.ConnectAll() - - // before the provider starts attacking, we should be able to retrieve successfully. We pass a size - // 16 block, but this is not important to the test and any valid block size behaves the same. - root := availability_test.RandFillBS(t, 16, provider.BlockService) - - eh := headertest.RandExtendedHeaderWithRoot(t, root) - getCtx, cancelGet := context.WithTimeout(ctx, sharesAvailableTimeout) - t.Cleanup(cancelGet) - err := requester.SharesAvailable(getCtx, eh) - require.NoError(t, err) - - // clear the storage of the requester so that it must retrieve again, then start attacking - // we reinitialize the node to clear the eds store - requester = full.Node(net) - mockBS.Attacking = true - getCtx, cancelGet = context.WithTimeout(ctx, sharesAvailableTimeout) - t.Cleanup(cancelGet) - err = requester.SharesAvailable(getCtx, eh) - require.ErrorIs(t, err, share.ErrNotAvailable) -} diff --git a/share/mocks/getter.go b/share/mocks/getter.go deleted file mode 100644 index 738e2b246..000000000 --- a/share/mocks/getter.go +++ /dev/null @@ -1,83 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: github.com/celestiaorg/celestia-node/share (interfaces: Getter) - -// Package mocks is a generated GoMock package. -package mocks - -import ( - context "context" - reflect "reflect" - - header "github.com/celestiaorg/celestia-node/header" - share "github.com/celestiaorg/celestia-node/share" - rsmt2d "github.com/celestiaorg/rsmt2d" - gomock "github.com/golang/mock/gomock" -) - -// MockGetter is a mock of Getter interface. -type MockGetter struct { - ctrl *gomock.Controller - recorder *MockGetterMockRecorder -} - -// MockGetterMockRecorder is the mock recorder for MockGetter. -type MockGetterMockRecorder struct { - mock *MockGetter -} - -// NewMockGetter creates a new mock instance. -func NewMockGetter(ctrl *gomock.Controller) *MockGetter { - mock := &MockGetter{ctrl: ctrl} - mock.recorder = &MockGetterMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockGetter) EXPECT() *MockGetterMockRecorder { - return m.recorder -} - -// GetEDS mocks base method. -func (m *MockGetter) GetEDS(arg0 context.Context, arg1 *header.ExtendedHeader) (*rsmt2d.ExtendedDataSquare, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetEDS", arg0, arg1) - ret0, _ := ret[0].(*rsmt2d.ExtendedDataSquare) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// GetEDS indicates an expected call of GetEDS. -func (mr *MockGetterMockRecorder) GetEDS(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetEDS", reflect.TypeOf((*MockGetter)(nil).GetEDS), arg0, arg1) -} - -// GetShare mocks base method. -func (m *MockGetter) GetShare(arg0 context.Context, arg1 *header.ExtendedHeader, arg2, arg3 int) ([]byte, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetShare", arg0, arg1, arg2, arg3) - ret0, _ := ret[0].([]byte) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// GetShare indicates an expected call of GetShare. -func (mr *MockGetterMockRecorder) GetShare(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetShare", reflect.TypeOf((*MockGetter)(nil).GetShare), arg0, arg1, arg2, arg3) -} - -// GetSharesByNamespace mocks base method. -func (m *MockGetter) GetSharesByNamespace(arg0 context.Context, arg1 *header.ExtendedHeader, arg2 share.Namespace) (share.NamespacedShares, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetSharesByNamespace", arg0, arg1, arg2) - ret0, _ := ret[0].(share.NamespacedShares) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// GetSharesByNamespace indicates an expected call of GetSharesByNamespace. -func (mr *MockGetterMockRecorder) GetSharesByNamespace(arg0, arg1, arg2 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSharesByNamespace", reflect.TypeOf((*MockGetter)(nil).GetSharesByNamespace), arg0, arg1, arg2) -} diff --git a/share/shwap/getters/cascade.go b/share/shwap/getters/cascade.go index ac1733ed3..255e2994e 100644 --- a/share/shwap/getters/cascade.go +++ b/share/shwap/getters/cascade.go @@ -52,7 +52,7 @@ func (cg *CascadeGetter) GetShare( upperBound := len(header.DAH.RowRoots) if row >= upperBound || col >= upperBound { - err := share.ErrOutOfBounds + err := shwap.ErrOutOfBounds span.RecordError(err) return nil, err } diff --git a/share/shwap/p2p/bitswap/getter.go b/share/shwap/p2p/bitswap/getter.go index ad4e9f5db..8947fc362 100644 --- a/share/shwap/p2p/bitswap/getter.go +++ b/share/shwap/p2p/bitswap/getter.go @@ -161,7 +161,7 @@ func (g *Getter) GetSharesByNamespace( ctx context.Context, hdr *header.ExtendedHeader, ns share.Namespace, -) (share.NamespacedShares, error) { +) (shwap.NamespaceData, error) { if err := ns.ValidateForData(); err != nil { return nil, err } @@ -183,10 +183,10 @@ func (g *Getter) GetSharesByNamespace( } // TODO(@Wondertan): this must use shwap types eventually - nsShrs := make(share.NamespacedShares, len(blks)) + nsShrs := make(shwap.NamespaceData, len(blks)) for i, blk := range blks { rnd := blk.(*RowNamespaceDataBlock).Container - nsShrs[i] = share.NamespacedRow{ + nsShrs[i] = shwap.RowNamespaceData{ Shares: rnd.Shares, Proof: rnd.Proof, } From 3ae0db2d64f0b64e44e1cd482ed3f9210aa6e0b5 Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Thu, 1 Aug 2024 10:55:49 +0200 Subject: [PATCH 2/3] review --- share/availability/full/availability.go | 4 +- .../availability/full/reconstruction_test.go | 284 ++++++++++++++++++ share/availability/full/testing.go | 57 ++++ share/availability/light/testing.go | 108 +++++++ share/availability/test/testing.go | 163 ++++++++++ share/ipld/corrupted_data_test.go | 51 ---- share/shwap/p2p/bitswap/getter.go | 1 - 7 files changed, 614 insertions(+), 54 deletions(-) create mode 100644 share/availability/full/reconstruction_test.go create mode 100644 share/availability/full/testing.go create mode 100644 share/availability/light/testing.go create mode 100644 share/availability/test/testing.go delete mode 100644 share/ipld/corrupted_data_test.go diff --git a/share/availability/full/availability.go b/share/availability/full/availability.go index 8448d6cf8..7554a7904 100644 --- a/share/availability/full/availability.go +++ b/share/availability/full/availability.go @@ -41,8 +41,8 @@ func (fa *ShareAvailability) SharesAvailable(ctx context.Context, header *header dah := header.DAH // if the data square is empty, we can safely link the header height in the store to an empty EDS. if share.DataHash(dah.Hash()).IsEmptyEDS() { - fa.store.Put(ctx, dah, header.Height(), share.EmptyEDS()) - return nil + err := fa.store.Put(ctx, dah, header.Height(), share.EmptyEDS()) + return fmt.Errorf("put empty EDS: %w", err) } // we assume the caller of this method has already performed basic validation on the diff --git a/share/availability/full/reconstruction_test.go b/share/availability/full/reconstruction_test.go new file mode 100644 index 000000000..fb8101cc3 --- /dev/null +++ b/share/availability/full/reconstruction_test.go @@ -0,0 +1,284 @@ +// //go:build !race +package full + +// +//import ( +// "context" +// "sync" +// "testing" +// "time" +// +// "github.com/stretchr/testify/require" +// "golang.org/x/sync/errgroup" +// +// "github.com/celestiaorg/celestia-node/header/headertest" +// "github.com/celestiaorg/celestia-node/share" +// "github.com/celestiaorg/celestia-node/share/availability/light" +// availability_test "github.com/celestiaorg/celestia-node/share/availability/test" +// "github.com/celestiaorg/celestia-node/share/eds" +//) +// +//func init() { +// eds.RetrieveQuadrantTimeout = time.Millisecond * 100 // to speed up tests +//} +// +//// TestShareAvailable_OneFullNode asserts that a full node can ensure +//// data is available (reconstruct data square) while being connected to +//// light nodes only. +//func TestShareAvailable_OneFullNode(t *testing.T) { +// // NOTE: Numbers are taken from the original 'Fraud and Data Availability Proofs' paper +// light.DefaultSampleAmount = 20 // s +// const ( +// origSquareSize = 16 // k +// lightNodes = 69 // c +// ) +// +// ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) +// defer cancel() +// +// net := availability_test.NewTestDAGNet(ctx, t) +// source, root := RandNode(net, origSquareSize) // make a source node, a.k.a bridge +// eh := headertest.RandExtendedHeader(t) +// eh.DAH = root +// full := Node(net) // make a full availability service which reconstructs data +// +// // ensure there is no connection between source and full nodes +// // so that full reconstructs from the light nodes only +// net.Disconnect(source.ID(), full.ID()) +// +// errg, errCtx := errgroup.WithContext(ctx) +// errg.Go(func() error { +// return full.SharesAvailable(errCtx, eh) +// }) +// +// lights := make([]*availability_test.TestNode, lightNodes) +// for i := 0; i < len(lights); i++ { +// lights[i] = light.Node(net) +// go func(i int) { +// err := lights[i].SharesAvailable(ctx, eh) +// if err != nil { +// t.Log("light errors:", err) +// } +// }(i) +// } +// +// for i := 0; i < len(lights); i++ { +// net.Connect(lights[i].ID(), source.ID()) +// } +// +// for i := 0; i < len(lights); i++ { +// net.Connect(lights[i].ID(), full.ID()) +// } +// +// err := errg.Wait() +// require.NoError(t, err) +//} +// +//// TestShareAvailable_ConnectedFullNodes asserts that two connected full nodes +//// can ensure data availability via two isolated light node subnetworks. Full +//// nodes start their availability process first, then light node start +//// availability process and connect to full node and only after light node +//// connect to the source node which has the data. After light node connect to the +//// source, full node must be able to finish the availability process started in +//// the beginning. +//func TestShareAvailable_ConnectedFullNodes(t *testing.T) { +// // NOTE: Numbers are taken from the original 'Fraud and Data Availability Proofs' paper +// light.DefaultSampleAmount = 20 // s +// const ( +// origSquareSize = 16 // k +// lightNodes = 60 // c +// ) +// +// ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) +// defer cancel() +// +// net := availability_test.NewTestDAGNet(ctx, t) +// source, root := RandNode(net, origSquareSize) +// eh := headertest.RandExtendedHeader(t) +// eh.DAH = root +// +// // create two full nodes and ensure they are disconnected +// full1 := Node(net) +// full2 := Node(net) +// +// // pre-connect fulls +// net.Connect(full1.ID(), full2.ID()) +// // ensure fulls and source are not connected +// // so that fulls take data from light nodes only +// net.Disconnect(full1.ID(), source.ID()) +// net.Disconnect(full2.ID(), source.ID()) +// +// // start reconstruction for fulls +// errg, errCtx := errgroup.WithContext(ctx) +// errg.Go(func() error { +// return full1.SharesAvailable(errCtx, eh) +// }) +// errg.Go(func() error { +// return full2.SharesAvailable(errCtx, eh) +// }) +// +// // create light nodes and start sampling for them immediately +// lights1, lights2 := make( +// []*availability_test.TestNode, lightNodes/2), +// make([]*availability_test.TestNode, lightNodes/2) +// for i := 0; i < len(lights1); i++ { +// lights1[i] = light.Node(net) +// go func(i int) { +// err := lights1[i].SharesAvailable(ctx, eh) +// if err != nil { +// t.Log("light1 errors:", err) +// } +// }(i) +// +// lights2[i] = light.Node(net) +// go func(i int) { +// err := lights2[i].SharesAvailable(ctx, eh) +// if err != nil { +// t.Log("light2 errors:", err) +// } +// }(i) +// } +// +// // shape topology +// for i := 0; i < len(lights1); i++ { +// // ensure lights1 are only connected to full1 +// net.Connect(lights1[i].ID(), full1.ID()) +// net.Disconnect(lights1[i].ID(), full2.ID()) +// // ensure lights2 are only connected to full2 +// net.Connect(lights2[i].ID(), full2.ID()) +// net.Disconnect(lights2[i].ID(), full1.ID()) +// } +// +// // start connection lights with sources +// for i := 0; i < len(lights1); i++ { +// net.Connect(lights1[i].ID(), source.ID()) +// net.Connect(lights2[i].ID(), source.ID()) +// } +// +// err := errg.Wait() +// require.NoError(t, err) +//} +// +//// TestShareAvailable_DisconnectedFullNodes asserts that two disconnected full +//// nodes cannot ensure data is available (reconstruct data square) while being +//// connected to isolated light nodes subnetworks, which do not have enough nodes +//// to reconstruct the data, but once ShareAvailability nodes connect, they can +//// collectively reconstruct it. +//// +////nolint:dupword +//func TestShareAvailable_DisconnectedFullNodes(t *testing.T) { +// // S - Source +// // L - Light Node +// // F - Full Node +// // ── - connection +// // +// // Topology: +// // NOTE: There are more Light Nodes in practice +// // ┌─┬─┬─S─┬─┬─┐ +// // │ │ │ │ │ │ +// // │ │ │ │ │ │ +// // │ │ │ │ │ │ +// // L L L L L L +// // │ │ │ │ │ │ +// // └─┴─┤ ├─┴─┘ +// // F└───┘F +// // +// +// // NOTE: Numbers are taken from the original 'Fraud and Data Availability Proofs' paper +// light.DefaultSampleAmount = 20 // s +// const ( +// origSquareSize = 16 // k +// lightNodes = 32 // c - total number of nodes on two subnetworks +// ) +// +// ctx, cancel := context.WithTimeout(context.Background(), time.Second*60) +// defer cancel() +// +// net := availability_test.NewTestDAGNet(ctx, t) +// source, root := RandNode(net, origSquareSize) +// eh := headertest.RandExtendedHeader(t) +// eh.DAH = root +// +// // create light nodes and start sampling for them immediately +// lights1, lights2 := make( +// []*availability_test.TestNode, lightNodes/2), +// make([]*availability_test.TestNode, lightNodes/2) +// +// var wg sync.WaitGroup +// wg.Add(lightNodes) +// for i := 0; i < len(lights1); i++ { +// lights1[i] = light.Node(net) +// go func(i int) { +// defer wg.Done() +// err := lights1[i].SharesAvailable(ctx, eh) +// if err != nil { +// t.Log("light1 errors:", err) +// } +// }(i) +// +// lights2[i] = light.Node(net) +// go func(i int) { +// defer wg.Done() +// err := lights2[i].SharesAvailable(ctx, eh) +// if err != nil { +// t.Log("light2 errors:", err) +// } +// }(i) +// } +// +// // create two full nodes and ensure they are disconnected +// full1 := Node(net) +// full2 := Node(net) +// net.Disconnect(full1.ID(), full2.ID()) +// +// // ensure fulls and source are not connected +// // so that fulls take data from light nodes only +// net.Disconnect(full1.ID(), source.ID()) +// net.Disconnect(full2.ID(), source.ID()) +// +// // shape topology +// for i := 0; i < len(lights1); i++ { +// // ensure lights1 are only connected to source and full1 +// net.Connect(lights1[i].ID(), source.ID()) +// net.Connect(lights1[i].ID(), full1.ID()) +// net.Disconnect(lights1[i].ID(), full2.ID()) +// // ensure lights2 are only connected to source and full2 +// net.Connect(lights2[i].ID(), source.ID()) +// net.Connect(lights2[i].ID(), full2.ID()) +// net.Disconnect(lights2[i].ID(), full1.ID()) +// } +// +// // start reconstruction for fulls that should fail +// ctxErr, cancelErr := context.WithTimeout(ctx, time.Second*5) +// errg, errCtx := errgroup.WithContext(ctxErr) +// errg.Go(func() error { +// return full1.SharesAvailable(errCtx, eh) +// }) +// errg.Go(func() error { +// return full2.SharesAvailable(errCtx, eh) +// }) +// +// // check that any of the fulls cannot reconstruct on their own +// err := errg.Wait() +// require.ErrorIs(t, err, share.ErrNotAvailable) +// cancelErr() +// +// // but after they connect +// net.Connect(full1.ID(), full2.ID()) +// +// // with clean caches from the previous try +// full1.ClearStorage() +// full2.ClearStorage() +// +// // they both should be able to reconstruct the block +// errg, bctx := errgroup.WithContext(ctx) +// errg.Go(func() error { +// return full1.SharesAvailable(bctx, eh) +// }) +// errg.Go(func() error { +// return full2.SharesAvailable(bctx, eh) +// }) +// require.NoError(t, errg.Wait()) +// // wait for all routines to finish before exit, in case there are any errors to log +// wg.Wait() +//} diff --git a/share/availability/full/testing.go b/share/availability/full/testing.go new file mode 100644 index 000000000..8351f7400 --- /dev/null +++ b/share/availability/full/testing.go @@ -0,0 +1,57 @@ +package full + +// +//import ( +// "context" +// "testing" +// "time" +// +// "github.com/ipfs/go-datastore" +// "github.com/stretchr/testify/require" +// +// "github.com/celestiaorg/celestia-node/share" +// availability_test "github.com/celestiaorg/celestia-node/share/availability/test" +// "github.com/celestiaorg/celestia-node/share/eds" +// "github.com/celestiaorg/celestia-node/share/getters" +// "github.com/celestiaorg/celestia-node/share/ipld" +// "github.com/celestiaorg/celestia-node/share/p2p/discovery" +//) +// +//// GetterWithRandSquare provides a share.Getter filled with 'n' NMT +//// trees of 'n' random shares, essentially storing a whole square. +//func GetterWithRandSquare(t *testing.T, n int) (share.Getter, *share.AxisRoots) { +// bServ := ipld.NewMemBlockservice() +// getter := getters.NewIPLDGetter(bServ) +// return getter, availability_test.RandFillBS(t, n, bServ) +//} +// +//// RandNode creates a Full Node filled with a random block of the given size. +//func RandNode(dn *availability_test.TestDagNet, squareSize int) (*availability_test.TestNode, *share.AxisRoots) { +// nd := Node(dn) +// return nd, availability_test.RandFillBS(dn.T, squareSize, nd.BlockService) +//} +// +//// Node creates a new empty Full Node. +//func Node(dn *availability_test.TestDagNet) *availability_test.TestNode { +// nd := dn.NewTestNode() +// nd.Getter = getters.NewIPLDGetter(nd.BlockService) +// nd.Availability = TestAvailability(dn.T, nd.Getter) +// return nd +//} +// +//func TestAvailability(t *testing.T, getter share.Getter) *ShareAvailability { +// params := discovery.DefaultParameters() +// params.AdvertiseInterval = time.Second +// params.PeersLimit = 10 +// +// store, err := eds.NewStore(eds.DefaultParameters(), t.TempDir(), datastore.NewMapDatastore()) +// require.NoError(t, err) +// err = store.Start(context.Background()) +// require.NoError(t, err) +// +// t.Cleanup(func() { +// err = store.Stop(context.Background()) +// require.NoError(t, err) +// }) +// return NewShareAvailability(store, getter) +//} diff --git a/share/availability/light/testing.go b/share/availability/light/testing.go new file mode 100644 index 000000000..673712f52 --- /dev/null +++ b/share/availability/light/testing.go @@ -0,0 +1,108 @@ +package light + +// +//import ( +// "context" +// "sync" +// "testing" +// +// "github.com/ipfs/boxo/blockservice" +// "github.com/ipfs/go-datastore" +// +// "github.com/celestiaorg/rsmt2d" +// +// "github.com/celestiaorg/celestia-node/header" +// "github.com/celestiaorg/celestia-node/header/headertest" +// "github.com/celestiaorg/celestia-node/share" +// availability_test "github.com/celestiaorg/celestia-node/share/availability/test" +// "github.com/celestiaorg/celestia-node/share/getters" +// "github.com/celestiaorg/celestia-node/share/ipld" +//) +// +//// GetterWithRandSquare provides a share.Getter filled with 'n' NMT trees of 'n' random shares, +//// essentially storing a whole square. +//func GetterWithRandSquare(t *testing.T, n int) (share.Getter, *header.ExtendedHeader) { +// bServ := ipld.NewMemBlockservice() +// getter := getters.NewIPLDGetter(bServ) +// root := availability_test.RandFillBS(t, n, bServ) +// eh := headertest.RandExtendedHeader(t) +// eh.DAH = root +// +// return getter, eh +//} +// +//// EmptyGetter provides an unfilled share.Getter with corresponding blockservice.BlockService than +//// can be filled by the test. +//func EmptyGetter() (share.Getter, blockservice.BlockService) { +// bServ := ipld.NewMemBlockservice() +// getter := getters.NewIPLDGetter(bServ) +// return getter, bServ +//} +// +//// RandNode creates a Light Node filled with a random block of the given size. +//func RandNode(dn *availability_test.TestDagNet, squareSize int) (*availability_test.TestNode, *share.AxisRoots) { +// nd := Node(dn) +// return nd, availability_test.RandFillBS(dn.T, squareSize, nd.BlockService) +//} +// +//// Node creates a new empty Light Node. +//func Node(dn *availability_test.TestDagNet) *availability_test.TestNode { +// nd := dn.NewTestNode() +// nd.Getter = getters.NewIPLDGetter(nd.BlockService) +// nd.Availability = TestAvailability(nd.Getter) +// return nd +//} +// +//func TestAvailability(getter share.Getter) *ShareAvailability { +// ds := datastore.NewMapDatastore() +// return NewShareAvailability(getter, ds) +//} +// +//func SubNetNode(sn *availability_test.SubNet) *availability_test.TestNode { +// nd := Node(sn.TestDagNet) +// sn.AddNode(nd) +// return nd +//} +// +//type onceGetter struct { +// *sync.Mutex +// available map[Sample]struct{} +//} +// +//func newOnceGetter() onceGetter { +// return onceGetter{ +// Mutex: &sync.Mutex{}, +// available: make(map[Sample]struct{}), +// } +//} +// +//func (m onceGetter) AddSamples(samples []Sample) { +// m.Lock() +// defer m.Unlock() +// for _, s := range samples { +// m.available[s] = struct{}{} +// } +//} +// +//func (m onceGetter) GetShare(_ context.Context, _ *header.ExtendedHeader, row, col int) (share.Share, error) { +// m.Lock() +// defer m.Unlock() +// s := Sample{Row: uint16(row), Col: uint16(col)} +// if _, ok := m.available[s]; ok { +// delete(m.available, s) +// return share.Share{}, nil +// } +// return share.Share{}, share.ErrNotAvailable +//} +// +//func (m onceGetter) GetEDS(_ context.Context, _ *header.ExtendedHeader) (*rsmt2d.ExtendedDataSquare, error) { +// panic("not implemented") +//} +// +//func (m onceGetter) GetSharesByNamespace( +// _ context.Context, +// _ *header.ExtendedHeader, +// _ share.Namespace, +//) (share.NamespacedShares, error) { +// panic("not implemented") +//} diff --git a/share/availability/test/testing.go b/share/availability/test/testing.go new file mode 100644 index 000000000..6fb4af30e --- /dev/null +++ b/share/availability/test/testing.go @@ -0,0 +1,163 @@ +package availability_test + +import ( + "context" + "testing" + + "github.com/ipfs/boxo/bitswap" + "github.com/ipfs/boxo/bitswap/network" + "github.com/ipfs/boxo/blockservice" + "github.com/ipfs/boxo/blockstore" + "github.com/ipfs/boxo/routing/offline" + ds "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + record "github.com/libp2p/go-libp2p-record" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" + "github.com/stretchr/testify/require" + + "github.com/celestiaorg/celestia-node/share" + "github.com/celestiaorg/celestia-node/share/ipld" + "github.com/celestiaorg/celestia-node/share/sharetest" +) + +// RandFillBS fills the given BlockService with a random block of a given size. +func RandFillBS(t *testing.T, n int, bServ blockservice.BlockService) *share.AxisRoots { + shares := sharetest.RandShares(t, n*n) + return FillBS(t, bServ, shares) +} + +// FillBS fills the given BlockService with the given shares. +func FillBS(t *testing.T, bServ blockservice.BlockService, shares []share.Share) *share.AxisRoots { + eds, err := ipld.AddShares(context.TODO(), shares, bServ) + require.NoError(t, err) + roots, err := share.NewAxisRoots(eds) + require.NoError(t, err) + return roots +} + +type TestNode struct { + net *TestDagNet + share.Getter + share.Availability + blockservice.BlockService + host.Host +} + +// ClearStorage cleans up the storage of the node. +func (n *TestNode) ClearStorage() { + keys, err := n.Blockstore().AllKeysChan(n.net.ctx) + require.NoError(n.net.T, err) + + for k := range keys { + err := n.DeleteBlock(n.net.ctx, k) + require.NoError(n.net.T, err) + } +} + +type TestDagNet struct { + ctx context.Context + T *testing.T + net mocknet.Mocknet + nodes []*TestNode +} + +// NewTestDAGNet creates a new testing swarm utility to spawn different nodes and test how they +// interact and/or exchange data. +func NewTestDAGNet(ctx context.Context, t *testing.T) *TestDagNet { + return &TestDagNet{ + ctx: ctx, + T: t, + net: mocknet.New(), + } +} + +// NewTestNodeWithBlockstore creates a new plain TestNode with the given blockstore that can serve +// and request data. +func (dn *TestDagNet) NewTestNodeWithBlockstore(dstore ds.Datastore, bstore blockstore.Blockstore) *TestNode { + hst, err := dn.net.GenPeer() + require.NoError(dn.T, err) + routing := offline.NewOfflineRouter(dstore, record.NamespacedValidator{}) + bs := bitswap.New( + dn.ctx, + network.NewFromIpfsHost(hst, routing), + bstore, + bitswap.ProvideEnabled(false), // disable routines for DHT content provides, as we don't use them + bitswap.EngineBlockstoreWorkerCount(1), // otherwise it spawns 128 routines which is too much for tests + bitswap.EngineTaskWorkerCount(2), + bitswap.TaskWorkerCount(2), + bitswap.SetSimulateDontHavesOnTimeout(false), + bitswap.SetSendDontHaves(false), + ) + nd := &TestNode{ + net: dn, + BlockService: ipld.NewBlockservice(bstore, bs), + Host: hst, + } + dn.nodes = append(dn.nodes, nd) + return nd +} + +// NewTestNode creates a plain network node that can serve and request data. +func (dn *TestDagNet) NewTestNode() *TestNode { + dstore := dssync.MutexWrap(ds.NewMapDatastore()) + bstore := blockstore.NewBlockstore(dstore) + return dn.NewTestNodeWithBlockstore(dstore, bstore) +} + +// ConnectAll connects all the peers on registered on the TestDagNet. +func (dn *TestDagNet) ConnectAll() { + err := dn.net.LinkAll() + require.NoError(dn.T, err) + + err = dn.net.ConnectAllButSelf() + require.NoError(dn.T, err) +} + +// Connect connects two given peers. +func (dn *TestDagNet) Connect(peerA, peerB peer.ID) { + _, err := dn.net.LinkPeers(peerA, peerB) + require.NoError(dn.T, err) + _, err = dn.net.ConnectPeers(peerA, peerB) + require.NoError(dn.T, err) +} + +// Disconnect disconnects two peers. +// It does a hard disconnect, meaning that disconnected peers won't be able to reconnect on their +// own but only with DagNet.Connect or TestDagNet.ConnectAll. +func (dn *TestDagNet) Disconnect(peerA, peerB peer.ID) { + err := dn.net.UnlinkPeers(peerA, peerB) + require.NoError(dn.T, err) + err = dn.net.DisconnectPeers(peerA, peerB) + require.NoError(dn.T, err) +} + +type SubNet struct { + *TestDagNet + nodes []*TestNode +} + +func (dn *TestDagNet) SubNet() *SubNet { + return &SubNet{dn, nil} +} + +func (sn *SubNet) AddNode(nd *TestNode) { + sn.nodes = append(sn.nodes, nd) +} + +func (sn *SubNet) ConnectAll() { + nodes := sn.nodes + for _, n1 := range nodes { + for _, n2 := range nodes { + if n1 == n2 { + continue + } + _, err := sn.net.LinkPeers(n1.ID(), n2.ID()) + require.NoError(sn.T, err) + + _, err = sn.net.ConnectPeers(n1.ID(), n2.ID()) + require.NoError(sn.T, err) + } + } +} diff --git a/share/ipld/corrupted_data_test.go b/share/ipld/corrupted_data_test.go deleted file mode 100644 index 0d0af6dd3..000000000 --- a/share/ipld/corrupted_data_test.go +++ /dev/null @@ -1,51 +0,0 @@ -package ipld_test - -import ( - "context" - "testing" - "time" - - "github.com/stretchr/testify/require" - - "github.com/celestiaorg/celestia-node/header/headertest" - "github.com/celestiaorg/celestia-node/share" - "github.com/celestiaorg/celestia-node/share/availability/full" - availability_test "github.com/celestiaorg/celestia-node/share/availability/test" - "github.com/celestiaorg/celestia-node/share/getters" -) - -// sharesAvailableTimeout is an arbitrarily picked interval of time in which a TestNode is expected -// to be able to complete a SharesAvailable request from a connected peer in a TestDagNet. -const sharesAvailableTimeout = 2 * time.Second - -// TestNamespaceHasher_CorruptedData is an integration test that verifies that the NamespaceHasher -// of a recipient of corrupted data will not panic, and will throw away the corrupted data. -func TestNamespaceHasher_CorruptedData(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - net := availability_test.NewTestDAGNet(ctx, t) - - requester := full.Node(net) - provider, mockBS := availability_test.MockNode(t, net) - provider.Availability = full.TestAvailability(t, getters.NewIPLDGetter(provider.BlockService)) - net.ConnectAll() - - // before the provider starts attacking, we should be able to retrieve successfully. We pass a size - // 16 block, but this is not important to the test and any valid block size behaves the same. - root := availability_test.RandFillBS(t, 16, provider.BlockService) - - eh := headertest.RandExtendedHeaderWithRoot(t, root) - getCtx, cancelGet := context.WithTimeout(ctx, sharesAvailableTimeout) - t.Cleanup(cancelGet) - err := requester.SharesAvailable(getCtx, eh) - require.NoError(t, err) - - // clear the storage of the requester so that it must retrieve again, then start attacking - // we reinitialize the node to clear the eds store - requester = full.Node(net) - mockBS.Attacking = true - getCtx, cancelGet = context.WithTimeout(ctx, sharesAvailableTimeout) - t.Cleanup(cancelGet) - err = requester.SharesAvailable(getCtx, eh) - require.ErrorIs(t, err, share.ErrNotAvailable) -} diff --git a/share/shwap/p2p/bitswap/getter.go b/share/shwap/p2p/bitswap/getter.go index 8947fc362..8f9711bb8 100644 --- a/share/shwap/p2p/bitswap/getter.go +++ b/share/shwap/p2p/bitswap/getter.go @@ -182,7 +182,6 @@ func (g *Getter) GetSharesByNamespace( return nil, err } - // TODO(@Wondertan): this must use shwap types eventually nsShrs := make(shwap.NamespaceData, len(blks)) for i, blk := range blks { rnd := blk.(*RowNamespaceDataBlock).Container From fd7f04cdd2fd0109ee20f90e8e4d1faa2be8722c Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Thu, 1 Aug 2024 11:54:32 +0200 Subject: [PATCH 3/3] wrap non-nil --- share/availability/full/availability.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/share/availability/full/availability.go b/share/availability/full/availability.go index 7554a7904..f4f0465dd 100644 --- a/share/availability/full/availability.go +++ b/share/availability/full/availability.go @@ -42,7 +42,10 @@ func (fa *ShareAvailability) SharesAvailable(ctx context.Context, header *header // if the data square is empty, we can safely link the header height in the store to an empty EDS. if share.DataHash(dah.Hash()).IsEmptyEDS() { err := fa.store.Put(ctx, dah, header.Height(), share.EmptyEDS()) - return fmt.Errorf("put empty EDS: %w", err) + if err != nil { + return fmt.Errorf("put empty EDS: %w", err) + } + return nil } // we assume the caller of this method has already performed basic validation on the