概述
你可以在Server
的同时运行一个Scheduler
来定期处理任务。Scheduler会定期将任务加入队列,然后由集群中可用的工作服务器来执行这些任务。
你需要确保每个调度只有一个Scheduler在运行,否则会出现重复任务。使用集中化的方法意味着不需要同步调度,并且服务可以在不使用锁的情况下运行。
如果需要动态地添加和删除周期性任务,请使用PeriodicTaskManager
而不是直接使用Scheduler
。请参阅此wiki页面以获取更多详细信息。
时区
默认情况下,周期任务使用的是UTC时间,但是你可以使用SchedulerOpts
来更改所使用的时区。
// 例如,使用America/Los_Angeles时区而不是默认的UTC时区。
loc, err := time.LoadLocation("America/Los_Angeles")
if err != nil {
panic(err)
}
scheduler := asynq.NewScheduler(
redisConnOpt,
&asynq.SchedulerOpts{
Location: loc,
},
)
任务记录
为了定期将任务加入队列,你需要向调度器注册一个任务记录。
scheduler := asynq.NewScheduler(redisConnOpt, nil)
task := asynq.NewTask("example_task", nil)
// 你可以使用cron规范字符串来指定调度。
entryID, err := scheduler.Register("* * * * *", task)
if err != nil {
log.Fatal(err)
}
log.Printf("registered an entry: %q\n", entryID)
// 你还可以使用"@every "来指定间隔。
entryID, err = scheduler.Register("@every 30s", task)
if err != nil {
log.Fatal(err)
}
log.Printf("registered an entry: %q\n", entryID)
// 你也可以传递选项。
entryID, err = scheduler.Register("@every 24h", task, asynq.Queue("myqueue"))
if err != nil {
log.Fatal(err)
}
log.Printf("registered an entry: %q\n", entryID)
运行调度器
要启动调度器,请在调度器上调用Run
。
scheduler := asynq.NewScheduler(redisConnOpt, nil)
// ... 注册任务
if err := scheduler.Run(); err != nil {
log.Fatal(err)
}
调用Run
将等待TERM或INT信号(例如Ctrl-C按键)。
错误处理
你可以提供一个处理函数来处理如果调度器无法将任务加入队列的错误。
func handleEnqueueError(task *asynq.Task, opts []asynq.Option, err error) {
// 你的错误处理逻辑
}
scheduler := asynq.NewScheduler(
redisConnOpt,
&asynq.SchedulerOpts{
EnqueueErrorHandler: handleEnqueueError,
},
)
通过CLI进行检查
CLI有一个名为cron
的子命令用于检查调度器记录。
要查看当前正在运行的调度器的所有记录,可以运行以下命令:
asynq cron ls
这个命令将输出一个包含每个记录的ID、计划规范、下次加入队列时间和上次加入队列时间的列表。
你还可以运行以下命令来查看每个记录的历史记录:
asynq cron history