Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ module go.kenn.io/kit
go 1.26.3

require (
github.com/asg017/sqlite-vec-go-bindings v0.1.6
github.com/gofrs/flock v0.13.0
github.com/mattn/go-sqlite3 v1.14.44
github.com/posthog/posthog-go v1.12.6
github.com/stretchr/testify v1.11.1
go.opentelemetry.io/otel v1.43.0
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
github.com/asg017/sqlite-vec-go-bindings v0.1.6 h1:Nx0jAzyS38XpkKznJ9xQjFXz2X9tI7KqjwVxV8RNoww=
github.com/asg017/sqlite-vec-go-bindings v0.1.6/go.mod h1:A8+cTt/nKFsYCQF6OgzSNpKZrzNo5gQsXBTfsXHXY0Q=
github.com/bitfield/gotestdox v0.2.2 h1:x6RcPAbBbErKLnapz1QeAlf3ospg8efBsedU93CDsnE=
github.com/bitfield/gotestdox v0.2.2/go.mod h1:D+gwtS0urjBrzguAkTM2wodsTQYFHdpx8eqRJ3N+9pY=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
Expand Down Expand Up @@ -38,6 +40,8 @@ github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovk
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-sqlite3 v1.14.44 h1:3VSe+xafpbzsLbdr2AWlAZk9yRHiBhTBakioXaCKTF8=
github.com/mattn/go-sqlite3 v1.14.44/go.mod h1:pjEuOr8IwzLJP2MfGeTb0A35jauH+C2kbHKBr7yXKVQ=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/posthog/posthog-go v1.12.6 h1:N+FrKWY6DOuDhV2OMgvtKAKDYGTdtS9/nuvr0BTyBp0=
Expand Down
41 changes: 41 additions & 0 deletions vector/AGENTS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# vector package invariants

`go.kenn.io/kit/vector` owns the backend-neutral parts of an embedding
pipeline. Preserve these invariants when changing it.

## The storage boundary is the point of this package

- The core `vector` package must not import `database/sql`, a driver, or
any backend client, and must not construct backend SQL. The `Fill` and
`Search` flows reach storage only through the `Store[K, G]` interface.
- Persistence is a function of the caller's source system. Backends live
in their own subpackages (e.g. `vector/sqlitevec`) so a caller wiring
one backend never pulls another backend's driver. New backends
(pgvector, duckdb) go in sibling subpackages, not into the core.
- Backends own query construction. The differences between sqlite-vec
`vec0 MATCH`, pgvector `<=>`, and duckdb `array_distance` belong behind
`QueryGeneration`, never in the core flows.

## Keys and generations are opaque

- Document identity is the caller's type `K` and generation identity its
type `G`. msgvault uses `int64`; kata uses UUIDs. Compare them for
equality only; never assume a type, a single id namespace, or an
ordering. Backends additionally require `K`/`G` to be types
`database/sql` can bind and scan.

## Merge semantics

- `Merge` takes per-generation lists in descending preference and keeps
the earliest list's hit on overlap (prefer the newer generation during
a migration). Coverage is a union — never drop a document that only one
generation covers, and never emit duplicates.
- Cross-generation scores are not comparable. Default to
`MergeNormalizedScore`; raw-score merging is opt-in.

## Generations during migration

- The mid-migration union exists because new documents land only in the
building generation while the active generation still serves the bulk.
`Search` must keep querying every generation `LiveGenerations` returns,
in the order it returns them.
48 changes: 48 additions & 0 deletions vector/chunk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package vector

// Chunk is a window of text encoded as a single vector. Index is the
// chunk's position within the source content, starting at zero.
type Chunk struct {
Index int
Text string
}

// SplitOptions controls how Split windows content into chunks.
type SplitOptions struct {
// MaxRunes bounds the number of runes in each chunk. Values <= 0
// disable splitting and return the content as a single chunk.
MaxRunes int
// Overlap is the number of runes shared between consecutive chunks.
// It is clamped to the range [0, MaxRunes-1].
Overlap int
}

// Split windows content into overlapping chunks of at most MaxRunes runes.
// It splits on runes rather than bytes so multi-byte characters are never
// torn apart. Empty content yields no chunks.
//
// Split measures size in runes, not model tokens. Callers that budget by
// tokens should convert their token budget to an approximate rune count.
func Split(content string, o SplitOptions) []Chunk {
if content == "" {
return nil
}
runes := []rune(content)
if o.MaxRunes <= 0 || len(runes) <= o.MaxRunes {
return []Chunk{{Index: 0, Text: content}}
}

overlap := min(max(o.Overlap, 0), o.MaxRunes-1)
stride := o.MaxRunes - overlap

var chunks []Chunk
for start, idx := 0, 0; start < len(runes); start, idx = start+stride, idx+1 {
end := start + o.MaxRunes
if end >= len(runes) {
chunks = append(chunks, Chunk{Index: idx, Text: string(runes[start:])})
break
}
chunks = append(chunks, Chunk{Index: idx, Text: string(runes[start:end])})
}
return chunks
}
84 changes: 84 additions & 0 deletions vector/chunk_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package vector_test

import (
"testing"

"github.com/stretchr/testify/assert"

"go.kenn.io/kit/vector"
)

func TestSplit(t *testing.T) {
tests := []struct {
name string
content string
opts vector.SplitOptions
want []vector.Chunk
}{
{
name: "empty yields no chunks",
content: "",
opts: vector.SplitOptions{MaxRunes: 4},
want: nil,
},
{
name: "non-positive max returns single chunk",
content: "hello world",
opts: vector.SplitOptions{MaxRunes: 0},
want: []vector.Chunk{{Index: 0, Text: "hello world"}},
},
{
name: "content shorter than max is one chunk",
content: "abcd",
opts: vector.SplitOptions{MaxRunes: 8},
want: []vector.Chunk{{Index: 0, Text: "abcd"}},
},
{
name: "windows without overlap",
content: "abcdefghij",
opts: vector.SplitOptions{MaxRunes: 5},
want: []vector.Chunk{
{Index: 0, Text: "abcde"},
{Index: 1, Text: "fghij"},
},
},
{
name: "windows with overlap",
content: "abcdefghij",
opts: vector.SplitOptions{MaxRunes: 4, Overlap: 1},
want: []vector.Chunk{
{Index: 0, Text: "abcd"},
{Index: 1, Text: "defg"},
{Index: 2, Text: "ghij"},
},
},
{
name: "overlap at or above max clamps to max-1",
content: "abcdef",
opts: vector.SplitOptions{MaxRunes: 3, Overlap: 9},
want: []vector.Chunk{
{Index: 0, Text: "abc"},
{Index: 1, Text: "bcd"},
{Index: 2, Text: "cde"},
{Index: 3, Text: "def"},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, tt.want, vector.Split(tt.content, tt.opts))
})
}
}

func TestSplitDoesNotTearMultiByteRunes(t *testing.T) {
assert := assert.New(t)
// Each emoji is multiple bytes but one rune.
chunks := vector.Split("😀😁😂🤣", vector.SplitOptions{MaxRunes: 2})

assert.Equal([]vector.Chunk{
{Index: 0, Text: "😀😁"},
{Index: 1, Text: "😂🤣"},
}, chunks)
}
24 changes: 24 additions & 0 deletions vector/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Package vector provides backend-neutral building blocks for embedding
// content and searching the resulting vectors.
//
// It is organized in three layers:
//
// - Transforms and value types: Split windows content into chunks,
// Generation identifies an embedding model, EncodeBatched batches
// encode calls, and RollupByDocument and Merge reduce and combine
// search results across generations. These are pure functions.
//
// - The Store contract: Store[K, G] is the persistence interface the
// flows depend on. Implementations are a function of the caller's
// source system and own all backend SQL and query construction; see
// the sqlitevec subpackage for a worked example.
//
// - Flows: Fill runs the scan-and-fill embedding loop and Search runs
// the cross-generation query-and-merge, both over a Store.
//
// Nothing in this package opens a database, holds an index, or constructs
// backend SQL — the flows delegate every storage operation to the Store.
// Document identity is the caller's own key type K, and generation
// identity its type G; the package compares both for equality but never
// interprets them.
package vector
111 changes: 111 additions & 0 deletions vector/encode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package vector

import (
"context"
"fmt"
"sync"
)

// Vector is a single embedding.
type Vector []float32

// EncodeFunc turns a batch of texts into one vector each, in the same
// order. Implementations own the model or API client and any retry or
// backoff policy, since retryability is provider-specific.
type EncodeFunc func(ctx context.Context, texts []string) ([][]float32, error)

// BatchOptions controls how EncodeBatched groups and parallelizes calls.
type BatchOptions struct {
// BatchSize is the maximum number of chunks passed to EncodeFunc in a
// single call. Values <= 0 send every chunk in one call.
BatchSize int
// Concurrency bounds how many EncodeFunc calls run at once. Values
// <= 0 mean one call at a time.
Concurrency int
}

// EncodeBatched splits chunks into batches, invokes enc with bounded
// concurrency, and returns one Vector per input chunk in input order. It
// stops launching work at the first error or when ctx is cancelled, and
// reports the first error encountered.
func EncodeBatched(ctx context.Context, enc EncodeFunc, chunks []Chunk, o BatchOptions) ([]Vector, error) {
if enc == nil {
return nil, fmt.Errorf("encode func is nil")
}
if len(chunks) == 0 {
return nil, nil
}

batchSize := o.BatchSize
if batchSize <= 0 {
batchSize = len(chunks)
}
concurrency := o.Concurrency
if concurrency <= 0 {
concurrency = 1
}

out := make([]Vector, len(chunks))
sem := make(chan struct{}, concurrency)
var (
wg sync.WaitGroup
mu sync.Mutex
firstErr error
)
failed := func() bool {
mu.Lock()
defer mu.Unlock()
return firstErr != nil
}
setErr := func(err error) {
mu.Lock()
if firstErr == nil {
firstErr = err
}
mu.Unlock()
}

for start := 0; start < len(chunks); start += batchSize {
if ctx.Err() != nil {
setErr(ctx.Err())
break
}
if failed() {
break
}

end := min(start+batchSize, len(chunks))
texts := make([]string, end-start)
for i, c := range chunks[start:end] {
texts[i] = c.Text
}

sem <- struct{}{}
wg.Add(1)
go func(start int, texts []string) {
defer wg.Done()
defer func() { <-sem }()

vecs, err := enc(ctx, texts)
if err != nil {
setErr(fmt.Errorf("encode batch at %d: %w", start, err))
return
}
if len(vecs) != len(texts) {
setErr(fmt.Errorf("encode batch at %d: got %d vectors for %d texts", start, len(vecs), len(texts)))
return
}
// Each batch owns a disjoint index range, so writes to out
// never overlap across goroutines.
for i, v := range vecs {
out[start+i] = Vector(v)
}
}(start, texts)
}

wg.Wait()
if firstErr != nil {
return nil, firstErr
}
return out, nil
}
Loading
Loading