一架梯子,一头程序猿,仰望星空!
Asynq任务队列教程 > 内容正文

任务速率限制


配置 asynq 服务器实现任务处理速率限制的示例

本页显示了如何配置 asynq 服务器以实现任务处理的速率限制。

请注意,这是每个服务器实例的速率限制,而不是全局速率限制。

在这个示例中,我们将使用 golang.org/x/time/rate 包来演示速率限制。关键的配置在你初始化服务器时的配置中是 IsFailureRetryDelayFunc。我们将创建一个自定义的错误类型并在 IsFailureRetryDelayFunc 函数中类型断言给定的错误。

package main

import (
    "context"
    "errors"
    "fmt"
    "log"
    "math/rand"
    "time"

    "golang.org/x/time/rate"
    "github.com/hibiken/asynq"
)

func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: ":6379"},
        asynq.Config{
            Concurrency:    10,
            // 如果错误是由于速率限制导致的,请不将错误计为故障。
            IsFailure:      func(err error) bool { return !IsRateLimitError(err) },
            RetryDelayFunc: retryDelay,
        },
    )

    if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
        log.Fatal(err)
    }
}

type RateLimitError struct {
    RetryIn time.Duration
}

func (e *RateLimitError) Error() string {
    return fmt.Sprintf("速率限制(%v 后重试)", e.RetryIn)
}

func IsRateLimitError(err error) bool {
    _, ok := err.(*RateLimitError)
    return ok
}

func retryDelay(n int, err error, task *asynq.Task) time.Duration {
    var ratelimitErr *RateLimitError
    if errors.As(err, &ratelimitErr) {
        return ratelimitErr.RetryIn
    }
    return asynq.DefaultRetryDelayFunc(n, err, task)
}

// 每秒10次事件的速率,最多允许30个事件的突发。
var limiter = rate.NewLimiter(10, 30)

func handler(ctx context.Context, task *asynq.Task) error {
    if !limiter.Allow() {
        return &RateLimitError{
            RetryIn: time.Duration(rand.Intn(10)) * time.Second,
        }
    }
    log.Printf("[*] 处理任务 %s", task.Payload())
    return nil
}