diff --git a/chunk_rows.go b/chunk_rows.go new file mode 100644 index 00000000..48cdfc50 --- /dev/null +++ b/chunk_rows.go @@ -0,0 +1,130 @@ +package duckdb + +import ( + "context" + "database/sql/driver" + "errors" + "io" + + "github.com/duckdb/duckdb-go/v2/mapping" +) + +// ChunkRows streams query results one DuckDB data chunk at a time. +// The returned chunk is invalidated by the next call to NextChunk or Close. +type ChunkRows struct { + stmt *Stmt + res mapping.Result + + chunk DataChunk + closeChunk bool +} + +func newChunkRowsWithStmt(res mapping.Result, stmt *Stmt) *ChunkRows { + columnCount := mapping.ColumnCount(&res) + r := &ChunkRows{ + res: res, + stmt: stmt, + chunk: DataChunk{}, + } + + for i := range uint64(columnCount) { + columnName := mapping.ColumnName(&res, mapping.IdxT(i)) + r.chunk.columnNames = append(r.chunk.columnNames, columnName) + } + + return r +} + +// Columns returns the result column names. +func (r *ChunkRows) Columns() []string { + return r.chunk.columnNames +} + +// NextChunk advances to the next result chunk. +func (r *ChunkRows) NextChunk() (*DataChunk, error) { + if r.closeChunk { + r.chunk.close() + r.closeChunk = false + } + + chunk := mapping.FetchChunk(r.res) + if chunk.Ptr == nil { + return nil, io.EOF + } + + r.closeChunk = true + if err := r.chunk.initFromDuckDataChunk(chunk, false); err != nil { + return nil, getError(err, nil) + } + + return &r.chunk, nil +} + +// Close releases the underlying result and prepared statement. +func (r *ChunkRows) Close() error { + if r.closeChunk { + r.chunk.close() + } + mapping.DestroyResult(&r.res) + + var err error + if r.stmt != nil { + r.stmt.rows = false + if r.stmt.closeOnRowsClose { + err = r.stmt.Close() + } + r.stmt = nil + } + + return err +} + +// QueryChunksContext executes a query and returns results chunk-by-chunk. +func (conn *Conn) QueryChunksContext(ctx context.Context, query string, args []driver.NamedValue) (*ChunkRows, error) { + cleanupCtx := conn.setContext(ctx) + defer cleanupCtx() + + var rows *ChunkRows + err := runWithCtxInterrupt(ctx, conn.conn, func(wctx context.Context) error { + prepared, err := conn.prepareStmts(wctx, query) + if err != nil { + return err + } + + r, err := prepared.QueryChunksContext(wctx, args) + if err != nil { + errClose := prepared.Close() + if errClose != nil { + return errors.Join(err, errClose) + } + return err + } + + prepared.closeOnRowsClose = true + rows = r + return nil + }) + if err != nil { + return nil, err + } + + return rows, nil +} + +// QueryChunksContext executes a prepared query and returns results chunk-by-chunk. +func (s *Stmt) QueryChunksContext(ctx context.Context, nargs []driver.NamedValue) (*ChunkRows, error) { + cleanupCtx := s.conn.setContext(ctx) + defer cleanupCtx() + + var res *mapping.Result + if err := runWithCtxInterrupt(ctx, s.conn.conn, func(wctx context.Context) error { + var executeErr error + res, executeErr = s.execute(wctx, nargs) + return executeErr + }); err != nil { + return nil, err + } + + s.rows = true + return newChunkRowsWithStmt(*res, s), nil +} diff --git a/chunk_rows_benchmark_test.go b/chunk_rows_benchmark_test.go new file mode 100644 index 00000000..25a4f1b5 --- /dev/null +++ b/chunk_rows_benchmark_test.go @@ -0,0 +1,407 @@ +package duckdb + +import ( + "context" + "database/sql" + "database/sql/driver" + "fmt" + "io" + "testing" + + "github.com/stretchr/testify/require" +) + +const ( + benchmarkChunkRowsTotalRows = 32 * 2048 + queryNumericBenchmark = `SELECT i, f FROM benchmark_rows ORDER BY i` +) + +var benchmarkChunkRowsSink struct { + sumInt int64 + sumFloat float64 + sumLen int + rowCount int +} + +func BenchmarkQueryContextRowsScan(b *testing.B) { + db, conn, query := prepareChunkRowsBenchmark(b) + defer closeConnWrapper(b, conn) + defer closeDbWrapper(b, db) + + ctx := context.Background() + b.ReportAllocs() + b.ResetTimer() + + for b.Loop() { + var sumInt int64 + var sumFloat float64 + var sumLen int + + err := conn.Raw(func(driverConn any) error { + duckConn := driverConn.(*Conn) + + rows, err := duckConn.QueryContext(ctx, query, nil) + require.NoError(b, err) + defer func() { + require.NoError(b, rows.Close()) + }() + + values := make([]driver.Value, 3) + for { + err := rows.Next(values) + if err != nil { + if err == io.EOF { + return nil + } + return err + } + + sumInt += values[0].(int64) + sumFloat += values[1].(float64) + sumLen += len(values[2].(string)) + } + }) + require.NoError(b, err) + + benchmarkChunkRowsSink.sumInt = sumInt + benchmarkChunkRowsSink.sumFloat = sumFloat + benchmarkChunkRowsSink.sumLen = sumLen + } +} + +func BenchmarkQueryContextRowsCount(b *testing.B) { + db, conn, query := prepareChunkRowsBenchmark(b) + defer closeConnWrapper(b, conn) + defer closeDbWrapper(b, db) + + ctx := context.Background() + b.ReportAllocs() + b.ResetTimer() + + for b.Loop() { + rowCount := 0 + + err := conn.Raw(func(driverConn any) error { + duckConn := driverConn.(*Conn) + + rows, err := duckConn.QueryContext(ctx, query, nil) + require.NoError(b, err) + defer func() { + require.NoError(b, rows.Close()) + }() + + values := make([]driver.Value, 3) + for { + err := rows.Next(values) + if err != nil { + if err == io.EOF { + return nil + } + return err + } + rowCount++ + } + }) + require.NoError(b, err) + + benchmarkChunkRowsSink.rowCount = rowCount + } +} + +func BenchmarkQueryChunksContextScan(b *testing.B) { + db, conn, query := prepareChunkRowsBenchmark(b) + defer closeConnWrapper(b, conn) + defer closeDbWrapper(b, db) + + ctx := context.Background() + b.ReportAllocs() + b.ResetTimer() + + for b.Loop() { + var sumInt int64 + var sumFloat float64 + var sumLen int + + err := conn.Raw(func(driverConn any) error { + duckConn := driverConn.(*Conn) + + rows, err := duckConn.QueryChunksContext(ctx, query, nil) + require.NoError(b, err) + defer func() { + require.NoError(b, rows.Close()) + }() + + for { + chunk, err := rows.NextChunk() + if err != nil { + if err == io.EOF { + return nil + } + return err + } + + for rowIdx := 0; rowIdx < chunk.GetSize(); rowIdx++ { + intVal, err := chunk.GetValue(0, rowIdx) + require.NoError(b, err) + floatVal, err := chunk.GetValue(1, rowIdx) + require.NoError(b, err) + stringVal, err := chunk.GetValue(2, rowIdx) + require.NoError(b, err) + + sumInt += intVal.(int64) + sumFloat += floatVal.(float64) + sumLen += len(stringVal.(string)) + } + } + }) + require.NoError(b, err) + + benchmarkChunkRowsSink.sumInt = sumInt + benchmarkChunkRowsSink.sumFloat = sumFloat + benchmarkChunkRowsSink.sumLen = sumLen + } +} + +func BenchmarkQueryChunksContextGetValueNumericScan(b *testing.B) { + db, conn, _ := prepareChunkRowsBenchmark(b) + defer closeConnWrapper(b, conn) + defer closeDbWrapper(b, db) + + ctx := context.Background() + b.ReportAllocs() + b.ResetTimer() + + for b.Loop() { + var sumInt int64 + var sumFloat float64 + + err := conn.Raw(func(driverConn any) error { + duckConn := driverConn.(*Conn) + + rows, err := duckConn.QueryChunksContext(ctx, queryNumericBenchmark, nil) + require.NoError(b, err) + defer func() { + require.NoError(b, rows.Close()) + }() + + for { + chunk, err := rows.NextChunk() + if err != nil { + if err == io.EOF { + return nil + } + return err + } + + for rowIdx := 0; rowIdx < chunk.GetSize(); rowIdx++ { + intVal, err := chunk.GetValue(0, rowIdx) + require.NoError(b, err) + floatVal, err := chunk.GetValue(1, rowIdx) + require.NoError(b, err) + + sumInt += intVal.(int64) + sumFloat += floatVal.(float64) + } + } + }) + require.NoError(b, err) + + benchmarkChunkRowsSink.sumInt = sumInt + benchmarkChunkRowsSink.sumFloat = sumFloat + } +} + +func BenchmarkQueryChunksContextTypedNumericScan(b *testing.B) { + db, conn, _ := prepareChunkRowsBenchmark(b) + defer closeConnWrapper(b, conn) + defer closeDbWrapper(b, db) + + ctx := context.Background() + b.ReportAllocs() + b.ResetTimer() + + for b.Loop() { + var sumInt int64 + var sumFloat float64 + + err := conn.Raw(func(driverConn any) error { + duckConn := driverConn.(*Conn) + + rows, err := duckConn.QueryChunksContext(ctx, queryNumericBenchmark, nil) + require.NoError(b, err) + defer func() { + require.NoError(b, rows.Close()) + }() + + for { + chunk, err := rows.NextChunk() + if err != nil { + if err == io.EOF { + return nil + } + return err + } + + ints, err := chunk.Int64Slice(0) + require.NoError(b, err) + floats, err := chunk.Float64Slice(1) + require.NoError(b, err) + + for rowIdx := range ints { + isNull, err := chunk.IsNull(0, rowIdx) + if err != nil { + return err + } + if !isNull { + sumInt += ints[rowIdx] + } + + isNull, err = chunk.IsNull(1, rowIdx) + if err != nil { + return err + } + if !isNull { + sumFloat += floats[rowIdx] + } + } + } + }) + require.NoError(b, err) + + benchmarkChunkRowsSink.sumInt = sumInt + benchmarkChunkRowsSink.sumFloat = sumFloat + } +} + +func BenchmarkQueryChunksContextTypedStringScan(b *testing.B) { + db, conn, query := prepareChunkRowsBenchmark(b) + defer closeConnWrapper(b, conn) + defer closeDbWrapper(b, db) + + ctx := context.Background() + b.ReportAllocs() + b.ResetTimer() + + for b.Loop() { + var sumInt int64 + var sumFloat float64 + var sumLen int + + err := conn.Raw(func(driverConn any) error { + duckConn := driverConn.(*Conn) + + rows, err := duckConn.QueryChunksContext(ctx, query, nil) + require.NoError(b, err) + defer func() { + require.NoError(b, rows.Close()) + }() + + for { + chunk, err := rows.NextChunk() + if err != nil { + if err == io.EOF { + return nil + } + return err + } + + ints, err := chunk.Int64Slice(0) + require.NoError(b, err) + floats, err := chunk.Float64Slice(1) + require.NoError(b, err) + strings, err := chunk.StringRefs(2) + require.NoError(b, err) + + for rowIdx := range ints { + isNull, err := chunk.IsNull(0, rowIdx) + if err != nil { + return err + } + if !isNull { + sumInt += ints[rowIdx] + } + + isNull, err = chunk.IsNull(1, rowIdx) + if err != nil { + return err + } + if !isNull { + sumFloat += floats[rowIdx] + } + + isNull, err = chunk.IsNull(2, rowIdx) + if err != nil { + return err + } + if !isNull { + sumLen += strings[rowIdx].Len() + } + } + } + }) + require.NoError(b, err) + + benchmarkChunkRowsSink.sumInt = sumInt + benchmarkChunkRowsSink.sumFloat = sumFloat + benchmarkChunkRowsSink.sumLen = sumLen + } +} + +func BenchmarkQueryChunksContextCount(b *testing.B) { + db, conn, query := prepareChunkRowsBenchmark(b) + defer closeConnWrapper(b, conn) + defer closeDbWrapper(b, db) + + ctx := context.Background() + b.ReportAllocs() + b.ResetTimer() + + for b.Loop() { + rowCount := 0 + + err := conn.Raw(func(driverConn any) error { + duckConn := driverConn.(*Conn) + + rows, err := duckConn.QueryChunksContext(ctx, query, nil) + require.NoError(b, err) + defer func() { + require.NoError(b, rows.Close()) + }() + + for { + chunk, err := rows.NextChunk() + if err != nil { + if err == io.EOF { + return nil + } + return err + } + + rowCount += chunk.GetSize() + } + }) + require.NoError(b, err) + + benchmarkChunkRowsSink.rowCount = rowCount + } +} + +func prepareChunkRowsBenchmark(b *testing.B) (*sql.DB, *sql.Conn, string) { + b.Helper() + + db := openDbWrapper(b, `?access_mode=READ_WRITE`) + setupQuery := fmt.Sprintf(` +CREATE TABLE benchmark_rows AS +SELECT + i::BIGINT AS i, + i::DOUBLE * 0.5 AS f, + ('value-' || i::VARCHAR) AS s +FROM range(%d) AS t(i)`, + benchmarkChunkRowsTotalRows, + ) + _, err := db.Exec(setupQuery) + require.NoError(b, err) + + conn := openConnWrapper(b, db, context.Background()) + return db, conn, `SELECT i, f, s FROM benchmark_rows ORDER BY i` +} diff --git a/chunk_rows_test.go b/chunk_rows_test.go new file mode 100644 index 00000000..afc95db2 --- /dev/null +++ b/chunk_rows_test.go @@ -0,0 +1,336 @@ +package duckdb + +import ( + "context" + "database/sql" + "fmt" + "io" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestQueryChunksContextStreamsChunkValues(t *testing.T) { + connector := newConnectorWrapper(t, "", nil) + defer closeConnectorWrapper(t, connector) + + db := sql.OpenDB(connector) + defer closeDbWrapper(t, db) + + ctx := context.Background() + createTable(t, db, `CREATE TABLE numbers (i INTEGER)`) + + _, err := db.ExecContext(ctx, `INSERT INTO numbers VALUES (1), (2), (3)`) + require.NoError(t, err) + + conn := openConnWrapper(t, db, ctx) + defer closeConnWrapper(t, conn) + + var got []int32 + err = conn.Raw(func(driverConn any) error { + duckConn := driverConn.(*Conn) + + rows, err := duckConn.QueryChunksContext(ctx, `SELECT i FROM numbers ORDER BY i`, nil) + require.NoError(t, err) + defer func() { + require.NoError(t, rows.Close()) + }() + + for { + chunk, err := rows.NextChunk() + if err != nil { + if err == io.EOF { + return nil + } + return err + } + + for rowIdx := range chunk.GetSize() { + value, err := chunk.GetValue(0, rowIdx) + require.NoError(t, err) + got = append(got, value.(int32)) + } + } + }) + require.NoError(t, err) + require.Equal(t, []int32{1, 2, 3}, got) +} + +func TestQueryChunksContextAdvancesAcrossMultipleChunks(t *testing.T) { + connector := newConnectorWrapper(t, "", nil) + defer closeConnectorWrapper(t, connector) + + db := sql.OpenDB(connector) + defer closeDbWrapper(t, db) + + ctx := context.Background() + createTable(t, db, `CREATE TABLE numbers (i INTEGER)`) + + totalRows := GetDataChunkCapacity() + 17 + insertSQL := `INSERT INTO numbers VALUES ` + for i := range totalRows { + if i > 0 { + insertSQL += ", " + } + insertSQL += fmt.Sprintf("(%d)", i) + } + + _, err := db.ExecContext(ctx, insertSQL) + require.NoError(t, err) + + conn := openConnWrapper(t, db, ctx) + defer closeConnWrapper(t, conn) + + var ( + chunkCount int + rowCount int + ) + err = conn.Raw(func(driverConn any) error { + duckConn := driverConn.(*Conn) + + rows, err := duckConn.QueryChunksContext(ctx, `SELECT i FROM numbers ORDER BY i`, nil) + require.NoError(t, err) + defer func() { + require.NoError(t, rows.Close()) + }() + + for { + chunk, err := rows.NextChunk() + if err != nil { + if err == io.EOF { + return nil + } + return err + } + + chunkCount++ + rowCount += chunk.GetSize() + } + }) + require.NoError(t, err) + require.GreaterOrEqual(t, chunkCount, 2) + require.Equal(t, totalRows, rowCount) +} + +func TestQueryChunksContextTypedSlices(t *testing.T) { + connector := newConnectorWrapper(t, "", nil) + defer closeConnectorWrapper(t, connector) + + db := sql.OpenDB(connector) + defer closeDbWrapper(t, db) + + ctx := context.Background() + createTable(t, db, `CREATE TABLE metrics (i INTEGER, f DOUBLE, ts BIGINT)`) + + _, err := db.ExecContext(ctx, `INSERT INTO metrics VALUES (1, 1.5, 10), (2, 2.5, 20), (3, 3.5, 30)`) + require.NoError(t, err) + + conn := openConnWrapper(t, db, ctx) + defer closeConnWrapper(t, conn) + + err = conn.Raw(func(driverConn any) error { + duckConn := driverConn.(*Conn) + + rows, err := duckConn.QueryChunksContext(ctx, `SELECT i, f, ts FROM metrics ORDER BY i`, nil) + require.NoError(t, err) + defer func() { + require.NoError(t, rows.Close()) + }() + + chunk, err := rows.NextChunk() + require.NoError(t, err) + + ints, err := chunk.Int32Slice(0) + require.NoError(t, err) + floats, err := chunk.Float64Slice(1) + require.NoError(t, err) + timestamps, err := chunk.Int64Slice(2) + require.NoError(t, err) + + require.Equal(t, []int32{1, 2, 3}, ints) + require.Equal(t, []float64{1.5, 2.5, 3.5}, floats) + require.Equal(t, []int64{10, 20, 30}, timestamps) + + return nil + }) + require.NoError(t, err) +} + +func TestQueryChunksContextTypedSliceDetectsNulls(t *testing.T) { + connector := newConnectorWrapper(t, "", nil) + defer closeConnectorWrapper(t, connector) + + db := sql.OpenDB(connector) + defer closeDbWrapper(t, db) + + ctx := context.Background() + createTable(t, db, `CREATE TABLE metrics (i INTEGER)`) + + _, err := db.ExecContext(ctx, `INSERT INTO metrics VALUES (1), (NULL), (3)`) + require.NoError(t, err) + + conn := openConnWrapper(t, db, ctx) + defer closeConnWrapper(t, conn) + + err = conn.Raw(func(driverConn any) error { + duckConn := driverConn.(*Conn) + + rows, err := duckConn.QueryChunksContext(ctx, `SELECT i FROM metrics`, nil) + require.NoError(t, err) + defer func() { + require.NoError(t, rows.Close()) + }() + + chunk, err := rows.NextChunk() + require.NoError(t, err) + + ints, err := chunk.Int32Slice(0) + require.NoError(t, err) + require.Len(t, ints, 3) + require.Equal(t, int32(1), ints[0]) + require.Equal(t, int32(3), ints[2]) + + isNull, err := chunk.IsNull(0, 1) + require.NoError(t, err) + require.True(t, isNull) + + isNull, err = chunk.IsNull(0, 0) + require.NoError(t, err) + require.False(t, isNull) + + return nil + }) + require.NoError(t, err) +} + +func TestQueryChunksContextTypedSliceRejectsWrongType(t *testing.T) { + connector := newConnectorWrapper(t, "", nil) + defer closeConnectorWrapper(t, connector) + + db := sql.OpenDB(connector) + defer closeDbWrapper(t, db) + + ctx := context.Background() + conn := openConnWrapper(t, db, ctx) + defer closeConnWrapper(t, conn) + + err := conn.Raw(func(driverConn any) error { + duckConn := driverConn.(*Conn) + + rows, err := duckConn.QueryChunksContext(ctx, `SELECT 1.5::DOUBLE AS f`, nil) + require.NoError(t, err) + defer func() { + require.NoError(t, rows.Close()) + }() + + chunk, err := rows.NextChunk() + require.NoError(t, err) + + _, err = chunk.Int32Slice(0) + require.ErrorContains(t, err, "expected INTEGER") + require.ErrorContains(t, err, "got DOUBLE") + + return nil + }) + require.NoError(t, err) +} + +func TestQueryChunksContextStringRefs(t *testing.T) { + connector := newConnectorWrapper(t, "", nil) + defer closeConnectorWrapper(t, connector) + + db := sql.OpenDB(connector) + defer closeDbWrapper(t, db) + + ctx := context.Background() + createTable(t, db, `CREATE TABLE labels (id INTEGER, s VARCHAR)`) + + const longLabel = "abcdefghijklmnopqrstuvwxyz" + + _, err := db.ExecContext(ctx, `INSERT INTO labels VALUES (1, ''), (2, NULL), (3, 'alpha'), (4, ?)`, longLabel) + require.NoError(t, err) + + conn := openConnWrapper(t, db, ctx) + defer closeConnWrapper(t, conn) + + err = conn.Raw(func(driverConn any) error { + duckConn := driverConn.(*Conn) + + rows, err := duckConn.QueryChunksContext(ctx, `SELECT s FROM labels ORDER BY id`, nil) + require.NoError(t, err) + defer func() { + require.NoError(t, rows.Close()) + }() + + chunk, err := rows.NextChunk() + require.NoError(t, err) + + refs, err := chunk.StringRefs(0) + require.NoError(t, err) + require.Len(t, refs, 4) + + isNull, err := chunk.IsNull(0, 0) + require.NoError(t, err) + require.False(t, isNull) + require.Equal(t, "", refs[0].UnsafeString()) + require.Equal(t, 0, refs[0].Len()) + + isNull, err = chunk.IsNull(0, 1) + require.NoError(t, err) + require.True(t, isNull) + require.Equal(t, "", refs[1].UnsafeString()) + require.Equal(t, 0, refs[1].Len()) + + isNull, err = chunk.IsNull(0, 2) + require.NoError(t, err) + require.False(t, isNull) + require.Equal(t, "alpha", refs[2].UnsafeString()) + require.Equal(t, 5, refs[2].Len()) + + isNull, err = chunk.IsNull(0, 3) + require.NoError(t, err) + require.False(t, isNull) + require.Equal(t, longLabel, refs[3].UnsafeString()) + require.Equal(t, len(longLabel), refs[3].Len()) + + return nil + }) + require.NoError(t, err) +} + +func TestQueryChunksContextTimestampMicrosSlice(t *testing.T) { + connector := newConnectorWrapper(t, "", nil) + defer closeConnectorWrapper(t, connector) + + db := sql.OpenDB(connector) + defer closeDbWrapper(t, db) + + ctx := context.Background() + createTable(t, db, `CREATE TABLE events (ts TIMESTAMP)`) + + _, err := db.ExecContext(ctx, `INSERT INTO events VALUES (TIMESTAMP '2024-02-03 01:02:03.123456'), (TIMESTAMP '2024-02-03 01:02:04.654321')`) + require.NoError(t, err) + + conn := openConnWrapper(t, db, ctx) + defer closeConnWrapper(t, conn) + + err = conn.Raw(func(driverConn any) error { + duckConn := driverConn.(*Conn) + + rows, err := duckConn.QueryChunksContext(ctx, `SELECT ts FROM events ORDER BY ts`, nil) + require.NoError(t, err) + defer func() { + require.NoError(t, rows.Close()) + }() + + chunk, err := rows.NextChunk() + require.NoError(t, err) + + micros, err := chunk.TimestampMicrosSlice(0) + require.NoError(t, err) + require.Equal(t, []int64{1706922123123456, 1706922124654321}, micros) + + return nil + }) + require.NoError(t, err) +} diff --git a/data_chunk_string.go b/data_chunk_string.go new file mode 100644 index 00000000..5c70c52c --- /dev/null +++ b/data_chunk_string.go @@ -0,0 +1,78 @@ +package duckdb + +import ( + "unsafe" + + "github.com/duckdb/duckdb-go/v2/mapping" +) + +// StringRef points at a DuckDB VARCHAR value owned by the current chunk. +// The referenced memory is invalidated by the next NextChunk or Close call. +type StringRef struct { + ptr unsafe.Pointer + len int +} + +type duckStringPointer struct { + length uint32 + prefix [4]byte + ptr *byte +} + +type duckStringInlined struct { + length uint32 + inlined [12]byte +} + +// Len returns the string length in bytes. +func (ref StringRef) Len() int { + return ref.len +} + +// UnsafeString returns a zero-copy Go string view over the DuckDB string memory. +// The returned string is invalidated by the next NextChunk or Close call. +func (ref StringRef) UnsafeString() string { + if ref.ptr == nil || ref.len == 0 { + return "" + } + + return unsafe.String((*byte)(ref.ptr), ref.len) +} + +// StringRefs returns zero-copy views over a VARCHAR column. +// The returned refs are only valid until the chunk is invalidated. +// For NULL rows, use IsNull to check whether the corresponding element is valid. +func (chunk *DataChunk) StringRefs(colIdx int) ([]StringRef, error) { + values, err := typedChunkSlice[mapping.StringT](chunk, colIdx, TYPE_VARCHAR) + if err != nil { + return nil, err + } + + refs := make([]StringRef, len(values)) + for idx := range values { + refs[idx] = newStringRef(&values[idx]) + } + + return refs, nil +} + +func newStringRef(strT *mapping.StringT) StringRef { + length := int(mapping.StringTLength(*strT)) + if length == 0 { + return StringRef{} + } + + if mapping.StringIsInlined(*strT) { + inlined := (*duckStringInlined)(unsafe.Pointer(strT)) + return StringRef{ + ptr: unsafe.Pointer(&inlined.inlined[0]), + len: length, + } + } + + pointer := (*duckStringPointer)(unsafe.Pointer(strT)) + return StringRef{ + ptr: unsafe.Pointer(pointer.ptr), + len: length, + } +} diff --git a/data_chunk_typed.go b/data_chunk_typed.go new file mode 100644 index 00000000..2b457d53 --- /dev/null +++ b/data_chunk_typed.go @@ -0,0 +1,74 @@ +package duckdb + +import ( + "unsafe" + + "github.com/duckdb/duckdb-go/v2/mapping" +) + +// IsNull reports whether the value at colIdx/rowIdx is NULL. +func (chunk *DataChunk) IsNull(colIdx, rowIdx int) (bool, error) { + colIdx, err := chunk.verifyAndRewriteColIdx(colIdx) + if err != nil { + return false, getError(errAPI, err) + } + + return chunk.columns[colIdx].getNull(mapping.IdxT(rowIdx)), nil +} + +// Int32Slice returns a zero-copy view over an INTEGER column. +// The returned slice is only valid until the chunk is invalidated. +// For NULL rows, use IsNull to check whether the corresponding element is valid. +func (chunk *DataChunk) Int32Slice(colIdx int) ([]int32, error) { + return typedChunkSlice[int32](chunk, colIdx, TYPE_INTEGER) +} + +// Int64Slice returns a zero-copy view over a BIGINT column. +// The returned slice is only valid until the chunk is invalidated. +// For NULL rows, use IsNull to check whether the corresponding element is valid. +func (chunk *DataChunk) Int64Slice(colIdx int) ([]int64, error) { + return typedChunkSlice[int64](chunk, colIdx, TYPE_BIGINT) +} + +// Float64Slice returns a zero-copy view over a DOUBLE column. +// The returned slice is only valid until the chunk is invalidated. +// For NULL rows, use IsNull to check whether the corresponding element is valid. +func (chunk *DataChunk) Float64Slice(colIdx int) ([]float64, error) { + return typedChunkSlice[float64](chunk, colIdx, TYPE_DOUBLE) +} + +// TimestampMicrosSlice returns TIMESTAMP values as Unix microseconds. +// The returned slice is only valid until the chunk is invalidated. +// For NULL rows, use IsNull to check whether the corresponding element is valid. +func (chunk *DataChunk) TimestampMicrosSlice(colIdx int) ([]int64, error) { + values, err := typedChunkSlice[mapping.Timestamp](chunk, colIdx, TYPE_TIMESTAMP) + if err != nil { + return nil, err + } + + micros := make([]int64, len(values)) + for idx := range values { + micros[idx] = mapping.TimestampMembers(&values[idx]) + } + + return micros, nil +} + +func typedChunkSlice[T any](chunk *DataChunk, colIdx int, expected Type) ([]T, error) { + colIdx, err := chunk.verifyAndRewriteColIdx(colIdx) + if err != nil { + return nil, getError(errAPI, err) + } + + column := &chunk.columns[colIdx] + if column.Type != expected { + return nil, getError(errAPI, invalidInputError(typeToStringMap[column.Type], typeToStringMap[expected])) + } + + size := chunk.GetSize() + if size == 0 { + return []T{}, nil + } + + return unsafe.Slice((*T)(column.dataPtr), size), nil +}