一架梯子,一头程序猿,仰望星空!
Asynq任务队列教程 > 内容正文

Handler详解


在本页面中,我将解释 Handler 接口的设计。

Handler 接口

你提供给服务器运行的 Handler 才是你的异步任务处理逻辑的核心。Handler 的责任是接受一个任务并进行处理,同时需要考虑上下文。如果处理不成功,它应该报告任何错误以便稍后重试任务。

以下是接口定义:

type Handler interface {
    ProcessTask(context.Context, *Task) error
}

这是一个简单的接口,简明扼要地描述了 Handler 的责任。

实现接口

通过多种方式可以实现这个处理器接口。

以下是定义自己的结构体类型来处理任务的示例。

type MyTaskHandler struct {
   // ... fields
}

// 实现 ProcessTask 方法
func (h *MyTaskHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
   // ... 任务处理逻辑
}

甚至可以定义一个函数来满足接口,得益于 HandlerFunc 适配器类型。

func myHandler(ctx context.Context, t *asynq.Task) error {
    // ... 任务处理逻辑
}

// h 满足 Handler 接口
h := asynq.HandlerFunc(myHandler) 

在大多数情况下,你可能需要检查输入任务的 Type 并相应地处理。

func (h *MyTaskHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
   switch t.Type() {
   case "type1":
      // 处理 type1
   case "type2":
      // 处理 type2
   case "typeN":
      // 处理 typeN

   default:
      return fmt.Errorf("意外的任务类型: %q", t.Type())
   }
}

你可以看到,处理器可以由许多不同的处理器组成,上面的示例中的每个 case 可以由一个专门的处理器处理。这就是 ServeMux 类型的用武之地。

使用 ServeMux

注意:你不一定要使用 ServeMux 类型来实现一个处理器,但在许多情况下它可能很有用。
通过 ServeMux,你可以注册多个处理器。它会将每个任务的类型与注册的模式列表进行匹配,并调用与任务类型名称最接近的模式对应的处理器。

mux := asynq.NewServeMux()
mux.Handle("email:welcome", welcomeEmailHandler) // 注册处理器
mux.Handle("email:reminder", reminderEmailHandler)
mux.Handle("email:" defaultEmailHandler) // 用于其他以 "email:" 前缀开头的任务类型的默认处理器

使用中间件

如果你需要在处理器之前和/或之后执行一些代码,可以借助中间件来实现。中间件是一个接受 Handler 并返回 Handler 的函数。
以下是一个记录任务处理开始和结束的中间件的示例。

func loggingMiddleware(h asynq.Handler) asynq.Handler {
    return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
        start := time.Now()
        log.Printf("开始处理 %q", t.Type())
        err := h.ProcessTask(ctx, t)
        if err != nil {
            return err
        }
        log.Printf("完成处理 %q: 经过时间 = %v", t.Type(), time.Since(start))
        return nil
    })
}

现在你可以用这个中间件来"包装"你的处理器。

myHandler = loggingMiddleware(myHandler)

另外,如果你正在使用 ServeMux,你可以像这样提供中间件。

mux := NewServeMux()
mux.Use(loggingMiddleware)

分组中间件

如果你有一个情况,在其中你想将一个中间件应用于一组任务,你可以通过组合多个ServeMux实例来实现。一个限制是每个组中的任务需要在其类型名称中有相同的前缀

示例:
如果你有一些处理订单的任务和一些处理产品的任务,并且你希望对所有的“产品”任务应用一个共享逻辑,对所有的“订单”任务应用另一个共享逻辑,你可以这样实现:

productHandlers := asynq.NewServeMux()
productHandlers.Use(productMiddleware) // 对所有产品任务应用共享逻辑
productHandlers.HandleFunc("product:update", productUpdateTaskHandler)
// ... 注册其他“产品”任务处理函数

orderHandlers := asynq.NewServeMux()
orderHandler.Use(orderMiddleware) // 对所有订单任务应用共享逻辑
orderHandlers.HandleFunc("order:refund", orderRefundTaskHandler)
// ... 注册其他“订单”任务处理函数

// 顶级处理函数
mux := asynq.NewServeMux()
mux.Use(someGlobalMiddleware) // 对所有任务应用共享逻辑
mux.Handle("product:", productHandlers)
mux.Handle("order:", orderHandlers)

if err := srv.Run(mux); err != nil {
    log.Fatal(err)
}