From 38bfdb900e22f0f6f85fcdc3ce2850433cfd644e Mon Sep 17 00:00:00 2001 From: David Lavieri Date: Wed, 29 Oct 2025 22:02:33 +0100 Subject: [PATCH 1/2] breaking change: remove obsolete Config.TimeSleep --- .github/workflows/build.yaml | 2 +- examples/basic/main.go | 3 +-- krun.go | 4 +--- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index ac1c985..238fc04 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -24,7 +24,7 @@ jobs: run: go mod download - name: "Run tests" - run: go test -count 10 -v ./... + run: go test -race -count 10 -v ./... - name: "Build" run: go build -o ./bin/krun . diff --git a/examples/basic/main.go b/examples/basic/main.go index 4e79489..f3a5f0f 100644 --- a/examples/basic/main.go +++ b/examples/basic/main.go @@ -11,8 +11,7 @@ import ( func main() { queue := krun.New(&krun.Config{ - Size: 5, // number of workers - WaitSleep: time.Microsecond, + Size: 5, // number of workers }) job := func(ctx context.Context) (interface{}, error) { diff --git a/krun.go b/krun.go index ff27375..5c9b30b 100644 --- a/krun.go +++ b/krun.go @@ -4,7 +4,6 @@ import ( "context" "errors" "sync" - "time" ) // ErrPoolClosed it is closed (hahah) @@ -38,8 +37,7 @@ type worker struct { } type Config struct { - Size int - WaitSleep time.Duration + Size int } func New(cfg *Config) Krun { From d139704bfe4ff0aae142ad9b919a2047e749385d Mon Sep 17 00:00:00 2001 From: David Lavieri Date: Sat, 1 Nov 2025 11:29:29 +0100 Subject: [PATCH 2/2] feat: Enhance Krun tests for handling edge cases and improve docs --- README.md | 3 +- krun.go | 97 +++++++++++++++++++++++-------- krun_test.go | 161 ++++++++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 233 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index 10152c5..0ce23f8 100644 --- a/README.md +++ b/README.md @@ -30,8 +30,7 @@ import ( func main() { queue := krun.New(&krun.Config{ - Size: 5, // number of workers - WaitSleep: time.Microsecond, + Size: 5, // number of workers }) job := func(ctx context.Context) (interface{}, error) { diff --git a/krun.go b/krun.go index 5c9b30b..37f2ea8 100644 --- a/krun.go +++ b/krun.go @@ -6,19 +6,29 @@ import ( "sync" ) -// ErrPoolClosed it is closed (hahah) +// ErrPoolClosed is returned when operations are attempted on a closed pool. var ErrPoolClosed = errors.New("pool's closed") +// Result represents the result of a job execution. type Result struct { Data interface{} Error error } + +// Job represents a function that can be executed by the worker pool. +// It receives a context and returns a result and an error. type Job func(ctx context.Context) (interface{}, error) +// Krun is the interface for a worker pool that can execute jobs concurrently. type Krun interface { + // Run executes a job and returns a channel that will receive the result. Run(ctx context.Context, f Job) <-chan *Result + // Wait blocks until all running jobs complete or the context is cancelled. Wait(ctx context.Context) + // Size returns the number of workers in the pool. Size() int + // Close shuts down the pool, waiting for all running jobs to complete. + // Returns ErrPoolClosed if called multiple times. Close() error } @@ -36,21 +46,28 @@ type worker struct { result chan *Result } +// Config configures a new Krun instance. type Config struct { Size int } +// New creates a new Krun worker pool with the given configuration. func New(cfg *Config) Krun { + size := 1 + if cfg != nil && cfg.Size > 0 { + size = cfg.Size + } + k := &krun{ - poolSize: cfg.Size, + poolSize: size, closed: false, - workers: make(chan *worker, cfg.Size), + workers: make(chan *worker, size), wg: sync.WaitGroup{}, mu: sync.RWMutex{}, } - for i := 0; i < cfg.Size; i++ { + for i := 0; i < size; i++ { k.push(&worker{}) } @@ -65,18 +82,46 @@ func (k *krun) Size() int { } func (k *krun) Run(ctx context.Context, f Job) <-chan *Result { - // get worker from the channel - w := k.pop() - k.wg.Add(1) - - // assign Job to the worker and Run it cr := make(chan *Result, 1) - w.job = f - w.result = cr - go k.work(ctx, w) - // return channel to the caller - return cr + // Check if context is already cancelled before trying to get a worker + if ctx.Err() != nil { + cr <- &Result{Error: ctx.Err()} + return cr + } + + // get worker from the channel + select { + case <-ctx.Done(): + cr <- &Result{Error: ctx.Err()} + return cr + case w, ok := <-k.workers: + if !ok { + // Channel was closed + cr <- &Result{Error: ErrPoolClosed} + return cr + } + + // Check if pool was closed after getting worker + k.mu.RLock() + closed := k.closed + k.mu.RUnlock() + + if closed { + // Pool was closed, discard worker + cr <- &Result{Error: ErrPoolClosed} + return cr + } + + k.wg.Add(1) + + // assign Job to the worker and Run it + w.job = f + w.result = cr + go k.work(ctx, w) + + return cr + } } func (k *krun) Wait(ctx context.Context) { @@ -104,12 +149,12 @@ func (k *krun) Close() error { k.closed = true k.mu.Unlock() + // Close worker channel first to unblock any waiting Run() calls + close(k.workers) + // Wait for all work to complete k.wg.Wait() - // Close worker channel - close(k.workers) - return nil } @@ -119,15 +164,21 @@ func (k *krun) work(ctx context.Context, w *worker) { // send Result into the caller channel w.result <- &Result{d, err} + k.wg.Done() - // return worker to Krun + // return worker to Krun if pool is still open k.push(w) - k.wg.Done() } + func (k *krun) push(w *worker) { - k.workers <- w -} + k.mu.RLock() + closed := k.closed + k.mu.RUnlock() + + if closed { + // Pool is closed, discard worker + return + } -func (k *krun) pop() *worker { - return <-k.workers + k.workers <- w } diff --git a/krun_test.go b/krun_test.go index 3fe07e6..2d17514 100644 --- a/krun_test.go +++ b/krun_test.go @@ -21,6 +21,36 @@ func TestNew(t *testing.T) { t.Fatalf("Expected *krun, got %T", v) } }) + + t.Run("handles nil Config", func(t *testing.T) { + k := New(nil) + if k == nil { + t.Fatalf("Expected Krun, got nil") + } + if k.Size() != 1 { + t.Fatalf("Expected default size 1, got %d", k.Size()) + } + }) + + t.Run("handles Size 0", func(t *testing.T) { + k := New(&Config{Size: 0}) + if k == nil { + t.Fatalf("Expected Krun, got nil") + } + if k.Size() != 1 { + t.Fatalf("Expected default size 1, got %d", k.Size()) + } + }) + + t.Run("handles negative Size", func(t *testing.T) { + k := New(&Config{Size: -5}) + if k == nil { + t.Fatalf("Expected Krun, got nil") + } + if k.Size() != 1 { + t.Fatalf("Expected default size 1, got %d", k.Size()) + } + }) } func TestKrun_Size(t *testing.T) { @@ -68,8 +98,6 @@ func TestRun(t *testing.T) { if tp != "my-string" { t.Fatalf("expected \"my-string\", received: %s", tp) } - - break default: t.Fatalf("expected string, got %t", tp) } @@ -111,7 +139,7 @@ func TestRun(t *testing.T) { select { case e := <-errChan: - t.Fatalf(e.Error()) + t.Fatalf("Expected nil, got %v", e) case <-time.After(time.Millisecond): return } @@ -135,6 +163,96 @@ func TestRun(t *testing.T) { t.Fatalf("Expected nil, got %v", d.Data) } }) + + t.Run("returns error if context already cancelled", func(t *testing.T) { + k := New(&Config{Size: 1}) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + r := <-k.Run(ctx, func(ctx context.Context) (interface{}, error) { + return "should not run", nil + }) + + if r.Error == nil { + t.Fatalf("Expected context cancelled error, got nil") + } + if r.Error != ctx.Err() { + t.Fatalf("Expected context cancelled error, got %v", r.Error) + } + }) + + t.Run("returns error if pool is closed", func(t *testing.T) { + k := New(&Config{Size: 1}) + + // Close the pool + if err := k.Close(); err != nil { + t.Fatalf("Expected nil on first close, got %v", err) + } + + // Try to run a job after close + r := <-k.Run(context.Background(), func(ctx context.Context) (interface{}, error) { + return "should not run", nil + }) + + if r.Error == nil { + t.Fatalf("Expected ErrPoolClosed, got nil") + } + if !errors.Is(r.Error, ErrPoolClosed) { + t.Fatalf("Expected ErrPoolClosed, got %v", r.Error) + } + }) + + t.Run("handles Run() after Close() while job waiting", func(t *testing.T) { + // t.SkipNow() + // return + k := New(&Config{Size: 1}) + + ctx := context.Background() + + // Start a job that takes time + started := make(chan struct{}) + jobDone := make(chan struct{}) + _ = k.Run(ctx, func(ctx context.Context) (interface{}, error) { + started <- struct{}{} + <-jobDone + return "done", nil + }) + + // Wait for job to start + <-started + + // Close in another goroutine + closeDone := make(chan error) + go func() { + closeDone <- k.Close() + }() + + // Try to run another job while closing + r := <-k.Run(ctx, func(ctx context.Context) (interface{}, error) { + return "should not run", nil + }) + + if r.Error == nil { + t.Fatalf("Expected ErrPoolClosed, got nil") + } + if !errors.Is(r.Error, ErrPoolClosed) { + t.Fatalf("Expected ErrPoolClosed, got %v", r.Error) + } + + // Finish the running job + close(jobDone) + + // Wait for close to complete + select { + case err := <-closeDone: + if err != nil { + t.Fatalf("Expected nil on close, got %v", err) + } + case <-time.After(50 * time.Millisecond): + t.Fatalf("Close did not complete in time") + } + }) } func TestKrun_Wait(t *testing.T) { @@ -296,4 +414,41 @@ func TestKrun_Close(t *testing.T) { t.Fatalf("Close did not complete in time") } }) + + t.Run("concurrent Close() calls", func(t *testing.T) { + k := New(&Config{Size: 5}) + + results := make(chan error, 10) + for i := 0; i < 10; i++ { + go func() { + results <- k.Close() + }() + } + + // One should succeed, rest should get ErrPoolClosed + successCount := 0 + errorCount := 0 + + for i := 0; i < 10; i++ { + select { + case err := <-results: + if err == nil { + successCount++ + } else if errors.Is(err, ErrPoolClosed) { + errorCount++ + } else { + t.Fatalf("Unexpected error: %v", err) + } + case <-time.After(100 * time.Millisecond): + t.Fatalf("Close did not complete in time") + } + } + + if successCount != 1 { + t.Fatalf("Expected exactly 1 successful close, got %d", successCount) + } + if errorCount != 9 { + t.Fatalf("Expected 9 ErrPoolClosed, got %d", errorCount) + } + }) }