From d04febc0b2febe12538c03962be0970841d8b201 Mon Sep 17 00:00:00 2001 From: andy pan Date: Fri, 6 Jul 2018 14:33:07 +0800 Subject: [PATCH] clear expired workers for Pool --- pool.go | 32 +++++++++++++++++++++++++++++++- worker.go | 4 ++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/pool.go b/pool.go index 003b11d..ef03d8a 100644 --- a/pool.go +++ b/pool.go @@ -26,6 +26,7 @@ import ( "math" "sync" "sync/atomic" + "time" ) type sig struct{} @@ -41,6 +42,9 @@ type Pool struct { // running is the number of the currently running goroutines. running int32 + // expiryDuration set the expired time (second) of every worker. + expiryDuration time.Duration + // freeSignal is used to notice pool there are available // workers which can be sent to work. freeSignal chan sig @@ -57,8 +61,32 @@ type Pool struct { once sync.Once } +func (p *Pool) MonitorAndClear() { + go func() { + for { + time.Sleep(p.expiryDuration) + currentTime := time.Now() + p.lock.Lock() + idleWorkers := p.workers + n := 0 + for i, w := range idleWorkers { + if currentTime.Sub(w.recycleTime) <= p.expiryDuration { + break + } + n = i + w.stop() + idleWorkers[i] = nil + } + n += 1 + p.workers = idleWorkers[n:] + p.lock.Unlock() + } + }() +} + + // NewPool generates a instance of ants pool -func NewPool(size int) (*Pool, error) { +func NewPool(size, expiry int) (*Pool, error) { if size <= 0 { return nil, ErrPoolSizeInvalid } @@ -66,6 +94,7 @@ func NewPool(size int) (*Pool, error) { capacity: int32(size), freeSignal: make(chan sig, math.MaxInt32), release: make(chan sig, 1), + expiryDuration: time.Duration(expiry)*time.Second, } return p, nil @@ -171,6 +200,7 @@ func (p *Pool) getWorker() *Worker { // putWorker puts a worker back into free pool, recycling the goroutines. func (p *Pool) putWorker(worker *Worker) { + worker.recycleTime = time.Now() p.lock.Lock() p.workers = append(p.workers, worker) p.lock.Unlock() diff --git a/worker.go b/worker.go index 162673e..babb8af 100644 --- a/worker.go +++ b/worker.go @@ -24,6 +24,7 @@ package ants import ( "sync/atomic" + "time" ) // Worker is the actual executor who runs the tasks, @@ -35,6 +36,9 @@ type Worker struct { // task is a job should be done. task chan f + + // recycleTime will be update when putting a worker back into queue. + recycleTime time.Time } // run starts a goroutine to repeat the process