mirror of
https://github.com/panjf2000/ants.git
synced 2025-12-16 01:41:02 +00:00
feat: implement generic pool (#351)
This commit is contained in:
parent
9a1446b823
commit
60bd4c42f9
4
ants.go
4
ants.go
@ -84,6 +84,9 @@ var (
|
|||||||
// ErrInvalidLoadBalancingStrategy will be returned when trying to create a MultiPool with an invalid load-balancing strategy.
|
// ErrInvalidLoadBalancingStrategy will be returned when trying to create a MultiPool with an invalid load-balancing strategy.
|
||||||
ErrInvalidLoadBalancingStrategy = errors.New("invalid load-balancing strategy")
|
ErrInvalidLoadBalancingStrategy = errors.New("invalid load-balancing strategy")
|
||||||
|
|
||||||
|
// ErrInvalidMultiPoolSize will be returned when trying to create a MultiPool with an invalid size.
|
||||||
|
ErrInvalidMultiPoolSize = errors.New("invalid size for multiple pool")
|
||||||
|
|
||||||
// workerChanCap determines whether the channel of a worker should be a buffered channel
|
// workerChanCap determines whether the channel of a worker should be a buffered channel
|
||||||
// to get the best performance. Inspired by fasthttp at
|
// to get the best performance. Inspired by fasthttp at
|
||||||
// https://github.com/valyala/fasthttp/blob/master/workerpool.go#L139
|
// https://github.com/valyala/fasthttp/blob/master/workerpool.go#L139
|
||||||
@ -387,6 +390,7 @@ func (p *poolCommon) Release() {
|
|||||||
p.lock.Lock()
|
p.lock.Lock()
|
||||||
p.workers.reset()
|
p.workers.reset()
|
||||||
p.lock.Unlock()
|
p.lock.Unlock()
|
||||||
|
|
||||||
// There might be some callers waiting in retrieveWorker(), so we need to wake them up to prevent
|
// There might be some callers waiting in retrieveWorker(), so we need to wake them up to prevent
|
||||||
// those callers blocking infinitely.
|
// those callers blocking infinitely.
|
||||||
p.cond.Broadcast()
|
p.cond.Broadcast()
|
||||||
|
|||||||
@ -48,6 +48,10 @@ func demoPoolFunc(args any) {
|
|||||||
time.Sleep(time.Duration(n) * time.Millisecond)
|
time.Sleep(time.Duration(n) * time.Millisecond)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func demoPoolFuncInt(n int) {
|
||||||
|
time.Sleep(time.Duration(n) * time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
var stopLongRunningFunc int32
|
var stopLongRunningFunc int32
|
||||||
|
|
||||||
func longRunningFunc() {
|
func longRunningFunc() {
|
||||||
@ -56,16 +60,12 @@ func longRunningFunc() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var stopLongRunningPoolFunc int32
|
|
||||||
|
|
||||||
func longRunningPoolFunc(arg any) {
|
func longRunningPoolFunc(arg any) {
|
||||||
if ch, ok := arg.(chan struct{}); ok {
|
<-arg.(chan struct{})
|
||||||
<-ch
|
}
|
||||||
return
|
|
||||||
}
|
func longRunningPoolFuncCh(ch chan struct{}) {
|
||||||
for atomic.LoadInt32(&stopLongRunningPoolFunc) == 0 {
|
<-ch
|
||||||
runtime.Gosched()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkGoroutines(b *testing.B) {
|
func BenchmarkGoroutines(b *testing.B) {
|
||||||
|
|||||||
832
ants_test.go
832
ants_test.go
File diff suppressed because it is too large
Load Diff
12
multipool.go
12
multipool.go
@ -25,6 +25,7 @@ package ants
|
|||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math"
|
||||||
"strings"
|
"strings"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
@ -58,6 +59,10 @@ type MultiPool struct {
|
|||||||
// NewMultiPool instantiates a MultiPool with a size of the pool list and a size
|
// NewMultiPool instantiates a MultiPool with a size of the pool list and a size
|
||||||
// per pool, and the load-balancing strategy.
|
// per pool, and the load-balancing strategy.
|
||||||
func NewMultiPool(size, sizePerPool int, lbs LoadBalancingStrategy, options ...Option) (*MultiPool, error) {
|
func NewMultiPool(size, sizePerPool int, lbs LoadBalancingStrategy, options ...Option) (*MultiPool, error) {
|
||||||
|
if size <= 0 {
|
||||||
|
return nil, ErrInvalidMultiPoolSize
|
||||||
|
}
|
||||||
|
|
||||||
if lbs != RoundRobin && lbs != LeastTasks {
|
if lbs != RoundRobin && lbs != LeastTasks {
|
||||||
return nil, ErrInvalidLoadBalancingStrategy
|
return nil, ErrInvalidLoadBalancingStrategy
|
||||||
}
|
}
|
||||||
@ -69,16 +74,13 @@ func NewMultiPool(size, sizePerPool int, lbs LoadBalancingStrategy, options ...O
|
|||||||
}
|
}
|
||||||
pools[i] = pool
|
pools[i] = pool
|
||||||
}
|
}
|
||||||
return &MultiPool{pools: pools, lbs: lbs}, nil
|
return &MultiPool{pools: pools, index: math.MaxUint32, lbs: lbs}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MultiPool) next(lbs LoadBalancingStrategy) (idx int) {
|
func (mp *MultiPool) next(lbs LoadBalancingStrategy) (idx int) {
|
||||||
switch lbs {
|
switch lbs {
|
||||||
case RoundRobin:
|
case RoundRobin:
|
||||||
if idx = int((atomic.AddUint32(&mp.index, 1) - 1) % uint32(len(mp.pools))); idx == -1 {
|
return int(atomic.AddUint32(&mp.index, 1) % uint32(len(mp.pools)))
|
||||||
idx = 0
|
|
||||||
}
|
|
||||||
return
|
|
||||||
case LeastTasks:
|
case LeastTasks:
|
||||||
leastTasks := 1<<31 - 1
|
leastTasks := 1<<31 - 1
|
||||||
for i, pool := range mp.pools {
|
for i, pool := range mp.pools {
|
||||||
|
|||||||
@ -25,6 +25,7 @@ package ants
|
|||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math"
|
||||||
"strings"
|
"strings"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
@ -47,6 +48,10 @@ type MultiPoolWithFunc struct {
|
|||||||
// NewMultiPoolWithFunc instantiates a MultiPoolWithFunc with a size of the pool list and a size
|
// NewMultiPoolWithFunc instantiates a MultiPoolWithFunc with a size of the pool list and a size
|
||||||
// per pool, and the load-balancing strategy.
|
// per pool, and the load-balancing strategy.
|
||||||
func NewMultiPoolWithFunc(size, sizePerPool int, fn func(any), lbs LoadBalancingStrategy, options ...Option) (*MultiPoolWithFunc, error) {
|
func NewMultiPoolWithFunc(size, sizePerPool int, fn func(any), lbs LoadBalancingStrategy, options ...Option) (*MultiPoolWithFunc, error) {
|
||||||
|
if size <= 0 {
|
||||||
|
return nil, ErrInvalidMultiPoolSize
|
||||||
|
}
|
||||||
|
|
||||||
if lbs != RoundRobin && lbs != LeastTasks {
|
if lbs != RoundRobin && lbs != LeastTasks {
|
||||||
return nil, ErrInvalidLoadBalancingStrategy
|
return nil, ErrInvalidLoadBalancingStrategy
|
||||||
}
|
}
|
||||||
@ -58,16 +63,13 @@ func NewMultiPoolWithFunc(size, sizePerPool int, fn func(any), lbs LoadBalancing
|
|||||||
}
|
}
|
||||||
pools[i] = pool
|
pools[i] = pool
|
||||||
}
|
}
|
||||||
return &MultiPoolWithFunc{pools: pools, lbs: lbs}, nil
|
return &MultiPoolWithFunc{pools: pools, index: math.MaxUint32, lbs: lbs}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MultiPoolWithFunc) next(lbs LoadBalancingStrategy) (idx int) {
|
func (mp *MultiPoolWithFunc) next(lbs LoadBalancingStrategy) (idx int) {
|
||||||
switch lbs {
|
switch lbs {
|
||||||
case RoundRobin:
|
case RoundRobin:
|
||||||
if idx = int((atomic.AddUint32(&mp.index, 1) - 1) % uint32(len(mp.pools))); idx == -1 {
|
return int(atomic.AddUint32(&mp.index, 1) % uint32(len(mp.pools)))
|
||||||
idx = 0
|
|
||||||
}
|
|
||||||
return
|
|
||||||
case LeastTasks:
|
case LeastTasks:
|
||||||
leastTasks := 1<<31 - 1
|
leastTasks := 1<<31 - 1
|
||||||
for i, pool := range mp.pools {
|
for i, pool := range mp.pools {
|
||||||
|
|||||||
215
multipool_func_generic.go
Normal file
215
multipool_func_generic.go
Normal file
@ -0,0 +1,215 @@
|
|||||||
|
// MIT License
|
||||||
|
|
||||||
|
// Copyright (c) 2025 Andy Pan
|
||||||
|
|
||||||
|
// Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
|
// of this software and associated documentation files (the "Software"), to deal
|
||||||
|
// in the Software without restriction, including without limitation the rights
|
||||||
|
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
|
// copies of the Software, and to permit persons to whom the Software is
|
||||||
|
// furnished to do so, subject to the following conditions:
|
||||||
|
//
|
||||||
|
// The above copyright notice and this permission notice shall be included in all
|
||||||
|
// copies or substantial portions of the Software.
|
||||||
|
//
|
||||||
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
|
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||||
|
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||||
|
// SOFTWARE.
|
||||||
|
|
||||||
|
package ants
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"math"
|
||||||
|
"strings"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
)
|
||||||
|
|
||||||
|
// MultiPoolWithFuncGeneric is the generic version of MultiPoolWithFunc.
|
||||||
|
type MultiPoolWithFuncGeneric[T any] struct {
|
||||||
|
pools []*PoolWithFuncGeneric[T]
|
||||||
|
index uint32
|
||||||
|
state int32
|
||||||
|
lbs LoadBalancingStrategy
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewMultiPoolWithFuncGeneric instantiates a MultiPoolWithFunc with a size of the pool list and a size
|
||||||
|
// per pool, and the load-balancing strategy.
|
||||||
|
func NewMultiPoolWithFuncGeneric[T any](size, sizePerPool int, fn func(T), lbs LoadBalancingStrategy, options ...Option) (*MultiPoolWithFuncGeneric[T], error) {
|
||||||
|
if size <= 0 {
|
||||||
|
return nil, ErrInvalidMultiPoolSize
|
||||||
|
}
|
||||||
|
|
||||||
|
if lbs != RoundRobin && lbs != LeastTasks {
|
||||||
|
return nil, ErrInvalidLoadBalancingStrategy
|
||||||
|
}
|
||||||
|
pools := make([]*PoolWithFuncGeneric[T], size)
|
||||||
|
for i := 0; i < size; i++ {
|
||||||
|
pool, err := NewPoolWithFuncGeneric(sizePerPool, fn, options...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
pools[i] = pool
|
||||||
|
}
|
||||||
|
return &MultiPoolWithFuncGeneric[T]{pools: pools, index: math.MaxUint32, lbs: lbs}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *MultiPoolWithFuncGeneric[T]) next(lbs LoadBalancingStrategy) (idx int) {
|
||||||
|
switch lbs {
|
||||||
|
case RoundRobin:
|
||||||
|
return int(atomic.AddUint32(&mp.index, 1) % uint32(len(mp.pools)))
|
||||||
|
case LeastTasks:
|
||||||
|
leastTasks := 1<<31 - 1
|
||||||
|
for i, pool := range mp.pools {
|
||||||
|
if n := pool.Running(); n < leastTasks {
|
||||||
|
leastTasks = n
|
||||||
|
idx = i
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
|
||||||
|
// Invoke submits a task to a pool selected by the load-balancing strategy.
|
||||||
|
func (mp *MultiPoolWithFuncGeneric[T]) Invoke(args T) (err error) {
|
||||||
|
if mp.IsClosed() {
|
||||||
|
return ErrPoolClosed
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = mp.pools[mp.next(mp.lbs)].Invoke(args); err == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err == ErrPoolOverload && mp.lbs == RoundRobin {
|
||||||
|
return mp.pools[mp.next(LeastTasks)].Invoke(args)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Running returns the number of the currently running workers across all pools.
|
||||||
|
func (mp *MultiPoolWithFuncGeneric[T]) Running() (n int) {
|
||||||
|
for _, pool := range mp.pools {
|
||||||
|
n += pool.Running()
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// RunningByIndex returns the number of the currently running workers in the specific pool.
|
||||||
|
func (mp *MultiPoolWithFuncGeneric[T]) RunningByIndex(idx int) (int, error) {
|
||||||
|
if idx < 0 || idx >= len(mp.pools) {
|
||||||
|
return -1, ErrInvalidPoolIndex
|
||||||
|
}
|
||||||
|
return mp.pools[idx].Running(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Free returns the number of available workers across all pools.
|
||||||
|
func (mp *MultiPoolWithFuncGeneric[T]) Free() (n int) {
|
||||||
|
for _, pool := range mp.pools {
|
||||||
|
n += pool.Free()
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// FreeByIndex returns the number of available workers in the specific pool.
|
||||||
|
func (mp *MultiPoolWithFuncGeneric[T]) FreeByIndex(idx int) (int, error) {
|
||||||
|
if idx < 0 || idx >= len(mp.pools) {
|
||||||
|
return -1, ErrInvalidPoolIndex
|
||||||
|
}
|
||||||
|
return mp.pools[idx].Free(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Waiting returns the number of the currently waiting tasks across all pools.
|
||||||
|
func (mp *MultiPoolWithFuncGeneric[T]) Waiting() (n int) {
|
||||||
|
for _, pool := range mp.pools {
|
||||||
|
n += pool.Waiting()
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// WaitingByIndex returns the number of the currently waiting tasks in the specific pool.
|
||||||
|
func (mp *MultiPoolWithFuncGeneric[T]) WaitingByIndex(idx int) (int, error) {
|
||||||
|
if idx < 0 || idx >= len(mp.pools) {
|
||||||
|
return -1, ErrInvalidPoolIndex
|
||||||
|
}
|
||||||
|
return mp.pools[idx].Waiting(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cap returns the capacity of this multi-pool.
|
||||||
|
func (mp *MultiPoolWithFuncGeneric[T]) Cap() (n int) {
|
||||||
|
for _, pool := range mp.pools {
|
||||||
|
n += pool.Cap()
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tune resizes each pool in multi-pool.
|
||||||
|
//
|
||||||
|
// Note that this method doesn't resize the overall
|
||||||
|
// capacity of multi-pool.
|
||||||
|
func (mp *MultiPoolWithFuncGeneric[T]) Tune(size int) {
|
||||||
|
for _, pool := range mp.pools {
|
||||||
|
pool.Tune(size)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsClosed indicates whether the multi-pool is closed.
|
||||||
|
func (mp *MultiPoolWithFuncGeneric[T]) IsClosed() bool {
|
||||||
|
return atomic.LoadInt32(&mp.state) == CLOSED
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReleaseTimeout closes the multi-pool with a timeout,
|
||||||
|
// it waits all pools to be closed before timing out.
|
||||||
|
func (mp *MultiPoolWithFuncGeneric[T]) ReleaseTimeout(timeout time.Duration) error {
|
||||||
|
if !atomic.CompareAndSwapInt32(&mp.state, OPENED, CLOSED) {
|
||||||
|
return ErrPoolClosed
|
||||||
|
}
|
||||||
|
|
||||||
|
errCh := make(chan error, len(mp.pools))
|
||||||
|
var wg errgroup.Group
|
||||||
|
for i, pool := range mp.pools {
|
||||||
|
func(p *PoolWithFuncGeneric[T], idx int) {
|
||||||
|
wg.Go(func() error {
|
||||||
|
err := p.ReleaseTimeout(timeout)
|
||||||
|
if err != nil {
|
||||||
|
err = fmt.Errorf("pool %d: %v", idx, err)
|
||||||
|
}
|
||||||
|
errCh <- err
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
}(pool, i)
|
||||||
|
}
|
||||||
|
|
||||||
|
_ = wg.Wait()
|
||||||
|
|
||||||
|
var errStr strings.Builder
|
||||||
|
for i := 0; i < len(mp.pools); i++ {
|
||||||
|
if err := <-errCh; err != nil {
|
||||||
|
errStr.WriteString(err.Error())
|
||||||
|
errStr.WriteString(" | ")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if errStr.Len() == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return errors.New(strings.TrimSuffix(errStr.String(), " | "))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reboot reboots a released multi-pool.
|
||||||
|
func (mp *MultiPoolWithFuncGeneric[T]) Reboot() {
|
||||||
|
if atomic.CompareAndSwapInt32(&mp.state, CLOSED, OPENED) {
|
||||||
|
atomic.StoreUint32(&mp.index, 0)
|
||||||
|
for _, pool := range mp.pools {
|
||||||
|
pool.Reboot()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
12
pool_func.go
12
pool_func.go
@ -26,8 +26,8 @@ package ants
|
|||||||
type PoolWithFunc struct {
|
type PoolWithFunc struct {
|
||||||
*poolCommon
|
*poolCommon
|
||||||
|
|
||||||
// poolFunc is the unified function for processing tasks.
|
// fn is the unified function for processing tasks.
|
||||||
poolFunc func(any)
|
fn func(any)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Invoke passes arguments to the pool.
|
// Invoke passes arguments to the pool.
|
||||||
@ -36,14 +36,14 @@ type PoolWithFunc struct {
|
|||||||
// but what calls for special attention is that you will get blocked with the last
|
// but what calls for special attention is that you will get blocked with the last
|
||||||
// Pool.Invoke() call once the current Pool runs out of its capacity, and to avoid this,
|
// Pool.Invoke() call once the current Pool runs out of its capacity, and to avoid this,
|
||||||
// you should instantiate a PoolWithFunc with ants.WithNonblocking(true).
|
// you should instantiate a PoolWithFunc with ants.WithNonblocking(true).
|
||||||
func (p *PoolWithFunc) Invoke(args any) error {
|
func (p *PoolWithFunc) Invoke(arg any) error {
|
||||||
if p.IsClosed() {
|
if p.IsClosed() {
|
||||||
return ErrPoolClosed
|
return ErrPoolClosed
|
||||||
}
|
}
|
||||||
|
|
||||||
w, err := p.retrieveWorker()
|
w, err := p.retrieveWorker()
|
||||||
if w != nil {
|
if w != nil {
|
||||||
w.inputParam(args)
|
w.inputArg(arg)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -61,13 +61,13 @@ func NewPoolWithFunc(size int, pf func(any), options ...Option) (*PoolWithFunc,
|
|||||||
|
|
||||||
pool := &PoolWithFunc{
|
pool := &PoolWithFunc{
|
||||||
poolCommon: pc,
|
poolCommon: pc,
|
||||||
poolFunc: pf,
|
fn: pf,
|
||||||
}
|
}
|
||||||
|
|
||||||
pool.workerCache.New = func() any {
|
pool.workerCache.New = func() any {
|
||||||
return &goWorkerWithFunc{
|
return &goWorkerWithFunc{
|
||||||
pool: pool,
|
pool: pool,
|
||||||
args: make(chan any, workerChanCap),
|
arg: make(chan any, workerChanCap),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
71
pool_func_generic.go
Normal file
71
pool_func_generic.go
Normal file
@ -0,0 +1,71 @@
|
|||||||
|
// MIT License
|
||||||
|
|
||||||
|
// Copyright (c) 2025 Andy Pan
|
||||||
|
|
||||||
|
// Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
|
// of this software and associated documentation files (the "Software"), to deal
|
||||||
|
// in the Software without restriction, including without limitation the rights
|
||||||
|
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
|
// copies of the Software, and to permit persons to whom the Software is
|
||||||
|
// furnished to do so, subject to the following conditions:
|
||||||
|
//
|
||||||
|
// The above copyright notice and this permission notice shall be included in all
|
||||||
|
// copies or substantial portions of the Software.
|
||||||
|
//
|
||||||
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
|
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||||
|
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||||
|
// SOFTWARE.
|
||||||
|
|
||||||
|
package ants
|
||||||
|
|
||||||
|
// PoolWithFuncGeneric is the generic version of PoolWithFunc.
|
||||||
|
type PoolWithFuncGeneric[T any] struct {
|
||||||
|
*poolCommon
|
||||||
|
|
||||||
|
// fn is the unified function for processing tasks.
|
||||||
|
fn func(T)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Invoke passes the argument to the pool to start a new task.
|
||||||
|
func (p *PoolWithFuncGeneric[T]) Invoke(arg T) error {
|
||||||
|
if p.IsClosed() {
|
||||||
|
return ErrPoolClosed
|
||||||
|
}
|
||||||
|
|
||||||
|
w, err := p.retrieveWorker()
|
||||||
|
if w != nil {
|
||||||
|
w.(*goWorkerWithFuncGeneric[T]).arg <- arg
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewPoolWithFuncGeneric instantiates a PoolWithFuncGeneric[T] with customized options.
|
||||||
|
func NewPoolWithFuncGeneric[T any](size int, pf func(T), options ...Option) (*PoolWithFuncGeneric[T], error) {
|
||||||
|
if pf == nil {
|
||||||
|
return nil, ErrLackPoolFunc
|
||||||
|
}
|
||||||
|
|
||||||
|
pc, err := newPool(size, options...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
pool := &PoolWithFuncGeneric[T]{
|
||||||
|
poolCommon: pc,
|
||||||
|
fn: pf,
|
||||||
|
}
|
||||||
|
|
||||||
|
pool.workerCache.New = func() any {
|
||||||
|
return &goWorkerWithFuncGeneric[T]{
|
||||||
|
pool: pool,
|
||||||
|
arg: make(chan T, workerChanCap),
|
||||||
|
exit: make(chan struct{}, 1),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return pool, nil
|
||||||
|
}
|
||||||
12
worker.go
12
worker.go
@ -31,6 +31,8 @@ import (
|
|||||||
// it starts a goroutine that accepts tasks and
|
// it starts a goroutine that accepts tasks and
|
||||||
// performs function calls.
|
// performs function calls.
|
||||||
type goWorker struct {
|
type goWorker struct {
|
||||||
|
worker
|
||||||
|
|
||||||
// pool who owns this worker.
|
// pool who owns this worker.
|
||||||
pool *Pool
|
pool *Pool
|
||||||
|
|
||||||
@ -64,11 +66,11 @@ func (w *goWorker) run() {
|
|||||||
w.pool.cond.Signal()
|
w.pool.cond.Signal()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
for f := range w.task {
|
for fn := range w.task {
|
||||||
if f == nil {
|
if fn == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
f()
|
fn()
|
||||||
if ok := w.pool.revertWorker(w); !ok {
|
if ok := w.pool.revertWorker(w); !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -91,7 +93,3 @@ func (w *goWorker) setLastUsedTime(t time.Time) {
|
|||||||
func (w *goWorker) inputFunc(fn func()) {
|
func (w *goWorker) inputFunc(fn func()) {
|
||||||
w.task <- fn
|
w.task <- fn
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *goWorker) inputParam(any) {
|
|
||||||
panic("unreachable")
|
|
||||||
}
|
|
||||||
|
|||||||
@ -31,11 +31,13 @@ import (
|
|||||||
// it starts a goroutine that accepts tasks and
|
// it starts a goroutine that accepts tasks and
|
||||||
// performs function calls.
|
// performs function calls.
|
||||||
type goWorkerWithFunc struct {
|
type goWorkerWithFunc struct {
|
||||||
|
worker
|
||||||
|
|
||||||
// pool who owns this worker.
|
// pool who owns this worker.
|
||||||
pool *PoolWithFunc
|
pool *PoolWithFunc
|
||||||
|
|
||||||
// args is a job should be done.
|
// arg is the argument for the function.
|
||||||
args chan any
|
arg chan any
|
||||||
|
|
||||||
// lastUsed will be updated when putting a worker back into queue.
|
// lastUsed will be updated when putting a worker back into queue.
|
||||||
lastUsed time.Time
|
lastUsed time.Time
|
||||||
@ -64,11 +66,11 @@ func (w *goWorkerWithFunc) run() {
|
|||||||
w.pool.cond.Signal()
|
w.pool.cond.Signal()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
for args := range w.args {
|
for arg := range w.arg {
|
||||||
if args == nil {
|
if arg == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
w.pool.poolFunc(args)
|
w.pool.fn(arg)
|
||||||
if ok := w.pool.revertWorker(w); !ok {
|
if ok := w.pool.revertWorker(w); !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -77,7 +79,7 @@ func (w *goWorkerWithFunc) run() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (w *goWorkerWithFunc) finish() {
|
func (w *goWorkerWithFunc) finish() {
|
||||||
w.args <- nil
|
w.arg <- nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *goWorkerWithFunc) lastUsedTime() time.Time {
|
func (w *goWorkerWithFunc) lastUsedTime() time.Time {
|
||||||
@ -88,10 +90,6 @@ func (w *goWorkerWithFunc) setLastUsedTime(t time.Time) {
|
|||||||
w.lastUsed = t
|
w.lastUsed = t
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *goWorkerWithFunc) inputFunc(func()) {
|
func (w *goWorkerWithFunc) inputArg(arg any) {
|
||||||
panic("unreachable")
|
w.arg <- arg
|
||||||
}
|
|
||||||
|
|
||||||
func (w *goWorkerWithFunc) inputParam(arg any) {
|
|
||||||
w.args <- arg
|
|
||||||
}
|
}
|
||||||
|
|||||||
96
worker_func_generic.go
Normal file
96
worker_func_generic.go
Normal file
@ -0,0 +1,96 @@
|
|||||||
|
// MIT License
|
||||||
|
|
||||||
|
// Copyright (c) 2025 Andy Pan
|
||||||
|
|
||||||
|
// Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
|
// of this software and associated documentation files (the "Software"), to deal
|
||||||
|
// in the Software without restriction, including without limitation the rights
|
||||||
|
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
|
// copies of the Software, and to permit persons to whom the Software is
|
||||||
|
// furnished to do so, subject to the following conditions:
|
||||||
|
//
|
||||||
|
// The above copyright notice and this permission notice shall be included in all
|
||||||
|
// copies or substantial portions of the Software.
|
||||||
|
//
|
||||||
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
|
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||||
|
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||||
|
// SOFTWARE.
|
||||||
|
|
||||||
|
package ants
|
||||||
|
|
||||||
|
import (
|
||||||
|
"runtime/debug"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// goWorkerWithFunc is the actual executor who runs the tasks,
|
||||||
|
// it starts a goroutine that accepts tasks and
|
||||||
|
// performs function calls.
|
||||||
|
type goWorkerWithFuncGeneric[T any] struct {
|
||||||
|
worker
|
||||||
|
|
||||||
|
// pool who owns this worker.
|
||||||
|
pool *PoolWithFuncGeneric[T]
|
||||||
|
|
||||||
|
// arg is a job should be done.
|
||||||
|
arg chan T
|
||||||
|
|
||||||
|
// exit signals the goroutine to exit.
|
||||||
|
exit chan struct{}
|
||||||
|
|
||||||
|
// lastUsed will be updated when putting a worker back into queue.
|
||||||
|
lastUsed time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// run starts a goroutine to repeat the process
|
||||||
|
// that performs the function calls.
|
||||||
|
func (w *goWorkerWithFuncGeneric[T]) run() {
|
||||||
|
w.pool.addRunning(1)
|
||||||
|
go func() {
|
||||||
|
defer func() {
|
||||||
|
if w.pool.addRunning(-1) == 0 && w.pool.IsClosed() {
|
||||||
|
w.pool.once.Do(func() {
|
||||||
|
close(w.pool.allDone)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
w.pool.workerCache.Put(w)
|
||||||
|
if p := recover(); p != nil {
|
||||||
|
if ph := w.pool.options.PanicHandler; ph != nil {
|
||||||
|
ph(p)
|
||||||
|
} else {
|
||||||
|
w.pool.options.Logger.Printf("worker exits from panic: %v\n%s\n", p, debug.Stack())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Call Signal() here in case there are goroutines waiting for available workers.
|
||||||
|
w.pool.cond.Signal()
|
||||||
|
}()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-w.exit:
|
||||||
|
return
|
||||||
|
case arg := <-w.arg:
|
||||||
|
w.pool.fn(arg)
|
||||||
|
if ok := w.pool.revertWorker(w); !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *goWorkerWithFuncGeneric[T]) finish() {
|
||||||
|
w.exit <- struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *goWorkerWithFuncGeneric[T]) lastUsedTime() time.Time {
|
||||||
|
return w.lastUsed
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *goWorkerWithFuncGeneric[T]) setLastUsedTime(t time.Time) {
|
||||||
|
w.lastUsed = t
|
||||||
|
}
|
||||||
@ -12,6 +12,9 @@ type loopQueue struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func newWorkerLoopQueue(size int) *loopQueue {
|
func newWorkerLoopQueue(size int) *loopQueue {
|
||||||
|
if size <= 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
return &loopQueue{
|
return &loopQueue{
|
||||||
items: make([]worker, size),
|
items: make([]worker, size),
|
||||||
size: size,
|
size: size,
|
||||||
@ -39,10 +42,6 @@ func (wq *loopQueue) isEmpty() bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (wq *loopQueue) insert(w worker) error {
|
func (wq *loopQueue) insert(w worker) error {
|
||||||
if wq.size == 0 {
|
|
||||||
return errQueueIsReleased
|
|
||||||
}
|
|
||||||
|
|
||||||
if wq.isFull {
|
if wq.isFull {
|
||||||
return errQueueIsFull
|
return errQueueIsFull
|
||||||
}
|
}
|
||||||
|
|||||||
@ -6,15 +6,17 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestNewLoopQueue(t *testing.T) {
|
func TestNewLoopQueue(t *testing.T) {
|
||||||
size := 100
|
size := 100
|
||||||
q := newWorkerLoopQueue(size)
|
q := newWorkerLoopQueue(size)
|
||||||
assert.EqualValues(t, 0, q.len(), "Len error")
|
require.EqualValues(t, 0, q.len(), "Len error")
|
||||||
assert.Equal(t, true, q.isEmpty(), "IsEmpty error")
|
require.Equal(t, true, q.isEmpty(), "IsEmpty error")
|
||||||
assert.Nil(t, q.detach(), "Dequeue error")
|
require.Nil(t, q.detach(), "Dequeue error")
|
||||||
|
|
||||||
|
require.Nil(t, newWorkerLoopQueue(0))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestLoopQueue(t *testing.T) {
|
func TestLoopQueue(t *testing.T) {
|
||||||
@ -27,9 +29,9 @@ func TestLoopQueue(t *testing.T) {
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
assert.EqualValues(t, 5, q.len(), "Len error")
|
require.EqualValues(t, 5, q.len(), "Len error")
|
||||||
_ = q.detach()
|
_ = q.detach()
|
||||||
assert.EqualValues(t, 4, q.len(), "Len error")
|
require.EqualValues(t, 4, q.len(), "Len error")
|
||||||
|
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
|
|
||||||
@ -39,13 +41,13 @@ func TestLoopQueue(t *testing.T) {
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
assert.EqualValues(t, 10, q.len(), "Len error")
|
require.EqualValues(t, 10, q.len(), "Len error")
|
||||||
|
|
||||||
err := q.insert(&goWorker{lastUsed: time.Now()})
|
err := q.insert(&goWorker{lastUsed: time.Now()})
|
||||||
assert.Error(t, err, "Enqueue, error")
|
require.Error(t, err, "Enqueue, error")
|
||||||
|
|
||||||
q.refresh(time.Second)
|
q.refresh(time.Second)
|
||||||
assert.EqualValuesf(t, 6, q.len(), "Len error: %d", q.len())
|
require.EqualValuesf(t, 6, q.len(), "Len error: %d", q.len())
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRotatedQueueSearch(t *testing.T) {
|
func TestRotatedQueueSearch(t *testing.T) {
|
||||||
@ -57,18 +59,18 @@ func TestRotatedQueueSearch(t *testing.T) {
|
|||||||
|
|
||||||
_ = q.insert(&goWorker{lastUsed: time.Now()})
|
_ = q.insert(&goWorker{lastUsed: time.Now()})
|
||||||
|
|
||||||
assert.EqualValues(t, 0, q.binarySearch(time.Now()), "index should be 0")
|
require.EqualValues(t, 0, q.binarySearch(time.Now()), "index should be 0")
|
||||||
assert.EqualValues(t, -1, q.binarySearch(expiry1), "index should be -1")
|
require.EqualValues(t, -1, q.binarySearch(expiry1), "index should be -1")
|
||||||
|
|
||||||
// 2
|
// 2
|
||||||
expiry2 := time.Now()
|
expiry2 := time.Now()
|
||||||
_ = q.insert(&goWorker{lastUsed: time.Now()})
|
_ = q.insert(&goWorker{lastUsed: time.Now()})
|
||||||
|
|
||||||
assert.EqualValues(t, -1, q.binarySearch(expiry1), "index should be -1")
|
require.EqualValues(t, -1, q.binarySearch(expiry1), "index should be -1")
|
||||||
|
|
||||||
assert.EqualValues(t, 0, q.binarySearch(expiry2), "index should be 0")
|
require.EqualValues(t, 0, q.binarySearch(expiry2), "index should be 0")
|
||||||
|
|
||||||
assert.EqualValues(t, 1, q.binarySearch(time.Now()), "index should be 1")
|
require.EqualValues(t, 1, q.binarySearch(time.Now()), "index should be 1")
|
||||||
|
|
||||||
// more
|
// more
|
||||||
for i := 0; i < 5; i++ {
|
for i := 0; i < 5; i++ {
|
||||||
@ -83,7 +85,7 @@ func TestRotatedQueueSearch(t *testing.T) {
|
|||||||
err = q.insert(&goWorker{lastUsed: time.Now()})
|
err = q.insert(&goWorker{lastUsed: time.Now()})
|
||||||
}
|
}
|
||||||
|
|
||||||
assert.EqualValues(t, 7, q.binarySearch(expiry3), "index should be 7")
|
require.EqualValues(t, 7, q.binarySearch(expiry3), "index should be 7")
|
||||||
|
|
||||||
// rotate
|
// rotate
|
||||||
for i := 0; i < 6; i++ {
|
for i := 0; i < 6; i++ {
|
||||||
@ -98,7 +100,7 @@ func TestRotatedQueueSearch(t *testing.T) {
|
|||||||
}
|
}
|
||||||
// head = 6, tail = 5, insert direction ->
|
// head = 6, tail = 5, insert direction ->
|
||||||
// [expiry4, time, time, time, time, nil/tail, time/head, time, time, time]
|
// [expiry4, time, time, time, time, nil/tail, time/head, time, time, time]
|
||||||
assert.EqualValues(t, 0, q.binarySearch(expiry4), "index should be 0")
|
require.EqualValues(t, 0, q.binarySearch(expiry4), "index should be 0")
|
||||||
|
|
||||||
for i := 0; i < 3; i++ {
|
for i := 0; i < 3; i++ {
|
||||||
_ = q.detach()
|
_ = q.detach()
|
||||||
@ -108,17 +110,17 @@ func TestRotatedQueueSearch(t *testing.T) {
|
|||||||
|
|
||||||
// head = 6, tail = 5, insert direction ->
|
// head = 6, tail = 5, insert direction ->
|
||||||
// [expiry4, time, time, time, time, expiry5, nil/tail, nil, nil, time/head]
|
// [expiry4, time, time, time, time, expiry5, nil/tail, nil, nil, time/head]
|
||||||
assert.EqualValues(t, 5, q.binarySearch(expiry5), "index should be 5")
|
require.EqualValues(t, 5, q.binarySearch(expiry5), "index should be 5")
|
||||||
|
|
||||||
for i := 0; i < 3; i++ {
|
for i := 0; i < 3; i++ {
|
||||||
_ = q.insert(&goWorker{lastUsed: time.Now()})
|
_ = q.insert(&goWorker{lastUsed: time.Now()})
|
||||||
}
|
}
|
||||||
// head = 9, tail = 9, insert direction ->
|
// head = 9, tail = 9, insert direction ->
|
||||||
// [expiry4, time, time, time, time, expiry5, time, time, time, time/head/tail]
|
// [expiry4, time, time, time, time, expiry5, time, time, time, time/head/tail]
|
||||||
assert.EqualValues(t, -1, q.binarySearch(expiry2), "index should be -1")
|
require.EqualValues(t, -1, q.binarySearch(expiry2), "index should be -1")
|
||||||
|
|
||||||
assert.EqualValues(t, 9, q.binarySearch(q.items[9].lastUsedTime()), "index should be 9")
|
require.EqualValues(t, 9, q.binarySearch(q.items[9].lastUsedTime()), "index should be 9")
|
||||||
assert.EqualValues(t, 8, q.binarySearch(time.Now()), "index should be 8")
|
require.EqualValues(t, 8, q.binarySearch(time.Now()), "index should be 8")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRetrieveExpiry(t *testing.T) {
|
func TestRetrieveExpiry(t *testing.T) {
|
||||||
@ -139,7 +141,7 @@ func TestRetrieveExpiry(t *testing.T) {
|
|||||||
}
|
}
|
||||||
workers := q.refresh(u)
|
workers := q.refresh(u)
|
||||||
|
|
||||||
assert.EqualValues(t, expirew, workers, "expired workers aren't right")
|
require.EqualValues(t, expirew, workers, "expired workers aren't right")
|
||||||
|
|
||||||
// test [ time, time, time, time, time, time+1s, time+1s, time+1s, time+1s, time+1s]
|
// test [ time, time, time, time, time, time+1s, time+1s, time+1s, time+1s, time+1s]
|
||||||
time.Sleep(u)
|
time.Sleep(u)
|
||||||
@ -152,7 +154,7 @@ func TestRetrieveExpiry(t *testing.T) {
|
|||||||
|
|
||||||
workers2 := q.refresh(u)
|
workers2 := q.refresh(u)
|
||||||
|
|
||||||
assert.EqualValues(t, expirew, workers2, "expired workers aren't right")
|
require.EqualValues(t, expirew, workers2, "expired workers aren't right")
|
||||||
|
|
||||||
// test [ time+1s, time+1s, time+1s, nil, nil, time+1s, time+1s, time+1s, time+1s, time+1s]
|
// test [ time+1s, time+1s, time+1s, nil, nil, time+1s, time+1s, time+1s, time+1s, time+1s]
|
||||||
for i := 0; i < size/2; i++ {
|
for i := 0; i < size/2; i++ {
|
||||||
@ -172,5 +174,5 @@ func TestRetrieveExpiry(t *testing.T) {
|
|||||||
|
|
||||||
workers3 := q.refresh(u)
|
workers3 := q.refresh(u)
|
||||||
|
|
||||||
assert.EqualValues(t, expirew, workers3, "expired workers aren't right")
|
require.EqualValues(t, expirew, workers3, "expired workers aren't right")
|
||||||
}
|
}
|
||||||
|
|||||||
@ -5,13 +5,8 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
// errQueueIsFull will be returned when the worker queue is full.
|
||||||
// errQueueIsFull will be returned when the worker queue is full.
|
var errQueueIsFull = errors.New("the queue is full")
|
||||||
errQueueIsFull = errors.New("the queue is full")
|
|
||||||
|
|
||||||
// errQueueIsReleased will be returned when trying to insert item to a released worker queue.
|
|
||||||
errQueueIsReleased = errors.New("the queue length is zero")
|
|
||||||
)
|
|
||||||
|
|
||||||
type worker interface {
|
type worker interface {
|
||||||
run()
|
run()
|
||||||
@ -19,7 +14,7 @@ type worker interface {
|
|||||||
lastUsedTime() time.Time
|
lastUsedTime() time.Time
|
||||||
setLastUsedTime(t time.Time)
|
setLastUsedTime(t time.Time)
|
||||||
inputFunc(func())
|
inputFunc(func())
|
||||||
inputParam(any)
|
inputArg(any)
|
||||||
}
|
}
|
||||||
|
|
||||||
type workerQueue interface {
|
type workerQueue interface {
|
||||||
|
|||||||
@ -13,57 +13,57 @@ func newWorkerStack(size int) *workerStack {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wq *workerStack) len() int {
|
func (ws *workerStack) len() int {
|
||||||
return len(wq.items)
|
return len(ws.items)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wq *workerStack) isEmpty() bool {
|
func (ws *workerStack) isEmpty() bool {
|
||||||
return len(wq.items) == 0
|
return len(ws.items) == 0
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wq *workerStack) insert(w worker) error {
|
func (ws *workerStack) insert(w worker) error {
|
||||||
wq.items = append(wq.items, w)
|
ws.items = append(ws.items, w)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wq *workerStack) detach() worker {
|
func (ws *workerStack) detach() worker {
|
||||||
l := wq.len()
|
l := ws.len()
|
||||||
if l == 0 {
|
if l == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
w := wq.items[l-1]
|
w := ws.items[l-1]
|
||||||
wq.items[l-1] = nil // avoid memory leaks
|
ws.items[l-1] = nil // avoid memory leaks
|
||||||
wq.items = wq.items[:l-1]
|
ws.items = ws.items[:l-1]
|
||||||
|
|
||||||
return w
|
return w
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wq *workerStack) refresh(duration time.Duration) []worker {
|
func (ws *workerStack) refresh(duration time.Duration) []worker {
|
||||||
n := wq.len()
|
n := ws.len()
|
||||||
if n == 0 {
|
if n == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
expiryTime := time.Now().Add(-duration)
|
expiryTime := time.Now().Add(-duration)
|
||||||
index := wq.binarySearch(0, n-1, expiryTime)
|
index := ws.binarySearch(0, n-1, expiryTime)
|
||||||
|
|
||||||
wq.expiry = wq.expiry[:0]
|
ws.expiry = ws.expiry[:0]
|
||||||
if index != -1 {
|
if index != -1 {
|
||||||
wq.expiry = append(wq.expiry, wq.items[:index+1]...)
|
ws.expiry = append(ws.expiry, ws.items[:index+1]...)
|
||||||
m := copy(wq.items, wq.items[index+1:])
|
m := copy(ws.items, ws.items[index+1:])
|
||||||
for i := m; i < n; i++ {
|
for i := m; i < n; i++ {
|
||||||
wq.items[i] = nil
|
ws.items[i] = nil
|
||||||
}
|
}
|
||||||
wq.items = wq.items[:m]
|
ws.items = ws.items[:m]
|
||||||
}
|
}
|
||||||
return wq.expiry
|
return ws.expiry
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wq *workerStack) binarySearch(l, r int, expiryTime time.Time) int {
|
func (ws *workerStack) binarySearch(l, r int, expiryTime time.Time) int {
|
||||||
for l <= r {
|
for l <= r {
|
||||||
mid := l + ((r - l) >> 1) // avoid overflow when computing mid
|
mid := l + ((r - l) >> 1) // avoid overflow when computing mid
|
||||||
if expiryTime.Before(wq.items[mid].lastUsedTime()) {
|
if expiryTime.Before(ws.items[mid].lastUsedTime()) {
|
||||||
r = mid - 1
|
r = mid - 1
|
||||||
} else {
|
} else {
|
||||||
l = mid + 1
|
l = mid + 1
|
||||||
@ -72,10 +72,10 @@ func (wq *workerStack) binarySearch(l, r int, expiryTime time.Time) int {
|
|||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wq *workerStack) reset() {
|
func (ws *workerStack) reset() {
|
||||||
for i := 0; i < wq.len(); i++ {
|
for i := 0; i < ws.len(); i++ {
|
||||||
wq.items[i].finish()
|
ws.items[i].finish()
|
||||||
wq.items[i] = nil
|
ws.items[i] = nil
|
||||||
}
|
}
|
||||||
wq.items = wq.items[:0]
|
ws.items = ws.items[:0]
|
||||||
}
|
}
|
||||||
|
|||||||
@ -6,15 +6,15 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestNewWorkerStack(t *testing.T) {
|
func TestNewWorkerStack(t *testing.T) {
|
||||||
size := 100
|
size := 100
|
||||||
q := newWorkerStack(size)
|
q := newWorkerStack(size)
|
||||||
assert.EqualValues(t, 0, q.len(), "Len error")
|
require.EqualValues(t, 0, q.len(), "Len error")
|
||||||
assert.Equal(t, true, q.isEmpty(), "IsEmpty error")
|
require.Equal(t, true, q.isEmpty(), "IsEmpty error")
|
||||||
assert.Nil(t, q.detach(), "Dequeue error")
|
require.Nil(t, q.detach(), "Dequeue error")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWorkerStack(t *testing.T) {
|
func TestWorkerStack(t *testing.T) {
|
||||||
@ -26,7 +26,7 @@ func TestWorkerStack(t *testing.T) {
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
assert.EqualValues(t, 5, q.len(), "Len error")
|
require.EqualValues(t, 5, q.len(), "Len error")
|
||||||
|
|
||||||
expired := time.Now()
|
expired := time.Now()
|
||||||
|
|
||||||
@ -43,9 +43,9 @@ func TestWorkerStack(t *testing.T) {
|
|||||||
t.Fatal("Enqueue error")
|
t.Fatal("Enqueue error")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
assert.EqualValues(t, 12, q.len(), "Len error")
|
require.EqualValues(t, 12, q.len(), "Len error")
|
||||||
q.refresh(time.Second)
|
q.refresh(time.Second)
|
||||||
assert.EqualValues(t, 6, q.len(), "Len error")
|
require.EqualValues(t, 6, q.len(), "Len error")
|
||||||
}
|
}
|
||||||
|
|
||||||
// It seems that something wrong with time.Now() on Windows, not sure whether it is a bug on Windows,
|
// It seems that something wrong with time.Now() on Windows, not sure whether it is a bug on Windows,
|
||||||
@ -58,18 +58,18 @@ func TestSearch(t *testing.T) {
|
|||||||
|
|
||||||
_ = q.insert(&goWorker{lastUsed: time.Now()})
|
_ = q.insert(&goWorker{lastUsed: time.Now()})
|
||||||
|
|
||||||
assert.EqualValues(t, 0, q.binarySearch(0, q.len()-1, time.Now()), "index should be 0")
|
require.EqualValues(t, 0, q.binarySearch(0, q.len()-1, time.Now()), "index should be 0")
|
||||||
assert.EqualValues(t, -1, q.binarySearch(0, q.len()-1, expiry1), "index should be -1")
|
require.EqualValues(t, -1, q.binarySearch(0, q.len()-1, expiry1), "index should be -1")
|
||||||
|
|
||||||
// 2
|
// 2
|
||||||
expiry2 := time.Now()
|
expiry2 := time.Now()
|
||||||
_ = q.insert(&goWorker{lastUsed: time.Now()})
|
_ = q.insert(&goWorker{lastUsed: time.Now()})
|
||||||
|
|
||||||
assert.EqualValues(t, -1, q.binarySearch(0, q.len()-1, expiry1), "index should be -1")
|
require.EqualValues(t, -1, q.binarySearch(0, q.len()-1, expiry1), "index should be -1")
|
||||||
|
|
||||||
assert.EqualValues(t, 0, q.binarySearch(0, q.len()-1, expiry2), "index should be 0")
|
require.EqualValues(t, 0, q.binarySearch(0, q.len()-1, expiry2), "index should be 0")
|
||||||
|
|
||||||
assert.EqualValues(t, 1, q.binarySearch(0, q.len()-1, time.Now()), "index should be 1")
|
require.EqualValues(t, 1, q.binarySearch(0, q.len()-1, time.Now()), "index should be 1")
|
||||||
|
|
||||||
// more
|
// more
|
||||||
for i := 0; i < 5; i++ {
|
for i := 0; i < 5; i++ {
|
||||||
@ -84,5 +84,5 @@ func TestSearch(t *testing.T) {
|
|||||||
_ = q.insert(&goWorker{lastUsed: time.Now()})
|
_ = q.insert(&goWorker{lastUsed: time.Now()})
|
||||||
}
|
}
|
||||||
|
|
||||||
assert.EqualValues(t, 7, q.binarySearch(0, q.len()-1, expiry3), "index should be 7")
|
require.EqualValues(t, 7, q.binarySearch(0, q.len()-1, expiry3), "index should be 7")
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user