Skip to content
95 changes: 92 additions & 3 deletions ch/chschema/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package chschema

import (
"bytes"
"database/sql"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -111,17 +112,77 @@ func (c ColumnOf[T]) Slice(s, e int) any {
return c.Column[s:e]
}

func (c *ColumnOf[T]) appendSliceColumn(v reflect.Value) {
colElemType := reflect.TypeOf(c.Column).Elem().Elem()
col := reflect.MakeSlice(reflect.SliceOf(colElemType), 0, v.Len())
for i := 0; i < v.Len(); i++ {
col = reflect.Append(col, v.Index(i).Convert(colElemType))
}
c.Column = append(c.Column, col.Interface().(T))
}

func (c *ColumnOf[T]) AppendValue(v reflect.Value) {
if reflect.TypeOf(c.Column).Elem().Kind() == reflect.Slice && v.Kind() == reflect.Slice {
c.appendSliceColumn(v)
return
}
c.Column = append(c.Column, v.Interface().(T))
}

func (c *ColumnOf[T]) convertSliceColumn(slice T, destSlice reflect.Value) {
sliceValue := reflect.ValueOf(slice)
elemType := destSlice.Type().Elem()

ret := reflect.MakeSlice(reflect.SliceOf(elemType), 0, sliceValue.Len())
for i := 0; i < sliceValue.Len(); i++ {
ret = reflect.Append(ret, sliceValue.Index(i).Convert(elemType))
}

destSlice.Set(ret)
}

func (c *ColumnOf[T]) ConvertAssign(idx int, dest reflect.Value) error {
if reflect.TypeOf(c.Column).Elem().Kind() == reflect.Slice && dest.Kind() == reflect.Slice {
c.convertSliceColumn(c.Column[idx], dest)
return nil
}
dest.Set(reflect.ValueOf(c.Column[idx]))
return nil
}

//------------------------------------------------------------------------------

func convertAssignDriverValue(col any, v reflect.Value) bool {
var ret reflect.Value

if _, ok := v.Interface().(sql.Scanner); !ok {
if !v.CanAddr() {
return false
}
if _, ok := v.Addr().Interface().(sql.Scanner); !ok {
return false
}

ret = reflect.New(v.Type())
if err := ret.Interface().(sql.Scanner).Scan(col); err != nil {
return false
}

v.Set(ret.Elem())
return true
}

ret = reflect.New(v.Type().Elem())
if err := ret.Interface().(sql.Scanner).Scan(col); err != nil {
return false
}

v.Set(ret)
return true
}

//------------------------------------------------------------------------------

type NumericColumnOf[T constraints.Integer | constraints.Float] struct {
ColumnOf[T]
}
Expand All @@ -135,7 +196,11 @@ func (c NumericColumnOf[T]) ConvertAssign(idx int, v reflect.Value) error {
case reflect.Float32, reflect.Float64:
v.SetFloat(float64(c.Column[idx]))
default:
v.Set(reflect.ValueOf(c.Column[idx]))
if !convertAssignDriverValue(c.Column[idx], v) {
v.Set(reflect.ValueOf(c.Column[idx]))
return nil
}
return nil
}
return nil
}
Expand All @@ -156,7 +221,7 @@ func (c StringColumn) ConvertAssign(idx int, v reflect.Value) error {
v.SetString(c.Column[idx])
return nil
case reflect.Slice:
if v.Type() == bytesType {
if v.Type() == bytesType || v.Type() == reflect.TypeOf((*json.RawMessage)(nil)).Elem() {
v.SetBytes(internal.Bytes(c.Column[idx]))
return nil
}
Expand All @@ -165,7 +230,31 @@ func (c StringColumn) ConvertAssign(idx int, v reflect.Value) error {
dec.UseNumber()
return dec.Decode(v.Addr().Interface())
default:
v.Set(reflect.ValueOf(c.Column[idx]))
if !convertAssignDriverValue(c.Column[idx], v) {
// v.Set(reflect.ValueOf(c.Column[idx]))
return nil
}
return nil
}
return fmt.Errorf("ch: can't scan %s into %s", "string", v.Type())
}

//------------------------------------------------------------------------------

func (c *UInt256Column) ConvertAssign(idx int, v reflect.Value) error {
switch v.Kind() {
case reflect.String:
v.SetString(c.Column[idx].String())
return nil
case reflect.Slice:
if v.Type() == bytesType {
v.SetBytes(internal.Bytes(string(c.Column[idx].Bytes())))
return nil
}
default:
if !convertAssignDriverValue(c.Column[idx].String(), v) {
return fmt.Errorf("ch: convertAssign UInt256 %x", c.Column[idx])
}
return nil
}
return fmt.Errorf("ch: can't scan %s into %s", "string", v.Type())
Expand Down
173 changes: 171 additions & 2 deletions ch/chschema/column_gen.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,42 @@
package chschema

import (
"database/sql/driver"
"encoding/json"
"fmt"
"math/big"
"reflect"
"strconv"
"time"

"github.com/uptrace/go-clickhouse/ch/chproto"
)

func getDriverValue(v reflect.Value) driver.Value {
if v.Kind() == reflect.Pointer && v.IsNil() {
return nil
}

dv, ok := v.Interface().(driver.Valuer)
if !ok {
if !v.CanAddr() {
return nil
}
if dv, ok = v.Addr().Interface().(driver.Valuer); !ok {
return nil
}
}

value, err := dv.Value()
if err != nil {
return nil
}

return value
}

//------------------------------------------------------------------------------

type Int8Column struct {
NumericColumnOf[int8]
}
Expand Down Expand Up @@ -1396,7 +1426,116 @@ func (c *UInt64Column) Type() reflect.Type {
}

func (c *UInt64Column) AppendValue(v reflect.Value) {
c.Column = append(c.Column, uint64(v.Uint()))
switch v.Kind() {
case reflect.Uint, reflect.Uintptr, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
c.Column = append(c.Column, v.Uint())
case reflect.String:
i, err := strconv.ParseInt(v.String(), 10, 64)
if err != nil {
return
}
c.Column = append(c.Column, uint64(i))
case reflect.Pointer:
if v.IsNil() {
c.Column = append(c.Column, 0)
return
}
c.AppendValue(v.Elem())
default:
value := getDriverValue(v)
if value != nil {
c.AppendValue(reflect.ValueOf(value))
return
}
c.Column = append(c.Column, v.Uint())
}
}

//------------------------------------------------------------------------------

type UInt256Column struct {
ColumnOf[*big.Int]
}

var _ Columnar = (*UInt256Column)(nil)

func NewUInt256Column() Columnar {
return new(UInt256Column)
}

var _UInt256Type = reflect.TypeOf((*big.Int)(nil)).Elem()

const u256sz = 256 / 8

func (c *UInt256Column) Type() reflect.Type {
return _UInt256Type
}

func (c *UInt256Column) AppendValue(v reflect.Value) {
switch v.Kind() {
case reflect.Uint, reflect.Uintptr, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
c.Column = append(c.Column, new(big.Int).SetUint64(v.Uint()))
case reflect.String:
i, ok := new(big.Int).SetString(v.String(), 10)
if !ok {
return
}
c.Column = append(c.Column, i)
case reflect.Slice:
ba, ok := v.Interface().([]byte)
if !ok {
return
}
c.Column = append(c.Column, new(big.Int).SetBytes(ba))
case reflect.Pointer:
if v.IsNil() {
c.Column = append(c.Column, new(big.Int))
return
}
c.AppendValue(v.Elem())
default:
value := getDriverValue(v)
if value != nil {
c.AppendValue(reflect.ValueOf(value))
return
}
}
}

func (c *UInt256Column) ReadFrom(rd *chproto.Reader, numRow int) error {
c.AllocForReading(numRow)

for it := range c.Column {
b := make([]byte, u256sz, u256sz)
n, err := rd.Read(b)
if err != nil {
return err
}
if n != u256sz {
return fmt.Errorf("expected read bytes %d, got %d", u256sz, n)
}
for i := 0; i < len(b)/2; i++ {
b[i], b[len(b)-i-1] = b[len(b)-i-1], b[i]
}
c.Column[it] = new(big.Int).SetBytes(b)
}

return nil
}

func (c *UInt256Column) WriteTo(wr *chproto.Writer) error {
for _, n := range c.Column {
if n.BitLen() > 256 {
return fmt.Errorf("%s has bigger len %d than 256", n, n.BitLen())
}
b := make([]byte, u256sz, u256sz)
b = n.FillBytes(b)
for i := 0; i < len(b)/2; i++ {
b[i], b[len(b)-i-1] = b[len(b)-i-1], b[i]
}
wr.Write(b)
}
return nil
}

//------------------------------------------------------------------------------
Expand Down Expand Up @@ -2201,7 +2340,37 @@ func (c *StringColumn) Type() reflect.Type {
}

func (c *StringColumn) AppendValue(v reflect.Value) {
c.Column = append(c.Column, string(v.String()))
if v.Kind() == reflect.Pointer && v.IsNil() {
c.Column = append(c.Column, "")
return
}

switch vi := v.Interface().(type) {
case string:
c.Column = append(c.Column, vi)
case []byte:
c.Column = append(c.Column, string(vi))
case json.RawMessage:
c.Column = append(c.Column, string(vi))
default:
value := getDriverValue(v)
if value != nil {
c.AppendValue(reflect.ValueOf(value))
return
}
if v.Kind() == reflect.String {
c.Column = append(c.Column, v.String())
return
}
j, err := json.Marshal(vi)
if err != nil {
panic(err)
}
if string(j) == "null" {
j = nil
}
c.Column = append(c.Column, string(j))
}
}

func (c *StringColumn) ReadFrom(rd *chproto.Reader, numRow int) error {
Expand Down
2 changes: 2 additions & 0 deletions ch/chschema/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ func ColumnFactory(chType string, typ reflect.Type) NewColumnFunc {
return NewUInt32Column
case chtype.UInt64:
return NewUInt64Column
case chtype.UInt256:
return NewUInt256Column
case chtype.Float32:
return NewFloat32Column
case chtype.Float64:
Expand Down
1 change: 1 addition & 0 deletions ch/chtype/chtype.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const (
UInt16 = "UInt16"
UInt32 = "UInt32"
UInt64 = "UInt64"
UInt256 = "UInt256"
Float32 = "Float32"
Float64 = "Float64"
DateTime = "DateTime"
Expand Down