Add a new method -- ReleaseTimeout() for waiting all workers to exit

Fixes #212
This commit is contained in:
Andy Pan 2022-03-08 15:30:24 +08:00
parent 134f354e8e
commit 96d074234a
4 changed files with 106 additions and 11 deletions

View File

@ -66,6 +66,9 @@ var (
// ErrInvalidPreAllocSize will be returned when trying to set up a negative capacity under PreAlloc mode. // ErrInvalidPreAllocSize will be returned when trying to set up a negative capacity under PreAlloc mode.
ErrInvalidPreAllocSize = errors.New("can not set up a negative capacity under PreAlloc mode") ErrInvalidPreAllocSize = errors.New("can not set up a negative capacity under PreAlloc mode")
// ErrTimeout will be returned after the operations timed out.
ErrTimeout = errors.New("operation timed out")
//--------------------------------------------------------------------------- //---------------------------------------------------------------------------
// workerChanCap determines whether the channel of a worker should be a buffered channel // workerChanCap determines whether the channel of a worker should be a buffered channel

View File

@ -822,3 +822,27 @@ func TestPoolTuneScaleUp(t *testing.T) {
close(c) close(c)
pf.Release() pf.Release()
} }
func TestReleaseTimeout(t *testing.T) {
p, _ := NewPool(10)
for i := 0; i < 5; i++ {
_ = p.Submit(func() {
time.Sleep(time.Second)
})
}
assert.NotZero(t, p.Running())
err := p.ReleaseTimeout(2 * time.Second)
assert.NoError(t, err)
var pf *PoolWithFunc
pf, _ = NewPoolWithFunc(10, func(i interface{}) {
dur := i.(time.Duration)
time.Sleep(dur)
})
for i := 0; i < 5; i++ {
_ = pf.Invoke(time.Second)
}
assert.NotZero(t, pf.Running())
err = pf.ReleaseTimeout(2 * time.Second)
assert.NoError(t, err)
}

44
pool.go
View File

@ -23,6 +23,7 @@
package ants package ants
import ( import (
"errors"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -58,6 +59,8 @@ type Pool struct {
// blockingNum is the number of the goroutines already been blocked on pool.Submit, protected by pool.lock // blockingNum is the number of the goroutines already been blocked on pool.Submit, protected by pool.lock
blockingNum int blockingNum int
stopHeartbeat chan struct{}
options *Options options *Options
} }
@ -66,7 +69,14 @@ func (p *Pool) purgePeriodically() {
heartbeat := time.NewTicker(p.options.ExpiryDuration) heartbeat := time.NewTicker(p.options.ExpiryDuration)
defer heartbeat.Stop() defer heartbeat.Stop()
for range heartbeat.C { for {
select {
case <-heartbeat.C:
case <-p.stopHeartbeat:
p.stopHeartbeat <- struct{}{}
return
}
if p.IsClosed() { if p.IsClosed() {
break break
} }
@ -112,9 +122,10 @@ func NewPool(size int, options ...Option) (*Pool, error) {
} }
p := &Pool{ p := &Pool{
capacity: int32(size), capacity: int32(size),
lock: internal.NewSpinLock(), lock: internal.NewSpinLock(),
options: opts, stopHeartbeat: make(chan struct{}, 1),
options: opts,
} }
p.workerCache.New = func() interface{} { p.workerCache.New = func() interface{} {
return &goWorker{ return &goWorker{
@ -201,7 +212,9 @@ func (p *Pool) IsClosed() bool {
// Release closes this pool and releases the worker queue. // Release closes this pool and releases the worker queue.
func (p *Pool) Release() { func (p *Pool) Release() {
atomic.StoreInt32(&p.state, CLOSED) if !atomic.CompareAndSwapInt32(&p.state, OPENED, CLOSED) {
return
}
p.lock.Lock() p.lock.Lock()
p.workers.reset() p.workers.reset()
p.lock.Unlock() p.lock.Unlock()
@ -210,6 +223,27 @@ func (p *Pool) Release() {
p.cond.Broadcast() p.cond.Broadcast()
} }
// ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out.
func (p *Pool) ReleaseTimeout(timeout time.Duration) error {
if p.IsClosed() {
return errors.New("pool is already closed")
}
select {
case p.stopHeartbeat <- struct{}{}:
<-p.stopHeartbeat
default:
}
p.Release()
endTime := time.Now().Add(timeout)
for time.Now().Before(endTime) {
if p.Running() == 0 {
return nil
}
time.Sleep(10 * time.Millisecond)
}
return ErrTimeout
}
// Reboot reboots a closed pool. // Reboot reboots a closed pool.
func (p *Pool) Reboot() { func (p *Pool) Reboot() {
if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) { if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {

View File

@ -23,6 +23,7 @@
package ants package ants
import ( import (
"errors"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -60,6 +61,8 @@ type PoolWithFunc struct {
// blockingNum is the number of the goroutines already been blocked on pool.Submit, protected by pool.lock // blockingNum is the number of the goroutines already been blocked on pool.Submit, protected by pool.lock
blockingNum int blockingNum int
stopHeartbeat chan struct{}
options *Options options *Options
} }
@ -69,7 +72,14 @@ func (p *PoolWithFunc) purgePeriodically() {
defer heartbeat.Stop() defer heartbeat.Stop()
var expiredWorkers []*goWorkerWithFunc var expiredWorkers []*goWorkerWithFunc
for range heartbeat.C { for {
select {
case <-heartbeat.C:
case <-p.stopHeartbeat:
p.stopHeartbeat <- struct{}{}
return
}
if p.IsClosed() { if p.IsClosed() {
break break
} }
@ -131,10 +141,11 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi
} }
p := &PoolWithFunc{ p := &PoolWithFunc{
capacity: int32(size), capacity: int32(size),
poolFunc: pf, poolFunc: pf,
lock: internal.NewSpinLock(), lock: internal.NewSpinLock(),
options: opts, stopHeartbeat: make(chan struct{}, 1),
options: opts,
} }
p.workerCache.New = func() interface{} { p.workerCache.New = func() interface{} {
return &goWorkerWithFunc{ return &goWorkerWithFunc{
@ -218,7 +229,9 @@ func (p *PoolWithFunc) IsClosed() bool {
// Release closes this pool and releases the worker queue. // Release closes this pool and releases the worker queue.
func (p *PoolWithFunc) Release() { func (p *PoolWithFunc) Release() {
atomic.StoreInt32(&p.state, CLOSED) if !atomic.CompareAndSwapInt32(&p.state, OPENED, CLOSED) {
return
}
p.lock.Lock() p.lock.Lock()
idleWorkers := p.workers idleWorkers := p.workers
for _, w := range idleWorkers { for _, w := range idleWorkers {
@ -231,6 +244,27 @@ func (p *PoolWithFunc) Release() {
p.cond.Broadcast() p.cond.Broadcast()
} }
// ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out.
func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error {
if p.IsClosed() {
return errors.New("pool is already closed")
}
select {
case p.stopHeartbeat <- struct{}{}:
<-p.stopHeartbeat
default:
}
p.Release()
endTime := time.Now().Add(timeout)
for time.Now().Before(endTime) {
if p.Running() == 0 {
return nil
}
time.Sleep(10 * time.Millisecond)
}
return ErrTimeout
}
// Reboot reboots a closed pool. // Reboot reboots a closed pool.
func (p *PoolWithFunc) Reboot() { func (p *PoolWithFunc) Reboot() {
if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) { if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {