From bffa25ea882f2bb43a81154558af83424e31736f Mon Sep 17 00:00:00 2001 From: Martin Hutchinson Date: Tue, 22 Jul 2025 16:28:22 +0000 Subject: [PATCH 1/2] [VIndex] Refactoring and cleaning tech debt The next milestone is to add the output log. While prototyping this, it became clear that a lot of the naming is confusing. Renamed to make clear what is the input log, what is the write ahead log, etc. This makes space to add the output log parameters etc afterwards. As part of this, simplified the storage flags so that it takes only a root directory and then creates the input and output logs in there, along with the WAL file. Other map persistence will likely be added to this directory too in the future. Use skey->vkey method from formats, and deleted local method. Added a high level test for the verifiable index. --- go.mod | 5 +- go.sum | 4 +- vindex/README.md | 2 +- vindex/cmd/logandmap/main.go | 125 +++++++++++------------------------ vindex/map.go | 107 ++++++++++++++++-------------- vindex/map_test.go | 124 ++++++++++++++++++++++++++++++++-- 6 files changed, 220 insertions(+), 147 deletions(-) diff --git a/go.mod b/go.mod index 9bccebe..0bcdf9b 100644 --- a/go.mod +++ b/go.mod @@ -4,8 +4,10 @@ go 1.24.1 require ( filippo.io/torchwood v0.5.1-0.20250605130057-fa65d721a6ce + github.com/google/go-cmp v0.7.0 github.com/gorilla/mux v1.8.1 - github.com/transparency-dev/formats v0.0.0-20250616090723-6ce2fd29df16 + github.com/transparency-dev/formats v0.0.0-20250723101439-be3b1008ec3a + github.com/transparency-dev/merkle v0.0.2 github.com/transparency-dev/tessera v0.2.1-0.20250722085756-7303218c6614 golang.org/x/mod v0.26.0 golang.org/x/sync v0.16.0 @@ -17,7 +19,6 @@ require ( github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect - github.com/transparency-dev/merkle v0.0.2 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/otel v1.37.0 // indirect go.opentelemetry.io/otel/metric v1.37.0 // indirect diff --git a/go.sum b/go.sum index e8c69c2..7d91588 100644 --- a/go.sum +++ b/go.sum @@ -21,8 +21,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/transparency-dev/formats v0.0.0-20250616090723-6ce2fd29df16 h1:4yn2lO94tSuoT8fPm4NJZL5cRMHKOpGwmH/KM9f/ULI= -github.com/transparency-dev/formats v0.0.0-20250616090723-6ce2fd29df16/go.mod h1:v+kgcd91U14WBv5EshoX1nooq4SIZTZzYpQDBq6N55U= +github.com/transparency-dev/formats v0.0.0-20250723101439-be3b1008ec3a h1:l1RrmDw9xrVN/lbW/rzPJhjQ+dmsqOyItES9Ku/njFA= +github.com/transparency-dev/formats v0.0.0-20250723101439-be3b1008ec3a/go.mod h1:A4VaaPFBMEuwtuihpGY8wUOqRBy5plQww4NqfjK5E7c= github.com/transparency-dev/merkle v0.0.2 h1:Q9nBoQcZcgPamMkGn7ghV8XiTZ/kRxn1yCG81+twTK4= github.com/transparency-dev/merkle v0.0.2/go.mod h1:pqSy+OXefQ1EDUVmAJ8MUhHB9TXGuzVAT58PqBoHz1A= github.com/transparency-dev/tessera v0.2.1-0.20250722085756-7303218c6614 h1:EfxzWae/zdnfVA44jFIKohvd2JWlcanRzRYb1A/uiDU= diff --git a/vindex/README.md b/vindex/README.md index cafa73d..7a06989 100644 --- a/vindex/README.md +++ b/vindex/README.md @@ -195,7 +195,7 @@ LOG_PRIVATE_KEY=PRIVATE+KEY+logandmap+38581672+AXJ0FKWOcO2ch6WC8kP705Ed3Gxu7pVtZ ``` Running the above will run a web server hosting the following URLs: - - `/inputlog/` - the [tlog-tiles][] + - `/inputlog/` - the [tlog-tiles][] base URL for the input log - `/vindex/lookup` - the provisional [vindex lookup API](./api/api.go) - `/outputlog/` - TODO(mhutchinson): this is where the output log will be hosted diff --git a/vindex/cmd/logandmap/main.go b/vindex/cmd/logandmap/main.go index 012144c..640bb13 100644 --- a/vindex/cmd/logandmap/main.go +++ b/vindex/cmd/logandmap/main.go @@ -23,9 +23,7 @@ package main import ( "context" - "crypto/ed25519" "crypto/sha256" - "encoding/base64" "encoding/json" "errors" "flag" @@ -35,12 +33,13 @@ import ( "net/http" "os" "os/signal" - "strings" + "path" "syscall" "time" "github.com/gorilla/mux" "github.com/transparency-dev/formats/log" + fnote "github.com/transparency-dev/formats/note" "github.com/transparency-dev/incubator/vindex" "github.com/transparency-dev/tessera" "github.com/transparency-dev/tessera/api" @@ -51,10 +50,9 @@ import ( ) var ( - privKeyFile = flag.String("private_key", "", "Location of private key file. If unset, uses the contents of the LOG_PRIVATE_KEY environment variable.") - inputLogDir = flag.String("input_log_dir", "", "Root directory in which to store the log for the POSIX-based Input Log") - walPath = flag.String("walPath", "", "Path to use for the Write Ahead Log. If empty, a temporary file will be used.") - listen = flag.String("listen", ":8088", "Address to set up HTTP server listening on") + inputLogPrivKeyFile = flag.String("input_log_private_key", "", "Location of private key file. If unset, uses the contents of the INPUT_LOG_PRIVATE_KEY environment variable.") + storageDir = flag.String("storage_dir", "", "Root directory in which to store the data for the demo. This will create subdirectories for the Input Log, Output Log, and allocate space to store the verifiable map persistence.") + listen = flag.String("listen", ":8088", "Address to set up HTTP server listening on") ) func main() { @@ -76,53 +74,58 @@ type LogEntry struct { } func run(ctx context.Context) error { - if *inputLogDir == "" { - return errors.New("input_log_dir must be set") + if *storageDir == "" { + return errors.New("storage_dir must be set") + } + inputLogDir := path.Join(*storageDir, "inputlog") + walPath := path.Join(*storageDir, "index.wal") + + if err := os.MkdirAll(inputLogDir, 0o755); err != nil { + return fmt.Errorf("failed to create input log directory: %v", err) } // Gather the info needed for reading/writing checkpoints - s, v := getSignerVerifierOrDie() + ils, ilv := getInputLogSignerVerifierOrDie() // Set up a Tessera POSIX log - driver, err := posix.New(ctx, posix.Config{Path: *inputLogDir}) + ild, err := posix.New(ctx, posix.Config{Path: inputLogDir}) if err != nil { return fmt.Errorf("failed to create new log: %v", err) } - // Get a Tessera appender - appender, shutdown, reader, err := tessera.NewAppender(ctx, driver, tessera.NewAppendOptions(). - WithCheckpointSigner(s). + inputAppender, inputShutdown, inputReader, err := tessera.NewAppender(ctx, ild, tessera.NewAppendOptions(). + WithCheckpointSigner(ils). WithCheckpointInterval(10*time.Second). WithBatching(256, time.Second)) if err != nil { return fmt.Errorf("failed to get appender: %v", err) } defer func() { - _ = shutdown(ctx) + _ = inputShutdown(ctx) }() // Create the verifiable index connected to the LogReader. inputLog := logReaderSource{ - r: reader, + r: inputReader, } - logCpParseFn := func(cpRaw []byte) (*log.Checkpoint, error) { + inputLogCpParseFn := func(cpRaw []byte) (*log.Checkpoint, error) { // No witnesses required yet - cp, _, _, err := log.ParseCheckpoint(cpRaw, v.Name(), v) + cp, _, _, err := log.ParseCheckpoint(cpRaw, ilv.Name(), ilv) return cp, err } - vi, err := vindex.NewVerifiableIndex(ctx, inputLog, logCpParseFn, mapFnFromFlags(), walPathFromFlags()) + vi, err := vindex.NewVerifiableIndex(ctx, inputLog, inputLogCpParseFn, mapFnFromFlags(), walPath) if err != nil { return fmt.Errorf("failed to create vindex: %v", err) } // Submits new entries to the log in the background. - go submitEntries(ctx, appender) + go submitEntries(ctx, inputAppender) // Keeps the map synced with the latest published log state. go maintainMap(ctx, vi) // Run a web server to handle queries over the verifiable index. - go runWebServer(vi) + go runWebServer(vi, inputLogDir) <-ctx.Done() return nil } @@ -132,11 +135,11 @@ type logReaderSource struct { r tessera.LogReader } -func (s logReaderSource) GetCheckpoint(ctx context.Context) (checkpoint []byte, err error) { +func (s logReaderSource) Checkpoint(ctx context.Context) (checkpoint []byte, err error) { return s.r.ReadCheckpoint(ctx) } -func (s logReaderSource) StreamLeaves(ctx context.Context, start, end uint64) iter.Seq2[[]byte, error] { +func (s logReaderSource) Leaves(ctx context.Context, start, end uint64) iter.Seq2[[]byte, error] { bi := client.EntryBundles(ctx, 2, s.r.IntegratedSize, s.r.ReadEntryBundle, start, end-start) unbundleFn := func(bundle []byte) ([][]byte, error) { eb := &api.EntryBundle{} @@ -168,13 +171,13 @@ func maintainMap(ctx context.Context, vi *vindex.VerifiableIndex) { defer ticker.Stop() for { + if err := vi.Update(ctx); err != nil { + klog.Warning(err) + } select { case <-ctx.Done(): return case <-ticker.C: - if err := vi.Update(ctx); err != nil { - klog.Warning(err) - } } } } @@ -217,7 +220,7 @@ func submitEntries(ctx context.Context, appender *tessera.Appender) { } } -func runWebServer(vi *vindex.VerifiableIndex) { +func runWebServer(vi *vindex.VerifiableIndex, ild string) { web := NewServer(func(h [sha256.Size]byte) ([]uint64, error) { idxes, size := vi.Lookup(h) if size == 0 { @@ -226,7 +229,7 @@ func runWebServer(vi *vindex.VerifiableIndex) { return idxes, nil }) - ilfs := http.FileServer(http.Dir(*inputLogDir)) + ilfs := http.FileServer(http.Dir(ild)) r := mux.NewRouter() r.PathPrefix("/inputlog/").Handler(http.StripPrefix("/inputlog/", ilfs)) web.registerHandlers(r) @@ -240,69 +243,29 @@ func runWebServer(vi *vindex.VerifiableIndex) { klog.Infof("Started HTTP server listening on %s", *listen) } -// Read log private key from file or environment variable and generate the +// Read input log private key from file or environment variable and generate the // note Signer and Verifier pair for it. -func getSignerVerifierOrDie() (note.Signer, note.Verifier) { +func getInputLogSignerVerifierOrDie() (note.Signer, note.Verifier) { var privKey string var err error - if len(*privKeyFile) > 0 { - privKey, err = getKeyFile(*privKeyFile) + if len(*inputLogPrivKeyFile) > 0 { + privKey, err = getKeyFile(*inputLogPrivKeyFile) if err != nil { klog.Exitf("Unable to get private key: %v", err) } } else { - privKey = os.Getenv("LOG_PRIVATE_KEY") + privKey = os.Getenv("INPUT_LOG_PRIVATE_KEY") if len(privKey) == 0 { - klog.Exit("Supply private key file path using --private_key or set LOG_PRIVATE_KEY environment variable") + klog.Exit("Supply private key file path using --input_log_private_key or set INPUT_LOG_PRIVATE_KEY environment variable") } } - s, v, err := signerVerifierFromSkey(privKey) + s, v, err := fnote.NewEd25519SignerVerifier(privKey) if err != nil { klog.Exitf("Failed to get signer/verifier: %v", err) } return s, v } -// TODO(mhutchinson): move this into t-dev/formats. -func signerVerifierFromSkey(skey string) (note.Signer, note.Verifier, error) { - const algEd25519 = 1 - s, err := note.NewSigner(skey) - if err != nil { - return nil, nil, err - } - _, skey, _ = strings.Cut(skey, "+") - _, skey, _ = strings.Cut(skey, "+") - _, skey, _ = strings.Cut(skey, "+") - _, key64, _ := strings.Cut(skey, "+") - key, err := base64.StdEncoding.DecodeString(key64) - if err != nil { - return nil, nil, fmt.Errorf("failed to decode base64: %v", err) - } - - alg, key := key[0], key[1:] - switch alg { - default: - return nil, nil, errors.New("unsupported algorithm") - - case algEd25519: - if len(key) != ed25519.SeedSize { - return nil, nil, fmt.Errorf("expected key seed of size %d but got %d", ed25519.SeedSize, len(key)) - } - key := ed25519.NewKeyFromSeed(key) - publicKey := key.Public().(ed25519.PublicKey) - vkey, err := note.NewEd25519VerifierKey(s.Name(), publicKey) - if err != nil { - return nil, nil, fmt.Errorf("failed to generate verifier from key: %v", err) - - } - v, err := note.NewVerifier(vkey) - if err != nil { - return nil, nil, fmt.Errorf("failed to create verifier from vkey: %v", err) - } - return s, v, err - } -} - func getKeyFile(path string) (string, error) { k, err := os.ReadFile(path) if err != nil { @@ -326,15 +289,3 @@ func mapFnFromFlags() vindex.MapFn { } return mapFn } - -func walPathFromFlags() string { - if len(*walPath) > 0 { - return *walPath - } - f, err := os.CreateTemp("", "walPath") - if err != nil { - klog.Exitf("Failed to create temporary path for WAL: %s", err) - } - klog.Infof("Created temporary WAL at %s", f.Name()) - return f.Name() -} diff --git a/vindex/map.go b/vindex/map.go index bac7cdc..d268772 100644 --- a/vindex/map.go +++ b/vindex/map.go @@ -53,12 +53,11 @@ type MapFn func([]byte) [][32]byte // InputLog represents a connection to the input log from which map data will be built. // This can be a local or remote data source. type InputLog interface { - // GetCheckpoint returns the latest checkpoint committing to the input log state. - GetCheckpoint(ctx context.Context) (checkpoint []byte, err error) - // StreamLeaves returns all the leaves in the range [start, end), outputting them via - // the out channel. - // TODO(mhutchinson): This out channel would be better as a returned iterator. - StreamLeaves(ctx context.Context, start, end uint64) iter.Seq2[[]byte, error] + // Checkpoint returns the latest checkpoint committing to the input log state. + Checkpoint(ctx context.Context) (checkpoint []byte, err error) + // Leaves returns all the leaves in the range [start, end), outputting them via + // the returned iterator. + Leaves(ctx context.Context, start, end uint64) iter.Seq2[[]byte, error] } // OpenCheckpointFn is a function that parses a checkpoint, validating it, and returns a parsed @@ -72,15 +71,15 @@ type OpenCheckpointFn func(cpRaw []byte) (*log.Checkpoint, error) // path. // Note that only one IndexBuilder should exist for any given walPath at any time. The behaviour is unspecified, // but likely broken, if multiple processes are writing to the same file at any given time. -func NewVerifiableIndex(ctx context.Context, inputLog InputLog, logParseFn OpenCheckpointFn, mapFn MapFn, walPath string) (*VerifiableIndex, error) { - wal := &writeAheadLog{ +func NewVerifiableIndex(ctx context.Context, inputLog InputLog, inputLogParseFn OpenCheckpointFn, mapFn MapFn, walPath string) (*VerifiableIndex, error) { + wal := &walWriter{ walPath: walPath, } ws, err := wal.init() if err != nil { return nil, err } - reader, err := newLogReader(walPath) + reader, err := newWalReader(walPath) if err != nil { return nil, err } @@ -89,14 +88,17 @@ func NewVerifiableIndex(ctx context.Context, inputLog InputLog, logParseFn OpenC return nil, fmt.Errorf("InitStorage: %s", err) } b := &VerifiableIndex{ - inputLog: inputLog, - logParseFn: logParseFn, - mapFn: mapFn, - wal: wal, - reader: reader, - vindex: *mpt.NewTree(sha256.Sum256, vtreeStorage), - data: map[[32]byte][]uint64{}, - nextIndex: ws, + inputLog: inputLog, + inputLogParseFn: inputLogParseFn, + mapFn: mapFn, + walWriter: wal, + walReader: reader, + vindex: *mpt.NewTree(sha256.Sum256, vtreeStorage), + data: map[[32]byte][]uint64{}, + nextIndex: ws, + } + if err := b.buildMap(ctx); err != nil { + return nil, fmt.Errorf("failed to build map: %v", err) } return b, nil } @@ -104,30 +106,30 @@ func NewVerifiableIndex(ctx context.Context, inputLog InputLog, logParseFn OpenC // VerifiableIndex manages reading from the input log, mapping leaves, updating the WAL, // reading the WAL, and keeping the state of the in-memory index updated from the WAL. type VerifiableIndex struct { - inputLog InputLog - logParseFn OpenCheckpointFn - mapFn MapFn - wal *writeAheadLog - reader *logReader + inputLog InputLog + inputLogParseFn OpenCheckpointFn + mapFn MapFn + walWriter *walWriter + walReader *walReader indexMu sync.RWMutex // covers vindex and data vindex mpt.Tree data map[[32]byte][]uint64 - nextIndex uint64 // nextIndex is the next index in the log to consume - rawCp []byte // rawCp is the last checkpoint we started syncing to - cpSize uint64 // cpSize is the tree size of rawCp - mapSize uint64 // mapSize is the total number of leaves processed into the map - // servingSize is the size of the input log we are serving for. // This a temporary workaround not having an output log, which we will eventually read to get // the checkpoint size. servingSize uint64 + + nextIndex uint64 // nextIndex is the next index in the log to consume + inputLogCp []byte // rawCp is the last checkpoint we started syncing to + inputLogCpSize uint64 // cpSize is the tree size of rawCp. Used to sync on WAL. + mapSize uint64 // mapSize is the total number of leaves processed into the map } // Close ensures that any open connections are closed before returning. func (b *VerifiableIndex) Close() error { - return b.wal.close() + return b.walWriter.close() } // Lookup returns the values stored for the given key. @@ -158,29 +160,29 @@ func (b *VerifiableIndex) Lookup(key [sha256.Size]byte) (indices []uint64, size // Update checks the input log for a new Checkpoint, and ensures that the Verifiable Index // is updated to the corresponding size. func (b *VerifiableIndex) Update(ctx context.Context) error { - rawCp, err := b.inputLog.GetCheckpoint(ctx) + rawCp, err := b.inputLog.Checkpoint(ctx) if err != nil { return fmt.Errorf("failed to get latest checkpoint from DB: %s", err) } - cp, err := b.logParseFn(rawCp) + cp, err := b.inputLogParseFn(rawCp) if err != nil { return fmt.Errorf("failed to parse checkpoint: %s", err) } - if cp.Size == b.cpSize { + if cp.Size == b.inputLogCpSize { klog.V(1).Infof("No update needed: checkpoint size is still %d", b.servingSize) return nil } - b.cpSize = cp.Size - b.rawCp = rawCp - klog.Infof("Building map to log size of %d", b.cpSize) + b.inputLogCpSize = cp.Size + b.inputLogCp = rawCp + klog.Infof("Building map to log size of %d", b.inputLogCpSize) eg, cctx := errgroup.WithContext(ctx) eg.Go(func() error { return b.syncFromInputLog(cctx) }) eg.Go(func() error { return b.buildMap(cctx) }) err = eg.Wait() - b.servingSize = b.cpSize + b.servingSize = b.inputLogCpSize return err } @@ -193,10 +195,10 @@ func (b *VerifiableIndex) Update(ctx context.Context) error { // cloneDB, which performed this validation. Implementing this will require the index to store some // state alongside the WAL which contains a compact range of its current progress. func (b *VerifiableIndex) syncFromInputLog(ctx context.Context) error { - if b.cpSize > b.nextIndex { + if b.inputLogCpSize > b.nextIndex { ctx, done := context.WithCancel(ctx) defer done() - for l, err := range b.inputLog.StreamLeaves(ctx, b.nextIndex, b.cpSize) { + for l, err := range b.inputLog.Leaves(ctx, b.nextIndex, b.inputLogCpSize) { idx := b.nextIndex if err != nil { return fmt.Errorf("failed to read leaf at index %d: %v", idx, err) @@ -218,14 +220,14 @@ func (b *VerifiableIndex) syncFromInputLog(ctx context.Context) error { return mapErr } b.nextIndex++ - if len(hashes) == 0 && idx < b.cpSize-1 { + if len(hashes) == 0 && idx < b.inputLogCpSize-1 { // We can skip writing out values with no hashes, as long as we're // not at the end of the log. // If we are at the end of the log, we need to write out a value as a sentinel // even if there are no hashes. continue } - if err := b.wal.append(idx, hashes); err != nil { + if err := b.walWriter.append(idx, hashes); err != nil { return fmt.Errorf("failed to add index to entry for leaf %d: %v", idx, err) } } @@ -238,13 +240,13 @@ func (b *VerifiableIndex) syncFromInputLog(ctx context.Context) error { func (b *VerifiableIndex) buildMap(ctx context.Context) error { startWal := time.Now() updatedKeys := make(map[[32]byte]bool) // Allows us to efficiently update vindex after first init - for b.mapSize < b.cpSize { + for b.mapSize < b.inputLogCpSize { select { case <-ctx.Done(): return ctx.Err() default: } - idx, hashes, err := b.reader.next() + idx, hashes, err := b.walReader.next() if err != nil { if err != io.EOF { return err @@ -311,7 +313,10 @@ func (b *VerifiableIndex) buildMap(ctx context.Context) error { return nil } -type writeAheadLog struct { +// walWriter provides the methods needed by the processor of the Input Log when interacting +// with the WAL. init provides the index that this processor should start from, and append +// allows new mapped entries to be added to the WAL. +type walWriter struct { walPath string f *os.File } @@ -321,7 +326,7 @@ type writeAheadLog struct { // // Note that it returns the next expected index to avoid awkwardness with the meaning of 0, // which could mean 0 was successfully read from a previous run, or that there was no log. -func (l *writeAheadLog) init() (uint64, error) { +func (l *walWriter) init() (uint64, error) { ffs := os.O_WRONLY | os.O_APPEND idx, err := l.validate() @@ -342,14 +347,14 @@ func (l *writeAheadLog) init() (uint64, error) { return idx, err } -func (l *writeAheadLog) close() error { +func (l *walWriter) close() error { return l.f.Close() } // validate reads the file and determines what the last mapped log index was, and returns it. // The assumption is that all lines ending with a newline were written correctly. // If there are any errors in the file then this throws an error. -func (l *writeAheadLog) validate() (uint64, error) { +func (l *walWriter) validate() (uint64, error) { f, err := os.Open(l.walPath) if err != nil { return 0, err @@ -417,7 +422,7 @@ func (l *writeAheadLog) validate() (uint64, error) { return idx, err } -func (l *writeAheadLog) append(idx uint64, hashes [][32]byte) error { +func (l *walWriter) append(idx uint64, hashes [][32]byte) error { e, err := marshalWalEntry(idx, hashes) if err != nil { return fmt.Errorf("failed to marshal entry: %v", err) @@ -426,18 +431,18 @@ func (l *writeAheadLog) append(idx uint64, hashes [][32]byte) error { return err } -func newLogReader(path string) (*logReader, error) { +func newWalReader(path string) (*walReader, error) { f, err := os.Open(path) if err != nil { return nil, err } - return &logReader{ + return &walReader{ f: f, r: bufio.NewReader(f), }, nil } -type logReader struct { +type walReader struct { f *os.File r *bufio.Reader partial string @@ -447,7 +452,7 @@ type logReader struct { // TODO(mhutchinson): change this as it's inconvenient with EOF handling, // which should be common when reader hits the end of the file but more is // to be written. -func (r *logReader) next() (uint64, [][32]byte, error) { +func (r *walReader) next() (uint64, [][32]byte, error) { line, err := r.r.ReadString('\n') if err != nil { if err == io.EOF { @@ -462,7 +467,7 @@ func (r *logReader) next() (uint64, [][32]byte, error) { return unmarshalWalEntry(line) } -func (r *logReader) close() error { +func (r *walReader) close() error { return r.f.Close() } diff --git a/vindex/map_test.go b/vindex/map_test.go index 49e92d8..f31330e 100644 --- a/vindex/map_test.go +++ b/vindex/map_test.go @@ -17,17 +17,31 @@ package vindex import ( + "bytes" + "context" "crypto/sha256" "encoding/hex" "fmt" "io" + "iter" "os" "testing" "time" + "github.com/google/go-cmp/cmp" + "github.com/transparency-dev/formats/log" + fnote "github.com/transparency-dev/formats/note" + "github.com/transparency-dev/merkle/rfc6962" + "github.com/transparency-dev/merkle/testonly" + "golang.org/x/mod/sumdb/note" "golang.org/x/sync/errgroup" ) +const ( + skey = "PRIVATE+KEY+logandmap+38581672+AXJ0FKWOcO2ch6WC8kP705Ed3Gxu7pVtZLhfHAQwp+FE" + vkey = "logandmap+38581672+Ab/PCr1eCclRPRMBqw/r5An1xO71MCnImLiospEq6b4l" +) + func TestWriteAheadLog_init(t *testing.T) { testCases := []struct { desc string @@ -81,7 +95,7 @@ func TestWriteAheadLog_init(t *testing.T) { if err := f.Close(); err != nil { t.Fatal(err) } - wal := &writeAheadLog{ + wal := &walWriter{ walPath: f.Name(), } idx, err := wal.init() @@ -113,7 +127,7 @@ func TestWriteAheadLog_roundtrip(t *testing.T) { t.Fatal(err) } - wal := &writeAheadLog{ + wal := &walWriter{ walPath: f.Name(), } idx, err := wal.init() @@ -160,7 +174,7 @@ func TestWriteAndWriteLog(t *testing.T) { t.Fatal(err) } - wal := &writeAheadLog{ + wal := &walWriter{ walPath: f.Name(), } idx, err := wal.init() @@ -171,7 +185,7 @@ func TestWriteAndWriteLog(t *testing.T) { t.Fatalf("expected index %d, got %d", want, got) } - reader, err := newLogReader(f.Name()) + reader, err := newWalReader(f.Name()) if err != nil { t.Fatal(err) } @@ -263,6 +277,108 @@ func TestUnmarshal(t *testing.T) { } } +func TestVerifiableIndex(t *testing.T) { + ctx := context.Background() + s, v, err := fnote.NewEd25519SignerVerifier(skey) + if err != nil { + t.Fatal(err) + } + inputLog := &inMemoryTreeSource{ + t: testonly.New(rfc6962.DefaultHasher), + leaves: make([][]byte, 0), + s: s, + } + for _, str := range []string{"foo: 2", "bar: 5", "bar: 10", "foo: 8"} { + inputLog.Append(str) + } + + inputLogCpParseFn := func(cpRaw []byte) (*log.Checkpoint, error) { + cp, _, _, err := log.ParseCheckpoint(cpRaw, v.Name(), v) + return cp, err + } + mapFn := func(leaf []byte) [][32]byte { + key, _, found := bytes.Cut(leaf, []byte(":")) + if !found { + panic("colon not found") + } + return [][32]byte{sha256.Sum256(key)} + } + f, err := os.CreateTemp("", "testWal") + if err != nil { + t.Fatal(err) + } + if err := f.Close(); err != nil { + t.Fatal(err) + } + vi, err := NewVerifiableIndex(ctx, inputLog, inputLogCpParseFn, mapFn, f.Name()) + if err != nil { + t.Fatal(err) + } + + if err := vi.Update(ctx); err != nil { + t.Fatal(err) + } + + idxes, size := vi.Lookup(sha256.Sum256([]byte("foo"))) + if size != 4 { + t.Errorf("expected size 4 but got %d", size) + } + if want := []uint64{0, 3}; !cmp.Equal(idxes, want) { + t.Errorf("expected %v but got %v", want, idxes) + } + + idxes, size = vi.Lookup(sha256.Sum256([]byte("bar"))) + if size != 4 { + t.Errorf("expected size 4 but got %d", size) + } + if want := []uint64{1, 2}; !cmp.Equal(idxes, want) { + t.Errorf("expected %v but got %v", want, idxes) + } + + idxes, size = vi.Lookup(sha256.Sum256([]byte("banana"))) + if size != 4 { + t.Errorf("expected size 4 but got %d", size) + } + if idxes != nil { + t.Errorf("expected no results but got %+v", idxes) + } +} + +type inMemoryTreeSource struct { + t *testonly.Tree + leaves [][]byte + s note.Signer +} + +func (s *inMemoryTreeSource) Checkpoint(ctx context.Context) (checkpoint []byte, err error) { + rootHash := s.t.Hash() + size := uint64(len(s.leaves)) + + cp := log.Checkpoint{ + Origin: s.s.Name(), + Size: size, + Hash: rootHash, + } + n := ¬e.Note{Text: string(cp.Marshal())} + return note.Sign(n, s.s) +} + +func (s *inMemoryTreeSource) Leaves(ctx context.Context, start, end uint64) iter.Seq2[[]byte, error] { + return func(yield func([]byte, error) bool) { + for _, entry := range s.leaves { + if !yield(entry, nil) { + return + } + } + } +} + +func (s *inMemoryTreeSource) Append(leafStr string) { + leaf := []byte(leafStr) + s.leaves = append(s.leaves, leaf) + s.t.Append(leaf) +} + func mustHashEncode(data string) string { h := sha256.Sum256([]byte(data)) return hex.EncodeToString(h[:]) From 6eca03fc2d152aeacd4bfd1336eb71f0629e77e1 Mon Sep 17 00:00:00 2001 From: Martin Hutchinson Date: Thu, 24 Jul 2025 10:39:16 +0000 Subject: [PATCH 2/2] Move state out of the Verifiable Index This moves things to be shaped more like a pipleine; the mapper is responsible for reading from the log, verifying the contents, and passing state to build a new map via clear mechanisms. At the moment, this is just the WAL, but the raw log checkpoint will also need to be passed through so that it can make it to the output log. As part of this, a compact range of the currently verified state will also need to be persisted so that we can load that to incrementally verify progress after a restart. --- vindex/map.go | 187 ++++++++++++++++++++++++++++---------------------- 1 file changed, 104 insertions(+), 83 deletions(-) diff --git a/vindex/map.go b/vindex/map.go index d268772..1b2b39a 100644 --- a/vindex/map.go +++ b/vindex/map.go @@ -87,104 +87,60 @@ func NewVerifiableIndex(ctx context.Context, inputLog InputLog, inputLogParseFn if err := mpt.InitStorage(sha256.Sum256, vtreeStorage); err != nil { return nil, fmt.Errorf("InitStorage: %s", err) } - b := &VerifiableIndex{ + mapper := &inputLogMapper{ inputLog: inputLog, inputLogParseFn: inputLogParseFn, mapFn: mapFn, walWriter: wal, - walReader: reader, - vindex: *mpt.NewTree(sha256.Sum256, vtreeStorage), - data: map[[32]byte][]uint64{}, nextIndex: ws, } - if err := b.buildMap(ctx); err != nil { + b := &VerifiableIndex{ + mapper: mapper, + walReader: reader, + vindex: *mpt.NewTree(sha256.Sum256, vtreeStorage), + data: map[[32]byte][]uint64{}, + } + if err := b.buildMap(ctx, ws); err != nil { return nil, fmt.Errorf("failed to build map: %v", err) } return b, nil } -// VerifiableIndex manages reading from the input log, mapping leaves, updating the WAL, -// reading the WAL, and keeping the state of the in-memory index updated from the WAL. -type VerifiableIndex struct { +// inputLogMapper reads the Input Log, checking that the data matches the commitments, +// and updates the WAL and DB with the resulting information. +type inputLogMapper struct { inputLog InputLog inputLogParseFn OpenCheckpointFn mapFn MapFn walWriter *walWriter - walReader *walReader - - indexMu sync.RWMutex // covers vindex and data - vindex mpt.Tree - data map[[32]byte][]uint64 - - // servingSize is the size of the input log we are serving for. - // This a temporary workaround not having an output log, which we will eventually read to get - // the checkpoint size. - servingSize uint64 nextIndex uint64 // nextIndex is the next index in the log to consume - inputLogCp []byte // rawCp is the last checkpoint we started syncing to inputLogCpSize uint64 // cpSize is the tree size of rawCp. Used to sync on WAL. - mapSize uint64 // mapSize is the total number of leaves processed into the map } -// Close ensures that any open connections are closed before returning. -func (b *VerifiableIndex) Close() error { - return b.walWriter.close() +func (m *inputLogMapper) close() error { + return m.walWriter.close() } -// Lookup returns the values stored for the given key. -// TODO(mhutchinson): This needs to return verifiable stuff -func (b *VerifiableIndex) Lookup(key [sha256.Size]byte) (indices []uint64, size uint64) { - // Scope the lock to be as minimal as possible - lookupLocked := func(key [sha256.Size]byte) []uint64 { - b.indexMu.RLock() - defer b.indexMu.RUnlock() - return b.data[key] - } - - // TODO(mhutchinson): this should come from the latest map root in the (witnessed) output log. - // This map root, the witnessed output log checkpoint, and all proofs should also be served here. - size = b.servingSize - - allIndices := lookupLocked(key) - for i, idx := range allIndices { - if idx >= size { - // If we have indices past the current size we are serving, drop them. - // Doing this allows us to update b.data with new indices while still serving from it. - return allIndices[:i], size - } - } - return allIndices, size -} - -// Update checks the input log for a new Checkpoint, and ensures that the Verifiable Index -// is updated to the corresponding size. -func (b *VerifiableIndex) Update(ctx context.Context) error { - rawCp, err := b.inputLog.Checkpoint(ctx) +// available returns whether this is work to do. +// TODO(mhutchinson): this can probably be deleted +func (m *inputLogMapper) available(ctx context.Context) bool { + rawCp, err := m.inputLog.Checkpoint(ctx) if err != nil { - return fmt.Errorf("failed to get latest checkpoint from DB: %s", err) + klog.Warningf("Failed to get latest checkpoint from DB: %s", err) + return false } - cp, err := b.inputLogParseFn(rawCp) + cp, err := m.inputLogParseFn(rawCp) if err != nil { - return fmt.Errorf("failed to parse checkpoint: %s", err) + klog.Warningf("Failed to parse checkpoint: %s", err) + return false } - if cp.Size == b.inputLogCpSize { - klog.V(1).Infof("No update needed: checkpoint size is still %d", b.servingSize) - return nil - } - b.inputLogCpSize = cp.Size - b.inputLogCp = rawCp - klog.Infof("Building map to log size of %d", b.inputLogCpSize) - - eg, cctx := errgroup.WithContext(ctx) - eg.Go(func() error { return b.syncFromInputLog(cctx) }) - eg.Go(func() error { return b.buildMap(cctx) }) - - err = eg.Wait() - b.servingSize = b.inputLogCpSize - - return err + // TODO(mhutchinson): remove this and replace with disk persistence? + defer func() { + m.inputLogCpSize = cp.Size + }() + return cp.Size > m.inputLogCpSize } // syncFromInputLog reads the latest checkpoint from the input log, and ensures that the WAL @@ -194,12 +150,12 @@ func (b *VerifiableIndex) Update(ctx context.Context) error { // leaves correspond to the checkpoint root hash. This was reasonable while it was based on the // cloneDB, which performed this validation. Implementing this will require the index to store some // state alongside the WAL which contains a compact range of its current progress. -func (b *VerifiableIndex) syncFromInputLog(ctx context.Context) error { - if b.inputLogCpSize > b.nextIndex { +func (m *inputLogMapper) syncFromInputLog(ctx context.Context) error { + if m.inputLogCpSize > m.nextIndex { ctx, done := context.WithCancel(ctx) defer done() - for l, err := range b.inputLog.Leaves(ctx, b.nextIndex, b.inputLogCpSize) { - idx := b.nextIndex + for l, err := range m.inputLog.Leaves(ctx, m.nextIndex, m.inputLogCpSize) { + idx := m.nextIndex if err != nil { return fmt.Errorf("failed to read leaf at index %d: %v", idx, err) } @@ -214,20 +170,20 @@ func (b *VerifiableIndex) syncFromInputLog(ctx context.Context) error { mapErr = fmt.Errorf("panic detected mapping index %d: %s", idx, r) } }() - hashes = b.mapFn(l) + hashes = m.mapFn(l) }() if mapErr != nil { return mapErr } - b.nextIndex++ - if len(hashes) == 0 && idx < b.inputLogCpSize-1 { + m.nextIndex++ + if len(hashes) == 0 && idx < m.inputLogCpSize-1 { // We can skip writing out values with no hashes, as long as we're // not at the end of the log. // If we are at the end of the log, we need to write out a value as a sentinel // even if there are no hashes. continue } - if err := b.walWriter.append(idx, hashes); err != nil { + if err := m.walWriter.append(idx, hashes); err != nil { return fmt.Errorf("failed to add index to entry for leaf %d: %v", idx, err) } } @@ -235,12 +191,76 @@ func (b *VerifiableIndex) syncFromInputLog(ctx context.Context) error { return nil } +// VerifiableIndex manages reading from the input log, mapping leaves, updating the WAL, +// reading the WAL, and keeping the state of the in-memory index updated from the WAL. +type VerifiableIndex struct { + mapper *inputLogMapper + walReader *walReader + + indexMu sync.RWMutex // covers vindex and data + vindex mpt.Tree + data map[[32]byte][]uint64 + + // servingSize is the size of the input log we are serving for. + // This a temporary workaround not having an output log, which we will eventually read to get + // the checkpoint size. + servingSize uint64 +} + +// Close ensures that any open connections are closed before returning. +func (b *VerifiableIndex) Close() error { + return b.mapper.close() +} + +// Lookup returns the values stored for the given key. +// TODO(mhutchinson): This needs to return verifiable stuff +func (b *VerifiableIndex) Lookup(key [sha256.Size]byte) (indices []uint64, size uint64) { + // Scope the lock to be as minimal as possible + lookupLocked := func(key [sha256.Size]byte) []uint64 { + b.indexMu.RLock() + defer b.indexMu.RUnlock() + return b.data[key] + } + + // TODO(mhutchinson): this should come from the latest map root in the (witnessed) output log. + // This map root, the witnessed output log checkpoint, and all proofs should also be served here. + size = b.servingSize + + allIndices := lookupLocked(key) + for i, idx := range allIndices { + if idx >= size { + // If we have indices past the current size we are serving, drop them. + // Doing this allows us to update b.data with new indices while still serving from it. + return allIndices[:i], size + } + } + return allIndices, size +} + +// Update checks the input log for a new Checkpoint, and ensures that the Verifiable Index +// is updated to the corresponding size. +func (b *VerifiableIndex) Update(ctx context.Context) error { + if !b.mapper.available(ctx) { + return nil + } + + newSize := b.mapper.inputLogCpSize + eg, cctx := errgroup.WithContext(ctx) + eg.Go(func() error { return b.mapper.syncFromInputLog(cctx) }) + eg.Go(func() error { return b.buildMap(cctx, newSize) }) + + err := eg.Wait() + + return err +} + // buildMap reads from the WAL until the file has been consumed and the map has been -// built up the WAL size. -func (b *VerifiableIndex) buildMap(ctx context.Context) error { +// built up the provided size. +func (b *VerifiableIndex) buildMap(ctx context.Context, toSize uint64) error { startWal := time.Now() updatedKeys := make(map[[32]byte]bool) // Allows us to efficiently update vindex after first init - for b.mapSize < b.inputLogCpSize { + + for i := b.servingSize; i < toSize; { select { case <-ctx.Done(): return ctx.Err() @@ -262,7 +282,7 @@ func (b *VerifiableIndex) buildMap(ctx context.Context) error { func() { b.indexMu.Lock() defer b.indexMu.Unlock() - b.mapSize = idx + 1 + i = idx + 1 for _, h := range hashes { klog.V(2).Infof("Read from WAL: index %d: %x", idx, h) // Add the data to the key/value map @@ -309,6 +329,7 @@ func (b *VerifiableIndex) buildMap(ctx context.Context) error { durationVIndex := time.Since(startVIndex) durationTotal := time.Since(startWal) + b.servingSize = toSize klog.Infof("buildMap: total=%s (wal=%s, vindex=%s)", durationTotal, durationWal, durationVIndex) return nil }