Asynq中的Unique任务功能可以确保在Redis队列中只有一个任务。 当您希望对任务进行去重,避免重复任务,这个特性就比较有用。
概述
有两种方法可以确保Asynq中任务的唯一性。
- 使用
TaskID
选项:自己生成唯一的任务ID - 使用
Unique
选项:让Asynq为任务创建唯一性锁
使用TaskID
选项
如果选择第一种方法,可以保证在任何时刻只有一个具有给定任务ID的任务。如果尝试使用相同的任务ID入队另一个任务,则会返回ErrTaskIDConflict
错误。
// 第一个任务应该没问题
_, err := client.Enqueue(task, asynq.TaskID("mytaskid"))
// 第二个任务将失败,err为ErrTaskIDConflict(假设第一个任务尚未处理)
_, err = client.Enqueue(task, asynq.TaskID("mytaskid"))
使用Unique
选项
第二种方法基于唯一性锁。在使用Unique
选项入队任务时,Client
会检查是否可以为给定任务获取锁。只有在可以获取锁时,任务才会被入队。如果已经有另一个任务持有锁,那么Client
将返回一个错误(查看下面的示例代码以了解如何检查错误)。
唯一性锁与TTL(生存时间)相关联,以避免永久持有锁。在TTL后或者在任务在TTL之前成功处理后,锁将被释放。 需要注意的一件重要事情是,Asynq的唯一任务特性是尽力而为的唯一性。换句话说,在任务处理之前如果锁已经过期,可能会入队一个重复的任务。
任务的唯一性基于以下属性:
- 类型
- Payload(载荷)
- 队列
因此,如果存在相同类型和载荷的任务,并且入队到相同的队列,那么在锁被释放之前,不会入队具有相同属性的另一个任务。
c := asynq.NewClient(redis)
t1 := asynq.NewTask("example", []byte("hello"))
// t1将在接下来的一个小时内持有唯一性锁。
err := c.Enqueue(t1, asynq.Unique(time.Hour))
switch {
case errors.Is(err, asynq.ErrDuplicateTask):
// 处理重复任务
case err != nil:
// 处理其他错误
}
t2 := asynq.NewTask("example", []byte("hello"))
// t2不能入队,因为它是t1的副本。
err = c.Enqueue(t2, asynq.Unique(time.Hour))
switch {
case errors.Is(err, asynq.ErrDuplicateTask):
// 处理重复任务
case err != nil:
// 处理其他错误
}
在上面的示例中,t2
不会入队,因为它是t1
的副本。
可以使用errors.Is
来检查返回的error
值,以确定它是否包装了asynq.ErrDuplicateTask
错误。