diff --git a/go.mod b/go.mod index 9bccebe..0bcdf9b 100644 --- a/go.mod +++ b/go.mod @@ -4,8 +4,10 @@ go 1.24.1 require ( filippo.io/torchwood v0.5.1-0.20250605130057-fa65d721a6ce + github.com/google/go-cmp v0.7.0 github.com/gorilla/mux v1.8.1 - github.com/transparency-dev/formats v0.0.0-20250616090723-6ce2fd29df16 + github.com/transparency-dev/formats v0.0.0-20250723101439-be3b1008ec3a + github.com/transparency-dev/merkle v0.0.2 github.com/transparency-dev/tessera v0.2.1-0.20250722085756-7303218c6614 golang.org/x/mod v0.26.0 golang.org/x/sync v0.16.0 @@ -17,7 +19,6 @@ require ( github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect - github.com/transparency-dev/merkle v0.0.2 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/otel v1.37.0 // indirect go.opentelemetry.io/otel/metric v1.37.0 // indirect diff --git a/go.sum b/go.sum index e8c69c2..7d91588 100644 --- a/go.sum +++ b/go.sum @@ -21,8 +21,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/transparency-dev/formats v0.0.0-20250616090723-6ce2fd29df16 h1:4yn2lO94tSuoT8fPm4NJZL5cRMHKOpGwmH/KM9f/ULI= -github.com/transparency-dev/formats v0.0.0-20250616090723-6ce2fd29df16/go.mod h1:v+kgcd91U14WBv5EshoX1nooq4SIZTZzYpQDBq6N55U= +github.com/transparency-dev/formats v0.0.0-20250723101439-be3b1008ec3a h1:l1RrmDw9xrVN/lbW/rzPJhjQ+dmsqOyItES9Ku/njFA= +github.com/transparency-dev/formats v0.0.0-20250723101439-be3b1008ec3a/go.mod h1:A4VaaPFBMEuwtuihpGY8wUOqRBy5plQww4NqfjK5E7c= github.com/transparency-dev/merkle v0.0.2 h1:Q9nBoQcZcgPamMkGn7ghV8XiTZ/kRxn1yCG81+twTK4= github.com/transparency-dev/merkle v0.0.2/go.mod h1:pqSy+OXefQ1EDUVmAJ8MUhHB9TXGuzVAT58PqBoHz1A= github.com/transparency-dev/tessera v0.2.1-0.20250722085756-7303218c6614 h1:EfxzWae/zdnfVA44jFIKohvd2JWlcanRzRYb1A/uiDU= diff --git a/vindex/README.md b/vindex/README.md index cafa73d..7a06989 100644 --- a/vindex/README.md +++ b/vindex/README.md @@ -195,7 +195,7 @@ LOG_PRIVATE_KEY=PRIVATE+KEY+logandmap+38581672+AXJ0FKWOcO2ch6WC8kP705Ed3Gxu7pVtZ ``` Running the above will run a web server hosting the following URLs: - - `/inputlog/` - the [tlog-tiles][] + - `/inputlog/` - the [tlog-tiles][] base URL for the input log - `/vindex/lookup` - the provisional [vindex lookup API](./api/api.go) - `/outputlog/` - TODO(mhutchinson): this is where the output log will be hosted diff --git a/vindex/cmd/logandmap/main.go b/vindex/cmd/logandmap/main.go index 012144c..640bb13 100644 --- a/vindex/cmd/logandmap/main.go +++ b/vindex/cmd/logandmap/main.go @@ -23,9 +23,7 @@ package main import ( "context" - "crypto/ed25519" "crypto/sha256" - "encoding/base64" "encoding/json" "errors" "flag" @@ -35,12 +33,13 @@ import ( "net/http" "os" "os/signal" - "strings" + "path" "syscall" "time" "github.com/gorilla/mux" "github.com/transparency-dev/formats/log" + fnote "github.com/transparency-dev/formats/note" "github.com/transparency-dev/incubator/vindex" "github.com/transparency-dev/tessera" "github.com/transparency-dev/tessera/api" @@ -51,10 +50,9 @@ import ( ) var ( - privKeyFile = flag.String("private_key", "", "Location of private key file. If unset, uses the contents of the LOG_PRIVATE_KEY environment variable.") - inputLogDir = flag.String("input_log_dir", "", "Root directory in which to store the log for the POSIX-based Input Log") - walPath = flag.String("walPath", "", "Path to use for the Write Ahead Log. If empty, a temporary file will be used.") - listen = flag.String("listen", ":8088", "Address to set up HTTP server listening on") + inputLogPrivKeyFile = flag.String("input_log_private_key", "", "Location of private key file. If unset, uses the contents of the INPUT_LOG_PRIVATE_KEY environment variable.") + storageDir = flag.String("storage_dir", "", "Root directory in which to store the data for the demo. This will create subdirectories for the Input Log, Output Log, and allocate space to store the verifiable map persistence.") + listen = flag.String("listen", ":8088", "Address to set up HTTP server listening on") ) func main() { @@ -76,53 +74,58 @@ type LogEntry struct { } func run(ctx context.Context) error { - if *inputLogDir == "" { - return errors.New("input_log_dir must be set") + if *storageDir == "" { + return errors.New("storage_dir must be set") + } + inputLogDir := path.Join(*storageDir, "inputlog") + walPath := path.Join(*storageDir, "index.wal") + + if err := os.MkdirAll(inputLogDir, 0o755); err != nil { + return fmt.Errorf("failed to create input log directory: %v", err) } // Gather the info needed for reading/writing checkpoints - s, v := getSignerVerifierOrDie() + ils, ilv := getInputLogSignerVerifierOrDie() // Set up a Tessera POSIX log - driver, err := posix.New(ctx, posix.Config{Path: *inputLogDir}) + ild, err := posix.New(ctx, posix.Config{Path: inputLogDir}) if err != nil { return fmt.Errorf("failed to create new log: %v", err) } - // Get a Tessera appender - appender, shutdown, reader, err := tessera.NewAppender(ctx, driver, tessera.NewAppendOptions(). - WithCheckpointSigner(s). + inputAppender, inputShutdown, inputReader, err := tessera.NewAppender(ctx, ild, tessera.NewAppendOptions(). + WithCheckpointSigner(ils). WithCheckpointInterval(10*time.Second). WithBatching(256, time.Second)) if err != nil { return fmt.Errorf("failed to get appender: %v", err) } defer func() { - _ = shutdown(ctx) + _ = inputShutdown(ctx) }() // Create the verifiable index connected to the LogReader. inputLog := logReaderSource{ - r: reader, + r: inputReader, } - logCpParseFn := func(cpRaw []byte) (*log.Checkpoint, error) { + inputLogCpParseFn := func(cpRaw []byte) (*log.Checkpoint, error) { // No witnesses required yet - cp, _, _, err := log.ParseCheckpoint(cpRaw, v.Name(), v) + cp, _, _, err := log.ParseCheckpoint(cpRaw, ilv.Name(), ilv) return cp, err } - vi, err := vindex.NewVerifiableIndex(ctx, inputLog, logCpParseFn, mapFnFromFlags(), walPathFromFlags()) + vi, err := vindex.NewVerifiableIndex(ctx, inputLog, inputLogCpParseFn, mapFnFromFlags(), walPath) if err != nil { return fmt.Errorf("failed to create vindex: %v", err) } // Submits new entries to the log in the background. - go submitEntries(ctx, appender) + go submitEntries(ctx, inputAppender) // Keeps the map synced with the latest published log state. go maintainMap(ctx, vi) // Run a web server to handle queries over the verifiable index. - go runWebServer(vi) + go runWebServer(vi, inputLogDir) <-ctx.Done() return nil } @@ -132,11 +135,11 @@ type logReaderSource struct { r tessera.LogReader } -func (s logReaderSource) GetCheckpoint(ctx context.Context) (checkpoint []byte, err error) { +func (s logReaderSource) Checkpoint(ctx context.Context) (checkpoint []byte, err error) { return s.r.ReadCheckpoint(ctx) } -func (s logReaderSource) StreamLeaves(ctx context.Context, start, end uint64) iter.Seq2[[]byte, error] { +func (s logReaderSource) Leaves(ctx context.Context, start, end uint64) iter.Seq2[[]byte, error] { bi := client.EntryBundles(ctx, 2, s.r.IntegratedSize, s.r.ReadEntryBundle, start, end-start) unbundleFn := func(bundle []byte) ([][]byte, error) { eb := &api.EntryBundle{} @@ -168,13 +171,13 @@ func maintainMap(ctx context.Context, vi *vindex.VerifiableIndex) { defer ticker.Stop() for { + if err := vi.Update(ctx); err != nil { + klog.Warning(err) + } select { case <-ctx.Done(): return case <-ticker.C: - if err := vi.Update(ctx); err != nil { - klog.Warning(err) - } } } } @@ -217,7 +220,7 @@ func submitEntries(ctx context.Context, appender *tessera.Appender) { } } -func runWebServer(vi *vindex.VerifiableIndex) { +func runWebServer(vi *vindex.VerifiableIndex, ild string) { web := NewServer(func(h [sha256.Size]byte) ([]uint64, error) { idxes, size := vi.Lookup(h) if size == 0 { @@ -226,7 +229,7 @@ func runWebServer(vi *vindex.VerifiableIndex) { return idxes, nil }) - ilfs := http.FileServer(http.Dir(*inputLogDir)) + ilfs := http.FileServer(http.Dir(ild)) r := mux.NewRouter() r.PathPrefix("/inputlog/").Handler(http.StripPrefix("/inputlog/", ilfs)) web.registerHandlers(r) @@ -240,69 +243,29 @@ func runWebServer(vi *vindex.VerifiableIndex) { klog.Infof("Started HTTP server listening on %s", *listen) } -// Read log private key from file or environment variable and generate the +// Read input log private key from file or environment variable and generate the // note Signer and Verifier pair for it. -func getSignerVerifierOrDie() (note.Signer, note.Verifier) { +func getInputLogSignerVerifierOrDie() (note.Signer, note.Verifier) { var privKey string var err error - if len(*privKeyFile) > 0 { - privKey, err = getKeyFile(*privKeyFile) + if len(*inputLogPrivKeyFile) > 0 { + privKey, err = getKeyFile(*inputLogPrivKeyFile) if err != nil { klog.Exitf("Unable to get private key: %v", err) } } else { - privKey = os.Getenv("LOG_PRIVATE_KEY") + privKey = os.Getenv("INPUT_LOG_PRIVATE_KEY") if len(privKey) == 0 { - klog.Exit("Supply private key file path using --private_key or set LOG_PRIVATE_KEY environment variable") + klog.Exit("Supply private key file path using --input_log_private_key or set INPUT_LOG_PRIVATE_KEY environment variable") } } - s, v, err := signerVerifierFromSkey(privKey) + s, v, err := fnote.NewEd25519SignerVerifier(privKey) if err != nil { klog.Exitf("Failed to get signer/verifier: %v", err) } return s, v } -// TODO(mhutchinson): move this into t-dev/formats. -func signerVerifierFromSkey(skey string) (note.Signer, note.Verifier, error) { - const algEd25519 = 1 - s, err := note.NewSigner(skey) - if err != nil { - return nil, nil, err - } - _, skey, _ = strings.Cut(skey, "+") - _, skey, _ = strings.Cut(skey, "+") - _, skey, _ = strings.Cut(skey, "+") - _, key64, _ := strings.Cut(skey, "+") - key, err := base64.StdEncoding.DecodeString(key64) - if err != nil { - return nil, nil, fmt.Errorf("failed to decode base64: %v", err) - } - - alg, key := key[0], key[1:] - switch alg { - default: - return nil, nil, errors.New("unsupported algorithm") - - case algEd25519: - if len(key) != ed25519.SeedSize { - return nil, nil, fmt.Errorf("expected key seed of size %d but got %d", ed25519.SeedSize, len(key)) - } - key := ed25519.NewKeyFromSeed(key) - publicKey := key.Public().(ed25519.PublicKey) - vkey, err := note.NewEd25519VerifierKey(s.Name(), publicKey) - if err != nil { - return nil, nil, fmt.Errorf("failed to generate verifier from key: %v", err) - - } - v, err := note.NewVerifier(vkey) - if err != nil { - return nil, nil, fmt.Errorf("failed to create verifier from vkey: %v", err) - } - return s, v, err - } -} - func getKeyFile(path string) (string, error) { k, err := os.ReadFile(path) if err != nil { @@ -326,15 +289,3 @@ func mapFnFromFlags() vindex.MapFn { } return mapFn } - -func walPathFromFlags() string { - if len(*walPath) > 0 { - return *walPath - } - f, err := os.CreateTemp("", "walPath") - if err != nil { - klog.Exitf("Failed to create temporary path for WAL: %s", err) - } - klog.Infof("Created temporary WAL at %s", f.Name()) - return f.Name() -} diff --git a/vindex/map.go b/vindex/map.go index bac7cdc..1b2b39a 100644 --- a/vindex/map.go +++ b/vindex/map.go @@ -53,12 +53,11 @@ type MapFn func([]byte) [][32]byte // InputLog represents a connection to the input log from which map data will be built. // This can be a local or remote data source. type InputLog interface { - // GetCheckpoint returns the latest checkpoint committing to the input log state. - GetCheckpoint(ctx context.Context) (checkpoint []byte, err error) - // StreamLeaves returns all the leaves in the range [start, end), outputting them via - // the out channel. - // TODO(mhutchinson): This out channel would be better as a returned iterator. - StreamLeaves(ctx context.Context, start, end uint64) iter.Seq2[[]byte, error] + // Checkpoint returns the latest checkpoint committing to the input log state. + Checkpoint(ctx context.Context) (checkpoint []byte, err error) + // Leaves returns all the leaves in the range [start, end), outputting them via + // the returned iterator. + Leaves(ctx context.Context, start, end uint64) iter.Seq2[[]byte, error] } // OpenCheckpointFn is a function that parses a checkpoint, validating it, and returns a parsed @@ -72,15 +71,15 @@ type OpenCheckpointFn func(cpRaw []byte) (*log.Checkpoint, error) // path. // Note that only one IndexBuilder should exist for any given walPath at any time. The behaviour is unspecified, // but likely broken, if multiple processes are writing to the same file at any given time. -func NewVerifiableIndex(ctx context.Context, inputLog InputLog, logParseFn OpenCheckpointFn, mapFn MapFn, walPath string) (*VerifiableIndex, error) { - wal := &writeAheadLog{ +func NewVerifiableIndex(ctx context.Context, inputLog InputLog, inputLogParseFn OpenCheckpointFn, mapFn MapFn, walPath string) (*VerifiableIndex, error) { + wal := &walWriter{ walPath: walPath, } ws, err := wal.init() if err != nil { return nil, err } - reader, err := newLogReader(walPath) + reader, err := newWalReader(walPath) if err != nil { return nil, err } @@ -88,37 +87,120 @@ func NewVerifiableIndex(ctx context.Context, inputLog InputLog, logParseFn OpenC if err := mpt.InitStorage(sha256.Sum256, vtreeStorage); err != nil { return nil, fmt.Errorf("InitStorage: %s", err) } + mapper := &inputLogMapper{ + inputLog: inputLog, + inputLogParseFn: inputLogParseFn, + mapFn: mapFn, + walWriter: wal, + nextIndex: ws, + } b := &VerifiableIndex{ - inputLog: inputLog, - logParseFn: logParseFn, - mapFn: mapFn, - wal: wal, - reader: reader, - vindex: *mpt.NewTree(sha256.Sum256, vtreeStorage), - data: map[[32]byte][]uint64{}, - nextIndex: ws, + mapper: mapper, + walReader: reader, + vindex: *mpt.NewTree(sha256.Sum256, vtreeStorage), + data: map[[32]byte][]uint64{}, + } + if err := b.buildMap(ctx, ws); err != nil { + return nil, fmt.Errorf("failed to build map: %v", err) } return b, nil } +// inputLogMapper reads the Input Log, checking that the data matches the commitments, +// and updates the WAL and DB with the resulting information. +type inputLogMapper struct { + inputLog InputLog + inputLogParseFn OpenCheckpointFn + mapFn MapFn + walWriter *walWriter + + nextIndex uint64 // nextIndex is the next index in the log to consume + inputLogCpSize uint64 // cpSize is the tree size of rawCp. Used to sync on WAL. +} + +func (m *inputLogMapper) close() error { + return m.walWriter.close() +} + +// available returns whether this is work to do. +// TODO(mhutchinson): this can probably be deleted +func (m *inputLogMapper) available(ctx context.Context) bool { + rawCp, err := m.inputLog.Checkpoint(ctx) + if err != nil { + klog.Warningf("Failed to get latest checkpoint from DB: %s", err) + return false + } + cp, err := m.inputLogParseFn(rawCp) + if err != nil { + klog.Warningf("Failed to parse checkpoint: %s", err) + return false + } + + // TODO(mhutchinson): remove this and replace with disk persistence? + defer func() { + m.inputLogCpSize = cp.Size + }() + return cp.Size > m.inputLogCpSize +} + +// syncFromInputLog reads the latest checkpoint from the input log, and ensures that the WAL +// contains a corresponding entry for every index committed to by that checkpoint. +// +// TODO(mhutchinson): this doesn't perform any validation on the input log to check the +// leaves correspond to the checkpoint root hash. This was reasonable while it was based on the +// cloneDB, which performed this validation. Implementing this will require the index to store some +// state alongside the WAL which contains a compact range of its current progress. +func (m *inputLogMapper) syncFromInputLog(ctx context.Context) error { + if m.inputLogCpSize > m.nextIndex { + ctx, done := context.WithCancel(ctx) + defer done() + for l, err := range m.inputLog.Leaves(ctx, m.nextIndex, m.inputLogCpSize) { + idx := m.nextIndex + if err != nil { + return fmt.Errorf("failed to read leaf at index %d: %v", idx, err) + } + + // Apply the MapFn in as safe a way as possible. This involves trapping any panics + // and failing gracefully. + var hashes [][32]byte + var mapErr error + func() { + defer func() { + if r := recover(); r != nil { + mapErr = fmt.Errorf("panic detected mapping index %d: %s", idx, r) + } + }() + hashes = m.mapFn(l) + }() + if mapErr != nil { + return mapErr + } + m.nextIndex++ + if len(hashes) == 0 && idx < m.inputLogCpSize-1 { + // We can skip writing out values with no hashes, as long as we're + // not at the end of the log. + // If we are at the end of the log, we need to write out a value as a sentinel + // even if there are no hashes. + continue + } + if err := m.walWriter.append(idx, hashes); err != nil { + return fmt.Errorf("failed to add index to entry for leaf %d: %v", idx, err) + } + } + } + return nil +} + // VerifiableIndex manages reading from the input log, mapping leaves, updating the WAL, // reading the WAL, and keeping the state of the in-memory index updated from the WAL. type VerifiableIndex struct { - inputLog InputLog - logParseFn OpenCheckpointFn - mapFn MapFn - wal *writeAheadLog - reader *logReader + mapper *inputLogMapper + walReader *walReader indexMu sync.RWMutex // covers vindex and data vindex mpt.Tree data map[[32]byte][]uint64 - nextIndex uint64 // nextIndex is the next index in the log to consume - rawCp []byte // rawCp is the last checkpoint we started syncing to - cpSize uint64 // cpSize is the tree size of rawCp - mapSize uint64 // mapSize is the total number of leaves processed into the map - // servingSize is the size of the input log we are serving for. // This a temporary workaround not having an output log, which we will eventually read to get // the checkpoint size. @@ -127,7 +209,7 @@ type VerifiableIndex struct { // Close ensures that any open connections are closed before returning. func (b *VerifiableIndex) Close() error { - return b.wal.close() + return b.mapper.close() } // Lookup returns the values stored for the given key. @@ -158,93 +240,33 @@ func (b *VerifiableIndex) Lookup(key [sha256.Size]byte) (indices []uint64, size // Update checks the input log for a new Checkpoint, and ensures that the Verifiable Index // is updated to the corresponding size. func (b *VerifiableIndex) Update(ctx context.Context) error { - rawCp, err := b.inputLog.GetCheckpoint(ctx) - if err != nil { - return fmt.Errorf("failed to get latest checkpoint from DB: %s", err) - } - cp, err := b.logParseFn(rawCp) - if err != nil { - return fmt.Errorf("failed to parse checkpoint: %s", err) - } - - if cp.Size == b.cpSize { - klog.V(1).Infof("No update needed: checkpoint size is still %d", b.servingSize) + if !b.mapper.available(ctx) { return nil } - b.cpSize = cp.Size - b.rawCp = rawCp - klog.Infof("Building map to log size of %d", b.cpSize) + newSize := b.mapper.inputLogCpSize eg, cctx := errgroup.WithContext(ctx) - eg.Go(func() error { return b.syncFromInputLog(cctx) }) - eg.Go(func() error { return b.buildMap(cctx) }) + eg.Go(func() error { return b.mapper.syncFromInputLog(cctx) }) + eg.Go(func() error { return b.buildMap(cctx, newSize) }) - err = eg.Wait() - b.servingSize = b.cpSize + err := eg.Wait() return err } -// syncFromInputLog reads the latest checkpoint from the input log, and ensures that the WAL -// contains a corresponding entry for every index committed to by that checkpoint. -// -// TODO(mhutchinson): this doesn't perform any validation on the input log to check the -// leaves correspond to the checkpoint root hash. This was reasonable while it was based on the -// cloneDB, which performed this validation. Implementing this will require the index to store some -// state alongside the WAL which contains a compact range of its current progress. -func (b *VerifiableIndex) syncFromInputLog(ctx context.Context) error { - if b.cpSize > b.nextIndex { - ctx, done := context.WithCancel(ctx) - defer done() - for l, err := range b.inputLog.StreamLeaves(ctx, b.nextIndex, b.cpSize) { - idx := b.nextIndex - if err != nil { - return fmt.Errorf("failed to read leaf at index %d: %v", idx, err) - } - - // Apply the MapFn in as safe a way as possible. This involves trapping any panics - // and failing gracefully. - var hashes [][32]byte - var mapErr error - func() { - defer func() { - if r := recover(); r != nil { - mapErr = fmt.Errorf("panic detected mapping index %d: %s", idx, r) - } - }() - hashes = b.mapFn(l) - }() - if mapErr != nil { - return mapErr - } - b.nextIndex++ - if len(hashes) == 0 && idx < b.cpSize-1 { - // We can skip writing out values with no hashes, as long as we're - // not at the end of the log. - // If we are at the end of the log, we need to write out a value as a sentinel - // even if there are no hashes. - continue - } - if err := b.wal.append(idx, hashes); err != nil { - return fmt.Errorf("failed to add index to entry for leaf %d: %v", idx, err) - } - } - } - return nil -} - // buildMap reads from the WAL until the file has been consumed and the map has been -// built up the WAL size. -func (b *VerifiableIndex) buildMap(ctx context.Context) error { +// built up the provided size. +func (b *VerifiableIndex) buildMap(ctx context.Context, toSize uint64) error { startWal := time.Now() updatedKeys := make(map[[32]byte]bool) // Allows us to efficiently update vindex after first init - for b.mapSize < b.cpSize { + + for i := b.servingSize; i < toSize; { select { case <-ctx.Done(): return ctx.Err() default: } - idx, hashes, err := b.reader.next() + idx, hashes, err := b.walReader.next() if err != nil { if err != io.EOF { return err @@ -260,7 +282,7 @@ func (b *VerifiableIndex) buildMap(ctx context.Context) error { func() { b.indexMu.Lock() defer b.indexMu.Unlock() - b.mapSize = idx + 1 + i = idx + 1 for _, h := range hashes { klog.V(2).Infof("Read from WAL: index %d: %x", idx, h) // Add the data to the key/value map @@ -307,11 +329,15 @@ func (b *VerifiableIndex) buildMap(ctx context.Context) error { durationVIndex := time.Since(startVIndex) durationTotal := time.Since(startWal) + b.servingSize = toSize klog.Infof("buildMap: total=%s (wal=%s, vindex=%s)", durationTotal, durationWal, durationVIndex) return nil } -type writeAheadLog struct { +// walWriter provides the methods needed by the processor of the Input Log when interacting +// with the WAL. init provides the index that this processor should start from, and append +// allows new mapped entries to be added to the WAL. +type walWriter struct { walPath string f *os.File } @@ -321,7 +347,7 @@ type writeAheadLog struct { // // Note that it returns the next expected index to avoid awkwardness with the meaning of 0, // which could mean 0 was successfully read from a previous run, or that there was no log. -func (l *writeAheadLog) init() (uint64, error) { +func (l *walWriter) init() (uint64, error) { ffs := os.O_WRONLY | os.O_APPEND idx, err := l.validate() @@ -342,14 +368,14 @@ func (l *writeAheadLog) init() (uint64, error) { return idx, err } -func (l *writeAheadLog) close() error { +func (l *walWriter) close() error { return l.f.Close() } // validate reads the file and determines what the last mapped log index was, and returns it. // The assumption is that all lines ending with a newline were written correctly. // If there are any errors in the file then this throws an error. -func (l *writeAheadLog) validate() (uint64, error) { +func (l *walWriter) validate() (uint64, error) { f, err := os.Open(l.walPath) if err != nil { return 0, err @@ -417,7 +443,7 @@ func (l *writeAheadLog) validate() (uint64, error) { return idx, err } -func (l *writeAheadLog) append(idx uint64, hashes [][32]byte) error { +func (l *walWriter) append(idx uint64, hashes [][32]byte) error { e, err := marshalWalEntry(idx, hashes) if err != nil { return fmt.Errorf("failed to marshal entry: %v", err) @@ -426,18 +452,18 @@ func (l *writeAheadLog) append(idx uint64, hashes [][32]byte) error { return err } -func newLogReader(path string) (*logReader, error) { +func newWalReader(path string) (*walReader, error) { f, err := os.Open(path) if err != nil { return nil, err } - return &logReader{ + return &walReader{ f: f, r: bufio.NewReader(f), }, nil } -type logReader struct { +type walReader struct { f *os.File r *bufio.Reader partial string @@ -447,7 +473,7 @@ type logReader struct { // TODO(mhutchinson): change this as it's inconvenient with EOF handling, // which should be common when reader hits the end of the file but more is // to be written. -func (r *logReader) next() (uint64, [][32]byte, error) { +func (r *walReader) next() (uint64, [][32]byte, error) { line, err := r.r.ReadString('\n') if err != nil { if err == io.EOF { @@ -462,7 +488,7 @@ func (r *logReader) next() (uint64, [][32]byte, error) { return unmarshalWalEntry(line) } -func (r *logReader) close() error { +func (r *walReader) close() error { return r.f.Close() } diff --git a/vindex/map_test.go b/vindex/map_test.go index 49e92d8..f31330e 100644 --- a/vindex/map_test.go +++ b/vindex/map_test.go @@ -17,17 +17,31 @@ package vindex import ( + "bytes" + "context" "crypto/sha256" "encoding/hex" "fmt" "io" + "iter" "os" "testing" "time" + "github.com/google/go-cmp/cmp" + "github.com/transparency-dev/formats/log" + fnote "github.com/transparency-dev/formats/note" + "github.com/transparency-dev/merkle/rfc6962" + "github.com/transparency-dev/merkle/testonly" + "golang.org/x/mod/sumdb/note" "golang.org/x/sync/errgroup" ) +const ( + skey = "PRIVATE+KEY+logandmap+38581672+AXJ0FKWOcO2ch6WC8kP705Ed3Gxu7pVtZLhfHAQwp+FE" + vkey = "logandmap+38581672+Ab/PCr1eCclRPRMBqw/r5An1xO71MCnImLiospEq6b4l" +) + func TestWriteAheadLog_init(t *testing.T) { testCases := []struct { desc string @@ -81,7 +95,7 @@ func TestWriteAheadLog_init(t *testing.T) { if err := f.Close(); err != nil { t.Fatal(err) } - wal := &writeAheadLog{ + wal := &walWriter{ walPath: f.Name(), } idx, err := wal.init() @@ -113,7 +127,7 @@ func TestWriteAheadLog_roundtrip(t *testing.T) { t.Fatal(err) } - wal := &writeAheadLog{ + wal := &walWriter{ walPath: f.Name(), } idx, err := wal.init() @@ -160,7 +174,7 @@ func TestWriteAndWriteLog(t *testing.T) { t.Fatal(err) } - wal := &writeAheadLog{ + wal := &walWriter{ walPath: f.Name(), } idx, err := wal.init() @@ -171,7 +185,7 @@ func TestWriteAndWriteLog(t *testing.T) { t.Fatalf("expected index %d, got %d", want, got) } - reader, err := newLogReader(f.Name()) + reader, err := newWalReader(f.Name()) if err != nil { t.Fatal(err) } @@ -263,6 +277,108 @@ func TestUnmarshal(t *testing.T) { } } +func TestVerifiableIndex(t *testing.T) { + ctx := context.Background() + s, v, err := fnote.NewEd25519SignerVerifier(skey) + if err != nil { + t.Fatal(err) + } + inputLog := &inMemoryTreeSource{ + t: testonly.New(rfc6962.DefaultHasher), + leaves: make([][]byte, 0), + s: s, + } + for _, str := range []string{"foo: 2", "bar: 5", "bar: 10", "foo: 8"} { + inputLog.Append(str) + } + + inputLogCpParseFn := func(cpRaw []byte) (*log.Checkpoint, error) { + cp, _, _, err := log.ParseCheckpoint(cpRaw, v.Name(), v) + return cp, err + } + mapFn := func(leaf []byte) [][32]byte { + key, _, found := bytes.Cut(leaf, []byte(":")) + if !found { + panic("colon not found") + } + return [][32]byte{sha256.Sum256(key)} + } + f, err := os.CreateTemp("", "testWal") + if err != nil { + t.Fatal(err) + } + if err := f.Close(); err != nil { + t.Fatal(err) + } + vi, err := NewVerifiableIndex(ctx, inputLog, inputLogCpParseFn, mapFn, f.Name()) + if err != nil { + t.Fatal(err) + } + + if err := vi.Update(ctx); err != nil { + t.Fatal(err) + } + + idxes, size := vi.Lookup(sha256.Sum256([]byte("foo"))) + if size != 4 { + t.Errorf("expected size 4 but got %d", size) + } + if want := []uint64{0, 3}; !cmp.Equal(idxes, want) { + t.Errorf("expected %v but got %v", want, idxes) + } + + idxes, size = vi.Lookup(sha256.Sum256([]byte("bar"))) + if size != 4 { + t.Errorf("expected size 4 but got %d", size) + } + if want := []uint64{1, 2}; !cmp.Equal(idxes, want) { + t.Errorf("expected %v but got %v", want, idxes) + } + + idxes, size = vi.Lookup(sha256.Sum256([]byte("banana"))) + if size != 4 { + t.Errorf("expected size 4 but got %d", size) + } + if idxes != nil { + t.Errorf("expected no results but got %+v", idxes) + } +} + +type inMemoryTreeSource struct { + t *testonly.Tree + leaves [][]byte + s note.Signer +} + +func (s *inMemoryTreeSource) Checkpoint(ctx context.Context) (checkpoint []byte, err error) { + rootHash := s.t.Hash() + size := uint64(len(s.leaves)) + + cp := log.Checkpoint{ + Origin: s.s.Name(), + Size: size, + Hash: rootHash, + } + n := ¬e.Note{Text: string(cp.Marshal())} + return note.Sign(n, s.s) +} + +func (s *inMemoryTreeSource) Leaves(ctx context.Context, start, end uint64) iter.Seq2[[]byte, error] { + return func(yield func([]byte, error) bool) { + for _, entry := range s.leaves { + if !yield(entry, nil) { + return + } + } + } +} + +func (s *inMemoryTreeSource) Append(leafStr string) { + leaf := []byte(leafStr) + s.leaves = append(s.leaves, leaf) + s.t.Append(leaf) +} + func mustHashEncode(data string) string { h := sha256.Sum256([]byte(data)) return hex.EncodeToString(h[:])