Asynq是一个Go语言异步任务框架,它以Redis作为消息队列,具备可伸缩性和简易性。
Asynq异步任务解决方案:
- 客户端将任务放入队列
- 服务器从队列中提取任务并为每个任务启动一个工作线程(协程)
- 多个工作协程并行处理任务
任务队列是一种将工作分配到多台机器的机制。系统可以由多个工作服务器和代理组成,实现高可用性和水平扩展。
特性
- 保证至少执行一次任务
- 任务调度
- 重试失败的任务
- 在工作线程崩溃时自动恢复任务
- 加权优先级队列
- 严格优先级队列
- 由于Redis中的写操作快速,添加任务的延迟低
- 使用唯一选项对任务进行去重
- 允许为每个任务设置超时和截止时间
- 允许聚合一组任务以批量执行多个连续操作
- 灵活的处理程序接口,支持中间件
- 允许暂停队列以停止从队列中处理任务
- 周期性任务
- 支持Redis Cluster以自动分片和实现高可用性
- 支持Redis Sentinel实现高可用性
- 与Prometheus集成,以收集和可视化队列指标
- Web界面,用于检查和远程控制队列和任务
- 命令行界面,用于检查和远程控制队列和任务
入门指南
在本教程中,我们将编写两个程序,client
和workers
。
-
client.go
将创建并安排任务,以异步地由后台工作线程处理。 -
workers.go
将启动多个并发的工作线程,处理由客户端创建的任务。
本指南假设您在localhost:6379
上运行Redis服务器。在开始之前,请确保已安装并运行Redis。
让我们首先创建我们的两个主要文件。
mkdir quickstart && cd quickstart
go mod init asynq-quickstart
mkdir client workers
touch client/client.go workers/workers.go
然后安装asynq
包。
go get -u github.com/hibiken/asynq
在开始编写代码之前,让我们回顾一下在这两个程序中将使用的一些核心类型。
Redis连接选项
Asynq使用Redis作为消息代理。client.go
和workers.go
都需要连接到Redis进行读写操作。我们将使用RedisClientOpt
来指定与本地运行的Redis服务器的连接。
redisConnOpt := asynq.RedisClientOpt{
Addr: "localhost:6379",
// 如果不需要密码,可省略
Password: "mypassword",
// 为asynq使用一个专用的数据库编号。
// 默认情况下,Redis提供16个数据库(0..15)
DB: 0,
}
任务
在asynq
中,工作单元封装在称为Task
的类型中,它概念上具有两个字段:Type
和Payload
。
// Type是一个字符串值,用于指示任务的类型。
func (t *Task) Type() string
// Payload是任务执行所需的数据。
func (t *Task) Payload() []byte
现在我们已经看了核心类型,让我们开始编写我们的程序。
客户端程序
在client.go
中,我们将创建一些任务,并使用asynq.Client
对它们进行入队。
要创建一个任务,可以使用NewTask
函数并传入任务的类型和有效负载。
Enqueue
方法接受一个任务和任意数量的选项。使用ProcessIn
或ProcessAt
选项来安排未来处理的任务。
// 与电子邮件相关任务的有效负载。
type EmailTaskPayload struct {
// 电子邮件接收者的ID。
UserID int
}
// client.go
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
// 创建带有类型名称和有效负载的任务。
payload, err := json.Marshal(EmailTaskPayload{UserID: 42})
if err != nil {
log.Fatal(err)
}
t1 := asynq.NewTask("email:welcome", payload)
t2 := asynq.NewTask("email:reminder", payload)
// 立即处理任务。
info, err := client.Enqueue(t1)
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] 成功将任务加入队列:%+v", info)
// 在24小时后处理任务。
info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] 成功将任务加入队列:%+v", info)
}
这就是我们客户端程序所需要的全部内容。
Workers程序
在workers.go
中,我们将创建一个asynq.Server
实例来启动workers。
NewServer
函数接受RedisConnOpt
和Config
作为参数。
Config
用于调整服务器的任务处理行为。
你可以查看Config
文档以了解所有可用的配置选项。
为了简单起见,我们在这个例子中只指定并发数。
// workers.go
func main() {
srv := asynq.NewServe(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
// 注意: 在下面的部分中,我们将介绍`handler`是什么。
if err := srv.Run(handler); err != nil {
log.Fatal(err)
}
}
(*Server).Run
方法的参数是一个接口asynq.Handler
,它有一个方法ProcessTask
。
type Handler interface {
// 如果任务成功处理,ProcessTask应返回nil。
// 如果ProcessTask返回一个非nil错误或导致panic,任务将稍后重试。
ProcessTask(context.Context, *Task) error
}
实现一个handler最简单的方法是定义一个具有相同签名的函数,并在将其传递给Run
时使用asynq.HandlerFunc
适配器类型。
func handler(ctx context.Context, t *asynq.Task) error {
switch t.Type() {
case "email:welcome":
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] 给用户 %d 发送欢迎邮件", p.UserID)
case "email:reminder":
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] 给用户 %d 发送提醒邮件", p.UserID)
default:
return fmt.Errorf("意外的任务类型:%s", t.Type())
}
return nil
}
func main() {
srv := asynq.NewServe(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
// 使用asynq.HandlerFunc适配器来处理函数
if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
log.Fatal(err)
}
}
我们可以继续为这个handler函数添加switch case,但在一个实际的应用中,将每个case的逻辑定义在一个单独的函数中会更方便。
为了重构我们的代码,让我们使用ServeMux
来创建我们的handler。就像来自"net/http"
包的ServeMux
一样,你可以通过调用Handle
或HandleFunc
来注册一个handler。ServeMux
满足Handler
接口,所以可以将其传递给(*Server).Run
。
// workers.go
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
mux := asynq.NewServeMux()
mux.HandleFunc("email:welcome", sendWelcomeEmail)
mux.HandleFunc("email:reminder", sendReminderEmail)
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}
}
func sendWelcomeEmail(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] 给用户 %d 发送欢迎邮件", p.UserID)
return nil
}
func sendReminderEmail(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] 给用户 %d 发送提醒邮件", p.UserID)
return nil
}
现在,我们已将每个任务类型的处理函数提取出来,代码看起来更加有组织性。
然而,代码还是有点太隐晦了,我们有这些任务类型和负载类型的字符串值,应该将它们封装在一个有机的包中。让我们重构我们的代码,编写一个封装任务创建和处理的包。我们简单地创建一个名为task
的包。
mkdir task && touch task/task.go
package task
import (
"context"
"fmt"
"github.com/hibiken/asynq"
)
// 任务类型的列表。
const (
TypeWelcomeEmail = "email:welcome"
TypeReminderEmail = "email:reminder"
)
// 与任何与电子邮件相关的任务相关的任务负载。
type emailTaskPayload struct {
// 电子邮件收件人的ID。
UserID int
}
func NewWelcomeEmailTask(id int) (*asynq.Task, error) {
payload, err := json.Marshal(emailTaskPayload{UserID: id})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeWelcomeEmail, payload), nil
}
func NewReminderEmailTask(id int) (*asynq.Task, error) {
payload, err := json.Marshal(emailTaskPayload{UserID: id})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeReminderEmail, payload), nil
}
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p emailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] 向用户 %d 发送欢迎电子邮件", p.UserID)
return nil
}
func HandleReminderEmailTask(ctx context.Context, t *asynq.Task) error {
var p emailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] 向用户 %d 发送提醒电子邮件", p.UserID)
return nil
}
现在我们可以在client.go
和workers.go
中导入这个包了。
// client.go
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
t1, err := task.NewWelcomeEmailTask(42)
if err != nil {
log.Fatal(err)
}
t2, err := task.NewReminderEmailTask(42)
if err != nil {
log.Fatal(err)
}
// 立即处理任务。
info, err := client.Enqueue(t1)
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] 任务成功入队列:%+v", info)
// 24小时后处理任务。
info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] 任务成功入队列:%+v", info)
}
// workers.go
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
mux := asynq.NewServeMux()
mux.HandleFunc(task.TypeWelcomeEmail, task.HandleWelcomeEmailTask)
mux.HandleFunc(task.TypeReminderEmail, task.HandleReminderEmailTask)
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}
}
现在代码看起来更好了!
运行程序
现在我们有了client
和workers
,我们可以运行这两个程序。让我们先运行client
程序来创建和调度任务。
go run client/client.go
这将创建两个任务:一个立即处理的任务和一个在24小时后处理的任务。
让我们使用asynq
的命令行界面来检查任务。
asynq dash
您应该能够看到一个任务处于Enqueued状态,另一个任务处于Scheduled状态。
注意:要了解每个状态的含义,请查看任务的生命周期。
最后,让我们启动workers
程序来处理任务。
go run workers/workers.go
注意:该程序不会退出,直到您发送一个信号来终止程序。有关如何安全地终止后台工作者的最佳实践,请参阅信号Wiki页面。
您应该能够在终端中看到一些文本输出,表示任务已成功处理。
您可以再次运行client
程序,看看工作者如何接受任务并处理它们。
任务重试
一个任务在第一次尝试时无法成功处理并不罕见。默认情况下,失败的任务将使用指数退避重试25次。让我们更新我们的处理程序,返回一个错误以模拟一个不成功的情况。
// tasks.go
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p emailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] 尝试发送欢迎邮件给用户 %d...", p.UserID)
return fmt.Errorf("无法发送邮件给用户")
}
让我们重新启动我们的workers程序并加入一个任务。
go run workers/workers.go
go run client/client.go
如果你正在运行asynq dash
,你应该能够看到一个任务处于Retry状态(通过导航到队列详情视图并高亮“retry”选项卡)。
要检查哪些任务处于重试状态,您还可以运行
asynq task ls --queue=default --state=retry
这将列出所有将来将被重试的任务。输出包括任务下一次执行的预计时间。
一旦一个任务耗尽它的重试次数,任务将转为Archived状态,并且将不会再次重试(您仍然可以使用CLI或WebUI工具手动运行存档的任务)。
在结束本教程之前,让我们修复我们的处理程序。
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p emailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] 给用户 %d 发送欢迎邮件", p.UserID)
return nil
}
现在我们修复了处理程序,在下一次尝试中任务将被成功处理 :)