一架梯子,一头程序猿,仰望星空!
Asynq任务队列教程 > 内容正文

Redis Cluster


本页面介绍了如何在Asynq中使用Redis Cluster作为消息代理。

使用Redis Cluster的优点

通过Redis Cluster,您可以获得以下优势:

  • 能够将数据轻松地分片到多个Redis节点上
  • 在某些节点故障时,仍能保持可用
  • 能够自动进行故障切换

概览

群集队列图示

Asynq根据队列对数据进行分片。 在上图中,我们有一个6个实例的Redis Cluster(3个主节点,3个从节点)和4个队列(q1、q2、q3、q4)。

  • Master1(以及它的副本Slave1)托管q1和q2。
  • Master2(以及它的副本Slave2)托管q3。
  • Master3(以及它的副本Slave3)托管q4。

当您使用asynq.Client将任务加入队列时,您可以使用Queue选项指定队列。 被加入队列的任务将由从这些队列拉取任务的asynq.Server(s)消费。

教程

在本节中,我们将介绍如何使用Redis Cluster作为Asynq的消息代理。我们假设您在端口7000-7005上运行了一个包含6个Redis实例的集群。下面是一个示例的redis.conf文件:

port 7000
cluster-enabled yes
cluster-config-file nodes.conf
cluster-node-timeout 5000
appendonly yes

接下来,我们将创建两个二进制文件:client和worker。

go mod init asynq-redis-cluster-quickstart
mkdir client worker
touch client/client.go worker/worker.go

client.go中,我们将创建一个新的asynq.Client并通过传递RedisClusterClientOpt来指定如何连接到Redis Cluster。

client := asynq.NewClient(asynq.RedisClusterClientOpt{
    Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})

一旦我们拥有了客户端,我们将创建任务并将它们加入到三个不同的队列中:

  • notifications
  • webhooks
  • images
// client.go

package main

import (
    "fmt"
    "log"

    "github.com/hibiken/asynq"
)

// 队列名称列表
const (
     QueueNotifications = "notifications"
     QueueWebhooks      = "webhooks"
     QueueImages        = "images"
)

func main() {
    client := asynq.NewClient(asynq.RedisClusterClientOpt{
        Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
    })
    defer client.Close()

    // 创建"notifications:email"任务并将其加入到"notifications"队列中。
    task := asynq.NewTask("notifications:email", map[string]interface{}{"to": 123, "from": 456})
    res, err := client.Enqueue(task, asynq.Queue(QueueNotifications))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("成功加入队列: %+v\n", res)

    // 创建"webhooks:sync"任务并将其加入到"webhooks"队列中。
    task = asynq.NewTask("webhooks:sync", map[string]interface{}{"data": 123})
    res, err = client.Enqueue(task, asynq.Queue(QueueWebhooks))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("成功加入队列: %+v\n", res)

    // 创建"images:resize"任务并将其加入到"images"队列中。
    task = asynq.NewTask("images:resize", map[string]interface{}{"src": "some/path/to/image"})
    res, err = client.Enqueue(task, asynq.Queue(QueueImages))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("成功加入队列: %+v\n", res)
}

让我们运行这个程序以添加三个任务到队列中。

go run client/client.go

现在,让我们转到worker来处理这三个任务。在worker.go中,我们将创建一个asynq.Server来从这三个队列中消费任务。同样地,我们将使用RedisClusterClientOpt连接到我们的Redis Cluster。 翻译结果:

// worker.go

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/hibiken/asynq"
)

func main() {
	redisConnOpt := asynq.RedisClusterClientOpt{Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"}}
	srv := asynq.NewServer(redisConnOpt, asynq.Config{
		Concurrency: 20,
		// 在这里给每个队列设置相同的优先级
		Queues: map[string]int{
			"notifications": 1,
			"webhooks":      1,
			"images":        1,
		},
	})
	mux := asynq.NewServeMux()
	mux.HandleFunc("notifications:email", handleEmailTask)
	mux.HandleFunc("webhooks:sync", handleWebhookSyncTask)
	mux.HandleFunc("images:resize", handleImageResizeTask)
	if err := srv.Run(mux); err != nil {
		log.Fatalf("无法启动服务器:%v", err)
	}
}

func handleEmailTask(ctx context.Context, t *asynq.Task) error {
	to, err := t.Payload.GetInt("to")
	if err != nil {
		return err
	}
	from, err := t.Payload.GetInt("from")
	if err != nil {
		return err
	}
	fmt.Printf("从 %d 发送邮件给 %d\n", from, to)
	return nil
}

func handleWebhookSyncTask(ctx context.Context, t *asynq.Task) error {
	data, err := t.Payload.GetInt("data")
	if err != nil {
		return err
	}
	fmt.Printf("处理 Webhook 任务:%d\n", data)
	return nil
}

func handleImageResizeTask(ctx context.Context, t *asynq.Task) error {
	src, err := t.Payload.GetString("src")
	if err != nil {
		return err
	}
	fmt.Printf("调整图像大小:%s\n", src)
	return nil
}

让我们运行此工作服务器来处理我们之前创建的三个任务。

go run worker/worker.go

您应该能够看到从每个处理程序打印的消息。

Redis节点和队列

如概述部分所述,Asynq根据队列将数据进行分片。所有加入同一队列的任务属于同一个Redis节点。那么哪个Redis节点托管哪个队列呢?

我们可以使用CLI来回答这个问题。

asynq queue ls --cluster

这个命令将打印一个队列列表,以及:

  • 队列所属的集群节点
  • 队列映射到的集群哈希槽

输出可能类似于这样:

Queue          Cluster KeySlot  Cluster Nodes
-----          ---------------  -------------
images         9450             [{d54231bccd6c1765ea15caf95a41c67b10b91e58 127.0.0.1:7001} {70a7d4569eac28eed577ee91863703ffab98d2e0 127.0.0.1:7005}]
webhooks       4418             [{d58959f6057ad0911d92d86d1d16dc2242e9ec48 127.0.0.1:7004} {e2fb9f1296a8d3a49818e0f9be3bfd74fdc052ea 127.0.0.1:7000}]
notifications  16340            [{c738a8a98c5f5f9161e9563fa739f9c8191b7f1a 127.0.0.1:7002} {18cdaa0712191d74656f08017371df41eeaad5fa 127.0.0.1:7003}]

您可以运行 redis-cli --cluster reshard 命令将队列从一个节点移动到另一个节点。请注意,在重新分片期间,由于Asynq使用多键操作,某些操作可能在一段时间内不可用。