我们建议在生产环境中使用监控工具例如 Prometheus 来监控你的工作进程和队列。
队列指标
如果你使用 Web UI,你可以通过传递两个参数来启用与 Prometheus 的集成:
-
--enable-metrics-exporter
: 启用队列指标的收集,并将其导出到/metrics
端点。 -
--prometheus-addr
: 在 Web UI 内部启用队列指标的可视化。
队列指标页面如下所示:
如果你不使用 Web UI,Asynq 附带了一个二进制文件,你可以运行它来导出队列指标。它还有一个用于收集队列指标的包 x/metrics
。
工作进程指标
Asynq Handler
接口和 ServeMux
可以使用指标跟踪代码进行仪表化。
以下是使用 Prometheus 导出工作进程指标的示例。我们可以在代码中仪表化我们的代码,以跟踪额外的应用特定指标,以及 prometheus 跟踪的默认指标(如内存、CPU)。
这里是示例代码中跟踪的应用特定指标:
- 工作进程处理的任务总数(包括成功和失败的任务)
- 工作进程处理失败的任务数
- 工作进程当前正在处理的任务数
package main
import (
"context"
"log"
"net/http"
"os"
"os/signal"
"runtime"
"github.com/hibiken/asynq"
"github.com/hibiken/asynq/examples/tasks"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"golang.org/x/sys/unix"
)
// 指标变量。
var (
processedCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "processed_tasks_total",
Help: "处理任务的总数",
},
[]string{"task_type"},
)
failedCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "failed_tasks_total",
Help: "处理失败的总次数",
},
[]string{"task_type"},
)
inProgressGauge = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "in_progress_tasks",
Help: "当前正在处理的任务数",
},
[]string{"task_type"},
)
)
func metricsMiddleware(next asynq.Handler) asynq.Handler {
return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
inProgressGauge.WithLabelValues(t.Type()).Inc()
err := next.ProcessTask(ctx, t)
inProgressGauge.WithLabelValues(t.Type()).Dec()
if err != nil {
failedCounter.WithLabelValues(t.Type()).Inc()
}
processedCounter.WithLabelValues(t.Type()).Inc()
return err
})
}
func main() {
httpServeMux := http.NewServeMux()
httpServeMux.Handle("/metrics", promhttp.Handler())
metricsSrv := &http.Server{
Addr: ":2112",
Handler: httpServeMux,
}
done := make(chan struct{})
// 启动指标服务器。
go func() {
err := metricsSrv.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
log.Printf("错误:指标服务器出错:%v", err)
}
close(done)
}()
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: ":6379"},
asynq.Config{Concurrency: 20},
)
mux := asynq.NewServeMux()
mux.Use(metricsMiddleware)
mux.HandleFunc(tasks.TypeEmail, tasks.HandleEmailTask)
// 启动工作服务器。
if err := srv.Start(mux); err != nil {
log.Fatalf("启动工作服务器失败:%v", err)
}
// 等待终止信号。
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, unix.SIGTERM, unix.SIGINT)
}