在本页面中,我将解释 Handler
接口的设计。
Handler 接口
你提供给服务器运行的 Handler
才是你的异步任务处理逻辑的核心。Handler 的责任是接受一个任务并进行处理,同时需要考虑上下文。如果处理不成功,它应该报告任何错误以便稍后重试任务。
以下是接口定义:
type Handler interface {
ProcessTask(context.Context, *Task) error
}
这是一个简单的接口,简明扼要地描述了 Handler 的责任。
实现接口
通过多种方式可以实现这个处理器接口。
以下是定义自己的结构体类型来处理任务的示例。
type MyTaskHandler struct {
// ... fields
}
// 实现 ProcessTask 方法
func (h *MyTaskHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
// ... 任务处理逻辑
}
甚至可以定义一个函数来满足接口,得益于 HandlerFunc
适配器类型。
func myHandler(ctx context.Context, t *asynq.Task) error {
// ... 任务处理逻辑
}
// h 满足 Handler 接口
h := asynq.HandlerFunc(myHandler)
在大多数情况下,你可能需要检查输入任务的 Type
并相应地处理。
func (h *MyTaskHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
switch t.Type() {
case "type1":
// 处理 type1
case "type2":
// 处理 type2
case "typeN":
// 处理 typeN
default:
return fmt.Errorf("意外的任务类型: %q", t.Type())
}
}
你可以看到,处理器可以由许多不同的处理器组成,上面的示例中的每个 case 可以由一个专门的处理器处理。这就是 ServeMux
类型的用武之地。
使用 ServeMux
注意:你不一定要使用 ServeMux
类型来实现一个处理器,但在许多情况下它可能很有用。
通过 ServeMux
,你可以注册多个处理器。它会将每个任务的类型与注册的模式列表进行匹配,并调用与任务类型名称最接近的模式对应的处理器。
mux := asynq.NewServeMux()
mux.Handle("email:welcome", welcomeEmailHandler) // 注册处理器
mux.Handle("email:reminder", reminderEmailHandler)
mux.Handle("email:" defaultEmailHandler) // 用于其他以 "email:" 前缀开头的任务类型的默认处理器
使用中间件
如果你需要在处理器之前和/或之后执行一些代码,可以借助中间件来实现。中间件是一个接受 Handler
并返回 Handler
的函数。
以下是一个记录任务处理开始和结束的中间件的示例。
func loggingMiddleware(h asynq.Handler) asynq.Handler {
return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
start := time.Now()
log.Printf("开始处理 %q", t.Type())
err := h.ProcessTask(ctx, t)
if err != nil {
return err
}
log.Printf("完成处理 %q: 经过时间 = %v", t.Type(), time.Since(start))
return nil
})
}
现在你可以用这个中间件来"包装"你的处理器。
myHandler = loggingMiddleware(myHandler)
另外,如果你正在使用 ServeMux
,你可以像这样提供中间件。
mux := NewServeMux()
mux.Use(loggingMiddleware)
分组中间件
如果你有一个情况,在其中你想将一个中间件应用于一组任务,你可以通过组合多个ServeMux
实例来实现。一个限制是每个组中的任务需要在其类型名称中有相同的前缀。
示例:
如果你有一些处理订单的任务和一些处理产品的任务,并且你希望对所有的“产品”任务应用一个共享逻辑,对所有的“订单”任务应用另一个共享逻辑,你可以这样实现:
productHandlers := asynq.NewServeMux()
productHandlers.Use(productMiddleware) // 对所有产品任务应用共享逻辑
productHandlers.HandleFunc("product:update", productUpdateTaskHandler)
// ... 注册其他“产品”任务处理函数
orderHandlers := asynq.NewServeMux()
orderHandler.Use(orderMiddleware) // 对所有订单任务应用共享逻辑
orderHandlers.HandleFunc("order:refund", orderRefundTaskHandler)
// ... 注册其他“订单”任务处理函数
// 顶级处理函数
mux := asynq.NewServeMux()
mux.Use(someGlobalMiddleware) // 对所有任务应用共享逻辑
mux.Handle("product:", productHandlers)
mux.Handle("order:", orderHandlers)
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}