From 7af4ff862ccb7f715eeba38f1a76c460aeb72c9b Mon Sep 17 00:00:00 2001 From: Andriy Biletsky Date: Tue, 11 Jul 2023 21:31:48 +0700 Subject: [PATCH 1/4] Add WriteOut method for saving encoded streams straight to files --- stream/stream.go | 34 ++++++++++++++++++++ stream/stream_test.go | 74 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 108 insertions(+) diff --git a/stream/stream.go b/stream/stream.go index e10e288..0f7f1e4 100644 --- a/stream/stream.go +++ b/stream/stream.go @@ -3,9 +3,13 @@ package stream import ( "bytes" "crypto/sha512" + "fmt" "hash" "io" + "io/ioutil" "math" + "os" + "path" "github.com/cockroachdb/errors" ) @@ -196,6 +200,36 @@ func (e *Encoder) Stream() (Stream, error) { return s, nil } +// WriteOut splits the source into blobs and writes them to disk +func (e *Encoder) WriteOut(dstPath string) ([]string, error) { + manifest := []string{} + + for { + blob, err := e.Next() + if err != nil { + if errors.Is(err, io.EOF) { + break + } + return nil, err + } + err = ioutil.WriteFile(path.Join(dstPath, blob.HashHex()), blob, os.ModePerm) + if err != nil { + return nil, fmt.Errorf("cannot write blob: %w", err) + } + manifest = append(manifest, blob.HashHex()) + } + + sdb := e.SDBlob().ToBlob() + h := sdb.HashHex() + err := ioutil.WriteFile(path.Join(dstPath, h), sdb, os.ModePerm) + if err != nil { + return nil, fmt.Errorf("cannot write SD blob: %w", err) + } + manifest = append([]string{h}, manifest...) + + return manifest, nil +} + // SDBlob returns the sd blob so far func (e *Encoder) SDBlob() *SDBlob { e.sd.updateStreamHash() diff --git a/stream/stream_test.go b/stream/stream_test.go index acbee8f..d9f9d15 100644 --- a/stream/stream_test.go +++ b/stream/stream_test.go @@ -7,6 +7,8 @@ import ( "crypto/sha512" "encoding/hex" "io" + "io/ioutil" + "path" "testing" "github.com/cockroachdb/errors" @@ -55,6 +57,9 @@ func TestStreamToFile(t *testing.T) { enc := NewEncoderFromSD(bytes.NewBuffer(data), sdBlob) newStream, err := enc.Stream() + if err != nil { + t.Fatal(err) + } if len(newStream) != len(testdataBlobHashes) { t.Fatalf("stream length mismatch. got %d blobs, expected %d", len(newStream), len(testdataBlobHashes)) @@ -138,6 +143,75 @@ func TestMakeStream(t *testing.T) { } } +func TestWriteOut(t *testing.T) { + blobsToRead := 3 + totalBlobs := blobsToRead + 3 + + data := make([]byte, ((totalBlobs-1)*maxBlobDataSize)+1000) // last blob is partial + _, err := rand.Read(data) + if err != nil { + t.Fatal(err) + } + + buf := bytes.NewBuffer(data) + + enc := NewEncoder(buf) + + stream := make(Stream, blobsToRead+1) // +1 for sd blob + for i := 1; i < blobsToRead+1; i++ { // start at 1 to skip sd blob + stream[i], err = enc.Next() + if err != nil { + t.Fatal(err) + } + } + + sdBlob := enc.SDBlob() + + if len(sdBlob.BlobInfos) != blobsToRead { + t.Errorf("expected %d blobs in partial sdblob, got %d", blobsToRead, len(sdBlob.BlobInfos)) + } + if enc.SourceLen() != maxBlobDataSize*blobsToRead { + t.Errorf("expected length of %d , got %d", maxBlobDataSize*blobsToRead, enc.SourceLen()) + } + + // now finish the stream, reusing key and IVs + buf = bytes.NewBuffer(data) // rewind to the beginning of the data + + enc = NewEncoderFromSD(buf, sdBlob) + + outPath := t.TempDir() + writtenManifest, err := enc.WriteOut(outPath) + if err != nil { + t.Fatal(err) + } + + if len(writtenManifest) != totalBlobs+1 { // +1 for the terminating blob at the end + t.Errorf("expected %d blobs in stream, got %d", totalBlobs+1, len(writtenManifest)) + } + if enc.SourceLen() != len(data) { + t.Errorf("expected length of %d , got %d", len(data), enc.SourceLen()) + } + + sdb, err := ioutil.ReadFile(path.Join(outPath, writtenManifest[0])) + if err != nil { + t.Fatal(err) + } + osdb := enc.SDBlob().ToBlob() + + if !bytes.Equal(osdb, sdb) { + t.Errorf("written sd blob does not match original sd blob") + } + for i := 1; i < len(stream); i++ { // start at 1 to skip sd blob + b, err := ioutil.ReadFile(path.Join(outPath, writtenManifest[i])) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(stream[i], b) { + t.Errorf("blob %d of reconstructed stream does not match original stream", i) + } + } +} + func TestEmptyStream(t *testing.T) { enc := NewEncoder(bytes.NewBuffer(nil)) _, err := enc.Next() From 114dd028376bb1f4450b20cd7c61849bd091d590 Mon Sep 17 00:00:00 2001 From: Andriy Biletsky Date: Wed, 12 Jul 2023 01:10:11 +0700 Subject: [PATCH 2/4] Implement a more generic encoding method --- stream/stream.go | 17 ++++++++--------- stream/stream_test.go | 8 ++++++-- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/stream/stream.go b/stream/stream.go index 0f7f1e4..8e17f6b 100644 --- a/stream/stream.go +++ b/stream/stream.go @@ -6,10 +6,7 @@ import ( "fmt" "hash" "io" - "io/ioutil" "math" - "os" - "path" "github.com/cockroachdb/errors" ) @@ -174,6 +171,7 @@ func (e *Encoder) Next() (Blob, error) { } // Stream creates the whole stream in one call +// TODO: Can be refactored to use Encode method func (e *Encoder) Stream() (Stream, error) { s := make(Stream, 1, 1+int(math.Ceil(float64(e.srcSizeHint)/maxBlobDataSize))) // len starts at 1 and cap is +1 to leave room for sd blob @@ -200,8 +198,8 @@ func (e *Encoder) Stream() (Stream, error) { return s, nil } -// WriteOut splits the source into blobs and writes them to disk -func (e *Encoder) WriteOut(dstPath string) ([]string, error) { +// Encode splits the source into blobs and feeds them into handler function +func (e *Encoder) Encode(handler func(string, []byte) error) ([]string, error) { manifest := []string{} for { @@ -212,18 +210,19 @@ func (e *Encoder) WriteOut(dstPath string) ([]string, error) { } return nil, err } - err = ioutil.WriteFile(path.Join(dstPath, blob.HashHex()), blob, os.ModePerm) + + err = handler(blob.HashHex(), blob) if err != nil { - return nil, fmt.Errorf("cannot write blob: %w", err) + return nil, fmt.Errorf("cannot process blob: %w", err) } manifest = append(manifest, blob.HashHex()) } sdb := e.SDBlob().ToBlob() h := sdb.HashHex() - err := ioutil.WriteFile(path.Join(dstPath, h), sdb, os.ModePerm) + err := handler(h, sdb) if err != nil { - return nil, fmt.Errorf("cannot write SD blob: %w", err) + return nil, fmt.Errorf("cannot handle SD blob: %w", err) } manifest = append([]string{h}, manifest...) diff --git a/stream/stream_test.go b/stream/stream_test.go index d9f9d15..ed58e10 100644 --- a/stream/stream_test.go +++ b/stream/stream_test.go @@ -8,6 +8,7 @@ import ( "encoding/hex" "io" "io/ioutil" + "os" "path" "testing" @@ -143,7 +144,7 @@ func TestMakeStream(t *testing.T) { } } -func TestWriteOut(t *testing.T) { +func TestEncode(t *testing.T) { blobsToRead := 3 totalBlobs := blobsToRead + 3 @@ -180,7 +181,10 @@ func TestWriteOut(t *testing.T) { enc = NewEncoderFromSD(buf, sdBlob) outPath := t.TempDir() - writtenManifest, err := enc.WriteOut(outPath) + handler := func(h string, b []byte) error { + return ioutil.WriteFile(path.Join(outPath, h), b, os.ModePerm) + } + writtenManifest, err := enc.Encode(handler) if err != nil { t.Fatal(err) } From 96e10c74e118edf1f6a7753c71ba0a707df261c7 Mon Sep 17 00:00:00 2001 From: Andriy Biletsky Date: Sat, 15 Jul 2023 02:07:50 +0700 Subject: [PATCH 3/4] Fix and improve incorrect filename tests --- stream/stream.go | 6 ++++++ stream/stream_test.go | 11 ++++++++++- .../testdata/new encoder from file.whatever... | 0 3 files changed, 16 insertions(+), 1 deletion(-) rename "stream/testdata/new \"encoder\" from file.whatever" => stream/testdata/new encoder from file.whatever... (100%) diff --git a/stream/stream.go b/stream/stream.go index a5db987..9771997 100644 --- a/stream/stream.go +++ b/stream/stream.go @@ -151,6 +151,12 @@ func NewEncoderFromFile(file *os.File) *Encoder { return e } +// SetFilename saves provided source filename into stream metadata +func (e *Encoder) SetFilename(filename string) { + e.sd.StreamName = filename + e.sd.SuggestedFileName = sanitizeFilename(filename) +} + // WithIVs sets preset cryptographic material for encoding func (e *Encoder) WithIVs(key []byte, ivs [][]byte) *Encoder { e.sd.Key = key diff --git a/stream/stream_test.go b/stream/stream_test.go index d5c8c55..0411b6e 100644 --- a/stream/stream_test.go +++ b/stream/stream_test.go @@ -14,6 +14,7 @@ import ( "testing" "github.com/cockroachdb/errors" + "gotest.tools/assert" ) var testdataBlobHashes = []string{ @@ -281,7 +282,7 @@ func TestNew(t *testing.T) { } func TestNewEncoderFromFile(t *testing.T) { - f, err := os.Open(filepath.Join("testdata", `new "encoder" from file.whatever`)) + f, err := os.Open(filepath.Join("testdata", "new encoder from file.whatever...")) if err != nil { t.Error(err) return @@ -293,3 +294,11 @@ func TestNewEncoderFromFile(t *testing.T) { t.Error("wrong or missing suggested_file_name in sd blob") } } + +func TestSetFilename(t *testing.T) { + enc := NewEncoder(bytes.NewBuffer(nil)) + enc.SetFilename(`filename "sketchy" string`) + + assert.Equal(t, "filename sketchy string", enc.sd.SuggestedFileName) + assert.Equal(t, `filename "sketchy" string`, enc.sd.StreamName) +} diff --git "a/stream/testdata/new \"encoder\" from file.whatever" b/stream/testdata/new encoder from file.whatever... similarity index 100% rename from "stream/testdata/new \"encoder\" from file.whatever" rename to stream/testdata/new encoder from file.whatever... From 8d3a4b20afbad05ac1c909a69866305b834c53ec Mon Sep 17 00:00:00 2001 From: Andriy Biletsky Date: Sat, 15 Jul 2023 02:12:17 +0700 Subject: [PATCH 4/4] Remove problematic testdata file (the name does not work with go packager) --- stream/stream_test.go | 16 +++++++++------- .../testdata/new encoder from file.whatever... | 0 2 files changed, 9 insertions(+), 7 deletions(-) delete mode 100644 stream/testdata/new encoder from file.whatever... diff --git a/stream/stream_test.go b/stream/stream_test.go index 0411b6e..4ff7127 100644 --- a/stream/stream_test.go +++ b/stream/stream_test.go @@ -14,6 +14,7 @@ import ( "testing" "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" "gotest.tools/assert" ) @@ -282,13 +283,14 @@ func TestNew(t *testing.T) { } func TestNewEncoderFromFile(t *testing.T) { - f, err := os.Open(filepath.Join("testdata", "new encoder from file.whatever...")) - if err != nil { - t.Error(err) - return - } - - e := NewEncoderFromFile(f) + sketchyFile := filepath.Join(t.TempDir(), `new "encoder" from file.whatever...`) + file, err := os.OpenFile(sketchyFile, os.O_RDONLY|os.O_CREATE, 0644) + require.NoError(t, err) + file.Close() + file, err = os.Open(sketchyFile) + require.NoError(t, err) + + e := NewEncoderFromFile(file) if e.sd.SuggestedFileName != "new encoder from file.whatever" { t.Error("wrong or missing suggested_file_name in sd blob") diff --git a/stream/testdata/new encoder from file.whatever... b/stream/testdata/new encoder from file.whatever... deleted file mode 100644 index e69de29..0000000