一架梯子,一头程序猿,仰望星空!
Watermill Go事件驱动框架教程 > 内容正文

消息路由


发布者和订阅者是Watermill的比较底层的部分。在实际应用中,你通常希望使用高级接口和功能,比如关联、指标、毒消息队列、重试、限流等。

有时候,你可能不希望在处理成功时发送Ack。有时候你可能希望在另一条消息处理完成后发送一条消息。

为了满足这些需求,有一个名为Router的组件。

Watermill Router

配置

完整源码:github.com/ThreeDotsLabs/watermill/message/router.go

// ...
type RouterConfig struct {
	// CloseTimeout 确定在关闭时,路由器应该为处理程序工作的时间。
	CloseTimeout time.Duration
}

func (c *RouterConfig) setDefaults() {
	if c.CloseTimeout == 0 {
		c.CloseTimeout = time.Second * 30
	}
}

// Validate 检查路由器配置是否有错误。
func (c RouterConfig) Validate() error {
	return nil
}
// ...

Handler(处理程序)

首先需要实现HandlerFunc函数:

完整源代码:github.com/ThreeDotsLabs/watermill/message/router.go

// ...

// HandlerFunc是当接收到消息时调用的函数。

// 在HandlerFunc不返回错误时,将自动调用msg.Ack()。

// 当HandlerFunc返回错误时,将调用msg.Nack()。

// 当处理程序中调用了msg.Ack()并且HandlerFunc返回错误时,

// 将不会发送msg.Nack(),因为已经发送了Ack。

// 当接收到多条消息时(因为在HandlerFunc中发送了msg.Ack()或Subscriber支持多个消费者时),

// HandlerFunc会并行执行。

type HandlerFunc func(msg *Message) ([]*Message, error)

// ...

接下来,您需要使用Router.AddHandler添加一个新的处理程序:

完整源代码:github.com/ThreeDotsLabs/watermill/message/router.go

// ...

// AddHandler添加一个新的处理程序。

// handlerName必须是唯一的。目前,它仅用于调试。

// subscribeTopic是处理程序将接收消息的主题。

// publishTopic是处理程序返回的消息将由Router产生的主题。

// 当处理程序需要发布到多个主题时,

// 建议只向处理程序注入Publisher或实现中间件,

// 中间件可以根据元数据捕获消息并发布到特定的主题。

// 如果在路由器已经运行时添加处理程序,则需要显式调用RunHandlers()。

func (r *Router) AddHandler(

	handlerName string,

	subscribeTopic string,

	subscriber Subscriber,

	publishTopic string,

	publisher Publisher,

	handlerFunc HandlerFunc,

) *Handler {

	r.logger.Info("Adding handler", watermill.LogFields{

		"handler_name": handlerName,

		"topic":        subscribeTopic,

	})

	r.handlersLock.Lock()

	defer r.handlersLock.Unlock()

	if _, ok := r.handlers[handlerName]; ok {

		panic(DuplicateHandlerNameError{handlerName})

	}

	publisherName, subscriberName := internal.StructName(publisher), internal.StructName(subscriber)

	newHandler := &handler{

		name:   handlerName,

		logger: r.logger,

		subscriber:     subscriber,

		subscribeTopic: subscribeTopic,

		subscriberName: subscriberName,

		publisher:     publisher,

		publishTopic:  publishTopic,

		publisherName: publisherName,

		handlerFunc: handlerFunc,

		runningHandlersWg:     r.runningHandlersWg,

		runningHandlersWgLock: r.runningHandlersWgLock,

		messagesCh:     nil,

		routersCloseCh: r.closingInProgressCh,

		startedCh: make(chan struct{}),

	}

	r.handlersWg.Add(1)

	r.handlers[handlerName] = newHandler

	select {

	case r.handlerAdded struct{}{}:

	default:

		// closeWhenAllHandlersStopped is not always waiting for handlerAdded

	}

	return &Handler{

		router:  r,

		handler: newHandler,

	}

}

// AddNoPublisherHandler添加一个新的处理程序。

// 这个处理程序不能返回消息。

// 当返回消息时,会发生错误并发送Nack。

//

// handlerName必须是唯一的。目前,它仅用于调试。

// subscribeTopic是处理程序将接收消息的主题。

// subscriber是一个订阅者,用于消费消息。

// 如果在路由器已经运行时添加处理程序,则需要显式调用RunHandlers()。

func (r *Router) AddNoPublisherHandler(

	handlerName string,

	subscribeTopic string,

	subscriber Subscriber,

	handlerFunc NoPublishHandlerFunc,

) *Handler {

	handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...

参考"Getting Started"中的示例用法。 完整源代码:github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// ...
	// AddHandler返回一个处理程序,可以用于添加处理程序级别的中间件,或停止处理程序。
	handler := router.AddHandler(
		"struct_handler",          // 处理程序名称,必须唯一
		"incoming_messages_topic", // 从中读取事件的主题
		pubSub,
		"outgoing_messages_topic", // 发布事件的主题
		pubSub,
		structHandler{}.Handler,
	)

	// 处理程序级别的中间件仅针对特定的处理程序执行
	// 这种中间件可以与路由器级别的中间件以相同的方式添加
	handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			log.Println("执行特定于处理程序的中间件,消息UUID为:", message.UUID)

			return h(message)
		}
	})

// ...

无发布处理程序

并非每个处理程序都会生成新消息。您可以使用 Router.AddNoPublisherHandler 来添加此类型的处理程序:

完整源代码:github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// AddNoPublisherHandler 添加一个新的处理程序。
// 该处理程序不能返回消息。
// 当返回消息时,将发生错误并发送 Nack。
//
// handlerName 必须是唯一的。目前,之仅用于调试目的。
//
// subscribeTopic 是处理程序将接收消息的主题。
//
// subscriber 是将使用的 Subscriber 消费消息。
//
// 如果在路由器已经运行时添加处理程序,则需要显式调用 RunHandlers()。
func (r *Router) AddNoPublisherHandler(
	handlerName string,
	subscribeTopic string,
	subscriber Subscriber,
	handlerFunc NoPublishHandlerFunc,
) *Handler {
	handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...
}

确认

默认情况下,当 HanderFunc 不返回错误时,会调用 msg.Ack()。如果返回错误,将调用 msg.Nack()。因此,在处理消息后,您无需调用 msg.Ack()msg.Nack()(当然,如果您想的话也可以调用)。

生产消息

当从处理程序返回多个消息时,请注意大多数 Publisher 实现不支持消息的原子发布。如果代理或存储不可用,则可能仅会产生一些消息并发送 msg.Nack()

如果这是个问题,考虑使用每个处理程序仅发布一条消息。

运行 Router

要运行路由器,您需要调用 Run()

完整源代码:github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Run 运行所有插件和处理程序,并开始订阅提供的主题。
// 此调用在路由器运行时阻塞。
//
// 当所有处理程序停止(例如,由于订阅已关闭),路由器也将停止。
//
// 要停止 Run(),您应该在路由器上调用 Close()。
//
// ctx 将传播到所有订阅者。
//
// 当所有处理程序停止(例如:由于连接关闭),Run() 也将停止。
func (r *Router) Run(ctx context.Context) (err error) {
// ...
}

确保路由器正在运行

了解路由器是否正在运行可能很有用。您可以使用 Running() 方法来实现。

完整源代码:github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Running 在路由器运行时关闭。
// 换句话说,您可以使用以下方式等待路由器运行:

// 	fmt.Println("Starting router")
//	go r.Run(ctx)
//	//	fmt.Println("Router is running")

// 警告:出于历史原因,此通道不知道路由器的关闭 - 如果路由器一直运行并关闭,则该通道将关闭。
func (r *Router) Running() chan struct{} {
// ...
}

您还可以使用返回布尔值的 IsRunning 函数:

完整源代码:github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// IsRunning 在路由器运行时返回 true。
//
// 警告:出于历史原因,此方法不知道路由器的关闭情况。
// 如果要了解路由器是否已关闭,请使用 IsClosed。
func (r *Router) IsRunning() bool {
// ...
}

关闭路由器

要关闭路由器,您需要调用 Close()

完整源代码:github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Close gracefully closes the router with a timeout provided in the configuration.
func (r *Router) Close() error {
	r.closedLock.Lock()
// ...

Close() 将关闭所有发布者和订阅者,并等待所有处理程序完成。

Close() 将等待配置中的 RouterConfig.CloseTimeout 设置的超时时间。如果超时时间到达,Close() 将返回一个错误。

路由器启动后添加处理程序

在路由器已经运行时可以添加一个新的处理程序。为了做到这一点,您需要调用 AddNoPublisherHandlerAddHandler,并调用 RunHandlers

完整源代码:github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// RunHandlers runs all handlers that were added after Run().
// RunHandlers is idempotent, so can be called multiple times safely.
func (r *Router) RunHandlers(ctx context.Context) error {
// ...

停止正在运行的处理程序

可以通过调用 Stop() 来停止 仅一个正在运行的处理程序

请注意,路由器将在没有运行的处理程序时关闭。

完整源代码:github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Stop stops the handler.
// Stop is asynchronous.
// You can check if handler was stopped with Stopped() function.
func (h *Handler) Stop() {
// ...

执行模型

订阅者 可以按顺序消费单条消息,也可以并行消费多条消息。

  • 单一消息流 是最简单的方法,这意味着在调用 msg.Ack() 之前,订阅者将不会收到任何新消息。
  • 多个消息流 仅由某些订阅者支持。通过同时订阅多个主题分区,可以并行地消费多条消息,甚至是之前没有确认的消息(例如,Kafka 订阅者就是这样工作的)。路由器通过并行运行 HandlerFunc 处理这个模型。

请参阅所选择的 Pub/Sub 文档以了解支持的执行模型。

中间件

完整源代码:github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// HandlerMiddleware 允许我们编写类似于 HandlerFunc 装饰器的东西。
// 它可以在处理程序之前(例如:修改消费的消息)或之后(修改生成的消息,对消费的消息进行 ack/nack,处理错误、记录日志等)执行某些操作。
//
// 可以使用 `AddMiddleware` 方法将其附加到路由器上。
//
// 示例:
//
// 	func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
// 		return func(message *message.Message) ([]*message.Message, error) {
// 			fmt.Println("executed before handler")
// 			producedMessages, err := h(message)
// 			fmt.Println("executed after handler")
//
// 			return producedMessages, err
// 		}
// 	}
type HandlerMiddleware func(h HandlerFunc) HandlerFunc

// ...

标准中间件的完整列表可以在 Middlewares 中找到。

插件

完整源代码:github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// RouterPlugin 是在路由器启动时执行的函数。
type RouterPlugin func(*Router) error

// ...

标准插件的完整列表可以在 message/router/plugin 中找到。

上下文(context)

处理程序接收到的每条消息都在context中存储了一些有用的值:

完整源代码:github.com/ThreeDotsLabs/watermill/message/router_context.go

// ...
// HandlerNameFromCtx从上下文中返回消费消息的路由中的消息处理程序的名称。
func HandlerNameFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, handlerNameKey)
}

// PublisherNameFromCtx从上下文中返回发布消息的路由中的消息发布者类型的名称。
// 例如,对于Kafka,它将是 `kafka.Publisher`。
func PublisherNameFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, publisherNameKey)
}

// SubscriberNameFromCtx从上下文中返回订阅消息的路由中的消息订阅者类型的名称。
// 例如,对于Kafka,它将是 `kafka.Subscriber`。
func SubscriberNameFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, subscriberNameKey)
}

// SubscribeTopicFromCtx从上下文中返回路由中接收到消息的主题。
func SubscribeTopicFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, subscribeTopicKey)
}

// PublishTopicFromCtx从上下文中返回路由中将要发布消息的主题。
func PublishTopicFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, publishTopicKey)
}
// ...