配置 asynq 服务器实现任务处理速率限制的示例
本页显示了如何配置 asynq 服务器以实现任务处理的速率限制。
请注意,这是每个服务器实例的速率限制,而不是全局速率限制。
在这个示例中,我们将使用 golang.org/x/time/rate
包来演示速率限制。关键的配置在你初始化服务器时的配置中是 IsFailure
和 RetryDelayFunc
。我们将创建一个自定义的错误类型并在 IsFailure
和 RetryDelayFunc
函数中类型断言给定的错误。
package main
import (
"context"
"errors"
"fmt"
"log"
"math/rand"
"time"
"golang.org/x/time/rate"
"github.com/hibiken/asynq"
)
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: ":6379"},
asynq.Config{
Concurrency: 10,
// 如果错误是由于速率限制导致的,请不将错误计为故障。
IsFailure: func(err error) bool { return !IsRateLimitError(err) },
RetryDelayFunc: retryDelay,
},
)
if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
log.Fatal(err)
}
}
type RateLimitError struct {
RetryIn time.Duration
}
func (e *RateLimitError) Error() string {
return fmt.Sprintf("速率限制(%v 后重试)", e.RetryIn)
}
func IsRateLimitError(err error) bool {
_, ok := err.(*RateLimitError)
return ok
}
func retryDelay(n int, err error, task *asynq.Task) time.Duration {
var ratelimitErr *RateLimitError
if errors.As(err, &ratelimitErr) {
return ratelimitErr.RetryIn
}
return asynq.DefaultRetryDelayFunc(n, err, task)
}
// 每秒10次事件的速率,最多允许30个事件的突发。
var limiter = rate.NewLimiter(10, 30)
func handler(ctx context.Context, task *asynq.Task) error {
if !limiter.Allow() {
return &RateLimitError{
RetryIn: time.Duration(rand.Intn(10)) * time.Second,
}
}
log.Printf("[*] 处理任务 %s", task.Payload())
return nil
}