Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 172 additions & 16 deletions appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,46 @@ package duckdb
import (
"database/sql/driver"
"errors"
"fmt"
"runtime"
"sync"

"github.com/duckdb/duckdb-go/mapping"
"golang.org/x/sync/errgroup"
)

// Appender wraps functionality around the DuckDB appender.
// It enables efficient bulk transformations.
type Appender struct {
// The raw sql.Conn's driver connection.
conn *Conn
// The DuckDB appender.
appender mapping.Appender
// True, if the appender has been closed.
closed bool
type (
appenderSource[T any] struct {
Source T
}

// The chunk to append to.
chunk DataChunk
// The column types of the table to append to.
types []mapping.LogicalType
// The number of appended rows.
rowCount int
}
rowAppenderSource = appenderSource[RowTableSource]
parallelRowAppenderSource = appenderSource[ParallelRowTableSource]
chunkAppenderSource = appenderSource[ChunkTableSource]
parallelChunkAppenderSource = appenderSource[ParallelChunkTableSource]

AppenderSource interface {
_secret()
}

// Appender wraps functionality around the DuckDB appender.
// It enables efficient bulk transformations.
Appender struct {
// The raw sql.Conn's driver connection.
conn *Conn
// The DuckDB appender.
appender mapping.Appender
// True, if the appender has been closed.
closed bool

// The chunk to append to.
chunk DataChunk
// The column types of the table to append to.
types []mapping.LogicalType
// The number of appended rows.
rowCount int
}
)

// NewAppenderFromConn returns a new Appender for the default catalog.
// The Appender batches rows via AppendRow. Upon reaching the auto-flush threshold or
Expand Down Expand Up @@ -207,6 +226,48 @@ func (a *Appender) initAppenderChunk() (*Appender, error) {
return a, nil
}

// AppendTableSource appends an AppenderSource to the appender. Repeatedly calls `FillRow` or
// `FillChunk` depending on the AppenderSource type. If an error is returned, any data
// inserted by that call to `FillRow` or `FillChunk` is ignored. Any previous calls to either
// of these functions will be processed and appended.
func (a *Appender) AppendTableSource(s AppenderSource) error {
var runParallel = func(maxThreads int, worker func() error) error {
g := errgroup.Group{}
for range min(maxThreads, runtime.GOMAXPROCS(-1)) {
g.Go(worker)
}
return g.Wait()
}

lock := &sync.Mutex{}
// projection is not used in chunk, so we must keep it a 1-1 mapping
columnCount := mapping.AppenderColumnCount(a.appender)
projection := make([]int, 0, columnCount)
for i := range columnCount {
projection = append(projection, int(i))
}
switch s := s.(type) {
case rowAppenderSource:
s.Source.Init()
return appenderRowThread(&parallelRowTSWrapper{s.Source}, lock, a.types, a.appender, projection)
case parallelRowAppenderSource:
info := s.Source.Init()
return runParallel(info.MaxThreads, func() error {
return appenderRowThread(s.Source, lock, a.types, a.appender, projection)
})
case chunkAppenderSource:
s.Source.Init()
return appenderChunkThread(&parallelChunkTSWrapper{s.Source}, lock, a.types, a.appender)
case parallelChunkAppenderSource:
info := s.Source.Init()
return runParallel(info.MaxThreads, func() error {
return appenderChunkThread(s.Source, lock, a.types, a.appender)
})
default:
return fmt.Errorf("unknown AppenderSource type: %T. Must be created with NewAppenderRowSource, NewAppenderParallelRowSource, NewAppenderChunkSource, or NewAppenderParallelChunkSource", s)
}
Comment thread
JAicewizard marked this conversation as resolved.
}

func (a *Appender) appendRowSlice(args []driver.Value) error {
// Early-out, if the number of args does not match the column count.
if len(args) != len(a.types) {
Expand Down Expand Up @@ -249,3 +310,98 @@ func (a *Appender) appendDataChunk() error {

return nil
}

func appenderRowThread(s ParallelRowTableSource, lock *sync.Mutex, types []mapping.LogicalType, duckdbAppender mapping.Appender, projection []int) error {
maxSize := GetDataChunkCapacity()
lstate := s.NewLocalState()
var chunk DataChunk
err := chunk.initFromTypes(types, true)
if err != nil {
return err
}
defer chunk.close()
chunk.projection = projection

for {
row := Row{
chunk: &chunk,
r: 0,
}
var next bool
var err error
for ; row.r < mapping.IdxT(maxSize); row.r++ {
next, err = s.FillRow(lstate, row)
if err != nil || !next {
break
}
}

mapping.DataChunkSetSize(chunk.chunk, row.r)

lock.Lock()
state := mapping.AppendDataChunk(duckdbAppender, chunk.chunk)
if state == mapping.StateError {
err := getDuckDBError(mapping.AppenderError(duckdbAppender))
lock.Unlock()
return err
}
lock.Unlock()
if err != nil {
return err
}
if !next {
break
}
chunk.reset(true)
}
return nil
}

func appenderChunkThread(s ParallelChunkTableSource, lock *sync.Mutex, types []mapping.LogicalType, duckdbAppender mapping.Appender) error {
lstate := s.NewLocalState()
var chunk DataChunk
err := chunk.initFromTypes(types, true)
if err != nil {
return err
}
defer chunk.close()
for {
err = s.FillChunk(lstate, chunk)
if err != nil {
Comment thread
JAicewizard marked this conversation as resolved.
return err
}
if chunk.GetSize() == 0 {
break
}

lock.Lock()
state := mapping.AppendDataChunk(duckdbAppender, chunk.chunk)
if state == mapping.StateError {
err := getDuckDBError(mapping.AppenderError(duckdbAppender))
lock.Unlock()
return err
}
lock.Unlock()
chunk.reset(true)
}

return nil
}

func (a appenderSource[T]) _secret() {}

func NewAppenderRowSource(source RowTableSource) AppenderSource {
return rowAppenderSource{Source: source}
}

func NewAppenderParallelRowSource(source ParallelRowTableSource) AppenderSource {
return parallelRowAppenderSource{Source: source}
}

func NewAppenderChunkSource(source ChunkTableSource) AppenderSource {
return chunkAppenderSource{Source: source}
}

func NewAppenderParallelChunkSource(source ParallelChunkTableSource) AppenderSource {
return parallelChunkAppenderSource{Source: source}
}
Loading