简介
中间件用于扩展事件框架,增加自定义功能,提供了对主处理程序的逻辑不相关但重要的功能。例如,在返回错误后重试处理程序,或者在处理程序中从恐慌中恢复并捕获堆栈跟踪。
中间件函数签名定义如下:
完整源码:github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// HandlerMiddleware 允许我们编写类似于处理程序的装饰器。
// 它可以在处理程序之前执行某些操作(例如:修改消费的消息)
// 还可以在处理程序之后执行某些操作(修改产生的消息、ACK/NACK消费的消息、处理错误、日志记录等)。
//
// 它可以通过使用`AddMiddleware`方法将其附加到路由器上。
//
// 示例:
//
// func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
// return func(message *message.Message) ([]*message.Message, error) {
// fmt.Println("执行处理程序之前")
// producedMessages, err := h(message)
// fmt.Println("执行处理程序之后")
//
// return producedMessages, err
// }
// }
type HandlerMiddleware func(h HandlerFunc) HandlerFunc
// ...
用法
中间件可以应用于路由器中的所有处理程序,也可以应用于特定的处理程序。当中间件直接添加到路由器时,它将应用于所有为路由器提供的处理程序。如果一个中间件只应用于特定的处理程序,它需要被添加到路由器中的处理程序上。
下面是一个示例用法:
完整源码:github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// ...
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// 当接收到SIGTERM信号时,SignalsHandler将优雅地关闭路由器。
// 你也可以通过调用`r.Close()`来关闭路由器。
router.AddPlugin(plugin.SignalsHandler)
// 路由器级别的中间件将在发送到路由器的每条消息上执行
router.AddMiddleware(
// CorrelationID将从传入消息的元数据复制关联ID到生成的消息中
middleware.CorrelationID,
// 如果处理函数返回错误,则会重试该处理函数。
// 最多重试MaxRetries次,之后该消息将被Nacked,由PubSub重新发送。
middleware.Retry{
MaxRetries: 3,
InitialInterval: time.Millisecond * 100,
Logger: logger,
}.Middleware,
// Recoverer处理处理函数中的恐慌。
// 在这种情况下,它将它们作为错误传递给Retry中间件。
middleware.Recoverer,
)
// 为了简单起见,我们在这里使用gochannel Pub/Sub,
// 你可以将它替换为任何Pub/Sub实现,它将工作一样。
pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
// 在后台发布一些传入的消息
go publishMessages(pubSub)
// AddHandler返回一个处理程序,可以用于添加处理程序级别的中间件
// 或停止处理程序。
handler := router.AddHandler(
"struct_handler", // 处理程序名称,必须是唯一的
"incoming_messages_topic", // 我们将要读取事件的话题
pubSub,
"outgoing_messages_topic", // 我们将要发布事件的话题
pubSub,
structHandler{}.Handler,
)
// 处理程序级别的中间件仅对特定的处理程序执行
// 这样的中间件可以和路由器级别的中间件以同样的方式添加
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("对 ", message.UUID, " 执行处理程序特定的中间件")
return h(message)
}
})
// 仅用于调试,我们打印所有在`incoming_messages_topic`接收到的消息
router.AddNoPublisherHandler(
"print_incoming_messages",
"incoming_messages_topic",
pubSub,
printMessages,
)
// 仅用于调试,我们打印所有发送到`outgoing_messages_topic`的事件
router.AddNoPublisherHandler(
"print_outgoing_messages",
"outgoing_messages_topic",
pubSub,
printMessages,
)
// 现在所有的处理程序都已经注册,我们可以运行路由器了。
// Run会阻塞,直到路由器停止运行。
// ...
可用的中间件
下列是Watermill提供的可复用中间件,您也可以轻松实现自己的中间件。例如,如果您想以某种类型的日志形式存储每个接收到的消息,那么这是最好的方式。
断路器
// CircuitBreaker是一个将处理程序包装在断路器中的中间件。
// 基于配置,如果处理程序持续返回错误,断路器将会快速失败。
// 这对于防止级联故障很有用。
type CircuitBreaker struct {
cb *gobreaker.CircuitBreaker
}
// NewCircuitBreaker返回一个新的CircuitBreaker中间件。
// 有关可用设置,请参考gobreaker文档。
func NewCircuitBreaker(settings gobreaker.Settings) CircuitBreaker {
return CircuitBreaker{
cb: gobreaker.NewCircuitBreaker(settings),
}
}
// Middleware返回CircuitBreaker中间件。
func (c CircuitBreaker) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
out, err := c.cb.Execute(func() (interface{}, error) {
return h(msg)
})
var result []*message.Message
if out != nil {
result = out.([]*message.Message)
}
return result, err
}
}
关联
// SetCorrelationID为消息设置关联ID。
//
// 当消息进入系统时,应调用SetCorrelationID。
// 当消息在请求中产生时(例如HTTP),消息关联ID应与请求的关联ID相同。
func SetCorrelationID(id string, msg *message.Message) {
if MessageCorrelationID(msg) != "" {
return
}
msg.Metadata.Set(CorrelationIDMetadataKey, id)
}
// MessageCorrelationID从消息中返回关联ID。
func MessageCorrelationID(message *message.Message) string {
return message.Metadata.Get(CorrelationIDMetadataKey)
}
// CorrelationID向处理程序生成的所有消息添加关联ID。
// ID基于处理程序接收到的消息ID。
//
// 要使CorrelationID正确工作,必须首先调用SetCorrelationID以便消息进入系统。
func CorrelationID(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
producedMessages, err := h(message)
correlationID := MessageCorrelationID(message)
for _, msg := range producedMessages {
SetCorrelationID(correlationID, msg)
}
return producedMessages, err
}
}
重复
// Duplicator会对消息进行两次处理,以确保端点是幂等的。
func Duplicator(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
firstProducedMessages, firstErr := h(msg)
if firstErr != nil {
return nil, firstErr
}
secondProducedMessages, secondErr := h(msg)
if secondErr != nil {
return nil, secondErr
}
return append(firstProducedMessages, secondProducedMessages...), nil
}
}
忽略错误
// IgnoreErrors提供了一个使处理程序忽略某些明确定义的错误的中间件。
type IgnoreErrors struct {
ignoredErrors map[string]struct{}
}
// NewIgnoreErrors创建一个新的IgnoreErrors中间件。
func NewIgnoreErrors(errs []error) IgnoreErrors {
errsMap := make(map[string]struct{}, len(errs))
for _, err := range errs {
errsMap[err.Error()] = struct{}{}
}
return IgnoreErrors{errsMap}
}
// Middleware返回IgnoreErrors中间件。
func (i IgnoreErrors) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
events, err := h(msg)
if err != nil {
if _, ok := i.ignoredErrors[errors.Cause(err).Error()]; ok {
return events, nil
}
return events, err
}
return events, nil
}
}
Instant Ack(即时应答)
// InstantAck使处理程序立即应答传入的消息,无论是否有任何错误。
// 它可用于提高吞吐量,但代价是:
// 如果你需要确保恰好一次的传递,你可能会得到至少一次的传递。
// 如果你要求有序的消息,可能会破坏顺序。
func InstantAck(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
message.Ack()
return h(message)
}
}
Poison
// PoisonQueue提供一个中间件功能,用于处理无法处理的消息,并将其发布到单独的主题上。
// 然后,主要的中间件链继续执行,业务和往常一样。
func PoisonQueue(pub message.Publisher, topic string) (message.HandlerMiddleware, error) {
if topic == "" {
return nil, ErrInvalidPoisonQueueTopic
}
pq := poisonQueue{
topic: topic,
pub: pub,
shouldGoToPoisonQueue: func(err error) bool {
return true
},
}
return pq.Middleware, nil
}
// PoisonQueueWithFilter与PoisonQueue相似,但接受一个函数来决定哪些错误符合毒品队列的条件。
func PoisonQueueWithFilter(pub message.Publisher, topic string, shouldGoToPoisonQueue func(err error) bool) (message.HandlerMiddleware, error) {
if topic == "" {
return nil, ErrInvalidPoisonQueueTopic
}
pq := poisonQueue{
topic: topic,
pub: pub,
shouldGoToPoisonQueue: shouldGoToPoisonQueue,
}
return pq.Middleware, nil
}
随机失败(Random Fail)
// RandomFail使处理程序基于随机概率失败。错误概率应在范围(0,1)内。
func RandomFail(errorProbability float32) message.HandlerMiddleware {
return func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
if shouldFail(errorProbability) {
return nil, errors.New("发生了随机错误")
}
return h(message)
}
}
}
// RandomPanic使处理程序基于随机概率发生panic。恐慌概率应在范围(0,1)内。
func RandomPanic(panicProbability float32) message.HandlerMiddleware {
return func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
if shouldFail(panicProbability) {
panic("发生了随机panic")
}
return h(message)
}
}
}
恢复器(Recoverer)
// RecoveredPanicError保存了恢复的panic的错误及其堆栈跟踪信息。
type RecoveredPanicError struct {
V interface{}
Stacktrace string
}
// Recoverer从处理程序中恢复任何panic,并在从处理程序返回的任何错误中附加RecoveredPanicError与堆栈跟踪。
func Recoverer(h message.HandlerFunc) message.HandlerFunc {
return func(event *message.Message) (events []*message.Message, err error) {
panicked := true
defer func() {
if r := recover(); r != nil || panicked {
err = errors.WithStack(RecoveredPanicError{V: r, Stacktrace: string(debug.Stack())})
}
}()
events, err = h(event)
panicked = false
return events, err
}
}
重试
// Retry提供了一个中间件,如果返回错误,它将重试处理程序。
// 可以配置重试行为,指数回退和最大经过时间。
type Retry struct {
// MaxRetries是将尝试的最大次数。
MaxRetries int
// InitialInterval是重试之间的第一个间隔。 后续的间隔将按Multiplier比例缩放。
InitialInterval time.Duration
// MaxInterval设置重试的指数回退限制。
MaxInterval time.Duration
// Multiplier是重试之间的等待间隔将被乘以的因子。
Multiplier float64
// MaxElapsedTime设置重试的最长时间限制。 如果为0,则禁用。
MaxElapsedTime time.Duration
// RandomizationFactor在以下区间内随机散布回退时间:
// [currentInterval * (1 - randomization_factor), currentInterval * (1 + randomization_factor)].
RandomizationFactor float64
// OnRetryHook是一个可选的函数,将在每次重试尝试时执行。
// 当前重试编号通过retryNum传递。
OnRetryHook func(retryNum int, delay time.Duration)
Logger watermill.LoggerAdapter
}
// Middleware返回Retry中间件。
func (r Retry) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
producedMessages, err := h(msg)
if err == nil {
return producedMessages, nil
}
expBackoff := backoff.NewExponentialBackOff()
expBackoff.InitialInterval = r.InitialInterval
expBackoff.MaxInterval = r.MaxInterval
expBackoff.Multiplier = r.Multiplier
expBackoff.MaxElapsedTime = r.MaxElapsedTime
expBackoff.RandomizationFactor = r.RandomizationFactor
ctx := msg.Context()
if r.MaxElapsedTime > 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, r.MaxElapsedTime)
defer cancel()
}
retryNum := 1
expBackoff.Reset()
retryLoop:
for {
waitTime := expBackoff.NextBackOff()
select {
case
限流
// Throttle提供一个中间件,一定时间内限制处理的信息数量。
// 这可以用来避免由于在未处理的长长队列上运行处理程序而造成过载。
type Throttle struct {
ticker *time.Ticker
}
// NewThrottle创建一个新的Throttle中间件。
// 示例持续时间和计数:NewThrottle(10, time.Second)表示每秒10个消息
func NewThrottle(count int64, duration time.Duration) *Throttle {
return &Throttle{
ticker: time.NewTicker(duration / time.Duration(count)),
}
}
// Middleware返回Throttle中间件。
func (t Throttle) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
// throttle由多个处理程序共享,它们将等待其“tick”
超时
// Timeout使处理程序在指定的时间后取消传入消息的上下文。
// 处理程序的任何超时敏感功能都应该监听 msg.Context().Done(),以知道何时失败。
func Timeout(timeout time.Duration) func(message.HandlerFunc) message.HandlerFunc {
return func(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
ctx, cancel := context.WithTimeout(msg.Context(), timeout)
defer func() {
cancel()
}()
msg.SetContext(ctx)
return h(msg)
}
}
}