go/writer: improve parquet write performance#4471
Conversation
1050885 to
cf5e9e7
Compare
d76c2d0 to
3e7d697
Compare
mdibaiee
left a comment
There was a problem hiding this comment.
Amazing results!!
A few small comments, otherwise LGTM
One note: I think we can now remove WithDisableDictionaryEncoding here:
connectors/go/writer/parquet.go
Lines 171 to 175 in 8da94b3
| } | ||
|
|
||
| // Close finalizes any open row group, writes the parquet footer (file metadata, footer length, | ||
| // and trailing magic), and marks the writer closed. The underlying sink is not itself closed. |
There was a problem hiding this comment.
Why is the underlying sink not closed?
| } | ||
|
|
||
| // SetNumRows records the row count for this row group; required before Close. The same value | ||
| // must also describe every column written into the row group. |
There was a problem hiding this comment.
Not sure what this means "The same value must also describe every column written into the row group"
| parent *mergeWriter | ||
| metadata *metadata.RowGroupMetaDataBuilder | ||
| ordinal int16 | ||
| nextCol int |
There was a problem hiding this comment.
nit: The name of this property threw me off a bit... if I understand correctly this is essentially colOrdinal of the column being written by NextColumn
also maybe we can store it as int16 since it seems to be used as such
| // at construction time will fail again immediately on the first real write. | ||
| sinkWriter, err := newMergeWriter(cwc, schemaRoot, props, kvmeta) | ||
| if err != nil { | ||
| panic(fmt.Sprintf("creating sink writer: %s", err)) |
There was a problem hiding this comment.
Would we get better error presentation for users if we bubble up the error or we need the panic stacktrace?
| compressed := cw.pageWriter.Compress(&buf, page.Data()) | ||
| compressedData := make([]byte, len(compressed)) | ||
| copy(compressedData, compressed) | ||
| newBuf := memory.NewBufferBytes(compressedData) |
There was a problem hiding this comment.
Do you mind adding a comment as to why the copy is necessary?
|
@jacobmarble it seems like tests are timing out on this branch while they work on |
|
Just commenting directly here as per the slack thread. Lets hold off on this change until after we switch to the Snowpipe streaming SDK. |
Description:
Improves parquet write per a new benchmark:
Two-part refactor of
ParquetWriter's buffer-to-sink pipeline:Column-oriented buffer (
b55c7158d): changes the write buffer from[][]anyto[]anyof typed slice pointers (*[]int64,*[]parquet.ByteArray, etc.) with a parallel[]int16definition-level slice. Values are converted to their Parquet physical types atWritetime instead of during the flush loop, eliminating the per-flush transposition pass and reducing GC scanning pressure.Page-copy (
cf5e9e76d):transferColumnValuespreviously decoded every column value from the scratch file and re-encoded it into the sink. Now the scratch writer uses the same codec as the sink and disables dictionary encoding, so compressed data pages are copied byte-for-byte through a newmergeWritertype that accepts pre-encoded pages and tracks row counts explicitly.Benchmark results (A = baseline, B = column-oriented buffer, C = page-copy):
Step 1 (A→B): cpu +7%, mem −25%, allocs ~unchanged.
Step 2 (B→C): cpu −9–25%, mem −6–33%, allocs −75%.
Net: −15% CPU, −41% memory, −75% allocations (all p=0.000).
Workflow steps:
No user-visible change.
Documentation links affected:
None.
Notes for reviewers:
makeColumnBufferstores a pointer-to-slice in[]anyto avoid boxing a 3-word slice header on every append.appendVal[T parquetValue]is a 5-line generic the compiler inlines at each call site, allowing devirtualization of thegetValFn[T]parameter.mergeWriter(merge_writer.go) is the new sink type; it acceptsWriteDataPagecalls and tracks row counts viaSetNumRows.