自定义发布/订阅的底层使用什么消息队列。
发布/订阅接口
要为自定义的发布/订阅添加支持,您需要实现message.Publisher
和message.Subscriber
接口。
完整源码:github.com/ThreeDotsLabs/watermill/message/pubsub.go
// ...
type Publisher interface {
// Publish将提供的消息发布到指定的主题。
//
// Publish可以同步或异步执行-这取决于实现方式。
//
// 大多数发布者实现不支持原子发布消息。
// 这意味着如果发布其中一个消息失败,则不会发布下一个消息。
//
// Publish必须是线程安全的。
Publish(topic string, messages ...*Message) error
// 如果发布者是异步的,Close应该刷新未发送的消息。
Close() error
}
// Subscriber是发布/订阅的消费部分。
type Subscriber interface {
// Subscribe返回从提供的主题获取到的消息的输出通道。
// 当在订阅程序上调用Close()时,通道将关闭。
//
// 要接收下一条消息,必须在接收到的消息上调用`Ack()`。
// 如果处理消息失败并且消息应该被重新传送,则应调用`Nack()`。
//
// 当提供的ctx取消时,订阅者将关闭订阅并关闭输出通道。
// 提供的ctx设置到所有生成的消息上。
// 当在消息上调用Nack或Ack时,消息的上下文会被取消。
Subscribe(ctx context.Context, topic string) (
待办事项列表
以下是您不应忘记的几点:
- 日志记录(良好的消息和适当的级别)。
- 可替代和可配置的消息编组器。
- 针对发布者和订阅者的
Close()
实现,应该是:- 幂等的
- 当发布者或订阅者被阻塞时(例如,等待Ack)能够正常工作
- 当订阅者输出通道被阻塞时(因为没有任何监听它的内容)能够正常工作
- 对消费的消息支持
Ack()
和Nack()
。 - 对已消费消息的
Nack()
支持重新投递。 - 使用通用的发布/订阅测试。您应该查看测试故障排除指南以获取调试提示。
- 性能优化。
- GoDocs、Markdown文档和入门示例。