opt: streamline pool implementation to reduce duplicated code (#350)

Also, bump up the minimal required Go version from 1.16 to 1.18.
This commit is contained in:
Andy Pan 2025-01-12 13:55:38 +08:00 committed by GitHub
parent 4f33c6ef27
commit 9a1446b823
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 481 additions and 782 deletions

View File

@ -55,7 +55,7 @@ jobs:
strategy: strategy:
fail-fast: false fail-fast: false
matrix: matrix:
go: [1.16, 1.23] go: [1.18, 1.23]
os: [ubuntu-latest, macos-latest, windows-latest] os: [ubuntu-latest, macos-latest, windows-latest]
name: Go ${{ matrix.go }} @ ${{ matrix.os }} name: Go ${{ matrix.go }} @ ${{ matrix.os }}
runs-on: ${{ matrix.os}} runs-on: ${{ matrix.os}}

View File

@ -7,7 +7,7 @@
<a title="Release" target="_blank" href="https://github.com/panjf2000/ants/releases"><img src="https://img.shields.io/github/v/release/panjf2000/ants.svg?color=161823&style=flat-square&logo=smartthings" /></a> <a title="Release" target="_blank" href="https://github.com/panjf2000/ants/releases"><img src="https://img.shields.io/github/v/release/panjf2000/ants.svg?color=161823&style=flat-square&logo=smartthings" /></a>
<a title="Tag" target="_blank" href="https://github.com/panjf2000/ants/tags"><img src="https://img.shields.io/github/v/tag/panjf2000/ants?color=%23ff8936&logo=fitbit&style=flat-square" /></a> <a title="Tag" target="_blank" href="https://github.com/panjf2000/ants/tags"><img src="https://img.shields.io/github/v/tag/panjf2000/ants?color=%23ff8936&logo=fitbit&style=flat-square" /></a>
<br/> <br/>
<a title="Minimum Go Version" target="_blank" href="https://github.com/panjf2000/gnet"><img src="https://img.shields.io/badge/go-%3E%3D1.16-30dff3?style=flat-square&logo=go" /></a> <a title="Minimum Go Version" target="_blank" href="https://github.com/panjf2000/gnet"><img src="https://img.shields.io/badge/go-%3E%3D1.18-30dff3?style=flat-square&logo=go" /></a>
<a title="Go Report Card" target="_blank" href="https://goreportcard.com/report/github.com/panjf2000/ants"><img src="https://goreportcard.com/badge/github.com/panjf2000/ants?style=flat-square" /></a> <a title="Go Report Card" target="_blank" href="https://goreportcard.com/report/github.com/panjf2000/ants"><img src="https://goreportcard.com/badge/github.com/panjf2000/ants?style=flat-square" /></a>
<a title="Doc for ants" target="_blank" href="https://pkg.go.dev/github.com/panjf2000/ants/v2?tab=doc"><img src="https://img.shields.io/badge/go.dev-doc-007d9c?style=flat-square&logo=read-the-docs" /></a> <a title="Doc for ants" target="_blank" href="https://pkg.go.dev/github.com/panjf2000/ants/v2?tab=doc"><img src="https://img.shields.io/badge/go.dev-doc-007d9c?style=flat-square&logo=read-the-docs" /></a>
<a title="Mentioned in Awesome Go" target="_blank" href="https://github.com/avelino/awesome-go#goroutines"><img src="https://awesome.re/mentioned-badge-flat.svg" /></a> <a title="Mentioned in Awesome Go" target="_blank" href="https://github.com/avelino/awesome-go#goroutines"><img src="https://awesome.re/mentioned-badge-flat.svg" /></a>
@ -78,7 +78,7 @@ import (
var sum int32 var sum int32
func myFunc(i interface{}) { func myFunc(i any) {
n := i.(int32) n := i.(int32)
atomic.AddInt32(&sum, n) atomic.AddInt32(&sum, n)
fmt.Printf("run with %d\n", n) fmt.Printf("run with %d\n", n)
@ -110,7 +110,7 @@ func main() {
// Use the pool with a function, // Use the pool with a function,
// set 10 to the capacity of goroutine pool and 1 second for expired duration. // set 10 to the capacity of goroutine pool and 1 second for expired duration.
p, _ := ants.NewPoolWithFunc(10, func(i interface{}) { p, _ := ants.NewPoolWithFunc(10, func(i any) {
myFunc(i) myFunc(i)
wg.Done() wg.Done()
}) })
@ -141,7 +141,7 @@ func main() {
fmt.Printf("finish all tasks.\n") fmt.Printf("finish all tasks.\n")
// Use the MultiPoolFunc and set the capacity of 10 goroutine pools to (runTimes/10). // Use the MultiPoolFunc and set the capacity of 10 goroutine pools to (runTimes/10).
mpf, _ := ants.NewMultiPoolWithFunc(10, runTimes/10, func(i interface{}) { mpf, _ := ants.NewMultiPoolWithFunc(10, runTimes/10, func(i any) {
myFunc(i) myFunc(i)
wg.Done() wg.Done()
}, ants.LeastTasks) }, ants.LeastTasks)
@ -186,7 +186,7 @@ type Options struct {
// PanicHandler is used to handle panics from each worker goroutine. // PanicHandler is used to handle panics from each worker goroutine.
// if nil, panics will be thrown out again from worker goroutines. // if nil, panics will be thrown out again from worker goroutines.
PanicHandler func(interface{}) PanicHandler func(any)
// Logger is the customized logger for logging info, if it is not set, // Logger is the customized logger for logging info, if it is not set,
// default standard logger from log package is used. // default standard logger from log package is used.
@ -229,7 +229,7 @@ func WithNonblocking(nonblocking bool) Option {
} }
// WithPanicHandler sets up panic handler. // WithPanicHandler sets up panic handler.
func WithPanicHandler(panicHandler func(interface{})) Option { func WithPanicHandler(panicHandler func(any)) Option {
return func(opts *Options) { return func(opts *Options) {
opts.PanicHandler = panicHandler opts.PanicHandler = panicHandler
} }

View File

@ -7,7 +7,7 @@
<a title="Release" target="_blank" href="https://github.com/panjf2000/ants/releases"><img src="https://img.shields.io/github/v/release/panjf2000/ants.svg?color=161823&style=flat-square&logo=smartthings" /></a> <a title="Release" target="_blank" href="https://github.com/panjf2000/ants/releases"><img src="https://img.shields.io/github/v/release/panjf2000/ants.svg?color=161823&style=flat-square&logo=smartthings" /></a>
<a title="Tag" target="_blank" href="https://github.com/panjf2000/ants/tags"><img src="https://img.shields.io/github/v/tag/panjf2000/ants?color=%23ff8936&logo=fitbit&style=flat-square" /></a> <a title="Tag" target="_blank" href="https://github.com/panjf2000/ants/tags"><img src="https://img.shields.io/github/v/tag/panjf2000/ants?color=%23ff8936&logo=fitbit&style=flat-square" /></a>
<br/> <br/>
<a title="Minimum Go Version" target="_blank" href="https://github.com/panjf2000/gnet"><img src="https://img.shields.io/badge/go-%3E%3D1.16-30dff3?style=flat-square&logo=go" /></a> <a title="Minimum Go Version" target="_blank" href="https://github.com/panjf2000/gnet"><img src="https://img.shields.io/badge/go-%3E%3D1.18-30dff3?style=flat-square&logo=go" /></a>
<a title="Go Report Card" target="_blank" href="https://goreportcard.com/report/github.com/panjf2000/ants"><img src="https://goreportcard.com/badge/github.com/panjf2000/ants?style=flat-square" /></a> <a title="Go Report Card" target="_blank" href="https://goreportcard.com/report/github.com/panjf2000/ants"><img src="https://goreportcard.com/badge/github.com/panjf2000/ants?style=flat-square" /></a>
<a title="Doc for ants" target="_blank" href="https://pkg.go.dev/github.com/panjf2000/ants/v2?tab=doc"><img src="https://img.shields.io/badge/go.dev-doc-007d9c?style=flat-square&logo=read-the-docs" /></a> <a title="Doc for ants" target="_blank" href="https://pkg.go.dev/github.com/panjf2000/ants/v2?tab=doc"><img src="https://img.shields.io/badge/go.dev-doc-007d9c?style=flat-square&logo=read-the-docs" /></a>
<a title="Mentioned in Awesome Go" target="_blank" href="https://github.com/avelino/awesome-go#goroutines"><img src="https://awesome.re/mentioned-badge-flat.svg" /></a> <a title="Mentioned in Awesome Go" target="_blank" href="https://github.com/avelino/awesome-go#goroutines"><img src="https://awesome.re/mentioned-badge-flat.svg" /></a>
@ -78,7 +78,7 @@ import (
var sum int32 var sum int32
func myFunc(i interface{}) { func myFunc(i any) {
n := i.(int32) n := i.(int32)
atomic.AddInt32(&sum, n) atomic.AddInt32(&sum, n)
fmt.Printf("run with %d\n", n) fmt.Printf("run with %d\n", n)
@ -110,7 +110,7 @@ func main() {
// Use the pool with a function, // Use the pool with a function,
// set 10 to the capacity of goroutine pool and 1 second for expired duration. // set 10 to the capacity of goroutine pool and 1 second for expired duration.
p, _ := ants.NewPoolWithFunc(10, func(i interface{}) { p, _ := ants.NewPoolWithFunc(10, func(i any) {
myFunc(i) myFunc(i)
wg.Done() wg.Done()
}) })
@ -141,7 +141,7 @@ func main() {
fmt.Printf("finish all tasks.\n") fmt.Printf("finish all tasks.\n")
// Use the MultiPoolFunc and set the capacity of 10 goroutine pools to (runTimes/10). // Use the MultiPoolFunc and set the capacity of 10 goroutine pools to (runTimes/10).
mpf, _ := ants.NewMultiPoolWithFunc(10, runTimes/10, func(i interface{}) { mpf, _ := ants.NewMultiPoolWithFunc(10, runTimes/10, func(i any) {
myFunc(i) myFunc(i)
wg.Done() wg.Done()
}, ants.LeastTasks) }, ants.LeastTasks)
@ -186,7 +186,7 @@ type Options struct {
// PanicHandler is used to handle panics from each worker goroutine. // PanicHandler is used to handle panics from each worker goroutine.
// if nil, panics will be thrown out again from worker goroutines. // if nil, panics will be thrown out again from worker goroutines.
PanicHandler func(interface{}) PanicHandler func(any)
// Logger is the customized logger for logging info, if it is not set, // Logger is the customized logger for logging info, if it is not set,
// default standard logger from log package is used. // default standard logger from log package is used.
@ -229,7 +229,7 @@ func WithNonblocking(nonblocking bool) Option {
} }
// WithPanicHandler sets up panic handler. // WithPanicHandler sets up panic handler.
func WithPanicHandler(panicHandler func(interface{})) Option { func WithPanicHandler(panicHandler func(any)) Option {
return func(opts *Options) { return func(opts *Options) {
opts.PanicHandler = panicHandler opts.PanicHandler = panicHandler
} }

392
ants.go
View File

@ -30,12 +30,17 @@
package ants package ants
import ( import (
"context"
"errors" "errors"
"log" "log"
"math" "math"
"os" "os"
"runtime" "runtime"
"sync"
"sync/atomic"
"time" "time"
syncx "github.com/panjf2000/ants/v2/pkg/sync"
) )
const ( const (
@ -101,14 +106,6 @@ var (
defaultAntsPool, _ = NewPool(DefaultAntsPoolSize) defaultAntsPool, _ = NewPool(DefaultAntsPoolSize)
) )
const nowTimeUpdateInterval = 500 * time.Millisecond
// Logger is used for logging formatted messages.
type Logger interface {
// Printf must have the same semantics as log.Printf.
Printf(format string, args ...interface{})
}
// Submit submits a task to pool. // Submit submits a task to pool.
func Submit(task func()) error { func Submit(task func()) error {
return defaultAntsPool.Submit(task) return defaultAntsPool.Submit(task)
@ -143,3 +140,382 @@ func ReleaseTimeout(timeout time.Duration) error {
func Reboot() { func Reboot() {
defaultAntsPool.Reboot() defaultAntsPool.Reboot()
} }
// Logger is used for logging formatted messages.
type Logger interface {
// Printf must have the same semantics as log.Printf.
Printf(format string, args ...any)
}
// poolCommon contains all common fields for other sophisticated pools.
type poolCommon struct {
// capacity of the pool, a negative value means that the capacity of pool is limitless, an infinite pool is used to
// avoid potential issue of endless blocking caused by nested usage of a pool: submitting a task to pool
// which submits a new task to the same pool.
capacity int32
// running is the number of the currently running goroutines.
running int32
// lock for protecting the worker queue.
lock sync.Locker
// workers is a slice that store the available workers.
workers workerQueue
// state is used to notice the pool to closed itself.
state int32
// cond for waiting to get an idle worker.
cond *sync.Cond
// done is used to indicate that all workers are done.
allDone chan struct{}
// once is used to make sure the pool is closed just once.
once *sync.Once
// workerCache speeds up the obtainment of a usable worker in function:retrieveWorker.
workerCache sync.Pool
// waiting is the number of goroutines already been blocked on pool.Submit(), protected by pool.lock
waiting int32
purgeDone int32
purgeCtx context.Context
stopPurge context.CancelFunc
ticktockDone int32
ticktockCtx context.Context
stopTicktock context.CancelFunc
now atomic.Value
options *Options
}
func newPool(size int, options ...Option) (*poolCommon, error) {
if size <= 0 {
size = -1
}
opts := loadOptions(options...)
if !opts.DisablePurge {
if expiry := opts.ExpiryDuration; expiry < 0 {
return nil, ErrInvalidPoolExpiry
} else if expiry == 0 {
opts.ExpiryDuration = DefaultCleanIntervalTime
}
}
if opts.Logger == nil {
opts.Logger = defaultLogger
}
p := &poolCommon{
capacity: int32(size),
allDone: make(chan struct{}),
lock: syncx.NewSpinLock(),
once: &sync.Once{},
options: opts,
}
if p.options.PreAlloc {
if size == -1 {
return nil, ErrInvalidPreAllocSize
}
p.workers = newWorkerQueue(queueTypeLoopQueue, size)
} else {
p.workers = newWorkerQueue(queueTypeStack, 0)
}
p.cond = sync.NewCond(p.lock)
p.goPurge()
p.goTicktock()
return p, nil
}
// purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger.
func (p *poolCommon) purgeStaleWorkers() {
ticker := time.NewTicker(p.options.ExpiryDuration)
defer func() {
ticker.Stop()
atomic.StoreInt32(&p.purgeDone, 1)
}()
purgeCtx := p.purgeCtx // copy to the local variable to avoid race from Reboot()
for {
select {
case <-purgeCtx.Done():
return
case <-ticker.C:
}
if p.IsClosed() {
break
}
var isDormant bool
p.lock.Lock()
staleWorkers := p.workers.refresh(p.options.ExpiryDuration)
n := p.Running()
isDormant = n == 0 || n == len(staleWorkers)
p.lock.Unlock()
// Clean up the stale workers.
for i := range staleWorkers {
staleWorkers[i].finish()
staleWorkers[i] = nil
}
// There might be a situation where all workers have been cleaned up (no worker is running),
// while some invokers still are stuck in p.cond.Wait(), then we need to awake those invokers.
if isDormant && p.Waiting() > 0 {
p.cond.Broadcast()
}
}
}
const nowTimeUpdateInterval = 500 * time.Millisecond
// ticktock is a goroutine that updates the current time in the pool regularly.
func (p *poolCommon) ticktock() {
ticker := time.NewTicker(nowTimeUpdateInterval)
defer func() {
ticker.Stop()
atomic.StoreInt32(&p.ticktockDone, 1)
}()
ticktockCtx := p.ticktockCtx // copy to the local variable to avoid race from Reboot()
for {
select {
case <-ticktockCtx.Done():
return
case <-ticker.C:
}
if p.IsClosed() {
break
}
p.now.Store(time.Now())
}
}
func (p *poolCommon) goPurge() {
if p.options.DisablePurge {
return
}
// Start a goroutine to clean up expired workers periodically.
p.purgeCtx, p.stopPurge = context.WithCancel(context.Background())
go p.purgeStaleWorkers()
}
func (p *poolCommon) goTicktock() {
p.now.Store(time.Now())
p.ticktockCtx, p.stopTicktock = context.WithCancel(context.Background())
go p.ticktock()
}
func (p *poolCommon) nowTime() time.Time {
return p.now.Load().(time.Time)
}
// Running returns the number of workers currently running.
func (p *poolCommon) Running() int {
return int(atomic.LoadInt32(&p.running))
}
// Free returns the number of available workers, -1 indicates this pool is unlimited.
func (p *poolCommon) Free() int {
c := p.Cap()
if c < 0 {
return -1
}
return c - p.Running()
}
// Waiting returns the number of tasks waiting to be executed.
func (p *poolCommon) Waiting() int {
return int(atomic.LoadInt32(&p.waiting))
}
// Cap returns the capacity of this pool.
func (p *poolCommon) Cap() int {
return int(atomic.LoadInt32(&p.capacity))
}
// Tune changes the capacity of this pool, note that it is noneffective to the infinite or pre-allocation pool.
func (p *poolCommon) Tune(size int) {
capacity := p.Cap()
if capacity == -1 || size <= 0 || size == capacity || p.options.PreAlloc {
return
}
atomic.StoreInt32(&p.capacity, int32(size))
if size > capacity {
if size-capacity == 1 {
p.cond.Signal()
return
}
p.cond.Broadcast()
}
}
// IsClosed indicates whether the pool is closed.
func (p *poolCommon) IsClosed() bool {
return atomic.LoadInt32(&p.state) == CLOSED
}
// Release closes this pool and releases the worker queue.
func (p *poolCommon) Release() {
if !atomic.CompareAndSwapInt32(&p.state, OPENED, CLOSED) {
return
}
if p.stopPurge != nil {
p.stopPurge()
p.stopPurge = nil
}
if p.stopTicktock != nil {
p.stopTicktock()
p.stopTicktock = nil
}
p.lock.Lock()
p.workers.reset()
p.lock.Unlock()
// There might be some callers waiting in retrieveWorker(), so we need to wake them up to prevent
// those callers blocking infinitely.
p.cond.Broadcast()
}
// ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out.
func (p *poolCommon) ReleaseTimeout(timeout time.Duration) error {
if p.IsClosed() || (!p.options.DisablePurge && p.stopPurge == nil) || p.stopTicktock == nil {
return ErrPoolClosed
}
p.Release()
var purgeCh <-chan struct{}
if !p.options.DisablePurge {
purgeCh = p.purgeCtx.Done()
} else {
purgeCh = p.allDone
}
if p.Running() == 0 {
p.once.Do(func() {
close(p.allDone)
})
}
timer := time.NewTimer(timeout)
defer timer.Stop()
for {
select {
case <-timer.C:
return ErrTimeout
case <-p.allDone:
<-purgeCh
<-p.ticktockCtx.Done()
if p.Running() == 0 &&
(p.options.DisablePurge || atomic.LoadInt32(&p.purgeDone) == 1) &&
atomic.LoadInt32(&p.ticktockDone) == 1 {
return nil
}
}
}
}
// Reboot reboots a closed pool, it does nothing if the pool is not closed.
// If you intend to reboot a closed pool, use ReleaseTimeout() instead of
// Release() to ensure that all workers are stopped and resource are released
// before rebooting, otherwise you may run into data race.
func (p *poolCommon) Reboot() {
if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
atomic.StoreInt32(&p.purgeDone, 0)
p.goPurge()
atomic.StoreInt32(&p.ticktockDone, 0)
p.goTicktock()
p.allDone = make(chan struct{})
p.once = &sync.Once{}
}
}
func (p *poolCommon) addRunning(delta int) int {
return int(atomic.AddInt32(&p.running, int32(delta)))
}
func (p *poolCommon) addWaiting(delta int) {
atomic.AddInt32(&p.waiting, int32(delta))
}
// retrieveWorker returns an available worker to run the tasks.
func (p *poolCommon) retrieveWorker() (w worker, err error) {
p.lock.Lock()
retry:
// First try to fetch the worker from the queue.
if w = p.workers.detach(); w != nil {
p.lock.Unlock()
return
}
// If the worker queue is empty, and we don't run out of the pool capacity,
// then just spawn a new worker goroutine.
if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
p.lock.Unlock()
w = p.workerCache.Get().(worker)
w.run()
return
}
// Bail out early if it's in nonblocking mode or the number of pending callers reaches the maximum limit value.
if p.options.Nonblocking || (p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks) {
p.lock.Unlock()
return nil, ErrPoolOverload
}
// Otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.
p.addWaiting(1)
p.cond.Wait() // block and wait for an available worker
p.addWaiting(-1)
if p.IsClosed() {
p.lock.Unlock()
return nil, ErrPoolClosed
}
goto retry
}
// revertWorker puts a worker back into free pool, recycling the goroutines.
func (p *poolCommon) revertWorker(worker worker) bool {
if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() {
p.cond.Broadcast()
return false
}
worker.setLastUsedTime(p.nowTime())
p.lock.Lock()
// To avoid memory leaks, add a double check in the lock scope.
// Issue: https://github.com/panjf2000/ants/issues/113
if p.IsClosed() {
p.lock.Unlock()
return false
}
if err := p.workers.insert(worker); err != nil {
p.lock.Unlock()
return false
}
// Notify the invoker stuck in 'retrieveWorker()' of there is an available worker in the worker queue.
p.cond.Signal()
p.lock.Unlock()
return true
}

View File

@ -43,7 +43,7 @@ func demoFunc() {
time.Sleep(time.Duration(BenchParam) * time.Millisecond) time.Sleep(time.Duration(BenchParam) * time.Millisecond)
} }
func demoPoolFunc(args interface{}) { func demoPoolFunc(args any) {
n := args.(int) n := args.(int)
time.Sleep(time.Duration(n) * time.Millisecond) time.Sleep(time.Duration(n) * time.Millisecond)
} }
@ -58,7 +58,7 @@ func longRunningFunc() {
var stopLongRunningPoolFunc int32 var stopLongRunningPoolFunc int32
func longRunningPoolFunc(arg interface{}) { func longRunningPoolFunc(arg any) {
if ch, ok := arg.(chan struct{}); ok { if ch, ok := arg.(chan struct{}); ok {
<-ch <-ch
return return

View File

@ -93,7 +93,7 @@ func TestAntsPoolWaitToGetWorkerPreMalloc(t *testing.T) {
// TestAntsPoolWithFuncWaitToGetWorker is used to test waiting to get worker. // TestAntsPoolWithFuncWaitToGetWorker is used to test waiting to get worker.
func TestAntsPoolWithFuncWaitToGetWorker(t *testing.T) { func TestAntsPoolWithFuncWaitToGetWorker(t *testing.T) {
var wg sync.WaitGroup var wg sync.WaitGroup
p, _ := NewPoolWithFunc(AntsSize, func(i interface{}) { p, _ := NewPoolWithFunc(AntsSize, func(i any) {
demoPoolFunc(i) demoPoolFunc(i)
wg.Done() wg.Done()
}) })
@ -113,7 +113,7 @@ func TestAntsPoolWithFuncWaitToGetWorker(t *testing.T) {
func TestAntsPoolWithFuncWaitToGetWorkerPreMalloc(t *testing.T) { func TestAntsPoolWithFuncWaitToGetWorkerPreMalloc(t *testing.T) {
var wg sync.WaitGroup var wg sync.WaitGroup
p, _ := NewPoolWithFunc(AntsSize, func(i interface{}) { p, _ := NewPoolWithFunc(AntsSize, func(i any) {
demoPoolFunc(i) demoPoolFunc(i)
wg.Done() wg.Done()
}, WithPreAlloc(true)) }, WithPreAlloc(true))
@ -227,7 +227,7 @@ func TestAntsPool(t *testing.T) {
func TestPanicHandler(t *testing.T) { func TestPanicHandler(t *testing.T) {
var panicCounter int64 var panicCounter int64
var wg sync.WaitGroup var wg sync.WaitGroup
p0, err := NewPool(10, WithPanicHandler(func(p interface{}) { p0, err := NewPool(10, WithPanicHandler(func(p any) {
defer wg.Done() defer wg.Done()
atomic.AddInt64(&panicCounter, 1) atomic.AddInt64(&panicCounter, 1)
t.Logf("catch panic with PanicHandler: %v", p) t.Logf("catch panic with PanicHandler: %v", p)
@ -242,7 +242,7 @@ func TestPanicHandler(t *testing.T) {
c := atomic.LoadInt64(&panicCounter) c := atomic.LoadInt64(&panicCounter)
assert.EqualValuesf(t, 1, c, "panic handler didn't work, panicCounter: %d", c) assert.EqualValuesf(t, 1, c, "panic handler didn't work, panicCounter: %d", c)
assert.EqualValues(t, 0, p0.Running(), "pool should be empty after panic") assert.EqualValues(t, 0, p0.Running(), "pool should be empty after panic")
p1, err := NewPoolWithFunc(10, func(p interface{}) { panic(p) }, WithPanicHandler(func(_ interface{}) { p1, err := NewPoolWithFunc(10, func(p any) { panic(p) }, WithPanicHandler(func(_ any) {
defer wg.Done() defer wg.Done()
atomic.AddInt64(&panicCounter, 1) atomic.AddInt64(&panicCounter, 1)
})) }))
@ -259,7 +259,7 @@ func TestPanicHandler(t *testing.T) {
func TestPanicHandlerPreMalloc(t *testing.T) { func TestPanicHandlerPreMalloc(t *testing.T) {
var panicCounter int64 var panicCounter int64
var wg sync.WaitGroup var wg sync.WaitGroup
p0, err := NewPool(10, WithPreAlloc(true), WithPanicHandler(func(p interface{}) { p0, err := NewPool(10, WithPreAlloc(true), WithPanicHandler(func(p any) {
defer wg.Done() defer wg.Done()
atomic.AddInt64(&panicCounter, 1) atomic.AddInt64(&panicCounter, 1)
t.Logf("catch panic with PanicHandler: %v", p) t.Logf("catch panic with PanicHandler: %v", p)
@ -274,7 +274,7 @@ func TestPanicHandlerPreMalloc(t *testing.T) {
c := atomic.LoadInt64(&panicCounter) c := atomic.LoadInt64(&panicCounter)
assert.EqualValuesf(t, 1, c, "panic handler didn't work, panicCounter: %d", c) assert.EqualValuesf(t, 1, c, "panic handler didn't work, panicCounter: %d", c)
assert.EqualValues(t, 0, p0.Running(), "pool should be empty after panic") assert.EqualValues(t, 0, p0.Running(), "pool should be empty after panic")
p1, err := NewPoolWithFunc(10, func(p interface{}) { panic(p) }, WithPanicHandler(func(_ interface{}) { p1, err := NewPoolWithFunc(10, func(p any) { panic(p) }, WithPanicHandler(func(_ any) {
defer wg.Done() defer wg.Done()
atomic.AddInt64(&panicCounter, 1) atomic.AddInt64(&panicCounter, 1)
})) }))
@ -296,7 +296,7 @@ func TestPoolPanicWithoutHandler(t *testing.T) {
panic("Oops!") panic("Oops!")
}) })
p1, err := NewPoolWithFunc(10, func(p interface{}) { p1, err := NewPoolWithFunc(10, func(p any) {
panic(p) panic(p)
}) })
assert.NoErrorf(t, err, "create new pool with func failed: %v", err) assert.NoErrorf(t, err, "create new pool with func failed: %v", err)
@ -312,7 +312,7 @@ func TestPoolPanicWithoutHandlerPreMalloc(t *testing.T) {
panic("Oops!") panic("Oops!")
}) })
p1, err := NewPoolWithFunc(10, func(p interface{}) { p1, err := NewPoolWithFunc(10, func(p any) {
panic(p) panic(p)
}) })
@ -345,7 +345,7 @@ func TestPurgePool(t *testing.T) {
assert.Equalf(t, 0, p.Running(), "pool should be empty after purge, but got %d", p.Running()) assert.Equalf(t, 0, p.Running(), "pool should be empty after purge, but got %d", p.Running())
ch = make(chan struct{}) ch = make(chan struct{})
f := func(i interface{}) { f := func(i any) {
<-ch <-ch
d := i.(int) % 100 d := i.(int) % 100
time.Sleep(time.Duration(d) * time.Millisecond) time.Sleep(time.Duration(d) * time.Millisecond)
@ -445,7 +445,7 @@ func TestMaxBlockingSubmit(t *testing.T) {
func TestNonblockingSubmitWithFunc(t *testing.T) { func TestNonblockingSubmitWithFunc(t *testing.T) {
poolSize := 10 poolSize := 10
var wg sync.WaitGroup var wg sync.WaitGroup
p, err := NewPoolWithFunc(poolSize, func(i interface{}) { p, err := NewPoolWithFunc(poolSize, func(i any) {
longRunningPoolFunc(i) longRunningPoolFunc(i)
wg.Done() wg.Done()
}, WithNonblocking(true)) }, WithNonblocking(true))
@ -537,7 +537,7 @@ func TestRebootNewPool(t *testing.T) {
assert.NoError(t, p.Submit(func() { wg.Done() }), "pool should be rebooted") assert.NoError(t, p.Submit(func() { wg.Done() }), "pool should be rebooted")
wg.Wait() wg.Wait()
p1, err := NewPoolWithFunc(10, func(i interface{}) { p1, err := NewPoolWithFunc(10, func(i any) {
demoPoolFunc(i) demoPoolFunc(i)
wg.Done() wg.Done()
}) })
@ -667,7 +667,7 @@ func TestWithDisablePurgePoolFunc(t *testing.T) {
var wg1, wg2 sync.WaitGroup var wg1, wg2 sync.WaitGroup
wg1.Add(numWorker) wg1.Add(numWorker)
wg2.Add(numWorker) wg2.Add(numWorker)
p, _ := NewPoolWithFunc(numWorker, func(_ interface{}) { p, _ := NewPoolWithFunc(numWorker, func(_ any) {
wg1.Done() wg1.Done()
<-sig <-sig
wg2.Done() wg2.Done()
@ -682,7 +682,7 @@ func TestWithDisablePurgeAndWithExpirationPoolFunc(t *testing.T) {
wg1.Add(numWorker) wg1.Add(numWorker)
wg2.Add(numWorker) wg2.Add(numWorker)
expiredDuration := time.Millisecond * 100 expiredDuration := time.Millisecond * 100
p, _ := NewPoolWithFunc(numWorker, func(_ interface{}) { p, _ := NewPoolWithFunc(numWorker, func(_ any) {
wg1.Done() wg1.Done()
<-sig <-sig
wg2.Done() wg2.Done()
@ -692,7 +692,7 @@ func TestWithDisablePurgeAndWithExpirationPoolFunc(t *testing.T) {
func TestInfinitePoolWithFunc(t *testing.T) { func TestInfinitePoolWithFunc(t *testing.T) {
c := make(chan struct{}) c := make(chan struct{})
p, _ := NewPoolWithFunc(-1, func(i interface{}) { p, _ := NewPoolWithFunc(-1, func(i any) {
demoPoolFunc(i) demoPoolFunc(i)
<-c <-c
}) })
@ -759,7 +759,7 @@ func TestReleaseWhenRunningPool(t *testing.T) {
func TestReleaseWhenRunningPoolWithFunc(t *testing.T) { func TestReleaseWhenRunningPoolWithFunc(t *testing.T) {
var wg sync.WaitGroup var wg sync.WaitGroup
p, _ := NewPoolWithFunc(1, func(i interface{}) { p, _ := NewPoolWithFunc(1, func(i any) {
t.Log("do task", i) t.Log("do task", i)
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
}) })
@ -914,7 +914,7 @@ func TestPoolTuneScaleUp(t *testing.T) {
p.Release() p.Release()
// test PoolWithFunc // test PoolWithFunc
pf, _ := NewPoolWithFunc(2, func(_ interface{}) { pf, _ := NewPoolWithFunc(2, func(_ any) {
<-c <-c
}) })
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
@ -962,7 +962,7 @@ func TestReleaseTimeout(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
var pf *PoolWithFunc var pf *PoolWithFunc
pf, _ = NewPoolWithFunc(10, func(i interface{}) { pf, _ = NewPoolWithFunc(10, func(i any) {
dur := i.(time.Duration) dur := i.(time.Duration)
time.Sleep(dur) time.Sleep(dur)
}) })

View File

@ -33,7 +33,7 @@ import (
var sum int32 var sum int32
func myFunc(i interface{}) { func myFunc(i any) {
n := i.(int32) n := i.(int32)
atomic.AddInt32(&sum, n) atomic.AddInt32(&sum, n)
fmt.Printf("run with %d\n", n) fmt.Printf("run with %d\n", n)
@ -65,7 +65,7 @@ func main() {
// Use the pool with a function, // Use the pool with a function,
// set 10 to the capacity of goroutine pool and 1 second for expired duration. // set 10 to the capacity of goroutine pool and 1 second for expired duration.
p, _ := ants.NewPoolWithFunc(10, func(i interface{}) { p, _ := ants.NewPoolWithFunc(10, func(i any) {
myFunc(i) myFunc(i)
wg.Done() wg.Done()
}) })
@ -96,7 +96,7 @@ func main() {
fmt.Printf("finish all tasks.\n") fmt.Printf("finish all tasks.\n")
// Use the MultiPoolFunc and set the capacity of 10 goroutine pools to (runTimes/10). // Use the MultiPoolFunc and set the capacity of 10 goroutine pools to (runTimes/10).
mpf, _ := ants.NewMultiPoolWithFunc(10, runTimes/10, func(i interface{}) { mpf, _ := ants.NewMultiPoolWithFunc(10, runTimes/10, func(i any) {
myFunc(i) myFunc(i)
wg.Done() wg.Done()
}, ants.LeastTasks) }, ants.LeastTasks)

8
go.mod
View File

@ -1,8 +1,14 @@
module github.com/panjf2000/ants/v2 module github.com/panjf2000/ants/v2
go 1.16 go 1.18
require ( require (
github.com/stretchr/testify v1.8.2 github.com/stretchr/testify v1.8.2
golang.org/x/sync v0.3.0 golang.org/x/sync v0.3.0
) )
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

View File

@ -46,7 +46,7 @@ type MultiPoolWithFunc struct {
// NewMultiPoolWithFunc instantiates a MultiPoolWithFunc with a size of the pool list and a size // NewMultiPoolWithFunc instantiates a MultiPoolWithFunc with a size of the pool list and a size
// per pool, and the load-balancing strategy. // per pool, and the load-balancing strategy.
func NewMultiPoolWithFunc(size, sizePerPool int, fn func(interface{}), lbs LoadBalancingStrategy, options ...Option) (*MultiPoolWithFunc, error) { func NewMultiPoolWithFunc(size, sizePerPool int, fn func(any), lbs LoadBalancingStrategy, options ...Option) (*MultiPoolWithFunc, error) {
if lbs != RoundRobin && lbs != LeastTasks { if lbs != RoundRobin && lbs != LeastTasks {
return nil, ErrInvalidLoadBalancingStrategy return nil, ErrInvalidLoadBalancingStrategy
} }
@ -82,7 +82,7 @@ func (mp *MultiPoolWithFunc) next(lbs LoadBalancingStrategy) (idx int) {
} }
// Invoke submits a task to a pool selected by the load-balancing strategy. // Invoke submits a task to a pool selected by the load-balancing strategy.
func (mp *MultiPoolWithFunc) Invoke(args interface{}) (err error) { func (mp *MultiPoolWithFunc) Invoke(args any) (err error) {
if mp.IsClosed() { if mp.IsClosed() {
return ErrPoolClosed return ErrPoolClosed
} }

View File

@ -34,7 +34,7 @@ type Options struct {
// PanicHandler is used to handle panics from each worker goroutine. // PanicHandler is used to handle panics from each worker goroutine.
// if nil, panics will be thrown out again from worker goroutines. // if nil, panics will be thrown out again from worker goroutines.
PanicHandler func(interface{}) PanicHandler func(any)
// Logger is the customized logger for logging info, if it is not set, // Logger is the customized logger for logging info, if it is not set,
// default standard logger from log package is used. // default standard logger from log package is used.
@ -80,7 +80,7 @@ func WithNonblocking(nonblocking bool) Option {
} }
// WithPanicHandler sets up panic handler. // WithPanicHandler sets up panic handler.
func WithPanicHandler(panicHandler func(interface{})) Option { func WithPanicHandler(panicHandler func(any)) Option {
return func(opts *Options) { return func(opts *Options) {
opts.PanicHandler = panicHandler opts.PanicHandler = panicHandler
} }

398
pool.go
View File

@ -22,203 +22,13 @@
package ants package ants
import ( // Pool is a goroutine pool that limits and recycles a mass of goroutines.
"context" // The pool capacity can be fixed or unlimited.
"sync"
"sync/atomic"
"time"
syncx "github.com/panjf2000/ants/v2/pkg/sync"
)
type poolCommon struct {
// capacity of the pool, a negative value means that the capacity of pool is limitless, an infinite pool is used to
// avoid potential issue of endless blocking caused by nested usage of a pool: submitting a task to pool
// which submits a new task to the same pool.
capacity int32
// running is the number of the currently running goroutines.
running int32
// lock for protecting the worker queue.
lock sync.Locker
// workers is a slice that store the available workers.
workers workerQueue
// state is used to notice the pool to closed itself.
state int32
// cond for waiting to get an idle worker.
cond *sync.Cond
// done is used to indicate that all workers are done.
allDone chan struct{}
// once is used to make sure the pool is closed just once.
once *sync.Once
// workerCache speeds up the obtainment of a usable worker in function:retrieveWorker.
workerCache sync.Pool
// waiting is the number of goroutines already been blocked on pool.Submit(), protected by pool.lock
waiting int32
purgeDone int32
purgeCtx context.Context
stopPurge context.CancelFunc
ticktockDone int32
ticktockCtx context.Context
stopTicktock context.CancelFunc
now atomic.Value
options *Options
}
// Pool accepts the tasks and process them concurrently,
// it limits the total of goroutines to a given number by recycling goroutines.
type Pool struct { type Pool struct {
poolCommon *poolCommon
} }
// purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger. // Submit submits a task to the pool.
func (p *Pool) purgeStaleWorkers() {
ticker := time.NewTicker(p.options.ExpiryDuration)
defer func() {
ticker.Stop()
atomic.StoreInt32(&p.purgeDone, 1)
}()
purgeCtx := p.purgeCtx // copy to the local variable to avoid race from Reboot()
for {
select {
case <-purgeCtx.Done():
return
case <-ticker.C:
}
if p.IsClosed() {
break
}
var isDormant bool
p.lock.Lock()
staleWorkers := p.workers.refresh(p.options.ExpiryDuration)
n := p.Running()
isDormant = n == 0 || n == len(staleWorkers)
p.lock.Unlock()
// Clean up the stale workers.
for i := range staleWorkers {
staleWorkers[i].finish()
staleWorkers[i] = nil
}
// There might be a situation where all workers have been cleaned up (no worker is running),
// while some invokers still are stuck in p.cond.Wait(), then we need to awake those invokers.
if isDormant && p.Waiting() > 0 {
p.cond.Broadcast()
}
}
}
// ticktock is a goroutine that updates the current time in the pool regularly.
func (p *Pool) ticktock() {
ticker := time.NewTicker(nowTimeUpdateInterval)
defer func() {
ticker.Stop()
atomic.StoreInt32(&p.ticktockDone, 1)
}()
ticktockCtx := p.ticktockCtx // copy to the local variable to avoid race from Reboot()
for {
select {
case <-ticktockCtx.Done():
return
case <-ticker.C:
}
if p.IsClosed() {
break
}
p.now.Store(time.Now())
}
}
func (p *Pool) goPurge() {
if p.options.DisablePurge {
return
}
// Start a goroutine to clean up expired workers periodically.
p.purgeCtx, p.stopPurge = context.WithCancel(context.Background())
go p.purgeStaleWorkers()
}
func (p *Pool) goTicktock() {
p.now.Store(time.Now())
p.ticktockCtx, p.stopTicktock = context.WithCancel(context.Background())
go p.ticktock()
}
func (p *Pool) nowTime() time.Time {
return p.now.Load().(time.Time)
}
// NewPool instantiates a Pool with customized options.
func NewPool(size int, options ...Option) (*Pool, error) {
if size <= 0 {
size = -1
}
opts := loadOptions(options...)
if !opts.DisablePurge {
if expiry := opts.ExpiryDuration; expiry < 0 {
return nil, ErrInvalidPoolExpiry
} else if expiry == 0 {
opts.ExpiryDuration = DefaultCleanIntervalTime
}
}
if opts.Logger == nil {
opts.Logger = defaultLogger
}
p := &Pool{poolCommon: poolCommon{
capacity: int32(size),
allDone: make(chan struct{}),
lock: syncx.NewSpinLock(),
once: &sync.Once{},
options: opts,
}}
p.workerCache.New = func() interface{} {
return &goWorker{
pool: p,
task: make(chan func(), workerChanCap),
}
}
if p.options.PreAlloc {
if size == -1 {
return nil, ErrInvalidPreAllocSize
}
p.workers = newWorkerQueue(queueTypeLoopQueue, size)
} else {
p.workers = newWorkerQueue(queueTypeStack, 0)
}
p.cond = sync.NewCond(p.lock)
p.goPurge()
p.goTicktock()
return p, nil
}
// Submit submits a task to this pool.
// //
// Note that you are allowed to call Pool.Submit() from the current Pool.Submit(), // Note that you are allowed to call Pool.Submit() from the current Pool.Submit(),
// but what calls for special attention is that you will get blocked with the last // but what calls for special attention is that you will get blocked with the last
@ -236,198 +46,20 @@ func (p *Pool) Submit(task func()) error {
return err return err
} }
// Running returns the number of workers currently running. // NewPool instantiates a Pool with customized options.
func (p *Pool) Running() int { func NewPool(size int, options ...Option) (*Pool, error) {
return int(atomic.LoadInt32(&p.running)) pc, err := newPool(size, options...)
if err != nil {
return nil, err
} }
// Free returns the number of available workers, -1 indicates this pool is unlimited. pool := &Pool{poolCommon: pc}
func (p *Pool) Free() int { pool.workerCache.New = func() any {
c := p.Cap() return &goWorker{
if c < 0 { pool: pool,
return -1 task: make(chan func(), workerChanCap),
}
return c - p.Running()
}
// Waiting returns the number of tasks waiting to be executed.
func (p *Pool) Waiting() int {
return int(atomic.LoadInt32(&p.waiting))
}
// Cap returns the capacity of this pool.
func (p *Pool) Cap() int {
return int(atomic.LoadInt32(&p.capacity))
}
// Tune changes the capacity of this pool, note that it is noneffective to the infinite or pre-allocation pool.
func (p *Pool) Tune(size int) {
capacity := p.Cap()
if capacity == -1 || size <= 0 || size == capacity || p.options.PreAlloc {
return
}
atomic.StoreInt32(&p.capacity, int32(size))
if size > capacity {
if size-capacity == 1 {
p.cond.Signal()
return
}
p.cond.Broadcast()
} }
} }
// IsClosed indicates whether the pool is closed. return pool, nil
func (p *Pool) IsClosed() bool {
return atomic.LoadInt32(&p.state) == CLOSED
}
// Release closes this pool and releases the worker queue.
func (p *Pool) Release() {
if !atomic.CompareAndSwapInt32(&p.state, OPENED, CLOSED) {
return
}
if p.stopPurge != nil {
p.stopPurge()
p.stopPurge = nil
}
if p.stopTicktock != nil {
p.stopTicktock()
p.stopTicktock = nil
}
p.lock.Lock()
p.workers.reset()
p.lock.Unlock()
// There might be some callers waiting in retrieveWorker(), so we need to wake them up to prevent
// those callers blocking infinitely.
p.cond.Broadcast()
}
// ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out.
func (p *Pool) ReleaseTimeout(timeout time.Duration) error {
if p.IsClosed() || (!p.options.DisablePurge && p.stopPurge == nil) || p.stopTicktock == nil {
return ErrPoolClosed
}
p.Release()
var purgeCh <-chan struct{}
if !p.options.DisablePurge {
purgeCh = p.purgeCtx.Done()
} else {
purgeCh = p.allDone
}
if p.Running() == 0 {
p.once.Do(func() {
close(p.allDone)
})
}
timer := time.NewTimer(timeout)
defer timer.Stop()
for {
select {
case <-timer.C:
return ErrTimeout
case <-p.allDone:
<-purgeCh
<-p.ticktockCtx.Done()
if p.Running() == 0 &&
(p.options.DisablePurge || atomic.LoadInt32(&p.purgeDone) == 1) &&
atomic.LoadInt32(&p.ticktockDone) == 1 {
return nil
}
}
}
}
// Reboot reboots a closed pool, it does nothing if the pool is not closed.
// If you intend to reboot a closed pool, use ReleaseTimeout() instead of
// Release() to ensure that all workers are stopped and resource are released
// before rebooting, otherwise you may run into data race.
func (p *Pool) Reboot() {
if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
atomic.StoreInt32(&p.purgeDone, 0)
p.goPurge()
atomic.StoreInt32(&p.ticktockDone, 0)
p.goTicktock()
p.allDone = make(chan struct{})
p.once = &sync.Once{}
}
}
func (p *Pool) addRunning(delta int) int {
return int(atomic.AddInt32(&p.running, int32(delta)))
}
func (p *Pool) addWaiting(delta int) {
atomic.AddInt32(&p.waiting, int32(delta))
}
// retrieveWorker returns an available worker to run the tasks.
func (p *Pool) retrieveWorker() (w worker, err error) {
p.lock.Lock()
retry:
// First try to fetch the worker from the queue.
if w = p.workers.detach(); w != nil {
p.lock.Unlock()
return
}
// If the worker queue is empty, and we don't run out of the pool capacity,
// then just spawn a new worker goroutine.
if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
p.lock.Unlock()
w = p.workerCache.Get().(*goWorker)
w.run()
return
}
// Bail out early if it's in nonblocking mode or the number of pending callers reaches the maximum limit value.
if p.options.Nonblocking || (p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks) {
p.lock.Unlock()
return nil, ErrPoolOverload
}
// Otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.
p.addWaiting(1)
p.cond.Wait() // block and wait for an available worker
p.addWaiting(-1)
if p.IsClosed() {
p.lock.Unlock()
return nil, ErrPoolClosed
}
goto retry
}
// revertWorker puts a worker back into free pool, recycling the goroutines.
func (p *Pool) revertWorker(worker *goWorker) bool {
if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() {
p.cond.Broadcast()
return false
}
worker.lastUsed = p.nowTime()
p.lock.Lock()
// To avoid memory leaks, add a double check in the lock scope.
// Issue: https://github.com/panjf2000/ants/issues/113
if p.IsClosed() {
p.lock.Unlock()
return false
}
if err := p.workers.insert(worker); err != nil {
p.lock.Unlock()
return false
}
// Notify the invoker stuck in 'retrieveWorker()' of there is an available worker in the worker queue.
p.cond.Signal()
p.lock.Unlock()
return true
} }

View File

@ -22,173 +22,21 @@
package ants package ants
import ( // PoolWithFunc is like Pool but accepts a unified function for all goroutines to execute.
"context"
"sync"
"sync/atomic"
"time"
syncx "github.com/panjf2000/ants/v2/pkg/sync"
)
// PoolWithFunc accepts the tasks and process them concurrently,
// it limits the total of goroutines to a given number by recycling goroutines.
type PoolWithFunc struct { type PoolWithFunc struct {
poolCommon *poolCommon
// poolFunc is the function for processing tasks. // poolFunc is the unified function for processing tasks.
poolFunc func(interface{}) poolFunc func(any)
} }
// purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger. // Invoke passes arguments to the pool.
func (p *PoolWithFunc) purgeStaleWorkers() {
ticker := time.NewTicker(p.options.ExpiryDuration)
defer func() {
ticker.Stop()
atomic.StoreInt32(&p.purgeDone, 1)
}()
purgeCtx := p.purgeCtx // copy to the local variable to avoid race from Reboot()
for {
select {
case <-purgeCtx.Done():
return
case <-ticker.C:
}
if p.IsClosed() {
break
}
var isDormant bool
p.lock.Lock()
staleWorkers := p.workers.refresh(p.options.ExpiryDuration)
n := p.Running()
isDormant = n == 0 || n == len(staleWorkers)
p.lock.Unlock()
// Clean up the stale workers.
for i := range staleWorkers {
staleWorkers[i].finish()
staleWorkers[i] = nil
}
// There might be a situation where all workers have been cleaned up (no worker is running),
// while some invokers still are stuck in p.cond.Wait(), then we need to awake those invokers.
if isDormant && p.Waiting() > 0 {
p.cond.Broadcast()
}
}
}
// ticktock is a goroutine that updates the current time in the pool regularly.
func (p *PoolWithFunc) ticktock() {
ticker := time.NewTicker(nowTimeUpdateInterval)
defer func() {
ticker.Stop()
atomic.StoreInt32(&p.ticktockDone, 1)
}()
ticktockCtx := p.ticktockCtx // copy to the local variable to avoid race from Reboot()
for {
select {
case <-ticktockCtx.Done():
return
case <-ticker.C:
}
if p.IsClosed() {
break
}
p.now.Store(time.Now())
}
}
func (p *PoolWithFunc) goPurge() {
if p.options.DisablePurge {
return
}
// Start a goroutine to clean up expired workers periodically.
p.purgeCtx, p.stopPurge = context.WithCancel(context.Background())
go p.purgeStaleWorkers()
}
func (p *PoolWithFunc) goTicktock() {
p.now.Store(time.Now())
p.ticktockCtx, p.stopTicktock = context.WithCancel(context.Background())
go p.ticktock()
}
func (p *PoolWithFunc) nowTime() time.Time {
return p.now.Load().(time.Time)
}
// NewPoolWithFunc instantiates a PoolWithFunc with customized options.
func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWithFunc, error) {
if size <= 0 {
size = -1
}
if pf == nil {
return nil, ErrLackPoolFunc
}
opts := loadOptions(options...)
if !opts.DisablePurge {
if expiry := opts.ExpiryDuration; expiry < 0 {
return nil, ErrInvalidPoolExpiry
} else if expiry == 0 {
opts.ExpiryDuration = DefaultCleanIntervalTime
}
}
if opts.Logger == nil {
opts.Logger = defaultLogger
}
p := &PoolWithFunc{
poolCommon: poolCommon{
capacity: int32(size),
allDone: make(chan struct{}),
lock: syncx.NewSpinLock(),
once: &sync.Once{},
options: opts,
},
poolFunc: pf,
}
p.workerCache.New = func() interface{} {
return &goWorkerWithFunc{
pool: p,
args: make(chan interface{}, workerChanCap),
}
}
if p.options.PreAlloc {
if size == -1 {
return nil, ErrInvalidPreAllocSize
}
p.workers = newWorkerQueue(queueTypeLoopQueue, size)
} else {
p.workers = newWorkerQueue(queueTypeStack, 0)
}
p.cond = sync.NewCond(p.lock)
p.goPurge()
p.goTicktock()
return p, nil
}
// Invoke submits a task to pool.
// //
// Note that you are allowed to call Pool.Invoke() from the current Pool.Invoke(), // Note that you are allowed to call Pool.Invoke() from the current Pool.Invoke(),
// but what calls for special attention is that you will get blocked with the last // but what calls for special attention is that you will get blocked with the last
// Pool.Invoke() call once the current Pool runs out of its capacity, and to avoid this, // Pool.Invoke() call once the current Pool runs out of its capacity, and to avoid this,
// you should instantiate a PoolWithFunc with ants.WithNonblocking(true). // you should instantiate a PoolWithFunc with ants.WithNonblocking(true).
func (p *PoolWithFunc) Invoke(args interface{}) error { func (p *PoolWithFunc) Invoke(args any) error {
if p.IsClosed() { if p.IsClosed() {
return ErrPoolClosed return ErrPoolClosed
} }
@ -200,198 +48,28 @@ func (p *PoolWithFunc) Invoke(args interface{}) error {
return err return err
} }
// Running returns the number of workers currently running. // NewPoolWithFunc instantiates a PoolWithFunc with customized options.
func (p *PoolWithFunc) Running() int { func NewPoolWithFunc(size int, pf func(any), options ...Option) (*PoolWithFunc, error) {
return int(atomic.LoadInt32(&p.running)) if pf == nil {
return nil, ErrLackPoolFunc
} }
// Free returns the number of available workers, -1 indicates this pool is unlimited. pc, err := newPool(size, options...)
func (p *PoolWithFunc) Free() int { if err != nil {
c := p.Cap() return nil, err
if c < 0 {
return -1
}
return c - p.Running()
} }
// Waiting returns the number of tasks waiting to be executed. pool := &PoolWithFunc{
func (p *PoolWithFunc) Waiting() int { poolCommon: pc,
return int(atomic.LoadInt32(&p.waiting)) poolFunc: pf,
} }
// Cap returns the capacity of this pool. pool.workerCache.New = func() any {
func (p *PoolWithFunc) Cap() int { return &goWorkerWithFunc{
return int(atomic.LoadInt32(&p.capacity)) pool: pool,
} args: make(chan any, workerChanCap),
// Tune changes the capacity of this pool, note that it is noneffective to the infinite or pre-allocation pool.
func (p *PoolWithFunc) Tune(size int) {
capacity := p.Cap()
if capacity == -1 || size <= 0 || size == capacity || p.options.PreAlloc {
return
}
atomic.StoreInt32(&p.capacity, int32(size))
if size > capacity {
if size-capacity == 1 {
p.cond.Signal()
return
}
p.cond.Broadcast()
} }
} }
// IsClosed indicates whether the pool is closed. return pool, nil
func (p *PoolWithFunc) IsClosed() bool {
return atomic.LoadInt32(&p.state) == CLOSED
}
// Release closes this pool and releases the worker queue.
func (p *PoolWithFunc) Release() {
if !atomic.CompareAndSwapInt32(&p.state, OPENED, CLOSED) {
return
}
if p.stopPurge != nil {
p.stopPurge()
p.stopPurge = nil
}
if p.stopTicktock != nil {
p.stopTicktock()
p.stopTicktock = nil
}
p.lock.Lock()
p.workers.reset()
p.lock.Unlock()
// There might be some callers waiting in retrieveWorker(), so we need to wake them up to prevent
// those callers blocking infinitely.
p.cond.Broadcast()
}
// ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out.
func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error {
if p.IsClosed() || (!p.options.DisablePurge && p.stopPurge == nil) || p.stopTicktock == nil {
return ErrPoolClosed
}
p.Release()
var purgeCh <-chan struct{}
if !p.options.DisablePurge {
purgeCh = p.purgeCtx.Done()
} else {
purgeCh = p.allDone
}
if p.Running() == 0 {
p.once.Do(func() {
close(p.allDone)
})
}
timer := time.NewTimer(timeout)
defer timer.Stop()
for {
select {
case <-timer.C:
return ErrTimeout
case <-p.allDone:
<-purgeCh
<-p.ticktockCtx.Done()
if p.Running() == 0 &&
(p.options.DisablePurge || atomic.LoadInt32(&p.purgeDone) == 1) &&
atomic.LoadInt32(&p.ticktockDone) == 1 {
return nil
}
}
}
}
// Reboot reboots a closed pool, it does nothing if the pool is not closed.
// If you intend to reboot a closed pool, use ReleaseTimeout() instead of
// Release() to ensure that all workers are stopped and resource are released
// before rebooting, otherwise you may run into data race.
func (p *PoolWithFunc) Reboot() {
if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
atomic.StoreInt32(&p.purgeDone, 0)
p.goPurge()
atomic.StoreInt32(&p.ticktockDone, 0)
p.goTicktock()
p.allDone = make(chan struct{})
p.once = &sync.Once{}
}
}
func (p *PoolWithFunc) addRunning(delta int) int {
return int(atomic.AddInt32(&p.running, int32(delta)))
}
func (p *PoolWithFunc) addWaiting(delta int) {
atomic.AddInt32(&p.waiting, int32(delta))
}
// retrieveWorker returns an available worker to run the tasks.
func (p *PoolWithFunc) retrieveWorker() (w worker, err error) {
p.lock.Lock()
retry:
// First try to fetch the worker from the queue.
if w = p.workers.detach(); w != nil {
p.lock.Unlock()
return
}
// If the worker queue is empty, and we don't run out of the pool capacity,
// then just spawn a new worker goroutine.
if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
p.lock.Unlock()
w = p.workerCache.Get().(*goWorkerWithFunc)
w.run()
return
}
// Bail out early if it's in nonblocking mode or the number of pending callers reaches the maximum limit value.
if p.options.Nonblocking || (p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks) {
p.lock.Unlock()
return nil, ErrPoolOverload
}
// Otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.
p.addWaiting(1)
p.cond.Wait() // block and wait for an available worker
p.addWaiting(-1)
if p.IsClosed() {
p.lock.Unlock()
return nil, ErrPoolClosed
}
goto retry
}
// revertWorker puts a worker back into free pool, recycling the goroutines.
func (p *PoolWithFunc) revertWorker(worker *goWorkerWithFunc) bool {
if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() {
p.cond.Broadcast()
return false
}
worker.lastUsed = p.nowTime()
p.lock.Lock()
// To avoid memory leaks, add a double check in the lock scope.
// Issue: https://github.com/panjf2000/ants/issues/113
if p.IsClosed() {
p.lock.Unlock()
return false
}
if err := p.workers.insert(worker); err != nil {
p.lock.Unlock()
return false
}
// Notify the invoker stuck in 'retrieveWorker()' of there is an available worker in the worker queue.
p.cond.Signal()
p.lock.Unlock()
return true
} }

View File

@ -84,10 +84,14 @@ func (w *goWorker) lastUsedTime() time.Time {
return w.lastUsed return w.lastUsed
} }
func (w *goWorker) setLastUsedTime(t time.Time) {
w.lastUsed = t
}
func (w *goWorker) inputFunc(fn func()) { func (w *goWorker) inputFunc(fn func()) {
w.task <- fn w.task <- fn
} }
func (w *goWorker) inputParam(interface{}) { func (w *goWorker) inputParam(any) {
panic("unreachable") panic("unreachable")
} }

View File

@ -35,7 +35,7 @@ type goWorkerWithFunc struct {
pool *PoolWithFunc pool *PoolWithFunc
// args is a job should be done. // args is a job should be done.
args chan interface{} args chan any
// lastUsed will be updated when putting a worker back into queue. // lastUsed will be updated when putting a worker back into queue.
lastUsed time.Time lastUsed time.Time
@ -84,10 +84,14 @@ func (w *goWorkerWithFunc) lastUsedTime() time.Time {
return w.lastUsed return w.lastUsed
} }
func (w *goWorkerWithFunc) setLastUsedTime(t time.Time) {
w.lastUsed = t
}
func (w *goWorkerWithFunc) inputFunc(func()) { func (w *goWorkerWithFunc) inputFunc(func()) {
panic("unreachable") panic("unreachable")
} }
func (w *goWorkerWithFunc) inputParam(arg interface{}) { func (w *goWorkerWithFunc) inputParam(arg any) {
w.args <- arg w.args <- arg
} }

View File

@ -1,5 +1,4 @@
//go:build !windows //go:build !windows
// +build !windows
package ants package ants

View File

@ -17,8 +17,9 @@ type worker interface {
run() run()
finish() finish()
lastUsedTime() time.Time lastUsedTime() time.Time
setLastUsedTime(t time.Time)
inputFunc(func()) inputFunc(func())
inputParam(interface{}) inputParam(any)
} }
type workerQueue interface { type workerQueue interface {

View File

@ -1,5 +1,4 @@
//go:build !windows //go:build !windows
// +build !windows
package ants package ants