The generic connection pool and task pool for Golang.
- WIP, v1.1.0, the first final release
- v1.0.x preview versions
- v0.9.x pre-released
For more information pls refer to examples/connpooldemo/main.go:
import "github.com/hedzr/pools/connpool"
pool := connpool.New(*poolSize,
connpool.WithWorkerDialer(newWorkerWithOpts(WithClientKeepAliveTimeout(*keepAliveTimeout))),
connpool.WithKeepAliveInterval(*keepAliveTimeout),
connpool.WithBlockIfCantBorrow(true),
)
defer pool.Close()
for i := 0; i < 10; i++ {
if c,ok:=pool.Borrow().(*clientSample); ok {
c.LongOper()
}
}A Dialer function feed to pool make it can be initialized implicitly.
connpool.Pool will hold the connections.
As a sample:
func newWorker() (w connpool.Worker, err error) {
w, err = newWorkerWithOpts()()
return
}
func newWorkerWithOpts(opts ...ClientSampleOpt) connpool.Dialer {
return func() (w connpool.Worker, err error) {
c := &clientSample{
keepAliveTimeout: 15 * time.Second,
sendCh: make(chan string),
doneCh: make(chan struct{}),
}
err = c.open()
if err == nil {
for _, opt := range opts {
opt(c)
}
}
w = c
return
}
}To keep the connection alive, your worker could implement connpool.KeepAliveTicker interface.
connpool.Pool will tick the workers periodically.
A good form is:
func (c *clientSample) Tick(tick time.Time) (err error) {
c.sendCh <- "PING"
return
}Generally the pool might return nil for Borrow() if all connections in pool had been borrowed.
But also WithBlockIfCantBorrow(true) can block at Borrow() till any connection returned by Return().
For more information pls refer to examples/jobsdemo/main.go:
package test
import (
"fmt"
"github.com/hedzr/pools/jobs"
"math/rand"
"time"
)
func testEntry(){
pool := jobs.New(30, jobs.WithOnEndCallback(func(result jobs.Result, err error, job jobs.Job, args ...interface{}) {
// onEndCallback here
return
}))
defer pool.Close()
for i := 0; i < 100; i++ {
pool.Schedule(newJob(i), i+1, i+2, i+3)
si := 1 + rand.Intn(10)
pool.ScheduleN(newJob2(i, si), si, i+1, i+2, i+3)
}
// t.Logf("pool size: %v", pool.Cap())
// pool.Pause()
// pool.Resume()
pool.WaitForIdle()
}
func newJob(i int) jobs.Job {
return &job1{taskIndex: i}
}
func newJob2(i, si int) jobs.Job {
return &job1{taskIndex: i, taskSubIndex: si}
}
type job1 struct {
taskIndex int
taskSubIndex int
}
func (j *job1) Run(workerIndex int, args ...interface{}) (res jobs.Result, err error) {
fmt.Printf("Task #%v [worker #%v]: args = %v\n", j.taskIndex, workerIndex, args)
time.Sleep(time.Duration(2+rand.Intn(2)) * time.Second)
return
}The above codes can be simplified:
package test
import "github.com/hedzr/pools/jobs"
func testEntry(){
pool := jobs.New(32, jobs.WithOnEndCallback(jobs.DummyOnEndCallback))
defer pool.CloseAndWait()
pool.Schedule(jobs.NewJobBuilder(func(workerIndex, subIndex int, args ...interface{}) (res jobs.Result, err error){
return
}), 1,2,3)
}Work-pool is a jobs scheduler but using a generator to feed the tasks.
For example:
package test
import "github.com/hedzr/pools/jobs"
func testWorkPool() {
pool := jobs.NewWorkPool(10)
defer pool.Wait()
generator := func(args ...interface{}) chan *jobs.Task {
ch := make(chan *jobs.Task)
count := args[0].(int)
go func() {
for i := 0; i < count; i++ {
job := newJob(i)
fmt.Printf(" -> new job #%d put\n", i)
ch <- jobs.ToTask(job, i+1, i+2, i+3)
}
close(ch)
}()
return ch
}
pool.OnComplete(func(numProcessed int) {
fmt.Printf("processed %d tasks\n", numProcessed)
})
pool.Run(generator, 30)
}For more information pls refer to examples/jobsdemo/main.go.
MIT