发布者和订阅者。
发布者(Publisher)
完整源码:[github.com / ThreeDotsLabs / watermill / message / pubsub.go](https://github.com/ThreeDotsLabs/watermill/tree/master/message/pubsub.go#L8)
// ...
type Publisher interface {
// Publish将提供的消息发布到给定的主题。
//
// Publish可以是同步的也可以是异步的-这取决于实现。
//
// 大多数发布者的实现不支持原子发布消息。
// 这意味着如果发布其中一条消息失败,下一条消息将不会被发布。
//
// Publish必须是线程安全的。
Publish(topic string, messages ...*Message) error
// 如果发布者是异步的,则Close应刷新未发送的消息。
Close() error
}
// ...
发布多个消息
大多数发布者的实现不支持原子发布消息。这意味着如果发布其中一条消息失败,下一条消息将不会被发布。
异步发布
发布可以是同步的也可以是异步的-这取决于实现。
Close()
如果发布者是异步的,则Close
应刷新未发送的消息。别忘记关闭订阅者。否则,您可能会丢失一些消息。
订阅者
完整源码:[github.com / ThreeDotsLabs / watermill / message / pubsub.go](https://github.com/ThreeDotsLabs/watermill/tree/master/message/pubsub.go#L23)
// ...
type Subscriber interface {
// Subscribe返回具有来自提供的主题的消息的输出通道。
// 当订阅者被调用Close()时,通道将关闭。
//
//要收到下一条消息,必须对接收到的消息调用`Ack()`。
//如果消息处理失败,并且应重新传递消息,应该调用`Nack()`。
//
//当提供的ctx取消时,订阅者将关闭订阅并关闭输出通道。
//提供的ctx设置为所有生成的消息。
//当在消息上调用Nack或Ack时,将取消消息的上下文。
Subscribe(ctx context.Context, topic string) (
}
Ack/Nack机制
订阅者有责任处理来自消息的Ack
和Nack
。正确的实现应在消耗下一条消息之前等待Ack
或Nack
。
重要订阅者实现提示:必须在来自Watermill消息的Ack之后发送 Ack / offset 到消息的存储/代理。否则,在处理完消息之前,如果进程死亡,就有可能丢失消息。
Close()
Close
会关闭所有订阅和它们的输出通道,并在需要时刷新偏移等。
至少一次传递
Watermill使用至少一次的传递语义构建。这意味着在处理消息时发生错误并且无法发送Ack
时,将重新传递消息。
您需要牢记这一点,并构建您的应用程序以进行幂等处理或实现重复机制。
不幸的是,创建一个通用的重复中间件是不可能的,因此我们鼓励您构建自己的实现。
通用测试
每个Pub / Sub在大多数方面都是相似的。为了避免为每个Pub / Sub实现单独的测试,我们创建了一个测试套件,任何Pub / Sub都应通过该测试套件。
这些测试可以在pubsub / tests / test_pubsub.go
中找到。
内置实现
要检查可用的Pub / Sub实现,请参阅已支持的Pub / Sub。
实现自定义Pub / Sub
有关如何为新的Pub / Sub引入支持的说明,请参见”实现自定义Pub / Sub”。