本页面介绍了如何在Asynq中使用Redis Cluster作为消息代理。
使用Redis Cluster的优点
通过Redis Cluster,您可以获得以下优势:
- 能够将数据轻松地分片到多个Redis节点上
- 在某些节点故障时,仍能保持可用
- 能够自动进行故障切换
概览
Asynq根据队列对数据进行分片。 在上图中,我们有一个6个实例的Redis Cluster(3个主节点,3个从节点)和4个队列(q1、q2、q3、q4)。
- Master1(以及它的副本Slave1)托管q1和q2。
- Master2(以及它的副本Slave2)托管q3。
- Master3(以及它的副本Slave3)托管q4。
当您使用asynq.Client
将任务加入队列时,您可以使用Queue
选项指定队列。
被加入队列的任务将由从这些队列拉取任务的asynq.Server
(s)消费。
教程
在本节中,我们将介绍如何使用Redis Cluster作为Asynq的消息代理。我们假设您在端口7000-7005上运行了一个包含6个Redis实例的集群。下面是一个示例的redis.conf
文件:
port 7000
cluster-enabled yes
cluster-config-file nodes.conf
cluster-node-timeout 5000
appendonly yes
接下来,我们将创建两个二进制文件:client和worker。
go mod init asynq-redis-cluster-quickstart
mkdir client worker
touch client/client.go worker/worker.go
在client.go
中,我们将创建一个新的asynq.Client
并通过传递RedisClusterClientOpt
来指定如何连接到Redis Cluster。
client := asynq.NewClient(asynq.RedisClusterClientOpt{
Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})
一旦我们拥有了客户端,我们将创建任务并将它们加入到三个不同的队列中:
- notifications
- webhooks
- images
// client.go
package main
import (
"fmt"
"log"
"github.com/hibiken/asynq"
)
// 队列名称列表
const (
QueueNotifications = "notifications"
QueueWebhooks = "webhooks"
QueueImages = "images"
)
func main() {
client := asynq.NewClient(asynq.RedisClusterClientOpt{
Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})
defer client.Close()
// 创建"notifications:email"任务并将其加入到"notifications"队列中。
task := asynq.NewTask("notifications:email", map[string]interface{}{"to": 123, "from": 456})
res, err := client.Enqueue(task, asynq.Queue(QueueNotifications))
if err != nil {
log.Fatal(err)
}
fmt.Printf("成功加入队列: %+v\n", res)
// 创建"webhooks:sync"任务并将其加入到"webhooks"队列中。
task = asynq.NewTask("webhooks:sync", map[string]interface{}{"data": 123})
res, err = client.Enqueue(task, asynq.Queue(QueueWebhooks))
if err != nil {
log.Fatal(err)
}
fmt.Printf("成功加入队列: %+v\n", res)
// 创建"images:resize"任务并将其加入到"images"队列中。
task = asynq.NewTask("images:resize", map[string]interface{}{"src": "some/path/to/image"})
res, err = client.Enqueue(task, asynq.Queue(QueueImages))
if err != nil {
log.Fatal(err)
}
fmt.Printf("成功加入队列: %+v\n", res)
}
让我们运行这个程序以添加三个任务到队列中。
go run client/client.go
现在,让我们转到worker来处理这三个任务。在worker.go
中,我们将创建一个asynq.Server
来从这三个队列中消费任务。同样地,我们将使用RedisClusterClientOpt
连接到我们的Redis Cluster。
翻译结果:
// worker.go
package main
import (
"context"
"fmt"
"log"
"github.com/hibiken/asynq"
)
func main() {
redisConnOpt := asynq.RedisClusterClientOpt{Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"}}
srv := asynq.NewServer(redisConnOpt, asynq.Config{
Concurrency: 20,
// 在这里给每个队列设置相同的优先级
Queues: map[string]int{
"notifications": 1,
"webhooks": 1,
"images": 1,
},
})
mux := asynq.NewServeMux()
mux.HandleFunc("notifications:email", handleEmailTask)
mux.HandleFunc("webhooks:sync", handleWebhookSyncTask)
mux.HandleFunc("images:resize", handleImageResizeTask)
if err := srv.Run(mux); err != nil {
log.Fatalf("无法启动服务器:%v", err)
}
}
func handleEmailTask(ctx context.Context, t *asynq.Task) error {
to, err := t.Payload.GetInt("to")
if err != nil {
return err
}
from, err := t.Payload.GetInt("from")
if err != nil {
return err
}
fmt.Printf("从 %d 发送邮件给 %d\n", from, to)
return nil
}
func handleWebhookSyncTask(ctx context.Context, t *asynq.Task) error {
data, err := t.Payload.GetInt("data")
if err != nil {
return err
}
fmt.Printf("处理 Webhook 任务:%d\n", data)
return nil
}
func handleImageResizeTask(ctx context.Context, t *asynq.Task) error {
src, err := t.Payload.GetString("src")
if err != nil {
return err
}
fmt.Printf("调整图像大小:%s\n", src)
return nil
}
让我们运行此工作服务器来处理我们之前创建的三个任务。
go run worker/worker.go
您应该能够看到从每个处理程序打印的消息。
Redis节点和队列
如概述部分所述,Asynq根据队列将数据进行分片。所有加入同一队列的任务属于同一个Redis节点。那么哪个Redis节点托管哪个队列呢?
我们可以使用CLI来回答这个问题。
asynq queue ls --cluster
这个命令将打印一个队列列表,以及:
- 队列所属的集群节点
- 队列映射到的集群哈希槽
输出可能类似于这样:
Queue Cluster KeySlot Cluster Nodes
----- --------------- -------------
images 9450 [{d54231bccd6c1765ea15caf95a41c67b10b91e58 127.0.0.1:7001} {70a7d4569eac28eed577ee91863703ffab98d2e0 127.0.0.1:7005}]
webhooks 4418 [{d58959f6057ad0911d92d86d1d16dc2242e9ec48 127.0.0.1:7004} {e2fb9f1296a8d3a49818e0f9be3bfd74fdc052ea 127.0.0.1:7000}]
notifications 16340 [{c738a8a98c5f5f9161e9563fa739f9c8191b7f1a 127.0.0.1:7002} {18cdaa0712191d74656f08017371df41eeaad5fa 127.0.0.1:7003}]
您可以运行 redis-cli --cluster reshard
命令将队列从一个节点移动到另一个节点。请注意,在重新分片期间,由于Asynq使用多键操作,某些操作可能在一段时间内不可用。