diff --git a/ch/chschema/column.go b/ch/chschema/column.go index 735540d..fb68d67 100644 --- a/ch/chschema/column.go +++ b/ch/chschema/column.go @@ -2,6 +2,7 @@ package chschema import ( "bytes" + "database/sql" "encoding/json" "fmt" "io" @@ -111,17 +112,77 @@ func (c ColumnOf[T]) Slice(s, e int) any { return c.Column[s:e] } +func (c *ColumnOf[T]) appendSliceColumn(v reflect.Value) { + colElemType := reflect.TypeOf(c.Column).Elem().Elem() + col := reflect.MakeSlice(reflect.SliceOf(colElemType), 0, v.Len()) + for i := 0; i < v.Len(); i++ { + col = reflect.Append(col, v.Index(i).Convert(colElemType)) + } + c.Column = append(c.Column, col.Interface().(T)) +} + func (c *ColumnOf[T]) AppendValue(v reflect.Value) { + if reflect.TypeOf(c.Column).Elem().Kind() == reflect.Slice && v.Kind() == reflect.Slice { + c.appendSliceColumn(v) + return + } c.Column = append(c.Column, v.Interface().(T)) } +func (c *ColumnOf[T]) convertSliceColumn(slice T, destSlice reflect.Value) { + sliceValue := reflect.ValueOf(slice) + elemType := destSlice.Type().Elem() + + ret := reflect.MakeSlice(reflect.SliceOf(elemType), 0, sliceValue.Len()) + for i := 0; i < sliceValue.Len(); i++ { + ret = reflect.Append(ret, sliceValue.Index(i).Convert(elemType)) + } + + destSlice.Set(ret) +} + func (c *ColumnOf[T]) ConvertAssign(idx int, dest reflect.Value) error { + if reflect.TypeOf(c.Column).Elem().Kind() == reflect.Slice && dest.Kind() == reflect.Slice { + c.convertSliceColumn(c.Column[idx], dest) + return nil + } dest.Set(reflect.ValueOf(c.Column[idx])) return nil } //------------------------------------------------------------------------------ +func convertAssignDriverValue(col any, v reflect.Value) bool { + var ret reflect.Value + + if _, ok := v.Interface().(sql.Scanner); !ok { + if !v.CanAddr() { + return false + } + if _, ok := v.Addr().Interface().(sql.Scanner); !ok { + return false + } + + ret = reflect.New(v.Type()) + if err := ret.Interface().(sql.Scanner).Scan(col); err != nil { + return false + } + + v.Set(ret.Elem()) + return true + } + + ret = reflect.New(v.Type().Elem()) + if err := ret.Interface().(sql.Scanner).Scan(col); err != nil { + return false + } + + v.Set(ret) + return true +} + +//------------------------------------------------------------------------------ + type NumericColumnOf[T constraints.Integer | constraints.Float] struct { ColumnOf[T] } @@ -135,7 +196,11 @@ func (c NumericColumnOf[T]) ConvertAssign(idx int, v reflect.Value) error { case reflect.Float32, reflect.Float64: v.SetFloat(float64(c.Column[idx])) default: - v.Set(reflect.ValueOf(c.Column[idx])) + if !convertAssignDriverValue(c.Column[idx], v) { + v.Set(reflect.ValueOf(c.Column[idx])) + return nil + } + return nil } return nil } @@ -156,7 +221,7 @@ func (c StringColumn) ConvertAssign(idx int, v reflect.Value) error { v.SetString(c.Column[idx]) return nil case reflect.Slice: - if v.Type() == bytesType { + if v.Type() == bytesType || v.Type() == reflect.TypeOf((*json.RawMessage)(nil)).Elem() { v.SetBytes(internal.Bytes(c.Column[idx])) return nil } @@ -165,7 +230,31 @@ func (c StringColumn) ConvertAssign(idx int, v reflect.Value) error { dec.UseNumber() return dec.Decode(v.Addr().Interface()) default: - v.Set(reflect.ValueOf(c.Column[idx])) + if !convertAssignDriverValue(c.Column[idx], v) { + // v.Set(reflect.ValueOf(c.Column[idx])) + return nil + } + return nil + } + return fmt.Errorf("ch: can't scan %s into %s", "string", v.Type()) +} + +//------------------------------------------------------------------------------ + +func (c *UInt256Column) ConvertAssign(idx int, v reflect.Value) error { + switch v.Kind() { + case reflect.String: + v.SetString(c.Column[idx].String()) + return nil + case reflect.Slice: + if v.Type() == bytesType { + v.SetBytes(internal.Bytes(string(c.Column[idx].Bytes()))) + return nil + } + default: + if !convertAssignDriverValue(c.Column[idx].String(), v) { + return fmt.Errorf("ch: convertAssign UInt256 %x", c.Column[idx]) + } return nil } return fmt.Errorf("ch: can't scan %s into %s", "string", v.Type()) diff --git a/ch/chschema/column_gen.go b/ch/chschema/column_gen.go index 5180b68..c1c7249 100644 --- a/ch/chschema/column_gen.go +++ b/ch/chschema/column_gen.go @@ -1,12 +1,42 @@ package chschema import ( + "database/sql/driver" + "encoding/json" + "fmt" + "math/big" "reflect" + "strconv" "time" "github.com/uptrace/go-clickhouse/ch/chproto" ) +func getDriverValue(v reflect.Value) driver.Value { + if v.Kind() == reflect.Pointer && v.IsNil() { + return nil + } + + dv, ok := v.Interface().(driver.Valuer) + if !ok { + if !v.CanAddr() { + return nil + } + if dv, ok = v.Addr().Interface().(driver.Valuer); !ok { + return nil + } + } + + value, err := dv.Value() + if err != nil { + return nil + } + + return value +} + +//------------------------------------------------------------------------------ + type Int8Column struct { NumericColumnOf[int8] } @@ -1396,7 +1426,116 @@ func (c *UInt64Column) Type() reflect.Type { } func (c *UInt64Column) AppendValue(v reflect.Value) { - c.Column = append(c.Column, uint64(v.Uint())) + switch v.Kind() { + case reflect.Uint, reflect.Uintptr, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: + c.Column = append(c.Column, v.Uint()) + case reflect.String: + i, err := strconv.ParseInt(v.String(), 10, 64) + if err != nil { + return + } + c.Column = append(c.Column, uint64(i)) + case reflect.Pointer: + if v.IsNil() { + c.Column = append(c.Column, 0) + return + } + c.AppendValue(v.Elem()) + default: + value := getDriverValue(v) + if value != nil { + c.AppendValue(reflect.ValueOf(value)) + return + } + c.Column = append(c.Column, v.Uint()) + } +} + +//------------------------------------------------------------------------------ + +type UInt256Column struct { + ColumnOf[*big.Int] +} + +var _ Columnar = (*UInt256Column)(nil) + +func NewUInt256Column() Columnar { + return new(UInt256Column) +} + +var _UInt256Type = reflect.TypeOf((*big.Int)(nil)).Elem() + +const u256sz = 256 / 8 + +func (c *UInt256Column) Type() reflect.Type { + return _UInt256Type +} + +func (c *UInt256Column) AppendValue(v reflect.Value) { + switch v.Kind() { + case reflect.Uint, reflect.Uintptr, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: + c.Column = append(c.Column, new(big.Int).SetUint64(v.Uint())) + case reflect.String: + i, ok := new(big.Int).SetString(v.String(), 10) + if !ok { + return + } + c.Column = append(c.Column, i) + case reflect.Slice: + ba, ok := v.Interface().([]byte) + if !ok { + return + } + c.Column = append(c.Column, new(big.Int).SetBytes(ba)) + case reflect.Pointer: + if v.IsNil() { + c.Column = append(c.Column, new(big.Int)) + return + } + c.AppendValue(v.Elem()) + default: + value := getDriverValue(v) + if value != nil { + c.AppendValue(reflect.ValueOf(value)) + return + } + } +} + +func (c *UInt256Column) ReadFrom(rd *chproto.Reader, numRow int) error { + c.AllocForReading(numRow) + + for it := range c.Column { + b := make([]byte, u256sz, u256sz) + n, err := rd.Read(b) + if err != nil { + return err + } + if n != u256sz { + return fmt.Errorf("expected read bytes %d, got %d", u256sz, n) + } + for i := 0; i < len(b)/2; i++ { + b[i], b[len(b)-i-1] = b[len(b)-i-1], b[i] + } + c.Column[it] = new(big.Int).SetBytes(b) + } + + return nil +} + +func (c *UInt256Column) WriteTo(wr *chproto.Writer) error { + for _, n := range c.Column { + if n.BitLen() > 256 { + return fmt.Errorf("%s has bigger len %d than 256", n, n.BitLen()) + } + b := make([]byte, u256sz, u256sz) + b = n.FillBytes(b) + for i := 0; i < len(b)/2; i++ { + b[i], b[len(b)-i-1] = b[len(b)-i-1], b[i] + } + wr.Write(b) + } + return nil } //------------------------------------------------------------------------------ @@ -2201,7 +2340,37 @@ func (c *StringColumn) Type() reflect.Type { } func (c *StringColumn) AppendValue(v reflect.Value) { - c.Column = append(c.Column, string(v.String())) + if v.Kind() == reflect.Pointer && v.IsNil() { + c.Column = append(c.Column, "") + return + } + + switch vi := v.Interface().(type) { + case string: + c.Column = append(c.Column, vi) + case []byte: + c.Column = append(c.Column, string(vi)) + case json.RawMessage: + c.Column = append(c.Column, string(vi)) + default: + value := getDriverValue(v) + if value != nil { + c.AppendValue(reflect.ValueOf(value)) + return + } + if v.Kind() == reflect.String { + c.Column = append(c.Column, v.String()) + return + } + j, err := json.Marshal(vi) + if err != nil { + panic(err) + } + if string(j) == "null" { + j = nil + } + c.Column = append(c.Column, string(j)) + } } func (c *StringColumn) ReadFrom(rd *chproto.Reader, numRow int) error { diff --git a/ch/chschema/types.go b/ch/chschema/types.go index f0e737b..970ea03 100644 --- a/ch/chschema/types.go +++ b/ch/chschema/types.go @@ -93,6 +93,8 @@ func ColumnFactory(chType string, typ reflect.Type) NewColumnFunc { return NewUInt32Column case chtype.UInt64: return NewUInt64Column + case chtype.UInt256: + return NewUInt256Column case chtype.Float32: return NewFloat32Column case chtype.Float64: diff --git a/ch/chtype/chtype.go b/ch/chtype/chtype.go index 8d60ca5..24079e8 100644 --- a/ch/chtype/chtype.go +++ b/ch/chtype/chtype.go @@ -13,6 +13,7 @@ const ( UInt16 = "UInt16" UInt32 = "UInt32" UInt64 = "UInt64" + UInt256 = "UInt256" Float32 = "Float32" Float64 = "Float64" DateTime = "DateTime"