Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
go-version: ['1.21', '1.22', '1.23']
go-version: ['1.25']

steps:
- name: Checkout code
Expand Down Expand Up @@ -55,12 +55,12 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: '1.23'
go-version: '1.25'

- name: Run golangci-lint
uses: golangci/golangci-lint-action@v8
with:
version: v2.2
version: latest
args: --timeout=5m
only-new-issues: true

Expand All @@ -75,7 +75,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: '1.23'
go-version: '1.25'

- name: Run benchmarks
run: go test -bench=. -benchmem -benchtime=5s ./... | tee benchmark.txt
Expand Down
71 changes: 62 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,39 @@
![Visitors](https://api.visitorbadge.io/api/visitors?path=iyashjayesh%2Fgo-adaptive-pool%20&countColor=%23263759&style=flat)
![GitHub last commit]( https://img.shields.io/github/last-commit/iyashjayesh/go-adaptive-pool)

A resilient adaptive worker pool for Go that handles dynamic scaling, backpressure, metrics and safe shutdown under load. It's built to keep your system stable when traffic spikes by not letting goroutines grow out of control.
> `go-adaptive-pool` is a bounded worker pool for Go with an adaptive worker lifecycle and explicit backpressure, designed to keep systems stable under bursty load.

## Features
> The goal is not to maximize throughput at all costs, but to prevent unbounded goroutine growth, avoid OOMs, and force overload to be handled explicitly instead of crashing later.

- **Bounded Concurrency**: Fixed queue size prevents unbounded memory growth
- **Explicit Backpressure**: Context-aware blocking when queue is full
- **Adaptive Scaling**: Workers scale up/down based on queue utilization
- **Safe Shutdown**: Graceful draining with deterministic worker cleanup
- **Prometheus Metrics**: Built-in observability for queue depth, throughput, and latency
- **Zero Global State**: Multiple pool instances with isolated metrics
- **Production Ready**: Comprehensive tests with race detector and goroutine leak detection
## What this library does

This pool focuses on controlling concurrency and memory usage when job submission can outpace processing.

It provides:

- Bounded concurrency via a fixed-size queue
- Adaptive worker lifecycle. Workers scale up and down based on queue pressure
- Explicit backpressure. When the queue is full, submissions block or fail fast
- Observability via built-in Prometheus metrics
- Safe shutdown with graceful draining and no goroutine leaks

The `adaptive` behavior here is worker lifecycle adaptation, not request-level concurrency control.

## Key features

- *Bounded queue*
- Fixed queue size to prevent unbounded memory growth.
- *Adaptive worker lifecycle*
- Workers scale up and down based on queue utilization, within configured limits.
- *Explicit backpressure*
- When overloaded, submissions block or are rejected. The caller must handle it.
- *Observability*
- Built-in Prometheus metrics:
- queue depth
- throughput
- latency
- *Safe shutdown*
- Graceful draining of queued jobs and clean worker shutdown with no leaks.

## Installation

Expand Down Expand Up @@ -172,6 +194,37 @@ cd examples/batch_processor
go run main.go
```

## Scope and non-goals

This library is intentionally narrow in scope.

It does NOT:

- Perform latency- or error-driven adaptive concurrency control
- Implement AIMD-style or feedback-loop–based limiters
- Replace adaptive limiters like failsafe-go or go-adaptive-limiter

It DOES:

- Enforce hard limits on concurrency and memory usage
- Prevent goroutine explosions under burst load
- Apply backpressure when the system is overloaded
- Make overload visible and explicit instead of failing implicitly

If you already have a well-tuned adaptive limiter controlling request concurrency, a fixed-size worker pool may be sufficient.

## Why this exists

In many real systems, goroutines are cheap individually but unbounded submission is not.

Under traffic spikes, naive patterns often lead to:
- Runaway goroutine creation
- Unbounded queues
- Memory pressure and OOMs
- Failure modes that only appear under load

This pool enforces limits and predictable behavior by design.

## 1 Million RPS Stress Test

We performed an extreme pressure test (1M RPS target for 30s with 500KB tasks) to compare the adaptive pool against naive goroutine spawning.
Expand Down
2 changes: 1 addition & 1 deletion comparison_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"testing"
"time"

"github.com/iyashjayesh/go-adaptive-pool"
adaptivepool "github.com/iyashjayesh/go-adaptive-pool"
)

// BenchmarkNaiveGoroutines benchmarks spawning a goroutine for each job
Expand Down
2 changes: 1 addition & 1 deletion examples/batch_processor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func main() {

// Submit tasks
SubmitStart := time.Now()
for i := 0; i < numTasks; i++ {
for i := range numTasks {
if ctx.Err() != nil {
break
}
Expand Down
4 changes: 2 additions & 2 deletions examples/comparison/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ func parseOutput(output string) Stats {

// Fallback: If no summary (crashed), parse the last periodic log line
if s.Completed == "" && lastLogLine != "" {
parts := strings.Split(lastLogLine, "|")
for _, p := range parts {
parts := strings.SplitSeq(lastLogLine, "|")
for p := range parts {
switch {
case strings.Contains(p, "Goro:"):
s.PeakGoro = strings.TrimSpace(strings.Split(p, ":")[1]) + " (Crashed)"
Expand Down
2 changes: 1 addition & 1 deletion examples/http_server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func handleMetrics(w http.ResponseWriter, _ *http.Request) {
metrics := pool.Metrics()

w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(map[string]interface{}{
_ = json.NewEncoder(w).Encode(map[string]any{
"queue_depth": metrics.QueueDepth(),
"active_workers": metrics.ActiveWorkers(),
"jobs_processed": metrics.JobsProcessed(),
Expand Down
2 changes: 1 addition & 1 deletion examples/one_million_simulator/naive/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func runSubmission(ctx context.Context, s *stats) {
var wg sync.WaitGroup
wg.Add(generators)

for i := 0; i < generators; i++ {
for range generators {
go SubmitLoop(ctx, s, &wg)
}

Expand Down
2 changes: 1 addition & 1 deletion examples/one_million_simulator/with_pool/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func main() {
var wg sync.WaitGroup
wg.Add(numGenerators)

for i := 0; i < numGenerators; i++ {
for range numGenerators {
go func() {
defer wg.Done()
for {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/iyashjayesh/go-adaptive-pool

go 1.23.0
go 1.25.0

require (
github.com/prometheus/client_golang v1.23.2
Expand Down
2 changes: 1 addition & 1 deletion shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (p *pool) Shutdown(ctx context.Context) error {

// Signal all workers to terminate
currentWorkers := p.state.getWorkerCount()
for i := 0; i < currentWorkers; i++ {
for range currentWorkers {
select {
case p.state.workerShutdown <- struct{}{}:
default:
Expand Down
Loading