一架梯子,一头程序猿,仰望星空!
Watermill Go事件驱动框架教程 > 内容正文

Watermill 快速入门


Watermill是什么?

Watermill是一个用于高效处理消息流的Golang库。它旨在用于构建事件驱动的应用程序。它可以用于事件溯源、基于消息的RPC、saga和其他你能想到的用途。你可以使用传统的发布/订阅实现,比如Kafka或RabbitMQ,也可以使用HTTP或MySQL的binlog,根据你的使用场景选择。

Watermill提供了一组发布/订阅的实现,并且可以很容易地通过扩展自己的实现。

Watermill还附带了标准的中间件,如仪表盘、队列、限流、关联等,这些工具被每个消息驱动的应用程序使用。

为什么使用Watermill?

随着越来越多的项目在最近几年采用微服务架构,并不是所有业务场景都可以通过RPC这种同步调用方式解决。异步任务这种处理机制反而成为了很好的补充。

Watermill目标是成为Go语言的标准消息库,将所有这些复杂性隐藏在易于理解的API后面。它提供了构建基于事件或其他异步模式的应用程序所需的一切。在查看示例之后,您应该能够快速集成Watermill到您的项目中。

安装

go get -u github.com/ThreeDotsLabs/watermill

准备背景

事件驱动应用程序背后的基本思想始终是相同的:监听传入的消息并对其做出反应。Watermill支持为多个发布者和订阅者实现此行为。

Watermill的核心部分是Message。它就像http包中的http.Request一样重要。大多数Watermill功能都以某种方式使用这个结构体。

尽管PubSub库提供了复杂的功能,但对于Watermill来说,只需实现两个接口即可开始使用它们:PublisherSubscriber

type Publisher interface {
    Publish(topic string, messages ...*Message) error
    Close() error
}

type Subscriber interface {
    Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
}

订阅消息

让我们从订阅开始。Subscribe期望一个主题名称,并返回一个接收传入消息的通道。主题的具体含义取决于PubSub的实现。

messages, err := subscriber.Subscribe(ctx, "example.topic")
if err != nil {
    panic(err)
}

for msg := range messages {
    fmt.Printf("received message: %s, payload: %s\n", msg.UUID, string(msg.Payload))
    msg.Ack()
}

支持的PubSub的详细示例请参见下面的内容。

Go Channel 例子

完整示例代码:github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
package main

import (
    "context"
    "fmt"
    "time"

    "github.com/ThreeDotsLabs/watermill"
    "github.com/ThreeDotsLabs/watermill/message"
    "github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

func main() {
    pubSub := gochannel.NewGoChannel(
        gochannel.Config{},
        watermill.NewStdLogger(false, false),
    )

    messages, err := pubSub.Subscribe(context.Background(), "example.topic")
    if err != nil {
        panic(err)
    }

    go process(messages)
// ...

完整示例代码:github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
func process(messages <-chan *message.Message) {
    for msg := range messages {
        fmt.Printf("received message: %s, payload: %s\n", msg.UUID, string(msg.Payload))

        //我们需要确认我们已经接收并处理了消息,否则它将被重发多次。
        msg.Ack()
    }
}

Kafka 例子

完整源代码:github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
package main

import (
    "context"
    "log"
    "time"

    "github.com/Shopify/sarama"

    "github.com/ThreeDotsLabs/watermill"
    "github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
    "github.com/ThreeDotsLabs/watermill/message"
)

func main() {
    saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
    // 相当于 auto.offset.reset: earliest
    saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest

    subscriber, err := kafka.NewSubscriber(
        kafka.SubscriberConfig{
            Brokers:               []string{"kafka:9092"},
            Unmarshaler:           kafka.DefaultMarshaler{},
            OverwriteSaramaConfig: saramaSubscriberConfig,
            ConsumerGroup:         "test_consumer_group",
        },
        watermill.NewStdLogger(false, false),
    )
    if err != nil {
        panic(err)
    }

    messages, err := subscriber.Subscribe(context.Background(), "example.topic")
    if err != nil {
        panic(err)
    }

    go process(messages)
// ...
}

完整源代码:github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
func process(messages <-chan *message.Message) {
    for msg := range messages {
        log.Printf("接收消息: %s, payload: %s", msg.UUID, string(msg.Payload))

        // 我们需要确认已经接收并处理了消息,
        // 否则消息将会一次又一次地被重新发送。
        msg.Ack()
    }
}

RabbitMQ (AMQP) 例子

完整源代码:github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
package main

import (
    "context"
    "log"
    "time"

    "github.com/ThreeDotsLabs/watermill"
    "github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
    "github.com/ThreeDotsLabs/watermill/message"
)

var amqpURI = "amqp://guest:guest@rabbitmq:5672/"

func main() {
    amqpConfig := amqp.NewDurableQueueConfig(amqpURI)

    subscriber, err := amqp.NewSubscriber(
        // 此配置基于以下示例:https://www.rabbitmq.com/tutorials/tutorial-two-go.html
        // 它被用作一个简单的队列。
        //
        // 如果你想要实现Pub/Sub风格的服务,请参考
        // https://watermill.io/pubsubs/amqp/#amqp-consumer-groups
        amqpConfig,
        watermill.NewStdLogger(false, false),
    )
    if err != nil {
        panic(err)
    }

    messages, err := subscriber.Subscribe(context.Background(), "example.topic")
    if err != nil {
        panic(err)
    }

    go process(messages)
// ...
}

完整源代码:github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
func process(messages <-chan *message.Message) {
    for msg := range messages {
        log.Printf("接收消息: %s, payload: %s", msg.UUID, string(msg.Payload))

        // 我们需要确认已经接收并处理了消息,
        // 否则消息将会一次又一次地被重新发送。
        msg.Ack()
    }
}

SQL 例子

完整源码:github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
package main

import (
    "context"
    stdSQL "database/sql"
    "log"
    "time"

    driver "github.com/go-sql-driver/mysql"

    "github.com/ThreeDotsLabs/watermill"
    "github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
    "github.com/ThreeDotsLabs/watermill/message"
)

func main() {
    db := createDB()
    logger := watermill.NewStdLogger(false, false)

    subscriber, err := sql.NewSubscriber(
        db,
        sql.SubscriberConfig{
            SchemaAdapter:    sql.DefaultMySQLSchema{},
            OffsetsAdapter:   sql.DefaultMySQLOffsetsAdapter{},
            InitializeSchema: true,
        },
        logger,
    )
    if err != nil {
        panic(err)
    }

    messages, err := subscriber.Subscribe(context.Background(), "example_topic")
    if err != nil {
        panic(err)
    }

    go process(messages)
// ...

完整源码:github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
func process(messages <-chan *message.Message) {
    for msg := range messages {
        log.Printf("received message: %s, payload: %s", msg.UUID, string(msg.Payload))

        // 需要确认我们已经接收并处理了消息,
        // 否则,消息将一遍又一遍地重复投递。
        msg.Ack()
    }
}

创建消息

Watermill不强制任何消息格式。NewMessage期望负载为字节切片。您可以使用字符串、JSON、protobuf、Avro、gob或任何其他序列化为[]byte的格式。

消息UUID是可选的,但建议使用,因为它有助于调试。

msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))

发布消息

Publish方法需要一个主题和一个或多个要发布的消息。

err := publisher.Publish("example.topic", msg)
if err != nil {
    panic(err)
}

Go Channel 例子

完整源码:github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
    go process(messages)

    publishMessages(pubSub)
}

func publishMessages(publisher message.Publisher) {
    for {
        msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))

        if err := publisher.Publish("example.topic", msg); err != nil {
            panic(err)
        }

        time.Sleep(time.Second)
// ...

Kafka 例子

完整源码:github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
    go process(messages)

    publisher, err := kafka.NewPublisher(
        kafka.PublisherConfig{
            Brokers:   []string{"kafka:9092"},
            Marshaler: kafka.DefaultMarshaler{},
        },
        watermill.NewStdLogger(false, false),
    )
    if err != nil {
        panic(err)
    }

    publishMessages(publisher)
}

func publishMessages(publisher message.Publisher) {
    for {
        msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))

        if err := publisher.Publish("example.topic", msg); err != nil {
            panic(err)
        }

        time.Sleep(time.Second)
// ...

RabbitMQ (AMQP) 例子

完整代码:github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
    go process(messages)

    publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
    if err != nil {
        panic(err)
    }

    publishMessages(publisher)
}

func publishMessages(publisher message.Publisher) {
    for {
        msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))

        if err := publisher.Publish("example.topic", msg); err != nil {
            panic(err)
        }

        time.Sleep(time.Second)
// ...

SQL 例子

完整代码:github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
    go process(messages)

    publisher, err := sql.NewPublisher(
        db,
        sql.PublisherConfig{
            SchemaAdapter: sql.DefaultMySQLSchema{},
        },
        logger,
    )
    if err != nil {
        panic(err)
    }

    publishMessages(publisher)
}

func createDB() *stdSQL.DB {
    conf := driver.NewConfig()
    conf.Net = "tcp"
    conf.User = "root"
    conf.Addr = "mysql"
    conf.DBName = "watermill"

    db, err := stdSQL.Open("mysql", conf.FormatDSN())
    if err != nil {
        panic(err)
    }

    err = db.Ping()
    if err != nil {
        panic(err)
    }

    return db
}

func publishMessages(publisher message.Publisher) {
    for {
        msg := message.NewMessage(watermill.NewUUID(), []byte(`{"message": "Hello, world!"}`))

        if err := publisher.Publish("example_topic", msg); err != nil {
            panic(err)
        }

        time.Sleep(time.Second)
// ...

使用Message Router

发布者和订阅者是Watermill的较低级别的部分。在大多数情况下,您通常希望使用高级接口和功能,如关联、指标、毒队列、重试、限流等。

您可能只想在成功处理消息时才发送确认。在其他情况下,您将立即发送确认,然后再考虑处理。有时,您希望根据传入的消息执行某些操作,并在回应中发布另一条消息。

为了满足这些需求,有一个名为Router的组件。

Message Router的示例应用程序

示例应用程序的流程如下:

  1. 每秒钟在incoming_messages_topic主题上产生一条消息。
  2. struct_handler处理程序监听incoming_messages_topic。当收到消息时,打印UUID并在outgoing_messages_topic上产生一条新消息。
  3. print_incoming_messages处理程序监听incoming_messages_topic并打印消息的UUID、有效载荷和元数据。
  4. print_outgoing_messages处理程序监听outgoing_messages_topic并打印消息的UUID、有效载荷和元数据。关联ID应与incoming_messages_topic上的消息相同。

路由器配置

首先配置路由器,添加插件和中间件。然后设置路由器将使用的处理程序。每个处理程序将独立处理消息。

完整源代码:github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// ...
package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/ThreeDotsLabs/watermill"
    "github.com/ThreeDotsLabs/watermill/message"
    "github.com/ThreeDotsLabs/watermill/message/router/middleware"
    "github.com/ThreeDotsLabs/watermill/message/router/plugin"
    "github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

var (
    // 对于这个示例,我们使用了一个简单的日志记录器实现,
    // 您可能想提供自己的`watermill.LoggerAdapter`实现。
    logger = watermill.NewStdLogger(false, false)
)

func main() {
    router, err := message.NewRouter(message.RouterConfig{}, logger)
    if err != nil {
        panic(err)
    }

    // 当接收到SIGTERM信号时,SignalsHandler会优雅地关闭Router。
    // 您也可以通过调用`r.Close()`来关闭路由器。
    router.AddPlugin(plugin.SignalsHandler)

    // 路由器级别的中间件将为发送到路由器的每个消息执行
    router.AddMiddleware(
        // CorrelationID将从传入消息的元数据复制关联ID到生成的消息中
        middleware.CorrelationID,

        // 如果处理程序返回错误,则重试处理程序函数。
        // 在达到MaxRetries后,消息被Nacked,由PubSub负责重新发送。
        middleware.Retry{
            MaxRetries:      3,
            InitialInterval: time.Millisecond * 100,
            Logger:          logger,
        }.Middleware,

        // Recoverer处理处理程序的panic。
        // 在这种情况下,它将它们作为错误传递给Retry中间件。
        middleware.Recoverer,
    )

    // 为了简化,我们在这里使用gochannel Pub/Sub,
    // 您可以用任何Pub/Sub实现替换它,效果将是一样的。
    pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)

    // 在后台产生一些传入消息
    go publishMessages(pubSub)

    // AddHandler返回一个处理程序,可用于添加处理程序级别的中间件
    // 或停止处理程序。
    handler := router.AddHandler(
        "struct_handler",          // 处理程序名称,必须是唯一的
        "incoming_messages_topic", // 从中读取事件的主题
        pubSub,
        "outgoing_messages_topic", // 发布事件到哪个主题
        pubSub,
        structHandler{}.Handler,
    )

    // 处理程序级别的中间件仅对特定处理程序执行
    // 可以通过与路由器级别中间件相同的方式添加此类中间件
    handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
        return func(message *message.Message) ([]*message.Message, error) {
            log.Println("为", message.UUID, "执行处理程序特定中间件")

            return h(message)
        }
    })

    // 仅用于调试,我们打印在“incoming_messages_topic”上收到的所有消息
    router.AddNoPublisherHandler(
        "print_incoming_messages",
        "incoming_messages_topic",
        pubSub,
        printMessages,
    )

    // 仅用于调试,我们打印发送到“outgoing_messages_topic”的所有事件
    router.AddNoPublisherHandler(
        "print_outgoing_messages",
        "outgoing_messages_topic",
        pubSub,
        printMessages,
    )

    // 现在所有处理程序都已注册,我们正在运行路由器。
    // Run在路由器运行时会阻塞。
    ctx := context.Background()
    if err := router.Run(ctx); err != nil {
        panic(err)
    }
}
// ...

入站消息

struct_handlerincoming_messages_topic 中消耗消息,因此我们通过在后台调用 publishMessages() 来模拟传入的流量。注意,我们添加了 SetCorrelationID 中间件。路由器将为所有生成的消息添加一个关联ID(存储在元数据中)。

完整源代码:github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// …
func publishMessages(publisher message.Publisher) {
    for {
        msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))
        middleware.SetCorrelationID(watermill.NewUUID(), msg)

        log.Printf("sending message %s, correlation id: %s\n", msg.UUID, middleware.MessageCorrelationID(msg))

        if err := publisher.Publish("incoming_messages_topic", msg); err != nil {
            panic(err)
        }

        time.Sleep(time.Second)
    }
}
// …

处理器(Handlers )

您可能已经注意到有两种类型的处理器函数

  1. 函数 func(msg *message.Message) ([]*message.Message, error)
  2. 方法 func (c structHandler) Handler(msg *message.Message) ([]*message.Message, error)

如果您的处理器是一个不依赖于任何依赖项的函数,使用第一个选项是可以的。当您的处理器需要一些依赖项(如数据库句柄、记录器等)时,第二个选项很有用。

完整源代码:github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// …
func printMessages(msg *message.Message) error {
    fmt.Printf(
        "\n> Received message: %s\n> %s\n> metadata: %v\n\n",
        msg.UUID, string(msg.Payload), msg.Metadata,
    )
    return nil
}

type structHandler struct {
    // 我们可以在这里添加一些依赖项
}

func (s structHandler) Handler(msg *message.Message) ([]*message.Message, error) {
    log.Println("structHandler received message", msg.UUID)

    msg = message.NewMessage(watermill.NewUUID(), []byte("message produced by structHandler"))
    return message.Messages{msg}, nil
}

完成!

您可以通过 go run main.go 运行此示例。