Watermill是什么?
Watermill是一个用于高效处理消息流的Golang库。它旨在用于构建事件驱动的应用程序。它可以用于事件溯源、基于消息的RPC、saga和其他你能想到的用途。你可以使用传统的发布/订阅实现,比如Kafka或RabbitMQ,也可以使用HTTP或MySQL的binlog,根据你的使用场景选择。
Watermill提供了一组发布/订阅的实现,并且可以很容易地通过扩展自己的实现。
Watermill还附带了标准的中间件,如仪表盘、队列、限流、关联等,这些工具被每个消息驱动的应用程序使用。
为什么使用Watermill?
随着越来越多的项目在最近几年采用微服务架构,并不是所有业务场景都可以通过RPC这种同步调用方式解决。异步任务这种处理机制反而成为了很好的补充。
Watermill目标是成为Go语言的标准消息库,将所有这些复杂性隐藏在易于理解的API后面。它提供了构建基于事件或其他异步模式的应用程序所需的一切。在查看示例之后,您应该能够快速集成Watermill到您的项目中。
安装
go get -u github.com/ThreeDotsLabs/watermill
准备背景
事件驱动应用程序背后的基本思想始终是相同的:监听传入的消息并对其做出反应。Watermill支持为多个发布者和订阅者实现此行为。
Watermill的核心部分是Message。它就像http
包中的http.Request
一样重要。大多数Watermill功能都以某种方式使用这个结构体。
尽管PubSub库提供了复杂的功能,但对于Watermill来说,只需实现两个接口即可开始使用它们:Publisher
和Subscriber
。
type Publisher interface {
Publish(topic string, messages ...*Message) error
Close() error
}
type Subscriber interface {
Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
}
订阅消息
让我们从订阅开始。Subscribe
期望一个主题名称,并返回一个接收传入消息的通道。主题的具体含义取决于PubSub的实现。
messages, err := subscriber.Subscribe(ctx, "example.topic")
if err != nil {
panic(err)
}
for msg := range messages {
fmt.Printf("received message: %s, payload: %s\n", msg.UUID, string(msg.Payload))
msg.Ack()
}
支持的PubSub的详细示例请参见下面的内容。
Go Channel 例子
完整示例代码:github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
package main
import (
"context"
"fmt"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)
func main() {
pubSub := gochannel.NewGoChannel(
gochannel.Config{},
watermill.NewStdLogger(false, false),
)
messages, err := pubSub.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
完整示例代码:github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
fmt.Printf("received message: %s, payload: %s\n", msg.UUID, string(msg.Payload))
//我们需要确认我们已经接收并处理了消息,否则它将被重发多次。
msg.Ack()
}
}
Kafka 例子
完整源代码:github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
package main
import (
"context"
"log"
"time"
"github.com/Shopify/sarama"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
"github.com/ThreeDotsLabs/watermill/message"
)
func main() {
saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
// 相当于 auto.offset.reset: earliest
saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
subscriber, err := kafka.NewSubscriber(
kafka.SubscriberConfig{
Brokers: []string{"kafka:9092"},
Unmarshaler: kafka.DefaultMarshaler{},
OverwriteSaramaConfig: saramaSubscriberConfig,
ConsumerGroup: "test_consumer_group",
},
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
}
完整源代码:github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("接收消息: %s, payload: %s", msg.UUID, string(msg.Payload))
// 我们需要确认已经接收并处理了消息,
// 否则消息将会一次又一次地被重新发送。
msg.Ack()
}
}
RabbitMQ (AMQP) 例子
完整源代码:github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
package main
import (
"context"
"log"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
"github.com/ThreeDotsLabs/watermill/message"
)
var amqpURI = "amqp://guest:guest@rabbitmq:5672/"
func main() {
amqpConfig := amqp.NewDurableQueueConfig(amqpURI)
subscriber, err := amqp.NewSubscriber(
// 此配置基于以下示例:https://www.rabbitmq.com/tutorials/tutorial-two-go.html
// 它被用作一个简单的队列。
//
// 如果你想要实现Pub/Sub风格的服务,请参考
// https://watermill.io/pubsubs/amqp/#amqp-consumer-groups
amqpConfig,
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
}
完整源代码:github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("接收消息: %s, payload: %s", msg.UUID, string(msg.Payload))
// 我们需要确认已经接收并处理了消息,
// 否则消息将会一次又一次地被重新发送。
msg.Ack()
}
}
SQL 例子
完整源码:github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
package main
import (
"context"
stdSQL "database/sql"
"log"
"time"
driver "github.com/go-sql-driver/mysql"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
"github.com/ThreeDotsLabs/watermill/message"
)
func main() {
db := createDB()
logger := watermill.NewStdLogger(false, false)
subscriber, err := sql.NewSubscriber(
db,
sql.SubscriberConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
OffsetsAdapter: sql.DefaultMySQLOffsetsAdapter{},
InitializeSchema: true,
},
logger,
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example_topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
完整源码:github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("received message: %s, payload: %s", msg.UUID, string(msg.Payload))
// 需要确认我们已经接收并处理了消息,
// 否则,消息将一遍又一遍地重复投递。
msg.Ack()
}
}
创建消息
Watermill不强制任何消息格式。NewMessage
期望负载为字节切片。您可以使用字符串、JSON、protobuf、Avro、gob或任何其他序列化为[]byte
的格式。
消息UUID是可选的,但建议使用,因为它有助于调试。
msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))
发布消息
Publish
方法需要一个主题和一个或多个要发布的消息。
err := publisher.Publish("example.topic", msg)
if err != nil {
panic(err)
}
Go Channel 例子
完整源码:github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
go process(messages)
publishMessages(pubSub)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))
if err := publisher.Publish("example.topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
Kafka 例子
完整源码:github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
go process(messages)
publisher, err := kafka.NewPublisher(
kafka.PublisherConfig{
Brokers: []string{"kafka:9092"},
Marshaler: kafka.DefaultMarshaler{},
},
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))
if err := publisher.Publish("example.topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
RabbitMQ (AMQP) 例子
完整代码:github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
go process(messages)
publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))
if err := publisher.Publish("example.topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
SQL 例子
完整代码:github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
go process(messages)
publisher, err := sql.NewPublisher(
db,
sql.PublisherConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
},
logger,
)
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func createDB() *stdSQL.DB {
conf := driver.NewConfig()
conf.Net = "tcp"
conf.User = "root"
conf.Addr = "mysql"
conf.DBName = "watermill"
db, err := stdSQL.Open("mysql", conf.FormatDSN())
if err != nil {
panic(err)
}
err = db.Ping()
if err != nil {
panic(err)
}
return db
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte(`{"message": "Hello, world!"}`))
if err := publisher.Publish("example_topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
使用Message Router
发布者和订阅者是Watermill的较低级别的部分。在大多数情况下,您通常希望使用高级接口和功能,如关联、指标、毒队列、重试、限流等。
您可能只想在成功处理消息时才发送确认。在其他情况下,您将立即发送确认,然后再考虑处理。有时,您希望根据传入的消息执行某些操作,并在回应中发布另一条消息。
为了满足这些需求,有一个名为Router的组件。
Message Router的示例应用程序
示例应用程序的流程如下:
- 每秒钟在
incoming_messages_topic
主题上产生一条消息。 struct_handler
处理程序监听incoming_messages_topic
。当收到消息时,打印UUID并在outgoing_messages_topic
上产生一条新消息。print_incoming_messages
处理程序监听incoming_messages_topic
并打印消息的UUID、有效载荷和元数据。print_outgoing_messages
处理程序监听outgoing_messages_topic
并打印消息的UUID、有效载荷和元数据。关联ID应与incoming_messages_topic
上的消息相同。
路由器配置
首先配置路由器,添加插件和中间件。然后设置路由器将使用的处理程序。每个处理程序将独立处理消息。
完整源代码:github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// ...
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
"github.com/ThreeDotsLabs/watermill/message/router/plugin"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)
var (
// 对于这个示例,我们使用了一个简单的日志记录器实现,
// 您可能想提供自己的`watermill.LoggerAdapter`实现。
logger = watermill.NewStdLogger(false, false)
)
func main() {
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// 当接收到SIGTERM信号时,SignalsHandler会优雅地关闭Router。
// 您也可以通过调用`r.Close()`来关闭路由器。
router.AddPlugin(plugin.SignalsHandler)
// 路由器级别的中间件将为发送到路由器的每个消息执行
router.AddMiddleware(
// CorrelationID将从传入消息的元数据复制关联ID到生成的消息中
middleware.CorrelationID,
// 如果处理程序返回错误,则重试处理程序函数。
// 在达到MaxRetries后,消息被Nacked,由PubSub负责重新发送。
middleware.Retry{
MaxRetries: 3,
InitialInterval: time.Millisecond * 100,
Logger: logger,
}.Middleware,
// Recoverer处理处理程序的panic。
// 在这种情况下,它将它们作为错误传递给Retry中间件。
middleware.Recoverer,
)
// 为了简化,我们在这里使用gochannel Pub/Sub,
// 您可以用任何Pub/Sub实现替换它,效果将是一样的。
pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
// 在后台产生一些传入消息
go publishMessages(pubSub)
// AddHandler返回一个处理程序,可用于添加处理程序级别的中间件
// 或停止处理程序。
handler := router.AddHandler(
"struct_handler", // 处理程序名称,必须是唯一的
"incoming_messages_topic", // 从中读取事件的主题
pubSub,
"outgoing_messages_topic", // 发布事件到哪个主题
pubSub,
structHandler{}.Handler,
)
// 处理程序级别的中间件仅对特定处理程序执行
// 可以通过与路由器级别中间件相同的方式添加此类中间件
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("为", message.UUID, "执行处理程序特定中间件")
return h(message)
}
})
// 仅用于调试,我们打印在“incoming_messages_topic”上收到的所有消息
router.AddNoPublisherHandler(
"print_incoming_messages",
"incoming_messages_topic",
pubSub,
printMessages,
)
// 仅用于调试,我们打印发送到“outgoing_messages_topic”的所有事件
router.AddNoPublisherHandler(
"print_outgoing_messages",
"outgoing_messages_topic",
pubSub,
printMessages,
)
// 现在所有处理程序都已注册,我们正在运行路由器。
// Run在路由器运行时会阻塞。
ctx := context.Background()
if err := router.Run(ctx); err != nil {
panic(err)
}
}
// ...
入站消息
struct_handler
从 incoming_messages_topic
中消耗消息,因此我们通过在后台调用 publishMessages()
来模拟传入的流量。注意,我们添加了 SetCorrelationID
中间件。路由器将为所有生成的消息添加一个关联ID(存储在元数据中)。
完整源代码:github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// …
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))
middleware.SetCorrelationID(watermill.NewUUID(), msg)
log.Printf("sending message %s, correlation id: %s\n", msg.UUID, middleware.MessageCorrelationID(msg))
if err := publisher.Publish("incoming_messages_topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
}
}
// …
处理器(Handlers )
您可能已经注意到有两种类型的处理器函数:
- 函数
func(msg *message.Message) ([]*message.Message, error)
- 方法
func (c structHandler) Handler(msg *message.Message) ([]*message.Message, error)
如果您的处理器是一个不依赖于任何依赖项的函数,使用第一个选项是可以的。当您的处理器需要一些依赖项(如数据库句柄、记录器等)时,第二个选项很有用。
完整源代码:github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// …
func printMessages(msg *message.Message) error {
fmt.Printf(
"\n> Received message: %s\n> %s\n> metadata: %v\n\n",
msg.UUID, string(msg.Payload), msg.Metadata,
)
return nil
}
type structHandler struct {
// 我们可以在这里添加一些依赖项
}
func (s structHandler) Handler(msg *message.Message) ([]*message.Message, error) {
log.Println("structHandler received message", msg.UUID)
msg = message.NewMessage(watermill.NewUUID(), []byte("message produced by structHandler"))
return message.Messages{msg}, nil
}
完成!
您可以通过 go run main.go
运行此示例。