diff --git a/Makefile b/Makefile index 4a25586..aef3574 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: all build test lint bench run-http run-batch help clean +.PHONY: all build test lint bench run-autoconfig run-http run-batch help clean # Go parameters GOCMD=go @@ -19,6 +19,7 @@ help: @echo " test Run unit tests with race detector and coverage" @echo " lint Run golangci-lint" @echo " bench Run all benchmarks" + @echo " run-autoconfig Run the auto-configuration example" @echo " run-http Run the HTTP server example" @echo " run-batch Run the batch processor example" @echo " clean Clean build artifacts and coverage files" @@ -41,6 +42,10 @@ bench: @echo "Running benchmarks..." $(GOTEST) -bench=. -benchmem ./... +run-autoconfig: + @echo "Running auto-configuration example..." + $(GOCMD) run examples/autoconfig/main.go + run-http: @echo "Running HTTP server example..." $(GOCMD) run examples/http_server/main.go diff --git a/README.md b/README.md index c69ef35..e29e55a 100644 --- a/README.md +++ b/README.md @@ -87,6 +87,51 @@ func main() { ## Configuration Options +### Auto-Configuration (Recommended) + +The pool provides intelligent auto-configuration based on system resources and workload profiles: + +```go +// Simple profile-based configuration +pool, err := adaptivepool.New( + adaptivepool.WithAutoConfig(adaptivepool.ProfileAPIServer), +) + +// Advanced system-aware configuration +pool, err := adaptivepool.New( + adaptivepool.WithSystemAwareConfig(adaptivepool.SystemAwareConfig{ + WorkloadType: adaptivepool.IOBound, + TargetLatencyMs: 500, + AvgJobMemoryBytes: 50 * 1024, // 50KB per job + MemoryLimitPercent: 0.2, // Use 20% of available memory + }), +) + +// Get configuration suggestions +config := adaptivepool.SuggestConfig(adaptivepool.IOBound) +fmt.Printf("Suggested min workers: %d\n", config.MinWorkers()) +fmt.Printf("Suggested max workers: %d\n", config.MaxWorkers()) +fmt.Printf("Suggested queue size: %d\n", config.QueueSize()) +``` + +**Available Workload Profiles:** + +| Profile | Use Case | Min Workers | Max Workers | Queue Size | +|---------|----------|-------------|-------------|------------| +| `ProfileAPIServer` | I/O bound API servers | 2x CPU | 10x CPU | 20x Max Workers | +| `ProfileCPUIntensive` | CPU-bound tasks | 1x CPU | 2x CPU | 5x Max Workers | +| `ProfileBatchProcessor` | Batch processing | 2x CPU | 4x CPU | 50x Max Workers | + +**Workload Types for System-Aware Config:** + +- `IOBound` - Tasks that spend most time waiting on I/O (network, disk, databases) +- `CPUBound` - Compute-intensive tasks that max out CPU +- `Mixed` - Workloads with both I/O and CPU characteristics + +### Manual Configuration + +For fine-grained control, configure parameters manually: + ```go pool, err := adaptivepool.New( // Minimum workers (default: 1) @@ -112,6 +157,18 @@ pool, err := adaptivepool.New( ) ``` +### Combining Auto-Config with Manual Overrides + +You can start with a profile and override specific settings: + +```go +pool, err := adaptivepool.New( + adaptivepool.WithAutoConfig(adaptivepool.ProfileAPIServer), + adaptivepool.WithMinWorkers(8), // Override min workers + adaptivepool.WithQueueSize(10000), // Override queue size +) +``` + ## Backpressure Handling The pool enforces backpressure when the queue is full: @@ -169,6 +226,19 @@ Shutdown behavior: ## Examples +### Auto-Configuration + +See [examples/autoconfig](examples/autoconfig/main.go) for demonstrations of: +- Profile-based auto-configuration +- System-aware configuration with memory constraints +- Configuration suggestions for different workload types +- Combining auto-config with manual overrides + +```bash +cd examples/autoconfig +go run main.go +``` + ### HTTP Server with Backpressure See [examples/http_server](examples/http_server/main.go) for a complete HTTP server that: diff --git a/autoconfig.go b/autoconfig.go new file mode 100644 index 0000000..2d18a30 --- /dev/null +++ b/autoconfig.go @@ -0,0 +1,166 @@ +package adaptivepool + +import ( + "runtime" +) + +// WorkloadProfile represents predefined workload types +type WorkloadProfile int + +const ( + // ProfileAPIServer is optimized for I/O bound API servers + ProfileAPIServer WorkloadProfile = iota + // ProfileCPUIntensive is optimized for CPU-bound tasks + ProfileCPUIntensive + // ProfileBatchProcessor is optimized for batch processing workloads + ProfileBatchProcessor +) + +// WorkloadType represents the nature of the workload +type WorkloadType int + +const ( + // IOBound workloads spend most time waiting on I/O + IOBound WorkloadType = iota + // CPUBound workloads are compute-intensive + CPUBound + // Mixed workloads have both I/O and CPU characteristics + Mixed +) + +// SystemAwareConfig provides advanced auto-configuration based on system resources +type SystemAwareConfig struct { + // WorkloadType specifies the nature of the workload + WorkloadType WorkloadType + // TargetLatencyMs is the target latency in milliseconds + TargetLatencyMs int + // AvgJobMemoryBytes is the average memory consumed per job + AvgJobMemoryBytes int64 + // MemoryLimitPercent is the percentage of available memory to use (0.0 to 1.0) + MemoryLimitPercent float64 +} + +// systemResources holds detected system resource information +type systemResources struct { + numCPU int + totalMemory uint64 + availableMemory uint64 +} + +// detectSystemResources detects available system resources +func detectSystemResources() systemResources { + var memStats runtime.MemStats + runtime.ReadMemStats(&memStats) + + return systemResources{ + numCPU: runtime.NumCPU(), + totalMemory: memStats.Sys, + availableMemory: memStats.Sys - memStats.Alloc, + } +} + +// applyProfile applies a predefined workload profile to the config +func applyProfile(config *Config, profile WorkloadProfile) { + resources := detectSystemResources() + numCPU := resources.numCPU + + switch profile { + case ProfileAPIServer: + // I/O bound: more workers to handle concurrent requests + config.minWorkers = numCPU * 2 + config.maxWorkers = numCPU * 10 + config.queueSize = config.maxWorkers * 20 + config.scaleUpThreshold = 0.6 + + case ProfileCPUIntensive: + // CPU bound: workers close to CPU count + config.minWorkers = numCPU + config.maxWorkers = numCPU * 2 + config.queueSize = config.maxWorkers * 5 + config.scaleUpThreshold = 0.8 + + case ProfileBatchProcessor: + // Batch processing: moderate workers, large queue + config.minWorkers = numCPU * 2 + config.maxWorkers = numCPU * 4 + config.queueSize = config.maxWorkers * 50 + config.scaleUpThreshold = 0.7 + } +} + +// applySystemAwareConfig applies system-aware configuration +func applySystemAwareConfig(config *Config, sysConfig SystemAwareConfig) { + resources := detectSystemResources() + numCPU := resources.numCPU + + // Set defaults if not provided + if sysConfig.MemoryLimitPercent <= 0 || sysConfig.MemoryLimitPercent > 1.0 { + sysConfig.MemoryLimitPercent = 0.2 // Default to 20% of available memory + } + + // Calculate worker counts based on workload type + switch sysConfig.WorkloadType { + case IOBound: + config.minWorkers = numCPU * 2 + config.maxWorkers = numCPU * 8 + config.scaleUpThreshold = 0.6 + + case CPUBound: + config.minWorkers = numCPU + config.maxWorkers = numCPU * 2 + config.scaleUpThreshold = 0.8 + + case Mixed: + config.minWorkers = numCPU + config.maxWorkers = numCPU * 4 + config.scaleUpThreshold = 0.7 + } + + // Calculate queue size based on memory constraints + if sysConfig.AvgJobMemoryBytes > 0 { + availableForQueue := float64(resources.availableMemory) * sysConfig.MemoryLimitPercent + maxQueueByMemory := int(availableForQueue / float64(sysConfig.AvgJobMemoryBytes)) + + // Use memory-based calculation but ensure reasonable bounds + minQueueSize := config.maxWorkers * 5 + maxQueueSize := config.maxWorkers * 100 + + if maxQueueByMemory < minQueueSize { + config.queueSize = minQueueSize + } else if maxQueueByMemory > maxQueueSize { + config.queueSize = maxQueueSize + } else { + config.queueSize = maxQueueByMemory + } + } else { + // Default queue sizing based on workload type + switch sysConfig.WorkloadType { + case IOBound: + config.queueSize = config.maxWorkers * 20 + case CPUBound: + config.queueSize = config.maxWorkers * 5 + case Mixed: + config.queueSize = config.maxWorkers * 10 + } + } + + // Adjust scaling behavior based on target latency + if sysConfig.TargetLatencyMs > 0 { + // Lower latency targets need more aggressive scaling + if sysConfig.TargetLatencyMs < 100 { + config.scaleUpThreshold = 0.5 + } else if sysConfig.TargetLatencyMs < 500 { + config.scaleUpThreshold = 0.6 + } + } +} + +// SuggestConfig analyzes the system and returns suggested configuration values +func SuggestConfig(workloadType WorkloadType) *Config { + config := defaultConfig() + applySystemAwareConfig(config, SystemAwareConfig{ + WorkloadType: workloadType, + MemoryLimitPercent: 0.2, + }) + return config +} diff --git a/autoconfig_test.go b/autoconfig_test.go new file mode 100644 index 0000000..3f249a2 --- /dev/null +++ b/autoconfig_test.go @@ -0,0 +1,337 @@ +package adaptivepool + +import ( + "context" + "runtime" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestDetectSystemResources(t *testing.T) { + resources := detectSystemResources() + + assert.Greater(t, resources.numCPU, 0, "should detect at least 1 CPU") + assert.Greater(t, resources.totalMemory, uint64(0), "should detect total memory") + assert.GreaterOrEqual(t, resources.totalMemory, resources.availableMemory, "total memory should be >= available memory") +} + +func TestApplyProfile_APIServer(t *testing.T) { + config := defaultConfig() + applyProfile(config, ProfileAPIServer) + + numCPU := runtime.NumCPU() + + assert.Equal(t, numCPU*2, config.minWorkers, "API server should have 2x CPU min workers") + assert.Equal(t, numCPU*10, config.maxWorkers, "API server should have 10x CPU max workers") + assert.Equal(t, config.maxWorkers*20, config.queueSize, "API server should have 20x max workers queue size") + assert.Equal(t, 0.6, config.scaleUpThreshold, "API server should have 0.6 scale up threshold") + + err := config.validate() + assert.NoError(t, err, "profile config should be valid") +} + +func TestApplyProfile_CPUIntensive(t *testing.T) { + config := defaultConfig() + applyProfile(config, ProfileCPUIntensive) + + numCPU := runtime.NumCPU() + + assert.Equal(t, numCPU, config.minWorkers, "CPU intensive should have 1x CPU min workers") + assert.Equal(t, numCPU*2, config.maxWorkers, "CPU intensive should have 2x CPU max workers") + assert.Equal(t, config.maxWorkers*5, config.queueSize, "CPU intensive should have 5x max workers queue size") + assert.Equal(t, 0.8, config.scaleUpThreshold, "CPU intensive should have 0.8 scale up threshold") + + err := config.validate() + assert.NoError(t, err, "profile config should be valid") +} + +func TestApplyProfile_BatchProcessor(t *testing.T) { + config := defaultConfig() + applyProfile(config, ProfileBatchProcessor) + + numCPU := runtime.NumCPU() + + assert.Equal(t, numCPU*2, config.minWorkers, "Batch processor should have 2x CPU min workers") + assert.Equal(t, numCPU*4, config.maxWorkers, "Batch processor should have 4x CPU max workers") + assert.Equal(t, config.maxWorkers*50, config.queueSize, "Batch processor should have 50x max workers queue size") + assert.Equal(t, 0.7, config.scaleUpThreshold, "Batch processor should have 0.7 scale up threshold") + + err := config.validate() + assert.NoError(t, err, "profile config should be valid") +} + +func TestApplySystemAwareConfig_IOBound(t *testing.T) { + config := defaultConfig() + sysConfig := SystemAwareConfig{ + WorkloadType: IOBound, + TargetLatencyMs: 500, + AvgJobMemoryBytes: 50 * 1024, // 50KB + MemoryLimitPercent: 0.2, + } + + applySystemAwareConfig(config, sysConfig) + + numCPU := runtime.NumCPU() + + assert.Equal(t, numCPU*2, config.minWorkers, "IO bound should have 2x CPU min workers") + assert.Equal(t, numCPU*8, config.maxWorkers, "IO bound should have 8x CPU max workers") + assert.Equal(t, 0.6, config.scaleUpThreshold, "IO bound should have 0.6 scale up threshold") + assert.Greater(t, config.queueSize, 0, "queue size should be positive") + + err := config.validate() + assert.NoError(t, err, "system aware config should be valid") +} + +func TestApplySystemAwareConfig_CPUBound(t *testing.T) { + config := defaultConfig() + sysConfig := SystemAwareConfig{ + WorkloadType: CPUBound, + TargetLatencyMs: 1000, + MemoryLimitPercent: 0.3, + } + + applySystemAwareConfig(config, sysConfig) + + numCPU := runtime.NumCPU() + + assert.Equal(t, numCPU, config.minWorkers, "CPU bound should have 1x CPU min workers") + assert.Equal(t, numCPU*2, config.maxWorkers, "CPU bound should have 2x CPU max workers") + assert.Equal(t, 0.8, config.scaleUpThreshold, "CPU bound should have 0.8 scale up threshold") + assert.Equal(t, config.maxWorkers*5, config.queueSize, "CPU bound should have 5x max workers queue size") + + err := config.validate() + assert.NoError(t, err, "system aware config should be valid") +} + +func TestApplySystemAwareConfig_Mixed(t *testing.T) { + config := defaultConfig() + sysConfig := SystemAwareConfig{ + WorkloadType: Mixed, + MemoryLimitPercent: 0.25, + } + + applySystemAwareConfig(config, sysConfig) + + numCPU := runtime.NumCPU() + + assert.Equal(t, numCPU, config.minWorkers, "Mixed should have 1x CPU min workers") + assert.Equal(t, numCPU*4, config.maxWorkers, "Mixed should have 4x CPU max workers") + assert.Equal(t, 0.7, config.scaleUpThreshold, "Mixed should have 0.7 scale up threshold") + assert.Equal(t, config.maxWorkers*10, config.queueSize, "Mixed should have 10x max workers queue size") + + err := config.validate() + assert.NoError(t, err, "system aware config should be valid") +} + +func TestApplySystemAwareConfig_MemoryConstraints(t *testing.T) { + config := defaultConfig() + sysConfig := SystemAwareConfig{ + WorkloadType: IOBound, + AvgJobMemoryBytes: 1024 * 1024, // 1MB per job + MemoryLimitPercent: 0.1, + } + + applySystemAwareConfig(config, sysConfig) + + // Queue size should be calculated based on memory + assert.Greater(t, config.queueSize, 0, "queue size should be positive") + assert.GreaterOrEqual(t, config.queueSize, config.maxWorkers*5, "queue size should be at least 5x max workers") + + err := config.validate() + assert.NoError(t, err, "memory-constrained config should be valid") +} + +func TestApplySystemAwareConfig_LowLatencyTarget(t *testing.T) { + config := defaultConfig() + sysConfig := SystemAwareConfig{ + WorkloadType: IOBound, + TargetLatencyMs: 50, // Low latency target + MemoryLimitPercent: 0.2, + } + + applySystemAwareConfig(config, sysConfig) + + assert.Equal(t, 0.5, config.scaleUpThreshold, "low latency should have aggressive 0.5 scale up threshold") + + err := config.validate() + assert.NoError(t, err, "low latency config should be valid") +} + +func TestApplySystemAwareConfig_MediumLatencyTarget(t *testing.T) { + config := defaultConfig() + sysConfig := SystemAwareConfig{ + WorkloadType: IOBound, + TargetLatencyMs: 200, // Medium latency target + MemoryLimitPercent: 0.2, + } + + applySystemAwareConfig(config, sysConfig) + + assert.Equal(t, 0.6, config.scaleUpThreshold, "medium latency should have 0.6 scale up threshold") + + err := config.validate() + assert.NoError(t, err, "medium latency config should be valid") +} + +func TestApplySystemAwareConfig_DefaultMemoryLimit(t *testing.T) { + config := defaultConfig() + sysConfig := SystemAwareConfig{ + WorkloadType: IOBound, + MemoryLimitPercent: 0, // Should use default + } + + applySystemAwareConfig(config, sysConfig) + + // Should not panic and should produce valid config + err := config.validate() + assert.NoError(t, err, "config with default memory limit should be valid") +} + +func TestSuggestConfig(t *testing.T) { + tests := []struct { + name string + workloadType WorkloadType + }{ + {"IOBound", IOBound}, + {"CPUBound", CPUBound}, + {"Mixed", Mixed}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + config := SuggestConfig(tt.workloadType) + + require.NotNil(t, config, "suggested config should not be nil") + assert.Greater(t, config.minWorkers, 0, "min workers should be positive") + assert.Greater(t, config.maxWorkers, 0, "max workers should be positive") + assert.Greater(t, config.queueSize, 0, "queue size should be positive") + assert.GreaterOrEqual(t, config.maxWorkers, config.minWorkers, "max workers should be >= min workers") + + err := config.validate() + assert.NoError(t, err, "suggested config should be valid") + }) + } +} + +func TestWithAutoConfig(t *testing.T) { + tests := []struct { + name string + profile WorkloadProfile + }{ + {"APIServer", ProfileAPIServer}, + {"CPUIntensive", ProfileCPUIntensive}, + {"BatchProcessor", ProfileBatchProcessor}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + pool, err := New(WithAutoConfig(tt.profile)) + + require.NoError(t, err, "should create pool with auto config") + require.NotNil(t, pool, "pool should not be nil") + + metrics := pool.Metrics() + assert.Greater(t, metrics.ActiveWorkers(), 0, "should have workers") + + // Cleanup + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + err = pool.Shutdown(ctx) + assert.NoError(t, err, "should shutdown cleanly") + }) + } +} + +func TestWithSystemAwareConfig(t *testing.T) { + sysConfig := SystemAwareConfig{ + WorkloadType: IOBound, + TargetLatencyMs: 500, + AvgJobMemoryBytes: 50 * 1024, + MemoryLimitPercent: 0.2, + } + + pool, err := New(WithSystemAwareConfig(sysConfig)) + + require.NoError(t, err, "should create pool with system aware config") + require.NotNil(t, pool, "pool should not be nil") + + metrics := pool.Metrics() + assert.Greater(t, metrics.ActiveWorkers(), 0, "should have workers") + + // Cleanup + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + err = pool.Shutdown(ctx) + assert.NoError(t, err, "should shutdown cleanly") +} + +func TestAutoConfig_CanOverrideWithOtherOptions(t *testing.T) { + // Auto config followed by manual override + pool, err := New( + WithAutoConfig(ProfileAPIServer), + WithMinWorkers(5), // Override min workers + ) + + require.NoError(t, err, "should create pool with overridden config") + require.NotNil(t, pool, "pool should not be nil") + + metrics := pool.Metrics() + assert.Equal(t, 5, metrics.ActiveWorkers(), "should respect manual override") + + // Cleanup + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + err = pool.Shutdown(ctx) + assert.NoError(t, err, "should shutdown cleanly") +} + +func TestSystemAwareConfig_QueueSizeBounds(t *testing.T) { + config := defaultConfig() + + // Test with very large memory per job (should hit minimum bound) + sysConfig := SystemAwareConfig{ + WorkloadType: IOBound, + AvgJobMemoryBytes: 100 * 1024 * 1024, // 100MB per job + MemoryLimitPercent: 0.1, + } + + applySystemAwareConfig(config, sysConfig) + + // Should be at minimum bound + minBound := config.maxWorkers * 5 + assert.GreaterOrEqual(t, config.queueSize, minBound, "should respect minimum queue size bound") + + err := config.validate() + assert.NoError(t, err, "config should be valid") +} + +func BenchmarkApplyProfile(b *testing.B) { + for i := 0; i < b.N; i++ { + config := defaultConfig() + applyProfile(config, ProfileAPIServer) + } +} + +func BenchmarkApplySystemAwareConfig(b *testing.B) { + sysConfig := SystemAwareConfig{ + WorkloadType: IOBound, + TargetLatencyMs: 500, + AvgJobMemoryBytes: 50 * 1024, + MemoryLimitPercent: 0.2, + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + config := defaultConfig() + applySystemAwareConfig(config, sysConfig) + } +} + +func BenchmarkDetectSystemResources(b *testing.B) { + for i := 0; i < b.N; i++ { + _ = detectSystemResources() + } +} diff --git a/examples/README.md b/examples/README.md index 17a908c..0c54350 100644 --- a/examples/README.md +++ b/examples/README.md @@ -2,18 +2,25 @@ This folder has some practical ways to use the adaptive pool. -### 1. HTTP Server (/http_server) +### 1. Auto-Configuration (/autoconfig) +Demonstrates intelligent auto-configuration based on system resources. +- Profile-based configuration for common workload types (API server, CPU intensive, batch processing). +- System-aware configuration with memory constraints and latency targets. +- Configuration suggestions for different workload types. +- Combining auto-config with manual overrides. + +### 2. HTTP Server (/http_server) Shows how to handle background jobs in a web server. - Uses backpressure to return a 503 instead of crashing when overloaded. - Includes a /metrics endpoint for Prometheus. - Handles graceful shutdown so no jobs are lost. -### 2. Batch Processor (/batch_processor) +### 3. Batch Processor (/batch_processor) Good for data processing scripts or background workers. - Processes 100,000 tasks using adaptive scaling. - Prints out throughput and latency as it goes. -### 3. stress test simulator (/one_million_simulator) +### 4. Stress Test Simulator (/one_million_simulator) The heavy-duty simulation used for the 1M RPS benchmarks. - with-pool: runs with protection enabled. - without-pool: runs the naive version (caution: uses lots of RAM). @@ -25,7 +32,17 @@ The heavy-duty simulation used for the 1M RPS benchmarks. You can use the Makefile in the root directory to run these: ```bash +make run-autoconfig make run-http make run-batch make run-comparison +``` + +Or run them directly: + +```bash +cd autoconfig && go run main.go +cd http_server && go run main.go +cd batch_processor && go run main.go +cd one_million_simulator/with_pool && go run main.go ``` \ No newline at end of file diff --git a/examples/autoconfig/main.go b/examples/autoconfig/main.go new file mode 100644 index 0000000..5a8d619 --- /dev/null +++ b/examples/autoconfig/main.go @@ -0,0 +1,180 @@ +package main + +import ( + "context" + "fmt" + "log" + "runtime" + "time" + + adaptivepool "github.com/iyashjayesh/go-adaptive-pool" +) + +func main() { + fmt.Println("=== Auto-Configuration Examples ===") + + // Example 1: Simple profile-based configuration + example1ProfileBased() + + // Example 2: System-aware configuration + example2SystemAware() + + // Example 3: Suggest configuration + example3SuggestConfig() + + // Example 4: Manual override with auto-config + example4ManualOverride() +} + +func example1ProfileBased() { + fmt.Println("Example 1: Profile-Based Auto-Configuration") + fmt.Println("-------------------------------------------") + + // Create pool with API Server profile + pool, err := adaptivepool.New( + adaptivepool.WithAutoConfig(adaptivepool.ProfileAPIServer), + ) + if err != nil { + log.Fatalf("Failed to create pool: %v", err) + } + + metrics := pool.Metrics() + fmt.Printf("Profile: API Server (I/O Bound)\n") + fmt.Printf("Initial Workers: %d\n", metrics.ActiveWorkers()) + fmt.Printf("Queue Capacity: %d\n", metrics.QueueCapacity()) + fmt.Printf("System CPUs: %d\n\n", runtime.NumCPU()) + + // Submit some jobs + ctx := context.Background() + for i := 0; i < 10; i++ { + err := pool.Submit(ctx, func(_ context.Context) error { + time.Sleep(100 * time.Millisecond) + fmt.Printf(" Job %d completed\n", i) + return nil + }) + if err != nil { + log.Printf("Failed to submit job: %v", err) + } + } + + // Wait for jobs to complete + time.Sleep(500 * time.Millisecond) + + // Shutdown + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := pool.Shutdown(shutdownCtx); err != nil { + log.Printf("Shutdown error: %v", err) + } + + fmt.Printf("Jobs Completed: %d\n", metrics.JobsProcessed()) + fmt.Println() +} + +func example2SystemAware() { + fmt.Println("Example 2: System-Aware Configuration") + fmt.Println("--------------------------------------") + + // Create pool with system-aware configuration + pool, err := adaptivepool.New( + adaptivepool.WithSystemAwareConfig(adaptivepool.SystemAwareConfig{ + WorkloadType: adaptivepool.IOBound, + TargetLatencyMs: 500, + AvgJobMemoryBytes: 50 * 1024, // 50KB per job + MemoryLimitPercent: 0.2, // Use 20% of available memory + }), + ) + if err != nil { + log.Fatalf("Failed to create pool: %v", err) + } + + metrics := pool.Metrics() + fmt.Printf("Workload Type: I/O Bound\n") + fmt.Printf("Target Latency: 500ms\n") + fmt.Printf("Initial Workers: %d\n", metrics.ActiveWorkers()) + fmt.Printf("Queue Capacity: %d\n", metrics.QueueCapacity()) + fmt.Printf("System CPUs: %d\n\n", runtime.NumCPU()) + + // Submit jobs + ctx := context.Background() + for i := 0; i < 20; i++ { + err := pool.Submit(ctx, func(_ context.Context) error { + time.Sleep(50 * time.Millisecond) + return nil + }) + if err != nil { + log.Printf("Failed to submit job: %v", err) + } + } + + // Wait and observe scaling + time.Sleep(1 * time.Second) + fmt.Printf("Workers after load: %d\n", metrics.ActiveWorkers()) + + // Shutdown + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := pool.Shutdown(shutdownCtx); err != nil { + log.Printf("Shutdown error: %v", err) + } + + fmt.Printf("Jobs Completed: %d\n", metrics.JobsProcessed()) + fmt.Println() +} + +func example3SuggestConfig() { + fmt.Println("Example 3: Configuration Suggestions") + fmt.Println("-------------------------------------") + + workloadTypes := []struct { + name string + wt adaptivepool.WorkloadType + }{ + {"I/O Bound", adaptivepool.IOBound}, + {"CPU Bound", adaptivepool.CPUBound}, + {"Mixed", adaptivepool.Mixed}, + } + + numCPU := runtime.NumCPU() + fmt.Printf("System CPUs: %d\n\n", numCPU) + + for _, wt := range workloadTypes { + config := adaptivepool.SuggestConfig(wt.wt) + fmt.Printf("Suggested config for %s:\n", wt.name) + fmt.Printf(" Min Workers: %d (%.1fx CPU)\n", config.MinWorkers(), float64(config.MinWorkers())/float64(numCPU)) + fmt.Printf(" Max Workers: %d (%.1fx CPU)\n", config.MaxWorkers(), float64(config.MaxWorkers())/float64(numCPU)) + fmt.Printf(" Queue Size: %d (%.1fx Max Workers)\n", config.QueueSize(), float64(config.QueueSize())/float64(config.MaxWorkers())) + fmt.Println() + } +} + +func example4ManualOverride() { + fmt.Println("Example 4: Manual Override with Auto-Config") + fmt.Println("--------------------------------------------") + + // Start with CPU intensive profile, then override specific settings + pool, err := adaptivepool.New( + adaptivepool.WithAutoConfig(adaptivepool.ProfileCPUIntensive), + adaptivepool.WithMinWorkers(4), // Override min workers + adaptivepool.WithQueueSize(500), // Override queue size + ) + if err != nil { + log.Fatalf("Failed to create pool: %v", err) + } + + metrics := pool.Metrics() + fmt.Printf("Base Profile: CPU Intensive\n") + fmt.Printf("Overridden Min Workers: %d\n", metrics.ActiveWorkers()) + fmt.Printf("Overridden Queue Capacity: %d\n", metrics.QueueCapacity()) + fmt.Printf("System CPUs: %d\n\n", runtime.NumCPU()) + + // Shutdown + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := pool.Shutdown(shutdownCtx); err != nil { + log.Printf("Shutdown error: %v", err) + } + + fmt.Println("Pool shutdown successfully") + fmt.Println() +} diff --git a/metrics.go b/metrics.go index cc65cc2..4a201d5 100644 --- a/metrics.go +++ b/metrics.go @@ -12,6 +12,9 @@ type Metrics interface { // QueueDepth returns the current number of jobs in the queue QueueDepth() int + // QueueCapacity returns the maximum capacity of the queue + QueueCapacity() int + // ActiveWorkers returns the current number of active workers ActiveWorkers() int @@ -93,6 +96,11 @@ func (m *metrics) QueueDepth() int { return m.state.getQueueDepth() } +// QueueCapacity returns the maximum capacity of the queue +func (m *metrics) QueueCapacity() int { + return m.state.getQueueCapacity() +} + // ActiveWorkers returns the current number of active workers func (m *metrics) ActiveWorkers() int { return m.state.getWorkerCount() diff --git a/options.go b/options.go index 67e0e21..e33b5c6 100644 --- a/options.go +++ b/options.go @@ -75,6 +75,22 @@ func WithScaleCooldown(d time.Duration) Option { } } +// WithAutoConfig applies a predefined workload profile for automatic configuration +// This configures MinWorkers, MaxWorkers, and QueueSize based on system resources +func WithAutoConfig(profile WorkloadProfile) Option { + return func(c *Config) { + applyProfile(c, profile) + } +} + +// WithSystemAwareConfig applies advanced system-aware configuration +// This analyzes system resources and workload characteristics to optimize pool parameters +func WithSystemAwareConfig(sysConfig SystemAwareConfig) Option { + return func(c *Config) { + applySystemAwareConfig(c, sysConfig) + } +} + // defaultConfig returns the default configuration func defaultConfig() *Config { return &Config{ @@ -110,3 +126,38 @@ func (c *Config) validate() error { } return nil } + +// MinWorkers returns the minimum number of workers +func (c *Config) MinWorkers() int { + return c.minWorkers +} + +// MaxWorkers returns the maximum number of workers +func (c *Config) MaxWorkers() int { + return c.maxWorkers +} + +// QueueSize returns the queue size +func (c *Config) QueueSize() int { + return c.queueSize +} + +// ScaleUpThreshold returns the scale up threshold +func (c *Config) ScaleUpThreshold() float64 { + return c.scaleUpThreshold +} + +// ScaleDownIdleDuration returns the scale down idle duration +func (c *Config) ScaleDownIdleDuration() time.Duration { + return c.scaleDownIdleDuration +} + +// MetricsEnabled returns whether metrics are enabled +func (c *Config) MetricsEnabled() bool { + return c.metricsEnabled +} + +// ScaleCooldown returns the scale cooldown duration +func (c *Config) ScaleCooldown() time.Duration { + return c.scaleCooldown +}