# ants
[英文](README.md) | [项目博客](https://taohuawu.club/high-performance-implementation-of-goroutine-pool) `ants`是一个高性能的协程池,实现了对大规模 goroutine 的调度管理、goroutine 复用,允许使用者在开发并发程序的时候限制协程数量,复用资源,达到更高效执行任务的效果。 ## 功能: - 实现了自动调度并发的 goroutine,复用 goroutine - 定时清理过期的 goroutine,进一步节省资源 - 提供了友好的接口:任务提交、获取运行中的协程数量、动态调整协程池大小 - 优雅处理 panic,防止程序崩溃 - 资源复用,极大节省内存使用量;在大规模批量并发任务场景下比原生 goroutine 并发具有更高的性能 ## 目前测试通过的Golang版本: - 1.8.x - 1.9.x - 1.10.x - 1.11.x - 1.12.x ## 安装 ``` sh go get -u github.com/panjf2000/ants ``` ## 使用 写 go 并发程序的时候如果程序会启动大量的 goroutine ,势必会消耗大量的系统资源(内存,CPU),通过使用 `ants`,可以实例化一个协程池,复用 goroutine ,节省资源,提升性能: ``` go package main import ( "fmt" "sync" "sync/atomic" "time" "github.com/panjf2000/ants/v2" ) var sum int32 func myFunc(i interface{}) { n := i.(int32) atomic.AddInt32(&sum, n) fmt.Printf("run with %d\n", n) } func demoFunc() { time.Sleep(10 * time.Millisecond) fmt.Println("Hello World!") } func main() { defer ants.Release() runTimes := 1000 // Use the common pool. var wg sync.WaitGroup syncCalculateSum := func() { demoFunc() wg.Done() } for i := 0; i < runTimes; i++ { wg.Add(1) _ = ants.Submit(syncCalculateSum) } wg.Wait() fmt.Printf("running goroutines: %d\n", ants.Running()) fmt.Printf("finish all tasks.\n") // Use the pool with a function, // set 10 to the capacity of goroutine pool and 1 second for expired duration. p, _ := ants.NewPoolWithFunc(10, func(i interface{}) { myFunc(i) wg.Done() }) defer p.Release() // Submit tasks one by one. for i := 0; i < runTimes; i++ { wg.Add(1) _ = p.Invoke(int32(i)) } wg.Wait() fmt.Printf("running goroutines: %d\n", p.Running()) fmt.Printf("finish all tasks, result is %d\n", sum) } ``` ## 与 http server 集成 ```go package main import ( "io/ioutil" "net/http" "github.com/panjf2000/ants/v2" ) type Request struct { Param []byte Result chan []byte } func main() { pool, _ := ants.NewPoolWithFunc(100000, func(payload interface{}) { request, ok := payload.(*Request) if !ok { return } reverseParam := func(s []byte) []byte { for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 { s[i], s[j] = s[j], s[i] } return s }(request.Param) request.Result <- reverseParam }) defer pool.Release() http.HandleFunc("/reverse", func(w http.ResponseWriter, r *http.Request) { param, err := ioutil.ReadAll(r.Body) if err != nil { http.Error(w, "request error", http.StatusInternalServerError) } defer r.Body.Close() request := &Request{Param: param, Result: make(chan []byte)} // Throttle the requests traffic with ants pool. This process is asynchronous and // you can receive a result from the channel defined outside. if err := pool.Invoke(request); err != nil { http.Error(w, "throttle limit error", http.StatusInternalServerError) } w.Write(<-request.Result) }) http.ListenAndServe(":8080", nil) } ``` ## Pool 配置 ```go type Options struct { // ExpiryDuration set the expired time (second) of every worker. ExpiryDuration time.Duration // PreAlloc indicate whether to make memory pre-allocation when initializing Pool. PreAlloc bool // Max number of goroutine blocking on pool.Submit. // 0 (default value) means no such limit. MaxBlockingTasks int // When Nonblocking is true, Pool.Submit will never be blocked. // ErrPoolOverload will be returned when Pool.Submit cannot be done at once. // When Nonblocking is true, MaxBlockingTasks is inoperative. Nonblocking bool // PanicHandler is used to handle panics from each worker goroutine. // if nil, panics will be thrown out again from worker goroutines. PanicHandler func(interface{}) } func WithOptions(options Options) Option { return func(opts *Options) { *opts = options } } func WithExpiryDuration(expiryDuration time.Duration) Option { return func(opts *Options) { opts.ExpiryDuration = expiryDuration } } func WithPreAlloc(preAlloc bool) Option { return func(opts *Options) { opts.PreAlloc = preAlloc } } func WithMaxBlockingTasks(maxBlockingTasks int) Option { return func(opts *Options) { opts.MaxBlockingTasks = maxBlockingTasks } } func WithNonblocking(nonblocking bool) Option { return func(opts *Options) { opts.Nonblocking = nonblocking } } func WithPanicHandler(panicHandler func(interface{})) Option { return func(opts *Options) { opts.PanicHandler = panicHandler } } ``` 通过在调用`NewPool`/`NewPoolWithFunc`之时使用各种 optional function,可以设置`ants.Options`中各个配置项的值,然后用它来定制化 goroutine pool. ## 自定义池 `ants`支持实例化使用者自己的一个 Pool ,指定具体的池容量;通过调用 `NewPool` 方法可以实例化一个新的带有指定容量的 Pool ,如下: ``` go // Set 10000 the size of goroutine pool p, _ := ants.NewPool(10000) ``` ## 任务提交 提交任务通过调用 `ants.Submit(func())`方法: ```go ants.Submit(func(){}) ``` ## 动态调整协程池容量 需要动态调整协程池容量可以通过调用`Tune(int)`: ``` go pool.Tune(1000) // Tune its capacity to 1000 pool.Tune(100000) // Tune its capacity to 100000 ``` 该方法是线程安全的。 ## 预先分配 goroutine 队列内存 `ants`允许你预先把整个池的容量分配内存, 这个功能可以在某些特定的场景下提高协程池的性能。比如, 有一个场景需要一个超大容量的池,而且每个 goroutine 里面的任务都是耗时任务,这种情况下,预先分配 goroutine 队列内存将会减少 re-slice 时的复制内存损耗。 ```go // ants will pre-malloc the whole capacity of pool when you invoke this function p, _ := ants.NewPool(100000, ants.WithPreAlloc(true)) ``` ## 销毁协程池 ```go pool.Release() ``` ## Benchmarks