发布者和订阅者是Watermill的比较底层的部分。在实际应用中,你通常希望使用高级接口和功能,比如关联、指标、毒消息队列、重试、限流等。
有时候,你可能不希望在处理成功时发送Ack。有时候你可能希望在另一条消息处理完成后发送一条消息。
为了满足这些需求,有一个名为Router的组件。
配置
完整源码:github.com/ThreeDotsLabs/watermill/message/router.go
// ...
type RouterConfig struct {
// CloseTimeout 确定在关闭时,路由器应该为处理程序工作的时间。
CloseTimeout time.Duration
}
func (c *RouterConfig) setDefaults() {
if c.CloseTimeout == 0 {
c.CloseTimeout = time.Second * 30
}
}
// Validate 检查路由器配置是否有错误。
func (c RouterConfig) Validate() error {
return nil
}
// ...
Handler(处理程序)
首先需要实现HandlerFunc
函数:
完整源代码:github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// HandlerFunc是当接收到消息时调用的函数。
// 在HandlerFunc不返回错误时,将自动调用msg.Ack()。
// 当HandlerFunc返回错误时,将调用msg.Nack()。
// 当处理程序中调用了msg.Ack()并且HandlerFunc返回错误时,
// 将不会发送msg.Nack(),因为已经发送了Ack。
// 当接收到多条消息时(因为在HandlerFunc中发送了msg.Ack()或Subscriber支持多个消费者时),
// HandlerFunc会并行执行。
type HandlerFunc func(msg *Message) ([]*Message, error)
// ...
接下来,您需要使用Router.AddHandler
添加一个新的处理程序:
完整源代码:github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// AddHandler添加一个新的处理程序。
// handlerName必须是唯一的。目前,它仅用于调试。
// subscribeTopic是处理程序将接收消息的主题。
// publishTopic是处理程序返回的消息将由Router产生的主题。
// 当处理程序需要发布到多个主题时,
// 建议只向处理程序注入Publisher或实现中间件,
// 中间件可以根据元数据捕获消息并发布到特定的主题。
// 如果在路由器已经运行时添加处理程序,则需要显式调用RunHandlers()。
func (r *Router) AddHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
publishTopic string,
publisher Publisher,
handlerFunc HandlerFunc,
) *Handler {
r.logger.Info("Adding handler", watermill.LogFields{
"handler_name": handlerName,
"topic": subscribeTopic,
})
r.handlersLock.Lock()
defer r.handlersLock.Unlock()
if _, ok := r.handlers[handlerName]; ok {
panic(DuplicateHandlerNameError{handlerName})
}
publisherName, subscriberName := internal.StructName(publisher), internal.StructName(subscriber)
newHandler := &handler{
name: handlerName,
logger: r.logger,
subscriber: subscriber,
subscribeTopic: subscribeTopic,
subscriberName: subscriberName,
publisher: publisher,
publishTopic: publishTopic,
publisherName: publisherName,
handlerFunc: handlerFunc,
runningHandlersWg: r.runningHandlersWg,
runningHandlersWgLock: r.runningHandlersWgLock,
messagesCh: nil,
routersCloseCh: r.closingInProgressCh,
startedCh: make(chan struct{}),
}
r.handlersWg.Add(1)
r.handlers[handlerName] = newHandler
select {
case r.handlerAdded struct{}{}:
default:
// closeWhenAllHandlersStopped is not always waiting for handlerAdded
}
return &Handler{
router: r,
handler: newHandler,
}
}
// AddNoPublisherHandler添加一个新的处理程序。
// 这个处理程序不能返回消息。
// 当返回消息时,会发生错误并发送Nack。
//
// handlerName必须是唯一的。目前,它仅用于调试。
// subscribeTopic是处理程序将接收消息的主题。
// subscriber是一个订阅者,用于消费消息。
// 如果在路由器已经运行时添加处理程序,则需要显式调用RunHandlers()。
func (r *Router) AddNoPublisherHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
handlerFunc NoPublishHandlerFunc,
) *Handler {
handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...
参考"Getting Started"中的示例用法。 完整源代码:github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// ...
// AddHandler返回一个处理程序,可以用于添加处理程序级别的中间件,或停止处理程序。
handler := router.AddHandler(
"struct_handler", // 处理程序名称,必须唯一
"incoming_messages_topic", // 从中读取事件的主题
pubSub,
"outgoing_messages_topic", // 发布事件的主题
pubSub,
structHandler{}.Handler,
)
// 处理程序级别的中间件仅针对特定的处理程序执行
// 这种中间件可以与路由器级别的中间件以相同的方式添加
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("执行特定于处理程序的中间件,消息UUID为:", message.UUID)
return h(message)
}
})
// ...
无发布处理程序
并非每个处理程序都会生成新消息。您可以使用 Router.AddNoPublisherHandler
来添加此类型的处理程序:
完整源代码:github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// AddNoPublisherHandler 添加一个新的处理程序。
// 该处理程序不能返回消息。
// 当返回消息时,将发生错误并发送 Nack。
//
// handlerName 必须是唯一的。目前,之仅用于调试目的。
//
// subscribeTopic 是处理程序将接收消息的主题。
//
// subscriber 是将使用的 Subscriber 消费消息。
//
// 如果在路由器已经运行时添加处理程序,则需要显式调用 RunHandlers()。
func (r *Router) AddNoPublisherHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
handlerFunc NoPublishHandlerFunc,
) *Handler {
handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...
}
确认
默认情况下,当 HanderFunc
不返回错误时,会调用 msg.Ack()
。如果返回错误,将调用 msg.Nack()
。因此,在处理消息后,您无需调用 msg.Ack()
或 msg.Nack()
(当然,如果您想的话也可以调用)。
生产消息
当从处理程序返回多个消息时,请注意大多数 Publisher 实现不支持消息的原子发布。如果代理或存储不可用,则可能仅会产生一些消息并发送 msg.Nack()
。
如果这是个问题,考虑使用每个处理程序仅发布一条消息。
运行 Router
要运行路由器,您需要调用 Run()
。
完整源代码:github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Run 运行所有插件和处理程序,并开始订阅提供的主题。
// 此调用在路由器运行时阻塞。
//
// 当所有处理程序停止(例如,由于订阅已关闭),路由器也将停止。
//
// 要停止 Run(),您应该在路由器上调用 Close()。
//
// ctx 将传播到所有订阅者。
//
// 当所有处理程序停止(例如:由于连接关闭),Run() 也将停止。
func (r *Router) Run(ctx context.Context) (err error) {
// ...
}
确保路由器正在运行
了解路由器是否正在运行可能很有用。您可以使用 Running()
方法来实现。
完整源代码:github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Running 在路由器运行时关闭。
// 换句话说,您可以使用以下方式等待路由器运行:
// fmt.Println("Starting router")
// go r.Run(ctx)
// // fmt.Println("Router is running")
// 警告:出于历史原因,此通道不知道路由器的关闭 - 如果路由器一直运行并关闭,则该通道将关闭。
func (r *Router) Running() chan struct{} {
// ...
}
您还可以使用返回布尔值的 IsRunning
函数:
完整源代码:github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// IsRunning 在路由器运行时返回 true。
//
// 警告:出于历史原因,此方法不知道路由器的关闭情况。
// 如果要了解路由器是否已关闭,请使用 IsClosed。
func (r *Router) IsRunning() bool {
// ...
}
关闭路由器
要关闭路由器,您需要调用 Close()
。
完整源代码:github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Close gracefully closes the router with a timeout provided in the configuration.
func (r *Router) Close() error {
r.closedLock.Lock()
// ...
Close()
将关闭所有发布者和订阅者,并等待所有处理程序完成。
Close()
将等待配置中的 RouterConfig.CloseTimeout
设置的超时时间。如果超时时间到达,Close()
将返回一个错误。
路由器启动后添加处理程序
在路由器已经运行时可以添加一个新的处理程序。为了做到这一点,您需要调用 AddNoPublisherHandler
或 AddHandler
,并调用 RunHandlers
。
完整源代码:github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// RunHandlers runs all handlers that were added after Run().
// RunHandlers is idempotent, so can be called multiple times safely.
func (r *Router) RunHandlers(ctx context.Context) error {
// ...
停止正在运行的处理程序
可以通过调用 Stop()
来停止 仅一个正在运行的处理程序。
请注意,路由器将在没有运行的处理程序时关闭。
完整源代码:github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Stop stops the handler.
// Stop is asynchronous.
// You can check if handler was stopped with Stopped() function.
func (h *Handler) Stop() {
// ...
执行模型
订阅者 可以按顺序消费单条消息,也可以并行消费多条消息。
-
单一消息流 是最简单的方法,这意味着在调用
msg.Ack()
之前,订阅者将不会收到任何新消息。 -
多个消息流 仅由某些订阅者支持。通过同时订阅多个主题分区,可以并行地消费多条消息,甚至是之前没有确认的消息(例如,Kafka 订阅者就是这样工作的)。路由器通过并行运行
HandlerFunc
处理这个模型。
请参阅所选择的 Pub/Sub 文档以了解支持的执行模型。
中间件
完整源代码:github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// HandlerMiddleware 允许我们编写类似于 HandlerFunc 装饰器的东西。
// 它可以在处理程序之前(例如:修改消费的消息)或之后(修改生成的消息,对消费的消息进行 ack/nack,处理错误、记录日志等)执行某些操作。
//
// 可以使用 `AddMiddleware` 方法将其附加到路由器上。
//
// 示例:
//
// func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
// return func(message *message.Message) ([]*message.Message, error) {
// fmt.Println("executed before handler")
// producedMessages, err := h(message)
// fmt.Println("executed after handler")
//
// return producedMessages, err
// }
// }
type HandlerMiddleware func(h HandlerFunc) HandlerFunc
// ...
标准中间件的完整列表可以在 Middlewares 中找到。
插件
完整源代码:github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// RouterPlugin 是在路由器启动时执行的函数。
type RouterPlugin func(*Router) error
// ...
标准插件的完整列表可以在 message/router/plugin 中找到。
上下文(context)
处理程序接收到的每条消息都在context
中存储了一些有用的值:
完整源代码:github.com/ThreeDotsLabs/watermill/message/router_context.go
// ...
// HandlerNameFromCtx从上下文中返回消费消息的路由中的消息处理程序的名称。
func HandlerNameFromCtx(ctx context.Context) string {
return valFromCtx(ctx, handlerNameKey)
}
// PublisherNameFromCtx从上下文中返回发布消息的路由中的消息发布者类型的名称。
// 例如,对于Kafka,它将是 `kafka.Publisher`。
func PublisherNameFromCtx(ctx context.Context) string {
return valFromCtx(ctx, publisherNameKey)
}
// SubscriberNameFromCtx从上下文中返回订阅消息的路由中的消息订阅者类型的名称。
// 例如,对于Kafka,它将是 `kafka.Subscriber`。
func SubscriberNameFromCtx(ctx context.Context) string {
return valFromCtx(ctx, subscriberNameKey)
}
// SubscribeTopicFromCtx从上下文中返回路由中接收到消息的主题。
func SubscribeTopicFromCtx(ctx context.Context) string {
return valFromCtx(ctx, subscribeTopicKey)
}
// PublishTopicFromCtx从上下文中返回路由中将要发布消息的主题。
func PublishTopicFromCtx(ctx context.Context) string {
return valFromCtx(ctx, publishTopicKey)
}
// ...