diff --git a/ants.go b/ants.go index d3306c4..560c8e5 100644 --- a/ants.go +++ b/ants.go @@ -1 +1,19 @@ package ants + +import "math" + +const DEFAULT_POOL_SIZE = math.MaxInt32 + +var defaultPool = NewPool(DEFAULT_POOL_SIZE) + +func Push(task f) error { + return defaultPool.Push(task) +} + +func Size() int { + return int(defaultPool.Size()) +} + +func Cap() int { + return int(defaultPool.Cap()) +} diff --git a/pool.go b/pool.go new file mode 100644 index 0000000..f5c7807 --- /dev/null +++ b/pool.go @@ -0,0 +1,103 @@ +package ants + +import ( + "runtime" + "sync/atomic" + "math" + "sync" +) + +type sig struct{} + +type f func() + +type Pool struct { + capacity int32 + length int32 + tasks chan f + workers chan *Worker + destroy chan sig + m sync.Mutex +} + +func NewPool(size int) *Pool { + p := &Pool{ + capacity: int32(size), + tasks: make(chan f, math.MaxInt32), + //workers: &sync.Pool{New: func() interface{} { return &Worker{} }}, + workers: make(chan *Worker, size), + destroy: make(chan sig, runtime.GOMAXPROCS(-1)), + } + p.loop() + return p +} + + +//------------------------------------------------------------------------- + +func (p *Pool) loop() { + for i := 0; i < runtime.GOMAXPROCS(-1); i++ { + go func() { + for { + select { + case task := <-p.tasks: + p.getWorker().sendTask(task) + case <-p.destroy: + return + } + } + }() + } +} + +func (p *Pool) Push(task f) error { + if len(p.destroy) > 0 { + return nil + } + p.tasks <- task + return nil +} +func (p *Pool) Size() int32 { + return atomic.LoadInt32(&p.length) +} + +func (p *Pool) Cap() int32 { + return atomic.LoadInt32(&p.capacity) +} + +func (p *Pool) Destroy() error { + p.m.Lock() + defer p.m.Unlock() + for i := 0; i < runtime.GOMAXPROCS(-1) + 1; i++ { + p.destroy <- sig{} + } + return nil +} + + +//------------------------------------------------------------------------- + +func (p *Pool) reachLimit() bool { + return p.Size() >= p.Cap() +} + +func (p *Pool) newWorker() *Worker { + worker := &Worker{ + pool: p, + task: make(chan f), + exit: make(chan sig), + } + worker.run() + atomic.AddInt32(&p.length, 1) + return worker +} + +func (p *Pool) getWorker() *Worker { + var worker *Worker + if p.reachLimit() { + worker = <-p.workers + } + worker = p.newWorker() + return worker +} + diff --git a/worker.go b/worker.go new file mode 100644 index 0000000..d380ade --- /dev/null +++ b/worker.go @@ -0,0 +1,31 @@ +package ants + +import "sync/atomic" + +type Worker struct { + pool *Pool + task chan f + exit chan sig +} + +func (w *Worker) run() { + go func() { + for { + select { + case f := <-w.task: + f() + case <-w.exit: + atomic.AddInt32(&w.pool.length, -1) + return + } + } + }() +} + +func (w *Worker) stop() { + w.exit <- sig{} +} + +func (w *Worker) sendTask(task f) { + w.task <- task +}