一架梯子,一头程序猿,仰望星空!
Asynq任务队列教程 > 内容正文

任务去重


Asynq中的Unique任务功能可以确保在Redis队列中只有一个任务。 当您希望对任务进行去重,避免重复任务,这个特性就比较有用。

概述

有两种方法可以确保Asynq中任务的唯一性。

  1. 使用TaskID选项:自己生成唯一的任务ID
  2. 使用Unique选项:让Asynq为任务创建唯一性锁

使用TaskID选项

如果选择第一种方法,可以保证在任何时刻只有一个具有给定任务ID的任务。如果尝试使用相同的任务ID入队另一个任务,则会返回ErrTaskIDConflict错误。

// 第一个任务应该没问题
_, err := client.Enqueue(task, asynq.TaskID("mytaskid"))

// 第二个任务将失败,err为ErrTaskIDConflict(假设第一个任务尚未处理)
_, err = client.Enqueue(task, asynq.TaskID("mytaskid"))

使用Unique选项

第二种方法基于唯一性锁。在使用Unique选项入队任务时,Client会检查是否可以为给定任务获取锁。只有在可以获取锁时,任务才会被入队。如果已经有另一个任务持有锁,那么Client将返回一个错误(查看下面的示例代码以了解如何检查错误)。

唯一性锁与TTL(生存时间)相关联,以避免永久持有锁。在TTL后或者在任务在TTL之前成功处理后,锁将被释放。 需要注意的一件重要事情是,Asynq的唯一任务特性是尽力而为的唯一性。换句话说,在任务处理之前如果锁已经过期,可能会入队一个重复的任务。

任务的唯一性基于以下属性:

  • 类型
  • Payload(载荷)
  • 队列

因此,如果存在相同类型和载荷的任务,并且入队到相同的队列,那么在锁被释放之前,不会入队具有相同属性的另一个任务。

c := asynq.NewClient(redis)

t1 := asynq.NewTask("example", []byte("hello"))

// t1将在接下来的一个小时内持有唯一性锁。
err := c.Enqueue(t1, asynq.Unique(time.Hour))
switch {
case errors.Is(err, asynq.ErrDuplicateTask):
    // 处理重复任务
case err != nil:
    // 处理其他错误
}

t2 := asynq.NewTask("example", []byte("hello"))
 
// t2不能入队,因为它是t1的副本。
err = c.Enqueue(t2, asynq.Unique(time.Hour))
switch {
case errors.Is(err, asynq.ErrDuplicateTask):
    // 处理重复任务
case err != nil:
    // 处理其他错误
}

在上面的示例中,t2不会入队,因为它是t1的副本。 可以使用errors.Is来检查返回的error值,以确定它是否包装了asynq.ErrDuplicateTask错误。