Tunny是一个用于生成和管理goroutine池的Golang库,允许您使用同步API来限制来自任意数量的goroutine的工作。
当您的工作来自任意数量的异步源但并行处理能力有限时,固定的goroutine池非常有用。例如,在处理CPU密集型的HTTP请求作业时,您可以创建一个大小与CPU数量相匹配的池。
安装
go get github.com/Jeffail/tunny
或者,使用dep:
dep ensure -add github.com/Jeffail/tunny
使用
对于大多数情况下,您的繁重工作可以用一个简单的func()
来表示,在这种情况下您可以使用NewFunc
。让我们看看如何使用我们的HTTP请求到CPU计数的例子:
package main
import (
"io/ioutil"
"net/http"
"runtime"
"github.com/Jeffail/tunny"
)
func main() {
numCPUs := runtime.NumCPU()
pool := tunny.NewFunc(numCPUs, func(payload interface{}) interface{} {
var result []byte
// TODO: 使用payload进行一些CPU密集型操作
return result
})
defer pool.Close()
http.HandleFunc("/work", func(w http.ResponseWriter, r *http.Request) {
input, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, "Internal error", http.StatusInternalServerError)
}
defer r.Body.Close()
// 将这项工作导入我们的池中。此调用是同步的,并将阻塞直到作业完成。
result := pool.Process(input)
w.Write(result.([]byte))
})
http.ListenAndServe(":8080", nil)
}
Tunny还支持超时。您可以将上面的Process
调用替换为以下代码:
result, err := pool.ProcessTimed(input, time.Second*5)
if err == tunny.ErrJobTimedOut {
http.Error(w, "Request timed out", http.StatusRequestTimeout)
}
您还可以使用请求的上下文(或任何其他上下文)来处理超时和截止时间。只需将Process
调用替换为以下代码:
result, err := pool.ProcessCtx(r.Context(), input)
if err == context.DeadlineExceeded {
http.Error(w, "Request timed out", http.StatusRequestTimeout)
}
修改池大小
可以使用SetSize(int)
在任何时候更改Tunny池的大小。
pool.SetSize(10) // 10个goroutine
pool.SetSize(100) // 100个goroutine
即使其他goroutine仍在处理,这也是安全的。
带状态的Goroutine
有时,在Tunny池中的每个goroutine都需要自己的管理状态。在这种情况下,您应该实现tunny.Worker
,其中包括终止、中断(如果一个作业超时并且不再需要)和阻塞下一个作业分配直到满足某个条件的调用。
在使用Worker
类型创建池时,您需要提供一个构造函数来生成您的自定义实现:
pool := tunny.New(poolSize, func() Worker {
// TODO: 在这里进行每个goroutine的状态分配。
return newCustomWorker()
})
这样,在池大小改变时,Tunny可以清理创建和销毁Worker
类型。
排序
积压的作业不能保证按顺序处理。由于当前通道和选择块的实现,积压的作业堆栈将被作为FIFO队列处理。然而,这种行为不是规范的一部分,不应依赖于它。