一架梯子,一头程序猿,仰望星空!
Watermill Go事件驱动框架教程 > 内容正文

Watermill 转发器组件(消息事务)


在一个事务中同时存储数据和发布事件,本章主要讲解事务消息机制。

在事务中发布消息的重要性

在处理事件驱动的应用程序时,您可能在某些时候需要存储应用程序状态并发布消息,告诉系统的其他部分刚刚发生了什么。在一个理想的场景中,您希望在一个事务中持久化应用程序状态并发布消息,因为不这样做可能会导致数据一致性方面的麻烦。为了在一个事务中同时提交存储数据和发布事件,您必须能够将消息发布到用于数据存储的同一个数据库中,或者自己实现2PC。如果您不想将消息代理更改为数据库,也不想再次发明轮子,可以通过使用Watermill的Forwarder组件来简化您的工作!

Forwarder组件

您可以将Forwarder视为一个后台运行的守护进程,它等待发布到数据库的消息,并确保它们最终到达消息代理。

Watermill Forwarder组件为了使Forwarder通用且可透明使用,它在中间数据库上的一个主题上监听,使用装饰的Forwarder发布者发送封装的消息。Forwarder解包它们,并发送到消息代理上的指定目标主题。

Forwarder封装

示例

让我们考虑以下示例:有一个负责执行彩票抽奖的命令。它必须随机选择已经在系统中注册的用户作为赢家。在执行此操作的同时,它还应该通过存储数据库条目将其所做的决定持久化,将唯一彩票ID与所选用户的ID相关联。此外,作为一个事件驱动的系统,它还应该发布LotteryConcluded事件,以便其他组件可以适当地对其进行反应。要确切 - 将有负责向彩票赢家发送奖品的组件。它将接收LotteryConcluded事件,并使用嵌入在事件中的彩票ID,与数据库条目进行验证,以确定赢家是谁。

在我们的案例中,数据库是MySQL,消息代理是Google Pub/Sub,但也可以是任何其他两种技术。

在实现这样一个命令时,我们可以采用各种方法。下面我们将介绍三种可能的尝试,并指出它们的漏洞。

先发布事件,然后存储数据

在这种方法中,命令首先会发布一个事件,然后再存储数据。虽然在大多数情况下这种方法可能能正常工作,但让我们尝试找出可能出现的问题。

命令需要执行三个基本操作:

  1. 选择一个随机的用户 A 作为中奖者。
  2. 发布一个 LotteryConcluded 事件,告知彩票 B 已经结束。
  3. 在数据库中存储彩票 B 已被用户 A 赢得。

每个步骤都有可能失败,从而打破我们的命令流程。如果第一个步骤失败,后果不会很严重 - 我们只需返回错误并认为整个命令失败。不会存储任何数据,也不会发布任何事件。可以简单地重新运行命令。

如果第二个步骤失败,我们仍然没有发布事件,也没有将数据存储在数据库中。我们可以重新运行命令,再试一次。

最有趣的是,如果第三个步骤失败会产生什么情况。在第二个步骤后,我们已经发布了事件,但最终数据库中将不会存储任何数据。其他组件会收到彩票已经结束的信号,但是没有赢家与事件中发送的彩票ID相关联。它们无法验证谁是赢家,因此它们的操作也必须被视为失败。

我们仍然可以摆脱这种情况,但很可能需要一些手动操作,例如使用已发出事件中的彩票ID重新运行命令。

完整源代码:github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go

// ...
// 1. 先将事件发布到 Google Cloud Pub/Sub,然后将数据存储到 MySQL。
func publishEventAndPersistData(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
    publisher, err := googlecloud.NewPublisher(
        googlecloud.PublisherConfig{
            ProjectID: projectID,
        },
        logger,
    )
    if err != nil {
        return err
    }
    event := LotteryConcludedEvent{LotteryID: lotteryID}
    payload, err := json.Marshal(event)
    if err != nil {
        return err
    }

    err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
    if err != nil {
        return err
    }

    // 如果出现错误,已经发出了事件,但是还没有保存数据。
    if err = simulateError(); err != nil {
        logger.Error("无法持久化数据", err, nil)
        return err
    }

    _, err = db.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
    if err != nil {
        return err
    }
// ...

先储存数据,再发布事件

在第二种方法中,我们将尝试解决先处理地址法的缺点。为了防止在数据库中没有正确持久化状态时不发布事件而向外部组件泄露失败情况,我们将改变动作的顺序如下所示:

  1. 随机选择用户 A 作为彩票的中奖者。
  2. 在数据库中储存彩票 B 已被用户 A 赢得的信息。
  3. 发布一个 LotteryConcluded 事件,告知彩票 B 已经结束。

与第一种方法相同,如果前两个动作失败了,我们没有任何后果。在第3个动作失败的情况下,我们的数据将持久化在数据库中,但是没有事件被发布。在这种情况下,我们不会向彩票组件之外泄露失败情况。但是,考虑到预期的系统行为,我们的获奖者将无法收到奖品,因为没有事件传递给负责此操作的组件。

这个问题可能也可以通过手动操作解决,即手动发布事件。但我们可以做得更好。

完整源代码:github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go

// ...
// 2. 先将数据持久化到 MySQL 中,然后直接发布事件到 Google Cloud Pub/Sub。
func persistDataAndPublishEvent(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
    _, err := db.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
    if err != nil {
        return err
    }

    var publisher message.Publisher
    publisher, err = googlecloud.NewPublisher(
        googlecloud.PublisherConfig{
            ProjectID: projectID,
        },
        logger,
    )
    if err != nil {
        return err
    }

    event := LotteryConcludedEvent{LotteryID: lotteryID}
    payload, err := json.Marshal(event)
    if err != nil {
        return err
    }

    // 如果此处失败,我们的数据已经持久化,但没有事件被发布。
    if err = simulateError(); err != nil {
        logger.Error("无法发布事件", err, nil)
        return err
    }

    err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
    if err != nil {
        return err
    }
// ...

在一个事务中存储数据和发布事件

设想一下,我们的命令可以同时执行第二和第三个任务。它们将被原子方式提交,这意味着任何一个任务失败时另一个任务也不能成功。这可以通过利用大多数数据库中已实现的事务机制来实现。我们的示例中使用的MySQL就是其中之一。

为了能够在一个事务中同时存储数据和发布事件,我们需要能够将消息发布到MySQL中。由于我们不希望在整个系统中将消息代理改为由MySQL支持,我们必须找到其他方法来实现这一点。

好消息是:Watermill提供了所有必需的工具!如果你正在使用的数据库是MySQL、PostgreSQL(或其他任何SQL数据库)、Firestore或Bolt,你可以将消息发布到它们中去。Forwarder组件将帮助你选择所有你发布到数据库中的消息,并将它们转发到你的消息代理中。

你需要确保:

  1. 你的命令使用在数据库事务上下文中工作的发布者(例如SQLFirestoreBolt)。
  2. Forwarder组件正在运行,使用数据库订阅者和消息代理发布者。

在这种情况下,命令可能如下所示:

完整源码:github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go
为了使Forwarder组件能够在后台工作,并将MySQL的消息转发到Google Pub/Sub,您需要按照以下方式进行设置:

完整代码,请参考github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go

// ...
// 在事务中将数据持久化到MySQL,并通过MySQL向Google Cloud Pub/Sub发送事件。
func persistDataAndPublishEventInTransaction(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
    tx, err := db.Begin()
    if err != nil {
        return err
    }

    defer func() {
        if err == nil {
            tx.Commit()
        } else {
            logger.Info("由于错误,回滚事务", watermill.LogFields{"error": err.Error()})
            // 在出现错误的情况下,由于MySQL事务回滚,我们可以确保不会发生以下不期望的情况:
            // - 发布了事件,但未持久化数据,
            // - 持久化了数据,但未发布事件。
            tx.Rollback()
        }
    }()

    _, err = tx.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
    if err != nil {
        return err
    }

    var publisher message.Publisher
    publisher, err = sql.NewPublisher(
        tx,
        sql.PublisherConfig{
            SchemaAdapter: sql.DefaultMySQLSchema{},
        },
        logger,
    )
    if err != nil {
        return err
    }

    // 通过给发布者装饰器,将事件包装在转发器组件可以理解的信封中。
    publisher = forwarder.NewPublisher(publisher, forwarder.PublisherConfig{
        ForwarderTopic: forwarderSQLTopic,
    })

    // 发布一个公告中奖的事件。请注意,我们在这里发布到了一个Google Cloud主题,同时使用了装饰过的MySQL发布者。
    event := LotteryConcludedEvent{LotteryID: lotteryID}
    payload, err := json.Marshal(event)
    if err != nil {
        return err
    }

    err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
    if err != nil {
        return err
    }

    return nil
// ...

如果您想进一步了解该示例,请在此处找到其实现。