一架梯子,一头程序猿,仰望星空!
Watermill Go事件驱动框架教程 > 内容正文

自定义消息队列


自定义发布/订阅的底层使用什么消息队列。

发布/订阅接口

要为自定义的发布/订阅添加支持,您需要实现message.Publishermessage.Subscriber接口。

完整源码:github.com/ThreeDotsLabs/watermill/message/pubsub.go

// ...
type Publisher interface {
	// Publish将提供的消息发布到指定的主题。
	//
	// Publish可以同步或异步执行-这取决于实现方式。
	//
	// 大多数发布者实现不支持原子发布消息。
	// 这意味着如果发布其中一个消息失败,则不会发布下一个消息。
	//
	// Publish必须是线程安全的。
	Publish(topic string, messages ...*Message) error
	// 如果发布者是异步的,Close应该刷新未发送的消息。
	Close() error
}

// Subscriber是发布/订阅的消费部分。
type Subscriber interface {
	// Subscribe返回从提供的主题获取到的消息的输出通道。
	// 当在订阅程序上调用Close()时,通道将关闭。
	//
	// 要接收下一条消息,必须在接收到的消息上调用`Ack()`。
	// 如果处理消息失败并且消息应该被重新传送,则应调用`Nack()`。
	//
	// 当提供的ctx取消时,订阅者将关闭订阅并关闭输出通道。
	// 提供的ctx设置到所有生成的消息上。
	// 当在消息上调用Nack或Ack时,消息的上下文会被取消。
	Subscribe(ctx context.Context, topic string) (

待办事项列表

以下是您不应忘记的几点:

  1. 日志记录(良好的消息和适当的级别)。
  2. 可替代和可配置的消息编组器。
  3. 针对发布者和订阅者的Close()实现,应该是:
    • 幂等的
    • 当发布者或订阅者被阻塞时(例如,等待Ack)能够正常工作
    • 当订阅者输出通道被阻塞时(因为没有任何监听它的内容)能够正常工作
  4. 对消费的消息支持Ack()Nack()
  5. 对已消费消息的Nack()支持重新投递。
  6. 使用通用的发布/订阅测试。您应该查看测试故障排除指南以获取调试提示。
  7. 性能优化。
  8. GoDocs、Markdown文档和入门示例。