一架梯子,一头程序猿,仰望星空!
Watermill Go事件驱动框架教程 > 内容正文

发布&订阅


发布者和订阅者。

发布者(Publisher)

完整源码:[github.com / ThreeDotsLabs / watermill / message / pubsub.go](https://github.com/ThreeDotsLabs/watermill/tree/master/message/pubsub.go#L8)

// ...
type Publisher interface {
    // Publish将提供的消息发布到给定的主题。
    //
    // Publish可以是同步的也可以是异步的-这取决于实现。
    //
    // 大多数发布者的实现不支持原子发布消息。
    // 这意味着如果发布其中一条消息失败,下一条消息将不会被发布。
    //
    // Publish必须是线程安全的。
    Publish(topic string, messages ...*Message) error
    // 如果发布者是异步的,则Close应刷新未发送的消息。
    Close() error
}
// ...

发布多个消息

大多数发布者的实现不支持原子发布消息。这意味着如果发布其中一条消息失败,下一条消息将不会被发布。

异步发布

发布可以是同步的也可以是异步的-这取决于实现。

Close()

如果发布者是异步的,则Close应刷新未发送的消息。别忘记关闭订阅者。否则,您可能会丢失一些消息。

订阅者

完整源码:[github.com / ThreeDotsLabs / watermill / message / pubsub.go](https://github.com/ThreeDotsLabs/watermill/tree/master/message/pubsub.go#L23)

// ...
type Subscriber interface {
    // Subscribe返回具有来自提供的主题的消息的输出通道。
    // 当订阅者被调用Close()时,通道将关闭。
    //
    //要收到下一条消息,必须对接收到的消息调用`Ack()`。
    //如果消息处理失败,并且应重新传递消息,应该调用`Nack()`。
    //
    //当提供的ctx取消时,订阅者将关闭订阅并关闭输出通道。
    //提供的ctx设置为所有生成的消息。
    //当在消息上调用Nack或Ack时,将取消消息的上下文。
    Subscribe(ctx context.Context, topic string) (
}

Ack/Nack机制

订阅者有责任处理来自消息的AckNack。正确的实现应在消耗下一条消息之前等待AckNack

重要订阅者实现提示:必须在来自Watermill消息的Ack之后发送 Ack / offset 到消息的存储/代理。否则,在处理完消息之前,如果进程死亡,就有可能丢失消息。

Close()

Close会关闭所有订阅和它们的输出通道,并在需要时刷新偏移等。

至少一次传递

Watermill使用至少一次的传递语义构建。这意味着在处理消息时发生错误并且无法发送Ack时,将重新传递消息。

您需要牢记这一点,并构建您的应用程序以进行幂等处理或实现重复机制。

不幸的是,创建一个通用的重复中间件是不可能的,因此我们鼓励您构建自己的实现。

通用测试

每个Pub / Sub在大多数方面都是相似的。为了避免为每个Pub / Sub实现单独的测试,我们创建了一个测试套件,任何Pub / Sub都应通过该测试套件。

这些测试可以在pubsub / tests / test_pubsub.go中找到。

内置实现

要检查可用的Pub / Sub实现,请参阅已支持的Pub / Sub。

实现自定义Pub / Sub

有关如何为新的Pub / Sub引入支持的说明,请参见”实现自定义Pub / Sub”。