diff --git a/benchmarker/cmd/ann_benchmark.go b/benchmarker/cmd/ann_benchmark.go index f19476f..085efc3 100644 --- a/benchmarker/cmd/ann_benchmark.go +++ b/benchmarker/cmd/ann_benchmark.go @@ -1,54 +1,18 @@ package cmd import ( - "context" "encoding/binary" - "encoding/json" - "fmt" "math" - "math/rand" - "os" - "path/filepath" "runtime" - "slices" - "strconv" - "strings" "time" - "github.com/hashicorp/go-retryablehttp" - log "github.com/sirupsen/logrus" - "golang.org/x/exp/constraints" - "github.com/google/uuid" + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" - "github.com/weaviate/weaviate-go-client/v4/weaviate" - "github.com/weaviate/weaviate-go-client/v4/weaviate/auth" - "github.com/weaviate/weaviate-go-client/v4/weaviate/fault" - "github.com/weaviate/weaviate/entities/models" - weaviategrpc "github.com/weaviate/weaviate/grpc/generated/protocol/v1" - "github.com/weaviate/weaviate/usecases/byteops" - "google.golang.org/grpc/metadata" - "google.golang.org/protobuf/types/known/structpb" -) - -type CompressionType byte - -const ( - CompressionTypePQ CompressionType = 0 - CompressionTypeSQ CompressionType = 1 - CompressionTypeRQ CompressionType = 2 - CompressionTypeUncompressed CompressionType = 255 ) -// Batch of vectors and offset for writing to Weaviate -type Batch struct { - Vectors [][]float32 - Offset int - Filters []int -} - -// Weaviate https://github.com/weaviate/weaviate-chaos-engineering/tree/main/apps/ann-benchmarks style format -// mixed camel / snake case for compatibility +// ResultsJSONBenchmark is the ann-benchmarks.com compatible output format. +// Mixed camel/snake case for compatibility with downstream tooling. type ResultsJSONBenchmark struct { Api string `json:"api"` Ef int `json:"ef,omitempty"` @@ -74,7 +38,7 @@ type ResultsJSONBenchmark struct { Timestamp string `json:"timestamp"` } -// Convert an int to a uuid formatted string +// uuidFromInt converts an integer to a UUID-formatted string. func uuidFromInt(val int) string { bytes := make([]byte, 16) binary.BigEndian.PutUint64(bytes[8:], uint64(val)) @@ -86,7 +50,7 @@ func uuidFromInt(val int) string { return id.String() } -// Convert a uuid formatted string to an int +// intFromUUID converts a UUID-formatted string back to an integer. func intFromUUID(uuidStr string) int { id, err := uuid.Parse(uuidStr) if err != nil { @@ -96,841 +60,6 @@ func intFromUUID(uuidStr string) int { return int(val) } -// Writes a single batch of vectors to Weaviate using gRPC -func writeChunk(chunk *Batch, client *weaviategrpc.WeaviateClient, cfg *Config) { - objects := make([]*weaviategrpc.BatchObject, len(chunk.Vectors)) - - for i, vector := range chunk.Vectors { - objects[i] = &weaviategrpc.BatchObject{ - Uuid: uuidFromInt(i + chunk.Offset + cfg.Offset), - Collection: cfg.ClassName, - } - if cfg.Tenant != "" { - objects[i].Tenant = cfg.Tenant - } - if cfg.MultiVectorDimensions > 0 { - if len(vector)%cfg.MultiVectorDimensions != 0 { - log.Fatalf("Vector length %d is not a multiple of dimensions %d", - len(vector), cfg.MultiVectorDimensions) - } - rows := len(vector) / cfg.MultiVectorDimensions - - multiVec := make([][]float32, rows) - for i := 0; i < rows; i++ { - start := i * cfg.MultiVectorDimensions - end := start + cfg.MultiVectorDimensions - multiVec[i] = vector[start:end] - } - objects[i].Vectors = []*weaviategrpc.Vectors{{ - Name: "multivector", - VectorBytes: byteops.Fp32SliceOfSlicesToBytes(multiVec), - Type: weaviategrpc.Vectors_VECTOR_TYPE_MULTI_FP32, - }} - } else { - objects[i].VectorBytes = encodeVector(vector) - } - if cfg.NamedVector != "" { - vectors := make([]*weaviategrpc.Vectors, 1) - vectors[0] = &weaviategrpc.Vectors{ - VectorBytes: encodeVector(vector), - Name: cfg.NamedVector, - } - objects[i].Vectors = vectors - } - if cfg.Filter { - nonRefProperties, err := structpb.NewStruct(map[string]interface{}{ - "category": strconv.Itoa(chunk.Filters[i]), - }) - if err != nil { - log.Fatalf("Error creating filtered struct: %v", err) - } - objects[i].Properties = &weaviategrpc.BatchObject_Properties{ - NonRefProperties: nonRefProperties, - } - } - } - - batchRequest := &weaviategrpc.BatchObjectsRequest{ - Objects: objects, - } - - ctx, cancel := context.WithTimeout(context.Background(), time.Second*300) - defer cancel() - - if cfg.HttpAuth != "" { - md := metadata.Pairs( - "Authorization", fmt.Sprintf("Bearer %s", cfg.HttpAuth), - ) - ctx = metadata.NewOutgoingContext(ctx, md) - } - - response, err := (*client).BatchObjects(ctx, batchRequest) - if err != nil { - log.Fatalf("could not send batch: %v", err) - } - - for _, result := range response.GetErrors() { - if result.Error != "" { - log.Printf("Error for index %d: %s", result.Index, result.Error) - } else { - log.Printf("Successfully processed object at index %d", result.Index) - } - } -} - -func createClient(cfg *Config) *weaviate.Client { - retryClient := retryablehttp.NewClient() - retryClient.RetryMax = 10 - - wcfg := weaviate.Config{ - Host: cfg.HttpOrigin, - Scheme: cfg.HttpScheme, - ConnectionClient: retryClient.HTTPClient, - StartupTimeout: 60 * time.Second, - } - if cfg.HttpAuth != "" { - wcfg.AuthConfig = auth.ApiKey{Value: cfg.HttpAuth} - wcfg.ConnectionClient = nil - } - client, err := weaviate.NewClient(wcfg) - if err != nil { - log.Fatalf("Error creating client: %v", err) - } - return client -} - -// Re/create Weaviate schema -func createSchema(cfg *Config, client *weaviate.Client) { - err := client.Schema().ClassDeleter().WithClassName(cfg.ClassName).Do(context.Background()) - if err != nil { - log.Fatalf("Error deleting class: %v", err) - } - - multiTenancyEnabled := false - if cfg.NumTenants > 0 { - multiTenancyEnabled = true - } - - classObj := &models.Class{ - Class: cfg.ClassName, - Description: fmt.Sprintf("Created by the Weaviate Benchmarker at %s", time.Now().String()), - MultiTenancyConfig: &models.MultiTenancyConfig{ - Enabled: multiTenancyEnabled, - }, - } - - if cfg.Shards > 1 { - classObj.ShardingConfig = map[string]interface{}{ - "desiredCount": cfg.Shards, - } - } - - var vectorIndexConfig map[string]interface{} - - if cfg.IndexType == "hnsw" { - vectorIndexConfig = map[string]interface{}{ - "distance": cfg.DistanceMetric, - "efConstruction": float64(cfg.EfConstruction), - "maxConnections": float64(cfg.MaxConnections), - "cleanupIntervalSeconds": cfg.CleanupIntervalSeconds, - "flatSearchCutoff": cfg.FlatSearchCutoff, - } - if cfg.PQ == "auto" { - pqConfig := map[string]interface{}{ - "enabled": true, - "segments": cfg.PQSegments, - "trainingLimit": cfg.TrainingLimit, - } - if cfg.RescoreLimit > -1 { - pqConfig["rescoreLimit"] = cfg.RescoreLimit - } - vectorIndexConfig["pq"] = pqConfig - } else if cfg.BQ { - bqConfig := map[string]interface{}{ - "enabled": true, - } - if cfg.RescoreLimit > -1 { - bqConfig["rescoreLimit"] = cfg.RescoreLimit - } - vectorIndexConfig["bq"] = bqConfig - } else if cfg.SQ == "auto" { - vectorIndexConfig = map[string]interface{}{ - "distance": cfg.DistanceMetric, - "efConstruction": float64(cfg.EfConstruction), - "maxConnections": float64(cfg.MaxConnections), - "cleanupIntervalSeconds": cfg.CleanupIntervalSeconds, - "sq": map[string]interface{}{ - "enabled": true, - "trainingLimit": cfg.TrainingLimit, - }, - } - } else if cfg.RQ == "auto" { - rqConfig := map[string]interface{}{ - "enabled": true, - "bits": cfg.RQBits, - } - if cfg.RescoreLimit > -1 { - rqConfig["rescoreLimit"] = cfg.RescoreLimit - } - vectorIndexConfig = map[string]interface{}{ - "distance": cfg.DistanceMetric, - "efConstruction": float64(cfg.EfConstruction), - "maxConnections": float64(cfg.MaxConnections), - "cleanupIntervalSeconds": cfg.CleanupIntervalSeconds, - "rq": rqConfig, - } - } - } else if cfg.IndexType == "flat" { - // Validate that BQ and RQ are not both enabled - if cfg.BQ && cfg.RQ == "auto" { - log.Fatalf("Cannot enable both BQ and RQ on flat index type") - } - - vectorIndexConfig = map[string]interface{}{ - "distance": cfg.DistanceMetric, - } - if cfg.BQ { - bqConfig := map[string]interface{}{ - "enabled": true, - "cache": cfg.Cache, - } - if cfg.RescoreLimit > -1 { - bqConfig["rescoreLimit"] = cfg.RescoreLimit - } - vectorIndexConfig["bq"] = bqConfig - } else if cfg.RQ == "auto" { - rqConfig := map[string]interface{}{ - "enabled": true, - "bits": cfg.RQBits, - "cache": cfg.Cache, - } - if cfg.RescoreLimit > -1 { - rqConfig["rescoreLimit"] = cfg.RescoreLimit - } - vectorIndexConfig["rq"] = rqConfig - log.WithFields(log.Fields{"bits": cfg.RQBits, "indexType": "flat"}).Printf("Enabled RQ on flat index type") - } - } else if cfg.IndexType == "dynamic" { - log.WithFields(log.Fields{"threshold": cfg.DynamicThreshold}).Info("Building dynamic vector index") - vectorIndexConfig = map[string]interface{}{ - "distance": cfg.DistanceMetric, - "threshold": cfg.DynamicThreshold, - "hnsw": map[string]interface{}{ - "efConstruction": float64(cfg.EfConstruction), - "maxConnections": float64(cfg.MaxConnections), - "cleanupIntervalSeconds": cfg.CleanupIntervalSeconds, - "flatSearchCutoff": cfg.FlatSearchCutoff, - }, - "flat": map[string]interface{}{}, - } - if cfg.PQ == "auto" { - pqConfig := map[string]interface{}{ - "enabled": true, - "segments": cfg.PQSegments, - "trainingLimit": cfg.TrainingLimit, - } - if cfg.RescoreLimit > -1 { - pqConfig["rescoreLimit"] = cfg.RescoreLimit - } - vectorIndexConfig["hnsw"].(map[string]interface{})["pq"] = pqConfig - } else if cfg.BQ { - bqConfig := map[string]interface{}{ - "enabled": true, - "cache": true, - } - if cfg.RescoreLimit > -1 { - bqConfig["rescoreLimit"] = cfg.RescoreLimit - } - vectorIndexConfig["hnsw"].(map[string]interface{})["bq"] = bqConfig - } else if cfg.RQ == "auto" { - vectorIndexConfig["flat"].(map[string]interface{})["rq"] = map[string]interface{}{ - "enabled": true, - "bits": cfg.RQBits, - } - vectorIndexConfig["hnsw"].(map[string]interface{})["rq"] = map[string]interface{}{ - "enabled": true, - "bits": cfg.RQBits, - } - } - } else if cfg.IndexType == "hfresh" { - vectorIndexConfig = map[string]interface{}{ - "distance": cfg.DistanceMetric, - "maxPostingSizeKB": cfg.MaxPostingSizeKB, - "replicas": cfg.Replicas, - "rngFactor": cfg.RngFactor, - "rq": map[string]interface{}{ - "rescoreLimit": cfg.RescoreLimit, - }, - } - } else { - log.Fatalf("Unknown index type %s", cfg.IndexType) - } - - vectorIndexConfig["filterStrategy"] = cfg.FilterStrategy - - if cfg.NamedVector != "" { - vectorConfig := make(map[string]models.VectorConfig) - vectorConfig[cfg.NamedVector] = models.VectorConfig{ - Vectorizer: map[string]interface{}{"none": nil}, - VectorIndexType: cfg.IndexType, - VectorIndexConfig: vectorIndexConfig, - } - classObj.VectorConfig = vectorConfig - } else { - if cfg.MultiVectorDimensions > 0 { - vectorIndexConfig = map[string]interface{}{} - if cfg.PQ == "auto" { - pqConfig := map[string]interface{}{ - "enabled": true, - "segments": cfg.PQSegments, - "trainingLimit": cfg.TrainingLimit, - } - if cfg.RescoreLimit > -1 { - pqConfig["rescoreLimit"] = cfg.RescoreLimit - } - vectorIndexConfig["pq"] = pqConfig - } else if cfg.BQ { - bqConfig := map[string]interface{}{ - "enabled": true, - "cache": true, - } - if cfg.RescoreLimit > -1 { - bqConfig["rescoreLimit"] = cfg.RescoreLimit - } - vectorIndexConfig["bq"] = bqConfig - } else if cfg.SQ == "auto" { - vectorIndexConfig = map[string]interface{}{ - "distance": cfg.DistanceMetric, - "efConstruction": float64(cfg.EfConstruction), - "maxConnections": float64(cfg.MaxConnections), - "cleanupIntervalSeconds": cfg.CleanupIntervalSeconds, - "sq": map[string]interface{}{ - "enabled": true, - "trainingLimit": cfg.TrainingLimit, - }, - } - } else if cfg.RQ == "auto" { - rqConfig := map[string]interface{}{ - "enabled": true, - "bits": cfg.RQBits, - } - if cfg.RescoreLimit > -1 { - rqConfig["rescoreLimit"] = cfg.RescoreLimit - } - - vectorIndexConfig = map[string]interface{}{ - "distance": cfg.DistanceMetric, - "efConstruction": float64(cfg.EfConstruction), - "maxConnections": float64(cfg.MaxConnections), - "cleanupIntervalSeconds": cfg.CleanupIntervalSeconds, - "rq": rqConfig, - } - } - vectorIndexConfig["multivector"] = map[string]interface{}{ - "enabled": true, - "muvera": map[string]interface{}{ - "enabled": cfg.MuveraEnabled, - "ksim": cfg.MuveraKSim, - "dprojections": cfg.MuveraDProjections, - "repetition": cfg.MuveraRepetition, - }, - } - - classObj.VectorConfig = map[string]models.VectorConfig{ - "multivector": { - Vectorizer: map[string]interface{}{ - "none": map[string]interface{}{}, - }, - VectorIndexConfig: vectorIndexConfig, - VectorIndexType: cfg.IndexType, - }, - } - } else { - classObj.VectorIndexType = cfg.IndexType - classObj.VectorIndexConfig = vectorIndexConfig - } - } - - if cfg.ReplicationFactor > 1 || cfg.AsyncReplicationEnabled { - classObj.ReplicationConfig = &models.ReplicationConfig{ - Factor: int64(cfg.ReplicationFactor), - AsyncEnabled: cfg.AsyncReplicationEnabled, - } - } - - err = client.Schema().ClassCreator().WithClass(classObj).Do(context.Background()) - if err != nil { - panic(err) - } - log.Printf("Created class %s", cfg.ClassName) -} - -func deleteChunk(chunk *Batch, client *weaviate.Client, cfg *Config) { - log.Debugf("Deleting chunk of %d vectors index %d", len(chunk.Vectors), chunk.Offset) - for i := range chunk.Vectors { - uuid := uuidFromInt(i + chunk.Offset + cfg.Offset) - err := client.Data().Deleter().WithClassName(cfg.ClassName).WithID(uuid).Do(context.Background()) - if err != nil { - log.Fatalf("Error deleting object: %v", err) - } - } -} - -func deleteUuidSlice(cfg *Config, client *weaviate.Client, slice []int) { - log.WithFields(log.Fields{"length": len(slice), "class": cfg.ClassName}).Printf("Deleting objects to trigger tombstone operations") - for _, i := range slice { - err := client.Data().Deleter().WithClassName(cfg.ClassName).WithID(uuidFromInt(i)).Do(context.Background()) - if err != nil { - log.Fatalf("Error deleting object: %v", err) - } - } - log.WithFields(log.Fields{"length": len(slice), "class": cfg.ClassName}).Printf("Completed deletes") -} - -func deleteUuidRange(cfg *Config, client *weaviate.Client, start int, end int) { - var slice []int - for i := start; i < end; i++ { - slice = append(slice, i) - } - deleteUuidSlice(cfg, client, slice) -} - -func addTenantIfNeeded(cfg *Config, client *weaviate.Client) { - if cfg.Tenant == "" { - return - } - err := client.Schema().TenantsCreator(). - WithClassName(cfg.ClassName). - WithTenants(models.Tenant{Name: cfg.Tenant}). - Do(context.Background()) - if err != nil { - log.Printf("Error adding tenant retrying in 1 second %v", err) - time.Sleep(1 * time.Second) - addTenantIfNeeded(cfg, client) - } -} - -// Update ef parameter on the Weaviate schema -func updateEf(ef int, cfg *Config, client *weaviate.Client) { - classConfig, err := client.Schema().ClassGetter().WithClassName(cfg.ClassName).Do(context.Background()) - if err != nil { - panic(err) - } - - var vectorIndexConfig map[string]interface{} - - if cfg.NamedVector != "" { - vectorIndexConfig = classConfig.VectorConfig[cfg.NamedVector].VectorIndexConfig.(map[string]interface{}) - } else if cfg.MultiVectorDimensions > 0 { - vectorIndexConfig = classConfig.VectorConfig["multivector"].VectorIndexConfig.(map[string]interface{}) - } else { - vectorIndexConfig = classConfig.VectorIndexConfig.(map[string]interface{}) - } - - switch cfg.IndexType { - case "hnsw": - vectorIndexConfig["ef"] = ef - case "flat": - if bq, exists := vectorIndexConfig["bq"]; exists && cfg.BQ { - bqConfig := bq.(map[string]interface{}) - bqConfig["rescoreLimit"] = ef - } else if rq, exists := vectorIndexConfig["rq"]; exists { - rqConfig := rq.(map[string]interface{}) - rqConfig["rescoreLimit"] = ef - } - case "dynamic": - hnswConfig := vectorIndexConfig["hnsw"].(map[string]interface{}) - hnswConfig["ef"] = ef - flatConfig := vectorIndexConfig["flat"].(map[string]interface{}) - if bq, exists := flatConfig["bq"]; exists && cfg.BQ { - bqConfig := bq.(map[string]interface{}) - bqConfig["rescoreLimit"] = ef - } else if rq, exists := flatConfig["rq"]; exists { - rqConfig := rq.(map[string]interface{}) - rqConfig["rescoreLimit"] = ef - } - case "hfresh": - vectorIndexConfig["searchProbe"] = ef - } - - if cfg.NamedVector != "" { - vectorConfig := classConfig.VectorConfig[cfg.NamedVector] - vectorConfig.VectorIndexConfig = vectorIndexConfig - classConfig.VectorConfig[cfg.NamedVector] = vectorConfig - } else if cfg.MultiVectorDimensions > 0 { - vectorConfig := classConfig.VectorConfig["multivector"] - vectorConfig.VectorIndexConfig = vectorIndexConfig - classConfig.VectorConfig["multivector"] = vectorConfig - } else { - classConfig.VectorIndexConfig = vectorIndexConfig - } - - err = client.Schema().ClassUpdater().WithClass(classConfig).Do(context.Background()) - if err != nil { - panic(err) - } -} - -func waitReady(cfg *Config, client *weaviate.Client, indexStart time.Time, maxDuration time.Duration, minQueueSize int64) time.Time { - start := time.Now() - current := time.Now() - - log.Infof("Waiting for queue to be empty\n") - for current.Sub(start) < maxDuration { - nodesStatus, err := client.Cluster().NodesStatusGetter().WithOutput("verbose").Do(context.Background()) - if err != nil { - panic(err) - } - totalShardQueue := int64(0) - for _, n := range nodesStatus.Nodes { - for _, s := range n.Shards { - if s.Class == cfg.ClassName && s.VectorQueueLength > 0 { - totalShardQueue += s.VectorQueueLength - } - } - } - if totalShardQueue < minQueueSize { - log.WithFields(log.Fields{"duration": current.Sub(start)}).Printf("Queue ready\n") - log.WithFields(log.Fields{"duration": current.Sub(indexStart)}).Printf("Total load and queue ready\n") - return current - } - time.Sleep(2 * time.Second) - current = time.Now() - } - log.Fatalf("Queue wasn't ready in %s\n", maxDuration) - return current -} - -// Update ef parameter on the Weaviate schema -func enableCompression(cfg *Config, client *weaviate.Client, dimensions uint, compressionType CompressionType) { - classConfig, err := client.Schema().ClassGetter().WithClassName(cfg.ClassName).Do(context.Background()) - if err != nil { - panic(err) - } - - var segments uint - var vectorIndexConfig map[string]interface{} - - if cfg.MultiVectorDimensions > 0 { - vectorIndexConfig = classConfig.VectorConfig["multivector"].VectorIndexConfig.(map[string]interface{}) - } else { - if cfg.NamedVector == "" { - vectorIndexConfig = classConfig.VectorIndexConfig.(map[string]interface{}) - } else { - vectorIndexConfig = classConfig.VectorConfig[cfg.NamedVector].VectorIndexConfig.(map[string]interface{}) - classConfig.Vectorizer = "" - } - } - - switch compressionType { - case CompressionTypePQ: - if dimensions%cfg.PQRatio != 0 { - log.Fatalf("PQ ratio of %d and dimensions of %d incompatible", cfg.PQRatio, dimensions) - } - if !cfg.MuveraEnabled { - segments = dimensions / cfg.PQRatio - } else { - segments = uint(math.Pow(2, float64(cfg.MuveraKSim))*float64(cfg.MuveraDProjections)*float64(cfg.MuveraRepetition)) / cfg.PQRatio - } - - pqConfig := map[string]interface{}{ - "enabled": true, - "segments": segments, - "trainingLimit": cfg.TrainingLimit, - } - if cfg.RescoreLimit > -1 { - pqConfig["rescoreLimit"] = cfg.RescoreLimit - } - vectorIndexConfig["pq"] = pqConfig - case CompressionTypeSQ: - sqConfig := map[string]interface{}{ - "enabled": true, - "trainingLimit": cfg.TrainingLimit, - } - if cfg.RescoreLimit > -1 { - sqConfig["rescoreLimit"] = cfg.RescoreLimit - } - vectorIndexConfig["sq"] = sqConfig - case CompressionTypeRQ: - rqConfig := map[string]interface{}{ - "enabled": true, - "bits": cfg.RQBits, - } - if cfg.RescoreLimit > -1 { - rqConfig["rescoreLimit"] = cfg.RescoreLimit - } - vectorIndexConfig["rq"] = rqConfig - } - - if cfg.MultiVectorDimensions > 0 { - vectorConfig := classConfig.VectorConfig["multivector"] - vectorConfig.VectorIndexConfig = vectorIndexConfig - classConfig.VectorConfig["multivector"] = vectorConfig - } else { - if cfg.NamedVector == "" { - classConfig.VectorIndexConfig = vectorIndexConfig - } else { - vectorConfig := classConfig.VectorConfig[cfg.NamedVector] - vectorConfig.VectorIndexConfig = vectorIndexConfig - classConfig.VectorConfig[cfg.NamedVector] = vectorConfig - } - } - - err = client.Schema().ClassUpdater().WithClass(classConfig).Do(context.Background()) - if err != nil { - panic(err) - } - switch compressionType { - case CompressionTypePQ: - log.WithFields(log.Fields{"segments": segments, "dimensions": dimensions}).Printf("Enabled PQ. Waiting for shard ready.\n") - case CompressionTypeSQ: - log.Printf("Enabled SQ. Waiting for shard ready.\n") - } - - start := time.Now() - - for { - time.Sleep(3 * time.Second) - diff := time.Since(start) - if diff.Minutes() > 50 { - log.Fatalf("Shard still not ready after 50 minutes, exiting..\n") - } - shards, err := client.Schema().ShardsGetter().WithClassName(cfg.ClassName).Do(context.Background()) - if err != nil || len(shards) == 0 { - if weaviateErr, ok := err.(*fault.WeaviateClientError); ok { - log.Warnf("Error getting schema: %v", weaviateErr.DerivedFromError) - } else { - log.Warnf("Error getting schema: %v", err) - } - continue - } - ready := true - for _, shard := range shards { - if shard.Status != "READY" { - ready = false - } - } - if ready { - break - } - } - - endTime := time.Now() - switch compressionType { - case CompressionTypePQ: - log.WithFields(log.Fields{"segments": segments, "dimensions": dimensions}).Printf("PQ Completed in %v\n", endTime.Sub(start)) - case CompressionTypeSQ: - log.Printf("SQ Completed in %v\n", endTime.Sub(start)) - case CompressionTypeRQ: - log.Printf("RQ Completed in %v\n", endTime.Sub(start)) - } -} - -func parseEfValues(s string) ([]int, error) { - strs := strings.Split(s, ",") - nums := make([]int, len(strs)) - for i, str := range strs { - num, err := strconv.Atoi(str) - if err != nil { - return nil, fmt.Errorf("error converting efArray '%s' to integer: %v", str, err) - } - nums[i] = num - } - return nums, nil -} - -func runQueries(cfg *Config, importTime time.Duration, testData [][]float32, neighbors [][]int, filters []int) { - runID := strconv.FormatInt(time.Now().Unix(), 10) - - efCandidates, err := parseEfValues(cfg.EfArray) - if err != nil { - log.Fatalf("Error parsing efArray, expected commas separated format \"16,32,64\" but:%v\n", err) - } - - // Read once at this point (after import and compaction delay) to get accurate memory stats - memstats := &Memstats{} - if !cfg.SkipMemoryStats { - memstats, err = readMemoryMetrics(cfg) - if err != nil { - log.Warnf("Error reading memory stats: %v", err) - memstats = &Memstats{} - } - } - - client := createClient(cfg) - os.Mkdir("./results", 0o755) - - iteration := 0 - for { - shouldStop := shouldStopRunQueries(iteration, cfg) - if cfg.WaitForBackground && shouldStop { - // todo - break - } - - iteration++ - iterationRunID := fmt.Sprintf("%d", iteration) - isFinalIteration := !cfg.WaitForBackground || shouldStop - - benchmarkResultsMap := make([]map[string]interface{}, 0, len(efCandidates)) - for _, ef := range efCandidates { - updateEf(ef, cfg, client) - - var result Results - - if cfg.QueryDuration > 0 { - result = benchmarkANNDuration(*cfg, testData, neighbors, filters) - } else { - result = benchmarkANN(*cfg, testData, neighbors, filters) - } - - if cfg.IndexType == "hnsw" || cfg.IndexType == "dynamic" { - log.WithFields(log.Fields{ - "mean": result.Mean, "qps": result.QueriesPerSecond, "recall": result.Recall, "ndcg": result.NDCG, - "parallel": cfg.Parallel, "limit": cfg.Limit, - "api": cfg.API, "ef": ef, "count": result.Total, "failed": result.Failed, - }).Info("Benchmark result") - } else if cfg.IndexType == "hfresh" { - log.WithFields(log.Fields{ - "mean": result.Mean, "qps": result.QueriesPerSecond, "recall": result.Recall, "ndcg": result.NDCG, - "parallel": cfg.Parallel, "limit": cfg.Limit, - "api": cfg.API, "searchProbe": ef, "count": result.Total, "failed": result.Failed, - }).Info("Benchmark result") - } else { - log.WithFields(log.Fields{ - "mean": result.Mean, "qps": result.QueriesPerSecond, "recall": result.Recall, "ndcg": result.NDCG, - "parallel": cfg.Parallel, "limit": cfg.Limit, - "api": cfg.API, "rescoreLimit": ef, "count": result.Total, "failed": result.Failed, - }).Info("Benchmark result") - } - - dataset := filepath.Base(cfg.BenchmarkFile) - - var resultMap map[string]interface{} - - benchResult := ResultsJSONBenchmark{ - Api: cfg.API, - EfConstruction: cfg.EfConstruction, - MaxConnections: cfg.MaxConnections, - Mean: result.Mean.Seconds(), - P99Latency: result.Percentiles[len(result.Percentiles)-1].Seconds(), - QueriesPerSecond: result.QueriesPerSecond, - Shards: cfg.Shards, - Parallelization: cfg.Parallel, - Limit: cfg.Limit, - ImportTime: importTime.Seconds(), - RunID: runID, - IterationRunID: iterationRunID, - Dataset: dataset, - NDCG: result.NDCG, - Recall: result.Recall, - HeapAllocBytes: memstats.HeapAllocBytes, - HeapInuseBytes: memstats.HeapInuseBytes, - HeapSysBytes: memstats.HeapSysBytes, - Timestamp: time.Now().Format(time.RFC3339), - } - switch cfg.IndexType { - case "flat": - benchResult.RescoreLimit = ef - case "hnsw", "dynamic": - benchResult.Ef = ef - case "hfresh": - benchResult.SearchProbe = ef - } - - jsonData, err := json.Marshal(benchResult) - if err != nil { - log.Fatalf("Error converting result to json") - } - - if err := json.Unmarshal(jsonData, &resultMap); err != nil { - log.Fatalf("Error converting json to map") - } - - if cfg.LabelMap != nil { - for key, value := range cfg.LabelMap { - resultMap[key] = value - } - } - - if isFinalIteration { - resultMap["finalIteration"] = true - } - - benchmarkResultsMap = append(benchmarkResultsMap, resultMap) - - } - - data, err := json.MarshalIndent(benchmarkResultsMap, "", " ") - if err != nil { - log.Fatalf("Error marshaling benchmark results: %v", err) - } - - err = os.WriteFile(fmt.Sprintf("./results/%s.json", runID), data, 0o644) - if err != nil { - log.Fatalf("Error writing benchmark results to file: %v", err) - } - - if !cfg.WaitForBackground { - break - } - - } -} - -func shouldStopRunQueries(iteration int, cfg *Config) bool { - if cfg.IndexType != "hfresh" { - return true - } - if iteration == 0 { // we want to trigger merge operations - return false - } - - metrics, err := readHFreshMetrics(cfg) - if err != nil { - log.WithError(err).Warn("Failed to read HFresh pending operations metrics") - return false - } - - noPendingOps := metrics.PendingSplitOperations == 0 && - metrics.PendingMergeOperations == 0 && - metrics.PendingReassignOperations == 0 - - if noPendingOps { - log.WithFields(log.Fields{ - "iteration": iteration, - }).Info("All HFresh background operations complete") - return true - } - - secs := 30 - - for { - metrics, err := readHFreshMetrics(cfg) - if err != nil { - log.WithError(err).Warn("Failed to read HFresh pending operations metrics") - return false - } - - log.WithFields(log.Fields{ - "iteration": iteration, - "pendingSplitOperations": metrics.PendingSplitOperations, - "pendingMergeOperations": metrics.PendingMergeOperations, - "pendingReassignOperations": metrics.PendingReassignOperations, - }).Info("HFresh background operations still running, checking again in ", secs, " seconds") - noPendingOps := metrics.PendingSplitOperations == 0 && - metrics.PendingMergeOperations == 0 && - metrics.PendingReassignOperations == 0 - - if noPendingOps { - break - } - time.Sleep(time.Duration(secs) * time.Second) - } - - return false -} - var annBenchmarkCommand = &cobra.Command{ Use: "ann-benchmark", Short: "Benchmark ANN Benchmark style datasets", @@ -952,7 +81,6 @@ var annBenchmarkCommand = &cobra.Command{ var dataset Dataset if len(cfg.DatasetRepo) > 0 { dataset = NewParquetDataset(cfg.DatasetRepo, cfg.Dataset, cfg.MultiVectorDimensions, cfg.Filter) - // dataset = NewParquetDataset("tobias-weaviate/ann-datasets", "dbpedia-openai-ada002-1536-float32-angular-100k", cfg.MultiVectorDimensions, cfg.Filter) } else { dataset = NewHdf5Dataset(cfg.BenchmarkFile, cfg.MultiVectorDimensions, cfg.Filter) } @@ -1174,100 +302,3 @@ func initAnnBenchmark() { annBenchmarkCommand.PersistentFlags().Float64Var(&globalConfig.RngFactor, "rngFactor", 10.0, "RNG factor for HFresh index (default 10.0)") } - -func benchmarkANN(cfg Config, queries Queries, neighbors Neighbors, filters []int) Results { - cfg.Queries = len(queries) - - i := 0 - return benchmark(cfg, func(className string) QueryWithNeighbors { - defer func() { i++ }() - - tenant := "" - if cfg.NumTenants > 0 { - tenant = fmt.Sprint(rand.Intn(cfg.NumTenants)) - } - filter := -1 - if cfg.Filter { - filter = filters[i] - } - - return QueryWithNeighbors{ - Query: nearVectorQueryGrpc(&cfg, queries[i], tenant, filter), - Neighbors: neighbors[i], - } - }) -} - -type Number interface { - constraints.Float | constraints.Integer -} - -func median[T Number](data []T) float64 { - dataCopy := make([]T, len(data)) - copy(dataCopy, data) - - slices.Sort(dataCopy) - - var median float64 - l := len(dataCopy) - if l == 0 { - return 0 - } else if l%2 == 0 { - median = float64((dataCopy[l/2-1] + dataCopy[l/2]) / 2.0) - } else { - median = float64(dataCopy[l/2]) - } - - return median -} - -type sampledResults struct { - Min []time.Duration - Max []time.Duration - Mean []time.Duration - Took []time.Duration - QueriesPerSecond []float64 - Recall []float64 - NDCG []float64 - Results []Results -} - -func benchmarkANNDuration(cfg Config, queries Queries, neighbors Neighbors, filters []int) Results { - cfg.Queries = len(queries) - - var samples sampledResults - - startTime := time.Now() - - var results Results - - for time.Since(startTime) < time.Duration(cfg.QueryDuration)*time.Second { - results = benchmarkANN(cfg, queries, neighbors, filters) - samples.Min = append(samples.Min, results.Min) - samples.Max = append(samples.Max, results.Max) - samples.Mean = append(samples.Mean, results.Mean) - samples.Took = append(samples.Took, results.Took) - samples.QueriesPerSecond = append(samples.QueriesPerSecond, results.QueriesPerSecond) - samples.NDCG = append(samples.NDCG, results.NDCG) - samples.Recall = append(samples.Recall, results.Recall) - samples.Results = append(samples.Results, results) - } - - var medianResult Results - - medianResult.Min = time.Duration(median(samples.Min)) - medianResult.Max = time.Duration(median(samples.Max)) - medianResult.Mean = time.Duration(median(samples.Mean)) - medianResult.Took = time.Duration(median(samples.Took)) - medianResult.QueriesPerSecond = median(samples.QueriesPerSecond) - medianResult.Percentiles = results.Percentiles - medianResult.PercentilesLabels = results.PercentilesLabels - medianResult.Total = results.Total - medianResult.Successful = results.Successful - medianResult.Failed = results.Failed - medianResult.Parallelization = cfg.Parallel - medianResult.Recall = median(samples.Recall) - medianResult.NDCG = median(samples.NDCG) - - return medianResult -} diff --git a/benchmarker/cmd/ann_ingest.go b/benchmarker/cmd/ann_ingest.go new file mode 100644 index 0000000..286c279 --- /dev/null +++ b/benchmarker/cmd/ann_ingest.go @@ -0,0 +1,294 @@ +package cmd + +import ( + "context" + "fmt" + "math" + "strconv" + "time" + + "github.com/hashicorp/go-retryablehttp" + log "github.com/sirupsen/logrus" + "github.com/weaviate/weaviate-go-client/v4/weaviate" + "github.com/weaviate/weaviate-go-client/v4/weaviate/auth" + "github.com/weaviate/weaviate-go-client/v4/weaviate/fault" + weaviategrpc "github.com/weaviate/weaviate/grpc/generated/protocol/v1" + "github.com/weaviate/weaviate/usecases/byteops" + "google.golang.org/grpc/metadata" + "google.golang.org/protobuf/types/known/structpb" +) + +type CompressionType byte + +const ( + CompressionTypePQ CompressionType = 0 + CompressionTypeSQ CompressionType = 1 + CompressionTypeRQ CompressionType = 2 + CompressionTypeUncompressed CompressionType = 255 +) + +// Batch of vectors and offset for writing to Weaviate +type Batch struct { + Vectors [][]float32 + Offset int + Filters []int +} + +func createClient(cfg *Config) *weaviate.Client { + retryClient := retryablehttp.NewClient() + retryClient.RetryMax = 10 + + wcfg := weaviate.Config{ + Host: cfg.HttpOrigin, + Scheme: cfg.HttpScheme, + ConnectionClient: retryClient.HTTPClient, + StartupTimeout: 60 * time.Second, + } + if cfg.HttpAuth != "" { + wcfg.AuthConfig = auth.ApiKey{Value: cfg.HttpAuth} + wcfg.ConnectionClient = nil + } + client, err := weaviate.NewClient(wcfg) + if err != nil { + log.Fatalf("Error creating client: %v", err) + } + return client +} + +// writeChunk writes a single batch of vectors to Weaviate using gRPC. +func writeChunk(chunk *Batch, client *weaviategrpc.WeaviateClient, cfg *Config) { + objects := make([]*weaviategrpc.BatchObject, len(chunk.Vectors)) + + for i, vector := range chunk.Vectors { + objects[i] = &weaviategrpc.BatchObject{ + Uuid: uuidFromInt(i + chunk.Offset + cfg.Offset), + Collection: cfg.ClassName, + } + if cfg.Tenant != "" { + objects[i].Tenant = cfg.Tenant + } + if cfg.MultiVectorDimensions > 0 { + if len(vector)%cfg.MultiVectorDimensions != 0 { + log.Fatalf("Vector length %d is not a multiple of dimensions %d", + len(vector), cfg.MultiVectorDimensions) + } + rows := len(vector) / cfg.MultiVectorDimensions + + multiVec := make([][]float32, rows) + for i := 0; i < rows; i++ { + start := i * cfg.MultiVectorDimensions + end := start + cfg.MultiVectorDimensions + multiVec[i] = vector[start:end] + } + objects[i].Vectors = []*weaviategrpc.Vectors{{ + Name: "multivector", + VectorBytes: byteops.Fp32SliceOfSlicesToBytes(multiVec), + Type: weaviategrpc.Vectors_VECTOR_TYPE_MULTI_FP32, + }} + } else { + objects[i].VectorBytes = encodeVector(vector) + } + if cfg.NamedVector != "" { + vectors := make([]*weaviategrpc.Vectors, 1) + vectors[0] = &weaviategrpc.Vectors{ + VectorBytes: encodeVector(vector), + Name: cfg.NamedVector, + } + objects[i].Vectors = vectors + } + if cfg.Filter { + nonRefProperties, err := structpb.NewStruct(map[string]interface{}{ + "category": strconv.Itoa(chunk.Filters[i]), + }) + if err != nil { + log.Fatalf("Error creating filtered struct: %v", err) + } + objects[i].Properties = &weaviategrpc.BatchObject_Properties{ + NonRefProperties: nonRefProperties, + } + } + } + + batchRequest := &weaviategrpc.BatchObjectsRequest{ + Objects: objects, + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*300) + defer cancel() + + if cfg.HttpAuth != "" { + md := metadata.Pairs( + "Authorization", fmt.Sprintf("Bearer %s", cfg.HttpAuth), + ) + ctx = metadata.NewOutgoingContext(ctx, md) + } + + response, err := (*client).BatchObjects(ctx, batchRequest) + if err != nil { + log.Fatalf("could not send batch: %v", err) + } + + for _, result := range response.GetErrors() { + if result.Error != "" { + log.Printf("Error for index %d: %s", result.Index, result.Error) + } else { + log.Printf("Successfully processed object at index %d", result.Index) + } + } +} + +func waitReady(cfg *Config, client *weaviate.Client, indexStart time.Time, maxDuration time.Duration, minQueueSize int64) time.Time { + start := time.Now() + current := time.Now() + + log.Infof("Waiting for queue to be empty\n") + for current.Sub(start) < maxDuration { + nodesStatus, err := client.Cluster().NodesStatusGetter().WithOutput("verbose").Do(context.Background()) + if err != nil { + panic(err) + } + totalShardQueue := int64(0) + for _, n := range nodesStatus.Nodes { + for _, s := range n.Shards { + if s.Class == cfg.ClassName && s.VectorQueueLength > 0 { + totalShardQueue += s.VectorQueueLength + } + } + } + if totalShardQueue < minQueueSize { + log.WithFields(log.Fields{"duration": current.Sub(start)}).Printf("Queue ready\n") + log.WithFields(log.Fields{"duration": current.Sub(indexStart)}).Printf("Total load and queue ready\n") + return current + } + time.Sleep(2 * time.Second) + current = time.Now() + } + + log.Fatalf("Queue wasn't ready in %s\n", maxDuration) + return current +} + +// enableCompression enables the specified compression type on an already-loaded collection. +func enableCompression(cfg *Config, client *weaviate.Client, dimensions uint, compressionType CompressionType) { + classConfig, err := client.Schema().ClassGetter().WithClassName(cfg.ClassName).Do(context.Background()) + if err != nil { + panic(err) + } + + var segments uint + var vectorIndexConfig map[string]interface{} + + if cfg.MultiVectorDimensions > 0 { + vectorIndexConfig = classConfig.VectorConfig["multivector"].VectorIndexConfig.(map[string]interface{}) + } else { + if cfg.NamedVector == "" { + vectorIndexConfig = classConfig.VectorIndexConfig.(map[string]interface{}) + } else { + vectorIndexConfig = classConfig.VectorConfig[cfg.NamedVector].VectorIndexConfig.(map[string]interface{}) + classConfig.Vectorizer = "" + } + } + + switch compressionType { + case CompressionTypePQ: + if dimensions%cfg.PQRatio != 0 { + log.Fatalf("PQ ratio of %d and dimensions of %d incompatible", cfg.PQRatio, dimensions) + } + if !cfg.MuveraEnabled { + segments = dimensions / cfg.PQRatio + } else { + segments = uint(math.Pow(2, float64(cfg.MuveraKSim))*float64(cfg.MuveraDProjections)*float64(cfg.MuveraRepetition)) / cfg.PQRatio + } + + pqConfig := map[string]interface{}{ + "enabled": true, + "segments": segments, + "trainingLimit": cfg.TrainingLimit, + } + if cfg.RescoreLimit > -1 { + pqConfig["rescoreLimit"] = cfg.RescoreLimit + } + vectorIndexConfig["pq"] = pqConfig + case CompressionTypeSQ: + sqConfig := map[string]interface{}{ + "enabled": true, + "trainingLimit": cfg.TrainingLimit, + } + if cfg.RescoreLimit > -1 { + sqConfig["rescoreLimit"] = cfg.RescoreLimit + } + vectorIndexConfig["sq"] = sqConfig + case CompressionTypeRQ: + rqConfig := map[string]interface{}{ + "enabled": true, + "bits": cfg.RQBits, + } + if cfg.RescoreLimit > -1 { + rqConfig["rescoreLimit"] = cfg.RescoreLimit + } + vectorIndexConfig["rq"] = rqConfig + } + + if cfg.MultiVectorDimensions > 0 { + vectorConfig := classConfig.VectorConfig["multivector"] + vectorConfig.VectorIndexConfig = vectorIndexConfig + classConfig.VectorConfig["multivector"] = vectorConfig + } else { + if cfg.NamedVector == "" { + classConfig.VectorIndexConfig = vectorIndexConfig + } else { + vectorConfig := classConfig.VectorConfig[cfg.NamedVector] + vectorConfig.VectorIndexConfig = vectorIndexConfig + classConfig.VectorConfig[cfg.NamedVector] = vectorConfig + } + } + + err = client.Schema().ClassUpdater().WithClass(classConfig).Do(context.Background()) + if err != nil { + panic(err) + } + switch compressionType { + case CompressionTypePQ: + log.WithFields(log.Fields{"segments": segments, "dimensions": dimensions}).Printf("Enabled PQ. Waiting for shard ready.\n") + case CompressionTypeSQ: + log.Printf("Enabled SQ. Waiting for shard ready.\n") + } + + start := time.Now() + + for { + time.Sleep(3 * time.Second) + diff := time.Since(start) + if diff.Minutes() > 50 { + log.Fatalf("Shard still not ready after 50 minutes, exiting..\n") + } + shards, err := client.Schema().ShardsGetter().WithClassName(cfg.ClassName).Do(context.Background()) + if err != nil || len(shards) == 0 { + if weaviateErr, ok := err.(*fault.WeaviateClientError); ok { + log.Warnf("Error getting schema: %v", weaviateErr.DerivedFromError) + } else { + log.Warnf("Error getting schema: %v", err) + } + continue + } + ready := true + for _, shard := range shards { + if shard.Status != "READY" { + ready = false + } + } + if ready { + break + } + } + + endTime := time.Now() + switch compressionType { + case CompressionTypePQ: + log.WithFields(log.Fields{"segments": segments, "dimensions": dimensions}).Printf("PQ Completed in %v\n", endTime.Sub(start)) + case CompressionTypeSQ: + log.Printf("SQ Completed in %v\n", endTime.Sub(start)) + case CompressionTypeRQ: + log.Printf("RQ Completed in %v\n", endTime.Sub(start)) + } +} diff --git a/benchmarker/cmd/ann_query.go b/benchmarker/cmd/ann_query.go new file mode 100644 index 0000000..9861c02 --- /dev/null +++ b/benchmarker/cmd/ann_query.go @@ -0,0 +1,318 @@ +package cmd + +import ( + "encoding/json" + "fmt" + "math/rand" + "os" + "path/filepath" + "slices" + "strconv" + "strings" + "time" + + log "github.com/sirupsen/logrus" + "golang.org/x/exp/constraints" +) + +func parseEfValues(s string) ([]int, error) { + strs := strings.Split(s, ",") + nums := make([]int, len(strs)) + for i, str := range strs { + num, err := strconv.Atoi(str) + if err != nil { + return nil, fmt.Errorf("error converting efArray '%s' to integer: %v", str, err) + } + nums[i] = num + } + return nums, nil +} + +func runQueries(cfg *Config, importTime time.Duration, testData [][]float32, neighbors [][]int, filters []int) { + runID := strconv.FormatInt(time.Now().Unix(), 10) + + efCandidates, err := parseEfValues(cfg.EfArray) + if err != nil { + log.Fatalf("Error parsing efArray, expected commas separated format \"16,32,64\" but:%v\n", err) + } + + // Read once at this point (after import and compaction delay) to get accurate memory stats + memstats := &Memstats{} + if !cfg.SkipMemoryStats { + memstats, err = readMemoryMetrics(cfg) + if err != nil { + log.Warnf("Error reading memory stats: %v", err) + memstats = &Memstats{} + } + } + + client := createClient(cfg) + os.Mkdir("./results", 0o755) + + iteration := 0 + for { + shouldStop := shouldStopRunQueries(iteration, cfg) + if cfg.WaitForBackground && shouldStop { + // todo + break + } + + iteration++ + iterationRunID := fmt.Sprintf("%d", iteration) + isFinalIteration := !cfg.WaitForBackground || shouldStop + + benchmarkResultsMap := make([]map[string]interface{}, 0, len(efCandidates)) + for _, ef := range efCandidates { + updateEf(ef, cfg, client) + + var result Results + + if cfg.QueryDuration > 0 { + result = benchmarkANNDuration(*cfg, testData, neighbors, filters) + } else { + result = benchmarkANN(*cfg, testData, neighbors, filters) + } + + if cfg.IndexType == "hnsw" || cfg.IndexType == "dynamic" { + log.WithFields(log.Fields{ + "mean": result.Mean, "qps": result.QueriesPerSecond, "recall": result.Recall, "ndcg": result.NDCG, + "parallel": cfg.Parallel, "limit": cfg.Limit, + "api": cfg.API, "ef": ef, "count": result.Total, "failed": result.Failed, + }).Info("Benchmark result") + } else if cfg.IndexType == "hfresh" { + log.WithFields(log.Fields{ + "mean": result.Mean, "qps": result.QueriesPerSecond, "recall": result.Recall, "ndcg": result.NDCG, + "parallel": cfg.Parallel, "limit": cfg.Limit, + "api": cfg.API, "searchProbe": ef, "count": result.Total, "failed": result.Failed, + }).Info("Benchmark result") + } else { + log.WithFields(log.Fields{ + "mean": result.Mean, "qps": result.QueriesPerSecond, "recall": result.Recall, "ndcg": result.NDCG, + "parallel": cfg.Parallel, "limit": cfg.Limit, + "api": cfg.API, "rescoreLimit": ef, "count": result.Total, "failed": result.Failed, + }).Info("Benchmark result") + } + + dataset := filepath.Base(cfg.BenchmarkFile) + + var resultMap map[string]interface{} + + benchResult := ResultsJSONBenchmark{ + Api: cfg.API, + EfConstruction: cfg.EfConstruction, + MaxConnections: cfg.MaxConnections, + Mean: result.Mean.Seconds(), + P99Latency: result.Percentiles[len(result.Percentiles)-1].Seconds(), + QueriesPerSecond: result.QueriesPerSecond, + Shards: cfg.Shards, + Parallelization: cfg.Parallel, + Limit: cfg.Limit, + ImportTime: importTime.Seconds(), + RunID: runID, + IterationRunID: iterationRunID, + Dataset: dataset, + NDCG: result.NDCG, + Recall: result.Recall, + HeapAllocBytes: memstats.HeapAllocBytes, + HeapInuseBytes: memstats.HeapInuseBytes, + HeapSysBytes: memstats.HeapSysBytes, + Timestamp: time.Now().Format(time.RFC3339), + } + switch cfg.IndexType { + case "flat": + benchResult.RescoreLimit = ef + case "hnsw", "dynamic": + benchResult.Ef = ef + case "hfresh": + benchResult.SearchProbe = ef + } + + jsonData, err := json.Marshal(benchResult) + if err != nil { + log.Fatalf("Error converting result to json") + } + + if err := json.Unmarshal(jsonData, &resultMap); err != nil { + log.Fatalf("Error converting json to map") + } + + if cfg.LabelMap != nil { + for key, value := range cfg.LabelMap { + resultMap[key] = value + } + } + + if isFinalIteration { + resultMap["finalIteration"] = true + } + + benchmarkResultsMap = append(benchmarkResultsMap, resultMap) + + } + + data, err := json.MarshalIndent(benchmarkResultsMap, "", " ") + if err != nil { + log.Fatalf("Error marshaling benchmark results: %v", err) + } + + err = os.WriteFile(fmt.Sprintf("./results/%s.json", runID), data, 0o644) + if err != nil { + log.Fatalf("Error writing benchmark results to file: %v", err) + } + + if !cfg.WaitForBackground { + break + } + + } +} + +func shouldStopRunQueries(iteration int, cfg *Config) bool { + if cfg.IndexType != "hfresh" { + return true + } + if iteration == 0 { // we want to trigger merge operations + return false + } + + metrics, err := readHFreshMetrics(cfg) + if err != nil { + log.WithError(err).Warn("Failed to read HFresh pending operations metrics") + return false + } + + noPendingOps := metrics.PendingSplitOperations == 0 && + metrics.PendingMergeOperations == 0 && + metrics.PendingReassignOperations == 0 + + if noPendingOps { + log.WithFields(log.Fields{ + "iteration": iteration, + }).Info("All HFresh background operations complete") + return true + } + + secs := 30 + + for { + metrics, err := readHFreshMetrics(cfg) + if err != nil { + log.WithError(err).Warn("Failed to read HFresh pending operations metrics") + return false + } + + log.WithFields(log.Fields{ + "iteration": iteration, + "pendingSplitOperations": metrics.PendingSplitOperations, + "pendingMergeOperations": metrics.PendingMergeOperations, + "pendingReassignOperations": metrics.PendingReassignOperations, + }).Info("HFresh background operations still running, checking again in ", secs, " seconds") + noPendingOps := metrics.PendingSplitOperations == 0 && + metrics.PendingMergeOperations == 0 && + metrics.PendingReassignOperations == 0 + + if noPendingOps { + break + } + time.Sleep(time.Duration(secs) * time.Second) + } + + return false +} + +func benchmarkANN(cfg Config, queries Queries, neighbors Neighbors, filters []int) Results { + cfg.Queries = len(queries) + + i := 0 + return benchmark(cfg, func(className string) QueryWithNeighbors { + defer func() { i++ }() + + tenant := "" + if cfg.NumTenants > 0 { + tenant = fmt.Sprint(rand.Intn(cfg.NumTenants)) + } + filter := -1 + if cfg.Filter { + filter = filters[i] + } + + return QueryWithNeighbors{ + Query: nearVectorQueryGrpc(&cfg, queries[i], tenant, filter), + Neighbors: neighbors[i], + } + }) +} + +type Number interface { + constraints.Float | constraints.Integer +} + +func median[T Number](data []T) float64 { + dataCopy := make([]T, len(data)) + copy(dataCopy, data) + + slices.Sort(dataCopy) + + var median float64 + l := len(dataCopy) + if l == 0 { + return 0 + } else if l%2 == 0 { + median = float64((dataCopy[l/2-1] + dataCopy[l/2]) / 2.0) + } else { + median = float64(dataCopy[l/2]) + } + + return median +} + +type sampledResults struct { + Min []time.Duration + Max []time.Duration + Mean []time.Duration + Took []time.Duration + QueriesPerSecond []float64 + Recall []float64 + NDCG []float64 + Results []Results +} + +func benchmarkANNDuration(cfg Config, queries Queries, neighbors Neighbors, filters []int) Results { + cfg.Queries = len(queries) + + var samples sampledResults + + startTime := time.Now() + + var results Results + + for time.Since(startTime) < time.Duration(cfg.QueryDuration)*time.Second { + results = benchmarkANN(cfg, queries, neighbors, filters) + samples.Min = append(samples.Min, results.Min) + samples.Max = append(samples.Max, results.Max) + samples.Mean = append(samples.Mean, results.Mean) + samples.Took = append(samples.Took, results.Took) + samples.QueriesPerSecond = append(samples.QueriesPerSecond, results.QueriesPerSecond) + samples.NDCG = append(samples.NDCG, results.NDCG) + samples.Recall = append(samples.Recall, results.Recall) + samples.Results = append(samples.Results, results) + } + + var medianResult Results + + medianResult.Min = time.Duration(median(samples.Min)) + medianResult.Max = time.Duration(median(samples.Max)) + medianResult.Mean = time.Duration(median(samples.Mean)) + medianResult.Took = time.Duration(median(samples.Took)) + medianResult.QueriesPerSecond = median(samples.QueriesPerSecond) + medianResult.Percentiles = results.Percentiles + medianResult.PercentilesLabels = results.PercentilesLabels + medianResult.Total = results.Total + medianResult.Successful = results.Successful + medianResult.Failed = results.Failed + medianResult.Parallelization = cfg.Parallel + medianResult.Recall = median(samples.Recall) + medianResult.NDCG = median(samples.NDCG) + + return medianResult +} diff --git a/benchmarker/cmd/ann_schema.go b/benchmarker/cmd/ann_schema.go new file mode 100644 index 0000000..ca3db31 --- /dev/null +++ b/benchmarker/cmd/ann_schema.go @@ -0,0 +1,383 @@ +package cmd + +import ( + "context" + "fmt" + "time" + + log "github.com/sirupsen/logrus" + "github.com/weaviate/weaviate-go-client/v4/weaviate" + "github.com/weaviate/weaviate/entities/models" +) + +// Re/create Weaviate schema +func createSchema(cfg *Config, client *weaviate.Client) { + err := client.Schema().ClassDeleter().WithClassName(cfg.ClassName).Do(context.Background()) + if err != nil { + log.Fatalf("Error deleting class: %v", err) + } + + multiTenancyEnabled := false + if cfg.NumTenants > 0 { + multiTenancyEnabled = true + } + + classObj := &models.Class{ + Class: cfg.ClassName, + Description: fmt.Sprintf("Created by the Weaviate Benchmarker at %s", time.Now().String()), + MultiTenancyConfig: &models.MultiTenancyConfig{ + Enabled: multiTenancyEnabled, + }, + } + + if cfg.Shards > 1 { + classObj.ShardingConfig = map[string]interface{}{ + "desiredCount": cfg.Shards, + } + } + + var vectorIndexConfig map[string]interface{} + + if cfg.IndexType == "hnsw" { + vectorIndexConfig = map[string]interface{}{ + "distance": cfg.DistanceMetric, + "efConstruction": float64(cfg.EfConstruction), + "maxConnections": float64(cfg.MaxConnections), + "cleanupIntervalSeconds": cfg.CleanupIntervalSeconds, + "flatSearchCutoff": cfg.FlatSearchCutoff, + } + if cfg.PQ == "auto" { + pqConfig := map[string]interface{}{ + "enabled": true, + "segments": cfg.PQSegments, + "trainingLimit": cfg.TrainingLimit, + } + if cfg.RescoreLimit > -1 { + pqConfig["rescoreLimit"] = cfg.RescoreLimit + } + vectorIndexConfig["pq"] = pqConfig + } else if cfg.BQ { + bqConfig := map[string]interface{}{ + "enabled": true, + } + if cfg.RescoreLimit > -1 { + bqConfig["rescoreLimit"] = cfg.RescoreLimit + } + vectorIndexConfig["bq"] = bqConfig + } else if cfg.SQ == "auto" { + vectorIndexConfig = map[string]interface{}{ + "distance": cfg.DistanceMetric, + "efConstruction": float64(cfg.EfConstruction), + "maxConnections": float64(cfg.MaxConnections), + "cleanupIntervalSeconds": cfg.CleanupIntervalSeconds, + "sq": map[string]interface{}{ + "enabled": true, + "trainingLimit": cfg.TrainingLimit, + }, + } + } else if cfg.RQ == "auto" { + rqConfig := map[string]interface{}{ + "enabled": true, + "bits": cfg.RQBits, + } + if cfg.RescoreLimit > -1 { + rqConfig["rescoreLimit"] = cfg.RescoreLimit + } + vectorIndexConfig = map[string]interface{}{ + "distance": cfg.DistanceMetric, + "efConstruction": float64(cfg.EfConstruction), + "maxConnections": float64(cfg.MaxConnections), + "cleanupIntervalSeconds": cfg.CleanupIntervalSeconds, + "rq": rqConfig, + } + } + } else if cfg.IndexType == "flat" { + // Validate that BQ and RQ are not both enabled + if cfg.BQ && cfg.RQ == "auto" { + log.Fatalf("Cannot enable both BQ and RQ on flat index type") + } + + vectorIndexConfig = map[string]interface{}{ + "distance": cfg.DistanceMetric, + } + if cfg.BQ { + bqConfig := map[string]interface{}{ + "enabled": true, + "cache": cfg.Cache, + } + if cfg.RescoreLimit > -1 { + bqConfig["rescoreLimit"] = cfg.RescoreLimit + } + vectorIndexConfig["bq"] = bqConfig + } else if cfg.RQ == "auto" { + rqConfig := map[string]interface{}{ + "enabled": true, + "bits": cfg.RQBits, + "cache": cfg.Cache, + } + if cfg.RescoreLimit > -1 { + rqConfig["rescoreLimit"] = cfg.RescoreLimit + } + vectorIndexConfig["rq"] = rqConfig + log.WithFields(log.Fields{"bits": cfg.RQBits, "indexType": "flat"}).Printf("Enabled RQ on flat index type") + } + } else if cfg.IndexType == "dynamic" { + log.WithFields(log.Fields{"threshold": cfg.DynamicThreshold}).Info("Building dynamic vector index") + vectorIndexConfig = map[string]interface{}{ + "distance": cfg.DistanceMetric, + "threshold": cfg.DynamicThreshold, + "hnsw": map[string]interface{}{ + "efConstruction": float64(cfg.EfConstruction), + "maxConnections": float64(cfg.MaxConnections), + "cleanupIntervalSeconds": cfg.CleanupIntervalSeconds, + "flatSearchCutoff": cfg.FlatSearchCutoff, + }, + "flat": map[string]interface{}{}, + } + if cfg.PQ == "auto" { + pqConfig := map[string]interface{}{ + "enabled": true, + "segments": cfg.PQSegments, + "trainingLimit": cfg.TrainingLimit, + } + if cfg.RescoreLimit > -1 { + pqConfig["rescoreLimit"] = cfg.RescoreLimit + } + vectorIndexConfig["hnsw"].(map[string]interface{})["pq"] = pqConfig + } else if cfg.BQ { + bqConfig := map[string]interface{}{ + "enabled": true, + "cache": true, + } + if cfg.RescoreLimit > -1 { + bqConfig["rescoreLimit"] = cfg.RescoreLimit + } + vectorIndexConfig["hnsw"].(map[string]interface{})["bq"] = bqConfig + } else if cfg.RQ == "auto" { + vectorIndexConfig["flat"].(map[string]interface{})["rq"] = map[string]interface{}{ + "enabled": true, + "bits": cfg.RQBits, + } + vectorIndexConfig["hnsw"].(map[string]interface{})["rq"] = map[string]interface{}{ + "enabled": true, + "bits": cfg.RQBits, + } + } + } else if cfg.IndexType == "hfresh" { + vectorIndexConfig = map[string]interface{}{ + "distance": cfg.DistanceMetric, + "maxPostingSizeKB": cfg.MaxPostingSizeKB, + "replicas": cfg.Replicas, + "rngFactor": cfg.RngFactor, + "rq": map[string]interface{}{ + "rescoreLimit": cfg.RescoreLimit, + }, + } + } else { + log.Fatalf("Unknown index type %s", cfg.IndexType) + } + + vectorIndexConfig["filterStrategy"] = cfg.FilterStrategy + + if cfg.NamedVector != "" { + vectorConfig := make(map[string]models.VectorConfig) + vectorConfig[cfg.NamedVector] = models.VectorConfig{ + Vectorizer: map[string]interface{}{"none": nil}, + VectorIndexType: cfg.IndexType, + VectorIndexConfig: vectorIndexConfig, + } + classObj.VectorConfig = vectorConfig + } else { + if cfg.MultiVectorDimensions > 0 { + vectorIndexConfig = map[string]interface{}{} + if cfg.PQ == "auto" { + pqConfig := map[string]interface{}{ + "enabled": true, + "segments": cfg.PQSegments, + "trainingLimit": cfg.TrainingLimit, + } + if cfg.RescoreLimit > -1 { + pqConfig["rescoreLimit"] = cfg.RescoreLimit + } + vectorIndexConfig["pq"] = pqConfig + } else if cfg.BQ { + bqConfig := map[string]interface{}{ + "enabled": true, + "cache": true, + } + if cfg.RescoreLimit > -1 { + bqConfig["rescoreLimit"] = cfg.RescoreLimit + } + vectorIndexConfig["bq"] = bqConfig + } else if cfg.SQ == "auto" { + vectorIndexConfig = map[string]interface{}{ + "distance": cfg.DistanceMetric, + "efConstruction": float64(cfg.EfConstruction), + "maxConnections": float64(cfg.MaxConnections), + "cleanupIntervalSeconds": cfg.CleanupIntervalSeconds, + "sq": map[string]interface{}{ + "enabled": true, + "trainingLimit": cfg.TrainingLimit, + }, + } + } else if cfg.RQ == "auto" { + rqConfig := map[string]interface{}{ + "enabled": true, + "bits": cfg.RQBits, + } + if cfg.RescoreLimit > -1 { + rqConfig["rescoreLimit"] = cfg.RescoreLimit + } + + vectorIndexConfig = map[string]interface{}{ + "distance": cfg.DistanceMetric, + "efConstruction": float64(cfg.EfConstruction), + "maxConnections": float64(cfg.MaxConnections), + "cleanupIntervalSeconds": cfg.CleanupIntervalSeconds, + "rq": rqConfig, + } + } + vectorIndexConfig["multivector"] = map[string]interface{}{ + "enabled": true, + "muvera": map[string]interface{}{ + "enabled": cfg.MuveraEnabled, + "ksim": cfg.MuveraKSim, + "dprojections": cfg.MuveraDProjections, + "repetition": cfg.MuveraRepetition, + }, + } + + classObj.VectorConfig = map[string]models.VectorConfig{ + "multivector": { + Vectorizer: map[string]interface{}{ + "none": map[string]interface{}{}, + }, + VectorIndexConfig: vectorIndexConfig, + VectorIndexType: cfg.IndexType, + }, + } + } else { + classObj.VectorIndexType = cfg.IndexType + classObj.VectorIndexConfig = vectorIndexConfig + } + } + + if cfg.ReplicationFactor > 1 || cfg.AsyncReplicationEnabled { + classObj.ReplicationConfig = &models.ReplicationConfig{ + Factor: int64(cfg.ReplicationFactor), + AsyncEnabled: cfg.AsyncReplicationEnabled, + } + } + + err = client.Schema().ClassCreator().WithClass(classObj).Do(context.Background()) + if err != nil { + panic(err) + } + log.Printf("Created class %s", cfg.ClassName) +} + +func deleteChunk(chunk *Batch, client *weaviate.Client, cfg *Config) { + log.Debugf("Deleting chunk of %d vectors index %d", len(chunk.Vectors), chunk.Offset) + for i := range chunk.Vectors { + uuid := uuidFromInt(i + chunk.Offset + cfg.Offset) + err := client.Data().Deleter().WithClassName(cfg.ClassName).WithID(uuid).Do(context.Background()) + if err != nil { + log.Fatalf("Error deleting object: %v", err) + } + } +} + +func deleteUuidSlice(cfg *Config, client *weaviate.Client, slice []int) { + log.WithFields(log.Fields{"length": len(slice), "class": cfg.ClassName}).Printf("Deleting objects to trigger tombstone operations") + for _, i := range slice { + err := client.Data().Deleter().WithClassName(cfg.ClassName).WithID(uuidFromInt(i)).Do(context.Background()) + if err != nil { + log.Fatalf("Error deleting object: %v", err) + } + } + log.WithFields(log.Fields{"length": len(slice), "class": cfg.ClassName}).Printf("Completed deletes") +} + +func deleteUuidRange(cfg *Config, client *weaviate.Client, start int, end int) { + var slice []int + for i := start; i < end; i++ { + slice = append(slice, i) + } + deleteUuidSlice(cfg, client, slice) +} + +func addTenantIfNeeded(cfg *Config, client *weaviate.Client) { + if cfg.Tenant == "" { + return + } + err := client.Schema().TenantsCreator(). + WithClassName(cfg.ClassName). + WithTenants(models.Tenant{Name: cfg.Tenant}). + Do(context.Background()) + if err != nil { + log.Printf("Error adding tenant retrying in 1 second %v", err) + time.Sleep(1 * time.Second) + addTenantIfNeeded(cfg, client) + } +} + +// updateEf updates the ef (or rescore limit / search probe) parameter on the Weaviate schema. +func updateEf(ef int, cfg *Config, client *weaviate.Client) { + classConfig, err := client.Schema().ClassGetter().WithClassName(cfg.ClassName).Do(context.Background()) + if err != nil { + panic(err) + } + + var vectorIndexConfig map[string]interface{} + + if cfg.NamedVector != "" { + vectorIndexConfig = classConfig.VectorConfig[cfg.NamedVector].VectorIndexConfig.(map[string]interface{}) + } else if cfg.MultiVectorDimensions > 0 { + vectorIndexConfig = classConfig.VectorConfig["multivector"].VectorIndexConfig.(map[string]interface{}) + } else { + vectorIndexConfig = classConfig.VectorIndexConfig.(map[string]interface{}) + } + + switch cfg.IndexType { + case "hnsw": + vectorIndexConfig["ef"] = ef + case "flat": + if bq, exists := vectorIndexConfig["bq"]; exists && cfg.BQ { + bqConfig := bq.(map[string]interface{}) + bqConfig["rescoreLimit"] = ef + } else if rq, exists := vectorIndexConfig["rq"]; exists { + rqConfig := rq.(map[string]interface{}) + rqConfig["rescoreLimit"] = ef + } + case "dynamic": + hnswConfig := vectorIndexConfig["hnsw"].(map[string]interface{}) + hnswConfig["ef"] = ef + flatConfig := vectorIndexConfig["flat"].(map[string]interface{}) + if bq, exists := flatConfig["bq"]; exists && cfg.BQ { + bqConfig := bq.(map[string]interface{}) + bqConfig["rescoreLimit"] = ef + } else if rq, exists := flatConfig["rq"]; exists { + rqConfig := rq.(map[string]interface{}) + rqConfig["rescoreLimit"] = ef + } + case "hfresh": + vectorIndexConfig["searchProbe"] = ef + } + + if cfg.NamedVector != "" { + vectorConfig := classConfig.VectorConfig[cfg.NamedVector] + vectorConfig.VectorIndexConfig = vectorIndexConfig + classConfig.VectorConfig[cfg.NamedVector] = vectorConfig + } else if cfg.MultiVectorDimensions > 0 { + vectorConfig := classConfig.VectorConfig["multivector"] + vectorConfig.VectorIndexConfig = vectorIndexConfig + classConfig.VectorConfig["multivector"] = vectorConfig + } else { + classConfig.VectorIndexConfig = vectorIndexConfig + } + + err = client.Schema().ClassUpdater().WithClass(classConfig).Do(context.Background()) + if err != nil { + panic(err) + } +} diff --git a/benchmarker/cmd/integration_test.go b/benchmarker/cmd/integration_test.go index 66182de..d87b47d 100644 --- a/benchmarker/cmd/integration_test.go +++ b/benchmarker/cmd/integration_test.go @@ -115,6 +115,34 @@ func setupTestCollection(t *testing.T, cfg *Config, vectors [][]float32) { time.Sleep(3 * time.Second) } +// setupTestCollectionFromSchema creates a fresh collection using createSchema(), +// inserts vectors through gRPC, and registers cleanup. +func setupTestCollectionFromSchema(t *testing.T, cfg *Config, vectors [][]float32) { + t.Helper() + client := createClient(cfg) + if cfg.FilterStrategy == "" { + cfg.FilterStrategy = "sweeping" + } + + createSchema(cfg, client) + + t.Cleanup(func() { + _ = client.Schema().ClassDeleter().WithClassName(cfg.ClassName).Do(context.Background()) + }) + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + conn, err := grpc.DialContext(ctx, cfg.Origin, grpc.WithInsecure()) //nolint:staticcheck + require.NoError(t, err) + defer conn.Close() + grpcClient := weaviategrpc.NewWeaviateClient(conn) + + writeChunk(&Batch{Vectors: vectors, Offset: 0}, &grpcClient, cfg) + + // Give indexers time to finish indexing before querying. + time.Sleep(3 * time.Second) +} + // TestIntegration_QueriesSucceed runs a full insert→query cycle with random // query vectors and asserts that no queries fail and throughput is positive. func TestIntegration_QueriesSucceed(t *testing.T) { @@ -196,3 +224,89 @@ func TestIntegration_ResultsJSON(t *testing.T) { assert.Contains(t, out, `"successful"`) assert.Contains(t, out, `"recall"`) } + +func TestIntegration_SmokePerIndexType(t *testing.T) { + base := integrationCfg() + skipIfWeaviateUnavailable(t, base.Origin) + + vectors := generateVectors(integrationVectorCount, integrationDimensions, 314159) + + testCases := []struct { + name string + indexType string + className string + configure func(*Config) + queryParam int + }{ + { + name: "hnsw", + indexType: "hnsw", + className: "BenchmarkIntTestHNSW", + configure: func(cfg *Config) {}, + queryParam: 32, + }, + { + name: "flat-rq", + indexType: "flat", + className: "BenchmarkIntTestFlatRQ", + configure: func(cfg *Config) { + cfg.RQ = "auto" + cfg.RQBits = 8 + cfg.Cache = true + cfg.RescoreLimit = 32 + }, + queryParam: 24, + }, + { + name: "dynamic", + indexType: "dynamic", + className: "BenchmarkIntTestDynamic", + configure: func(cfg *Config) { + cfg.DynamicThreshold = 100 + }, + queryParam: 32, + }, + { + name: "hfresh", + indexType: "hfresh", + className: "BenchmarkIntTestHFresh", + configure: func(cfg *Config) { + cfg.MaxPostingSizeKB = 64 + cfg.Replicas = 1 + cfg.RngFactor = 10 + cfg.RescoreLimit = 32 + }, + queryParam: 16, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + cfg := integrationCfg() + cfg.IndexType = tc.indexType + cfg.ClassName = tc.className + cfg.Queries = 20 + cfg.Limit = 10 + cfg.FilterStrategy = "sweeping" + + tc.configure(&cfg) + + setupTestCollectionFromSchema(t, &cfg, vectors) + + client := createClient(&cfg) + updateEf(tc.queryParam, &cfg, client) + + results := benchmark(cfg, func(_ string) QueryWithNeighbors { + return QueryWithNeighbors{ + Query: nearVectorQueryGrpc(&cfg, randomVector(cfg.Dimensions), "", -1), + } + }) + + assert.Equal(t, 0, results.Failed, "expected zero failed queries for %s", tc.indexType) + assert.Equal(t, cfg.Queries, results.Successful, "all queries should succeed for %s", tc.indexType) + assert.Greater(t, results.QueriesPerSecond, 0.0, "QPS should be positive for %s", tc.indexType) + assert.Greater(t, int64(results.Mean), int64(0), "mean latency should be positive for %s", tc.indexType) + }) + } +}