From 96d074234a612a15078f25cf2f156f833ff3182f Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Tue, 8 Mar 2022 15:30:24 +0800 Subject: [PATCH] Add a new method -- ReleaseTimeout() for waiting all workers to exit Fixes #212 --- ants.go | 3 +++ ants_test.go | 24 ++++++++++++++++++++++++ pool.go | 44 +++++++++++++++++++++++++++++++++++++++----- pool_func.go | 46 ++++++++++++++++++++++++++++++++++++++++------ 4 files changed, 106 insertions(+), 11 deletions(-) diff --git a/ants.go b/ants.go index 4538625..3439c48 100644 --- a/ants.go +++ b/ants.go @@ -66,6 +66,9 @@ var ( // ErrInvalidPreAllocSize will be returned when trying to set up a negative capacity under PreAlloc mode. ErrInvalidPreAllocSize = errors.New("can not set up a negative capacity under PreAlloc mode") + // ErrTimeout will be returned after the operations timed out. + ErrTimeout = errors.New("operation timed out") + //--------------------------------------------------------------------------- // workerChanCap determines whether the channel of a worker should be a buffered channel diff --git a/ants_test.go b/ants_test.go index 7e34578..41b1566 100644 --- a/ants_test.go +++ b/ants_test.go @@ -822,3 +822,27 @@ func TestPoolTuneScaleUp(t *testing.T) { close(c) pf.Release() } + +func TestReleaseTimeout(t *testing.T) { + p, _ := NewPool(10) + for i := 0; i < 5; i++ { + _ = p.Submit(func() { + time.Sleep(time.Second) + }) + } + assert.NotZero(t, p.Running()) + err := p.ReleaseTimeout(2 * time.Second) + assert.NoError(t, err) + + var pf *PoolWithFunc + pf, _ = NewPoolWithFunc(10, func(i interface{}) { + dur := i.(time.Duration) + time.Sleep(dur) + }) + for i := 0; i < 5; i++ { + _ = pf.Invoke(time.Second) + } + assert.NotZero(t, pf.Running()) + err = pf.ReleaseTimeout(2 * time.Second) + assert.NoError(t, err) +} diff --git a/pool.go b/pool.go index d3056a3..e8c4335 100644 --- a/pool.go +++ b/pool.go @@ -23,6 +23,7 @@ package ants import ( + "errors" "sync" "sync/atomic" "time" @@ -58,6 +59,8 @@ type Pool struct { // blockingNum is the number of the goroutines already been blocked on pool.Submit, protected by pool.lock blockingNum int + stopHeartbeat chan struct{} + options *Options } @@ -66,7 +69,14 @@ func (p *Pool) purgePeriodically() { heartbeat := time.NewTicker(p.options.ExpiryDuration) defer heartbeat.Stop() - for range heartbeat.C { + for { + select { + case <-heartbeat.C: + case <-p.stopHeartbeat: + p.stopHeartbeat <- struct{}{} + return + } + if p.IsClosed() { break } @@ -112,9 +122,10 @@ func NewPool(size int, options ...Option) (*Pool, error) { } p := &Pool{ - capacity: int32(size), - lock: internal.NewSpinLock(), - options: opts, + capacity: int32(size), + lock: internal.NewSpinLock(), + stopHeartbeat: make(chan struct{}, 1), + options: opts, } p.workerCache.New = func() interface{} { return &goWorker{ @@ -201,7 +212,9 @@ func (p *Pool) IsClosed() bool { // Release closes this pool and releases the worker queue. func (p *Pool) Release() { - atomic.StoreInt32(&p.state, CLOSED) + if !atomic.CompareAndSwapInt32(&p.state, OPENED, CLOSED) { + return + } p.lock.Lock() p.workers.reset() p.lock.Unlock() @@ -210,6 +223,27 @@ func (p *Pool) Release() { p.cond.Broadcast() } +// ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out. +func (p *Pool) ReleaseTimeout(timeout time.Duration) error { + if p.IsClosed() { + return errors.New("pool is already closed") + } + select { + case p.stopHeartbeat <- struct{}{}: + <-p.stopHeartbeat + default: + } + p.Release() + endTime := time.Now().Add(timeout) + for time.Now().Before(endTime) { + if p.Running() == 0 { + return nil + } + time.Sleep(10 * time.Millisecond) + } + return ErrTimeout +} + // Reboot reboots a closed pool. func (p *Pool) Reboot() { if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) { diff --git a/pool_func.go b/pool_func.go index 1a03b16..f9accb2 100644 --- a/pool_func.go +++ b/pool_func.go @@ -23,6 +23,7 @@ package ants import ( + "errors" "sync" "sync/atomic" "time" @@ -60,6 +61,8 @@ type PoolWithFunc struct { // blockingNum is the number of the goroutines already been blocked on pool.Submit, protected by pool.lock blockingNum int + stopHeartbeat chan struct{} + options *Options } @@ -69,7 +72,14 @@ func (p *PoolWithFunc) purgePeriodically() { defer heartbeat.Stop() var expiredWorkers []*goWorkerWithFunc - for range heartbeat.C { + for { + select { + case <-heartbeat.C: + case <-p.stopHeartbeat: + p.stopHeartbeat <- struct{}{} + return + } + if p.IsClosed() { break } @@ -131,10 +141,11 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi } p := &PoolWithFunc{ - capacity: int32(size), - poolFunc: pf, - lock: internal.NewSpinLock(), - options: opts, + capacity: int32(size), + poolFunc: pf, + lock: internal.NewSpinLock(), + stopHeartbeat: make(chan struct{}, 1), + options: opts, } p.workerCache.New = func() interface{} { return &goWorkerWithFunc{ @@ -218,7 +229,9 @@ func (p *PoolWithFunc) IsClosed() bool { // Release closes this pool and releases the worker queue. func (p *PoolWithFunc) Release() { - atomic.StoreInt32(&p.state, CLOSED) + if !atomic.CompareAndSwapInt32(&p.state, OPENED, CLOSED) { + return + } p.lock.Lock() idleWorkers := p.workers for _, w := range idleWorkers { @@ -231,6 +244,27 @@ func (p *PoolWithFunc) Release() { p.cond.Broadcast() } +// ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out. +func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error { + if p.IsClosed() { + return errors.New("pool is already closed") + } + select { + case p.stopHeartbeat <- struct{}{}: + <-p.stopHeartbeat + default: + } + p.Release() + endTime := time.Now().Add(timeout) + for time.Now().Before(endTime) { + if p.Running() == 0 { + return nil + } + time.Sleep(10 * time.Millisecond) + } + return ErrTimeout +} + // Reboot reboots a closed pool. func (p *PoolWithFunc) Reboot() { if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {