diff --git a/ants.go b/ants.go index df77ace..c8248f5 100644 --- a/ants.go +++ b/ants.go @@ -88,6 +88,8 @@ var ( defaultAntsPool, _ = NewPool(DefaultAntsPoolSize) ) +const nowTimeUpdateInterval = 500 * time.Millisecond + // Logger is used for logging formatted messages. type Logger interface { // Printf must have the same semantics as log.Printf. diff --git a/ants_benchmark_test.go b/ants_benchmark_test.go index 0dfd8ee..c8364e5 100644 --- a/ants_benchmark_test.go +++ b/ants_benchmark_test.go @@ -25,6 +25,7 @@ package ants import ( "runtime" "sync" + "sync/atomic" "testing" "time" ) @@ -144,3 +145,44 @@ func BenchmarkAntsPoolThroughput(b *testing.B) { } b.StopTimer() } + +func BenchmarkTimeNow(b *testing.B) { + for i := 0; i < b.N; i++ { + _ = time.Now() + } +} + +func BenchmarkTimeNowCache(b *testing.B) { + var ( + now atomic.Value + offset int32 + ) + + now.Store(time.Now()) + go func() { + for range time.Tick(500 * time.Millisecond) { + now.Store(time.Now()) + atomic.StoreInt32(&offset, 0) + } + }() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = now.Load().(time.Time).Add(time.Duration(atomic.AddInt32(&offset, 1))) + } +} + +func BenchmarkTimeNowCache1(b *testing.B) { + var now atomic.Value + now.Store(time.Now()) + go func() { + for range time.Tick(500 * time.Millisecond) { + now.Store(time.Now()) + } + }() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = now.Load().(time.Time) + } +} diff --git a/ants_test.go b/ants_test.go index 7342542..93bbbea 100644 --- a/ants_test.go +++ b/ants_test.go @@ -322,22 +322,50 @@ func TestPoolPanicWithoutHandlerPreMalloc(t *testing.T) { _ = p1.Invoke("Oops!") } -func TestPurge(t *testing.T) { - p, err := NewPool(10) +func TestPurgePool(t *testing.T) { + size := 500 + ch := make(chan struct{}) + + p, err := NewPool(size) assert.NoErrorf(t, err, "create TimingPool failed: %v", err) defer p.Release() - _ = p.Submit(demoFunc) - time.Sleep(3 * DefaultCleanIntervalTime) - assert.EqualValues(t, 0, p.Running(), "all p should be purged") - p1, err := NewPoolWithFunc(10, demoPoolFunc) + + for i := 0; i < size; i++ { + j := i + 1 + _ = p.Submit(func() { + <-ch + d := j % 100 + time.Sleep(time.Duration(d) * time.Millisecond) + }) + } + assert.Equalf(t, size, p.Running(), "pool should be full, expected: %d, but got: %d", size, p.Running()) + + close(ch) + time.Sleep(5 * DefaultCleanIntervalTime) + assert.Equalf(t, 0, p.Running(), "pool should be empty after purge, but got %d", p.Running()) + + ch = make(chan struct{}) + f := func(i interface{}) { + <-ch + d := i.(int) % 100 + time.Sleep(time.Duration(d) * time.Millisecond) + } + + p1, err := NewPoolWithFunc(size, f) assert.NoErrorf(t, err, "create TimingPoolWithFunc failed: %v", err) defer p1.Release() - _ = p1.Invoke(1) - time.Sleep(3 * DefaultCleanIntervalTime) - assert.EqualValues(t, 0, p.Running(), "all p should be purged") + + for i := 0; i < size; i++ { + _ = p1.Invoke(i) + } + assert.Equalf(t, size, p1.Running(), "pool should be full, expected: %d, but got: %d", size, p1.Running()) + + close(ch) + time.Sleep(5 * DefaultCleanIntervalTime) + assert.Equalf(t, 0, p1.Running(), "pool should be empty after purge, but got %d", p1.Running()) } -func TestPurgePreMalloc(t *testing.T) { +func TestPurgePreMallocPool(t *testing.T) { p, err := NewPool(10, WithPreAlloc(true)) assert.NoErrorf(t, err, "create TimingPool failed: %v", err) defer p.Release() @@ -547,9 +575,7 @@ func TestInfinitePool(t *testing.T) { } var err error _, err = NewPool(-1, WithPreAlloc(true)) - if err != ErrInvalidPreAllocSize { - t.Errorf("expect ErrInvalidPreAllocSize but got %v", err) - } + assert.EqualErrorf(t, err, ErrInvalidPreAllocSize.Error(), "") } func testPoolWithDisablePurge(t *testing.T, p *Pool, numWorker int, waitForPurge time.Duration) { diff --git a/pool.go b/pool.go index d5e7b7b..95909e9 100644 --- a/pool.go +++ b/pool.go @@ -62,11 +62,13 @@ type Pool struct { heartbeatDone int32 stopHeartbeat context.CancelFunc + now atomic.Value + options *Options } -// purgePeriodically clears expired workers periodically which runs in an individual goroutine, as a scavenger. -func (p *Pool) purgePeriodically(ctx context.Context) { +// purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger. +func (p *Pool) purgeStaleWorkers(ctx context.Context) { heartbeat := time.NewTicker(p.options.ExpiryDuration) defer func() { @@ -76,9 +78,9 @@ func (p *Pool) purgePeriodically(ctx context.Context) { for { select { - case <-heartbeat.C: case <-ctx.Done(): return + case <-heartbeat.C: } if p.IsClosed() { @@ -108,6 +110,20 @@ func (p *Pool) purgePeriodically(ctx context.Context) { } } +// ticktock is a goroutine that updates the current time in the pool regularly. +func (p *Pool) ticktock() { + ticker := time.NewTicker(nowTimeUpdateInterval) + defer ticker.Stop() + + for range ticker.C { + p.now.Store(time.Now()) + } +} + +func (p *Pool) nowTime() time.Time { + return p.now.Load().(time.Time) +} + // NewPool generates an instance of ants pool. func NewPool(size int, options ...Option) (*Pool, error) { opts := loadOptions(options...) @@ -154,8 +170,12 @@ func NewPool(size int, options ...Option) (*Pool, error) { var ctx context.Context ctx, p.stopHeartbeat = context.WithCancel(context.Background()) if !p.options.DisablePurge { - go p.purgePeriodically(ctx) + go p.purgeStaleWorkers(ctx) } + + p.now.Store(time.Now()) + go p.ticktock() + return p, nil } @@ -264,7 +284,7 @@ func (p *Pool) Reboot() { var ctx context.Context ctx, p.stopHeartbeat = context.WithCancel(context.Background()) if !p.options.DisablePurge { - go p.purgePeriodically(ctx) + go p.purgeStaleWorkers(ctx) } } } @@ -340,7 +360,7 @@ func (p *Pool) revertWorker(worker *goWorker) bool { p.cond.Broadcast() return false } - worker.recycleTime = time.Now() + worker.recycleTime = p.nowTime() p.lock.Lock() // To avoid memory leaks, add a double check in the lock scope. diff --git a/pool_func.go b/pool_func.go index c72cf34..6bca66e 100644 --- a/pool_func.go +++ b/pool_func.go @@ -64,11 +64,13 @@ type PoolWithFunc struct { heartbeatDone int32 stopHeartbeat context.CancelFunc + now atomic.Value + options *Options } -// purgePeriodically clears expired workers periodically which runs in an individual goroutine, as a scavenger. -func (p *PoolWithFunc) purgePeriodically(ctx context.Context) { +// purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger. +func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) { heartbeat := time.NewTicker(p.options.ExpiryDuration) defer func() { heartbeat.Stop() @@ -78,9 +80,9 @@ func (p *PoolWithFunc) purgePeriodically(ctx context.Context) { var expiredWorkers []*goWorkerWithFunc for { select { - case <-heartbeat.C: case <-ctx.Done(): return + case <-heartbeat.C: } if p.IsClosed() { @@ -123,6 +125,20 @@ func (p *PoolWithFunc) purgePeriodically(ctx context.Context) { } } +// ticktock is a goroutine that updates the current time in the pool regularly. +func (p *PoolWithFunc) ticktock() { + ticker := time.NewTicker(nowTimeUpdateInterval) + defer ticker.Stop() + + for range ticker.C { + p.now.Store(time.Now()) + } +} + +func (p *PoolWithFunc) nowTime() time.Time { + return p.now.Load().(time.Time) +} + // NewPoolWithFunc generates an instance of ants pool with a specific function. func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWithFunc, error) { if size <= 0 { @@ -171,8 +187,12 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi var ctx context.Context ctx, p.stopHeartbeat = context.WithCancel(context.Background()) if !p.options.DisablePurge { - go p.purgePeriodically(ctx) + go p.purgeStaleWorkers(ctx) } + + p.now.Store(time.Now()) + go p.ticktock() + return p, nil } @@ -285,7 +305,7 @@ func (p *PoolWithFunc) Reboot() { var ctx context.Context ctx, p.stopHeartbeat = context.WithCancel(context.Background()) if !p.options.DisablePurge { - go p.purgePeriodically(ctx) + go p.purgeStaleWorkers(ctx) } } } @@ -368,7 +388,7 @@ func (p *PoolWithFunc) revertWorker(worker *goWorkerWithFunc) bool { p.cond.Broadcast() return false } - worker.recycleTime = time.Now() + worker.recycleTime = p.nowTime() p.lock.Lock() // To avoid memory leaks, add a double check in the lock scope.