Golang实现的Watermill中的CQRS。
CQRS机制
CQRS意味着“命令查询职责分离”。我们将命令(写请求)和查询(读请求)的责任分离开。写请求和读请求由不同的对象处理。
这就是CQRS。我们还可以进一步分割数据存储,拥有单独的读和写存储。一旦这样做,可能会有许多读存储,针对处理不同类型的查询或跨越许多有界上下文进行优化。尽管单独的读/写存储通常是与CQRS相关讨论的主题,但这不是CQRS本身。CQRS仅是命令和查询的第一次分割。
cqrs
组件提供了一些有用的抽象,构建在Pub/Sub和Router的基础之上,有助于实现CQRS模式。
您不需要实现整个CQRS。通常只使用该组件的事件部分构建事件驱动的应用程序。
构建模块
事件
事件表示已经发生的事情。事件是不可变的。
事件总线
完整源代码:github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go
// ...
// EventBus将事件传输到事件处理程序。
type EventBus struct {
// ...
完整源代码:github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go
// ...
type EventBusConfig struct {
// GeneratePublishTopic用于生成发布事件的主题名称。
GeneratePublishTopic GenerateEventPublishTopicFn
// OnPublish在发送事件之前调用。
// 可以修改*message.Message。
//
// 此选项不是必需的。
OnPublish OnEventSendFn
// Marshaler用于编码和解码事件。
// 这是必需的。
Marshaler CommandEventMarshaler
// 用于记录的Logger实例。
// 如果未提供,则使用watermill.NopLogger。
Logger watermill.LoggerAdapter
}
func (c *EventBusConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
事件处理器
完整代码:github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go
// ...
// EventProcessor 用于确定应该处理从事件总线接收到的事件的 EventHandler。
type EventProcessor struct {
// ...
完整代码:github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go
// ...
type EventProcessorConfig struct {
// GenerateSubscribeTopic 用于生成订阅事件的主题。
// 如果事件处理器使用处理程序组,则使用 GenerateSubscribeTopic。
GenerateSubscribeTopic EventProcessorGenerateSubscribeTopicFn
// SubscriberConstructor 用于为 EventHandler 创建订阅者。
//
// 此函数对每个 EventHandler 实例调用一次。
// 如果要为多个处理程序重用一个订阅者,请使用 GroupEventProcessor。
SubscriberConstructor EventProcessorSubscriberConstructorFn
// OnHandle 在处理事件之前被调用。
// OnHandle 的工作方式类似于中间件:您可以在处理事件之前和之后注入其他逻辑。
//
// 因此,您需要显式调用 params.Handler.Handle() 来处理事件。
//
// func(params EventProcessorOnHandleParams) (err error) {
// // 处理之前的逻辑
// // (...)
// err := params.Handler.Handle(params.Message.Context(), params.Event)
//
// // 处理之后的逻辑
// // (...)
// return err
// }
//
// 此选项不是必需的。
OnHandle EventProcessorOnHandleFn
// AckOnUnknownEvent 用于确定是否应在事件没有定义处理程序时确认消息。
AckOnUnknownEvent bool
// Marshaler 用于编组和解组事件。
// 必需的。
Marshaler CommandEventMarshaler
// 用于日志记录的 Logger 实例。
// 如果未提供,将使用 watermill.NopLogger。
Logger watermill.LoggerAdapter
// disableRouterAutoAddHandlers 是用于保持向后兼容性的。
// 当使用 NewEventProcessor 创建 EventProcessor 时,将设置该值。
// 已弃用:请迁移到 NewEventProcessorWithConfig。
disableRouterAutoAddHandlers bool
}
func (c *EventProcessorConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
事件组处理器
完整源码:github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go
// ...
// EventGroupProcessor 决定应该由哪个事件处理程序处理从事件总线接收到的事件。
// 与 EventProcessor 相比,EventGroupProcessor 允许具有共享同一订阅者实例的多个处理程序。
type EventGroupProcessor struct {
// ...
完整源码:github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go
// ...
type EventGroupProcessorConfig struct {
// GenerateSubscribeTopic 用于生成订阅处理程序组事件的主题。
// 如果使用处理程序组,则此选项对于 EventProcessor 是必需的。
GenerateSubscribeTopic EventGroupProcessorGenerateSubscribeTopicFn
// SubscriberConstructor 用于为 GroupEventHandler 创建订阅者。
// 此函数在每个事件组中调用一次 - 这样可以为每个组创建一个订阅。
// 当我们要按顺序处理来自一个流的事件时,它非常有用。
SubscriberConstructor EventGroupProcessorSubscriberConstructorFn
// OnHandle 在处理事件之前调用。
// OnHandle 类似于中间件:您可以在处理事件之前和之后注入其他逻辑。
//
// 因此,您需要显式调用 params.Handler.Handle() 来处理事件。
//
// func(params EventGroupProcessorOnHandleParams) (err error) {
// // 处理之前的逻辑
// // (...)
//
// err := params.Handler.Handle(params.Message.Context(), params.Event)
//
// // 处理之后的逻辑
// // (...)
//
// return err
// }
//
// 此选项不是必需的。
OnHandle EventGroupProcessorOnHandleFn
// AckOnUnknownEvent 用于决定是否应该确认如果事件没有定义处理程序。
AckOnUnknownEvent bool
// Marshaler 用于编码和解码事件。
// 这是必需的。
Marshaler CommandEventMarshaler
// 用于记录的 Logger 实例。
// 如果未提供,则使用 watermill.NopLogger。
Logger watermill.LoggerAdapter
}
func (c *EventGroupProcessorConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
了解更多关于事件组处理器的信息。
事件处理程序
完整源码:github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go
// ...
// EventHandler 接收由 NewEvent 定义的事件,并使用其 Handle 方法处理它们。
// 如果使用 DDD,事件处理程序可以修改和持久化聚合。
// 它还可以调用流程管理器、saga 或只是构建读模型。
//
// 与命令处理程序不同,每个事件可以有多个事件处理程序。
//
// 在处理消息期间,使用一个 EventHandler 实例。
// 当同时传递多个事件时,Handle 方法可以同时执行多次。
// 因此,Handle 方法需要是线程安全的!
type EventHandler interface {
// ...
命令
命令是一个简单的数据结构,表示执行某些操作的请求。
命令总线
完整源码:github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go
// ...
// CommandBus是将命令传输给命令处理程序的组件。
type CommandBus struct {
// ...
完整源码:github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go
// ...
type CommandBusConfig struct {
// GeneratePublishTopic用于生成发布命令的主题。
GeneratePublishTopic CommandBusGeneratePublishTopicFn
// OnSend在发布命令之前调用。
// 可以修改 *message.Message。
//
// 该选项不是必需的。
OnSend CommandBusOnSendFn
// Marshaler用于序列化和反序列化命令。
// 必需的。
Marshaler CommandEventMarshaler
// 用于记录日志的Logger实例。
// 如果未提供,将使用watermill.NopLogger。
Logger watermill.LoggerAdapter
}
func (c *CommandBusConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
命令处理器
完整源码:github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go
// ...
// CommandProcessorSubscriberConstructorFn为CommandHandler创建订阅者。
// 它允许您为每个命令处理程序创建单独的自定义订阅者。
type CommandProcessorSubscriberConstructorFn func(CommandProcessorSubscriberConstructorParams) (message.Subscriber, error)
// ...
完整源码:github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go
// ...
type CommandProcessorConfig struct {
// GenerateSubscribeTopic用于生成订阅命令的主题。
GenerateSubscribeTopic CommandProcessorGenerateSubscribeTopicFn
// SubscriberConstructor用于为CommandHandler创建订阅者。
SubscriberConstructor CommandProcessorSubscriberConstructorFn
// OnHandle在处理命令之前调用。
// OnHandle的工作方式与中间件类似:您可以在处理命令之前和之后注入附加的逻辑。
//
// 由于这一点,您需要显式调用params.Handler.Handle()来处理命令。
// func(params CommandProcessorOnHandleParams) (err error) {
// // 逻辑在处理之前
// // (...)
//
// err := params.Handler.Handle(params.Message.Context(), params.Command)
//
// // 逻辑在处理之后
// // (...)
//
// return err
// }
//
// 该选项不是必需的。
OnHandle CommandProcessorOnHandleFn
// Marshaler用于序列化和反序列化命令。
// 必需的。
Marshaler CommandEventMarshaler
// 用于记录日志的Logger实例。
// 如果未提供,将使用watermill.NopLogger。
Logger watermill.LoggerAdapter
// 如果为true,即使CommandHandler返回错误,CommandProcessor也会ack消息。
// 如果RequestReplyBackend不为null并且发送回复失败,消息仍将被否认。
//
// 警告:在使用requestreply组件(requestreply.NewCommandHandler或requestreply.NewCommandHandlerWithResult)时,不建议使用此选项,
// 因为当发送回复失败时,它可能会ack命令。
//
// 当使用requestreply时,您应该使用requestreply.PubSubBackendConfig.AckCommandErrors。
AckCommandHandlingErrors bool
// disableRouterAutoAddHandlers用于保持向后兼容性。
// 当由NewCommandProcessor创建CommandProcessor时设置它。
// 已弃用:请迁移到NewCommandProcessorWithConfig。
disableRouterAutoAddHandlers bool
}
func (c *CommandProcessorConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
命令处理器
完整的源码:github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go
// ...
// CommandHandler接收由NewCommand定义的命令,并使用Handle方法处理它。
// 如果使用DDD,CommandHandler可能会修改和持久化聚合。
//
// 与EventHandler不同,每个Command只能有一个CommandHandler。
//
// 在处理消息期间,使用一个CommandHandler实例。
// 当多个命令同时被投递时,Handle方法可以同时执行多次。
// 因此,Handle方法需要是线程安全的!
type CommandHandler interface {
// ...
命令和事件编组器
完整的源码:github.com/ThreeDotsLabs/watermill/components/cqrs/marshaler.go
// ...
// CommandEventMarshaler将命令和事件编组为Watermill的消息,反之亦然。
// 命令的有效负载需要被编组为[]bytes。
type CommandEventMarshaler interface {
// Marshal将命令或事件编组为Watermill的消息。
Marshal(v interface{}) (*message.Message, error)
// Unmarshal将Watermill的消息解码为v命令或事件。
Unmarshal(msg *message.Message, v interface{}) (err error)
// Name返回命令或事件的名称。
// 名称可用于确定所接收到的命令或事件是否是我们要处理的事件。
Name(v interface{}) string
// NameFromMessage从Watermill的消息中返回命令或事件的名称(由Marshal生成)。
//
// 当我们有编组为Watermill的消息的命令或事件时,
// 我们应该使用NameFromMessage而不是Name,以避免不必要的解码。
NameFromMessage(msg *message.Message) string
}
// ...
使用方法
示例领域
以一个简单的领域为例,该领域负责在酒店中处理房间预订。
我们将使用Event Storming符号来展示此领域的模型。
符号说明:
- 蓝色便笺是命令
- 橙色便笺是事件
- 绿色便笺是从事件异步生成的读取模型
- 紫色便笺是由事件触发并生成命令的策略
- 粉色便笺是热点区域;我们标记经常出现问题的地方
领域很简单:
- 客户可以预订房间。
-
每当预订一间房,我们就为客户订购一瓶啤酒(因为我们热爱我们的客人)。
- 我们知道有时啤酒不够。
- 我们会基于预订生成一份财务报告。
发送命令
首先,我们需要模拟客户的动作。
完整的源码:github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf
// ...
bookRoomCmd := &BookRoom{
RoomId: fmt.Sprintf("%d", i),
GuestName: "John",
StartDate: startDate,
EndDate: endDate,
}
if err := commandBus.Send(context.Background(), bookRoomCmd); err != nil {
panic(err)
}
// ...
命令处理器
BookRoomHandler
将处理我们的命令。
完整源代码: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
// BookRoomHandler是一个命令处理器,处理BookRoom命令并发出RoomBooked事件。
//
// 在CQRS中,一种命令必须由一个处理器处理。
// 当添加另一个处理此命令的处理器到命令处理器时,将返回错误。
type BookRoomHandler struct {
eventBus *cqrs.EventBus
}
func (b BookRoomHandler) HandlerName() string {
return "BookRoomHandler"
}
// NewCommand返回此处理器应处理的命令类型。它必须是一个指针。
func (b BookRoomHandler) NewCommand() interface{} {
return &BookRoom{}
}
func (b BookRoomHandler) Handle(ctx context.Context, c interface{}) error {
// c始终是由`NewCommand`返回的类型,强制转换始终是安全的
cmd := c.(*BookRoom)
// 一些随机价格,在实际生产中可能会以更明智的方式计算
price := (rand.Int63n(40) + 1) * 10
log.Printf(
"预订了%s,从%s到%s",
cmd.RoomId,
cmd.GuestName,
time.Unix(cmd.StartDate.Seconds, int64(cmd.StartDate.Nanos)),
time.Unix(cmd.EndDate.Seconds, int64(cmd.EndDate.Nanos)),
)
// RoomBooked将由OrderBeerOnRoomBooked事件处理器处理,
// 将来RoomBooked可以由多个事件处理器处理
if err := b.eventBus.Publish(ctx, &RoomBooked{
ReservationId: watermill.NewUUID(),
RoomId: cmd.RoomId,
GuestName: cmd.GuestName,
Price: price,
StartDate: cmd.StartDate,
EndDate: cmd.EndDate,
}); err != nil {
return err
}
return nil
}
// OrderBeerOnRoomBooked是一个事件处理器,处理RoomBooked事件并发出OrderBeer命令。
// ...
事件处理器
如前所述,我们希望每次订房时都能订一瓶啤酒(*“当房间被预订时”*贴纸)。我们通过使用OrderBeer
命令来实现。
完整源代码: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
// OrderBeerOnRoomBooked是一个事件处理器,处理RoomBooked事件并发出OrderBeer命令。
type OrderBeerOnRoomBooked struct {
commandBus *cqrs.CommandBus
}
func (o OrderBeerOnRoomBooked) HandlerName() string {
// 这个名称传给EventsSubscriberConstructor用于生成队列名称
return "OrderBeerOnRoomBooked"
}
func (OrderBeerOnRoomBooked) NewEvent() interface{} {
return &RoomBooked{}
}
func (o OrderBeerOnRoomBooked) Handle(ctx context.Context, e interface{}) error {
event := e.(*RoomBooked)
orderBeerCmd := &OrderBeer{
RoomId: event.RoomId,
Count: rand.Int63n(10) + 1,
}
return o.commandBus.Send(ctx, orderBeerCmd)
}
// OrderBeerHandler是一个命令处理器,处理OrderBeer命令并发出BeerOrdered事件。
// ...
OrderBeerHandler
与BookRoomHandler
非常相似。唯一的区别是,当啤酒不足时,它有时会返回错误,导致命令被重新传递。您可以在示例源代码中找到完整的实现。
事件处理程序组
默认情况下,每个事件处理程序都有一个单独的订阅者实例。如果只有一个事件类型发送到主题,那么这种方式可以正常工作。
在主题上存在多个事件类型的情况下,有两种选择:
- 您可以将
EventConfig.AckOnUnknownEvent
设置为true
- 这将确认所有未由处理程序处理的事件。 - 您可以使用事件处理程序组机制。
要使用事件组,您需要在 EventConfig
中设置 GenerateHandlerGroupSubscribeTopic
和 GroupSubscriberConstructor
选项。
然后,您可以在 EventProcessor
上使用 AddHandlersGroup
。
完整源码:github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
err = eventProcessor.AddHandlersGroup(
"events",
OrderBeerOnRoomBooked{commandBus},
NewBookingsFinancialReport(),
cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
logger.Info("Beer ordered", watermill.LogFields{
"room_id": event.RoomId,
})
return nil
}),
)
if err != nil {
// ...
GenerateHandlerGroupSubscribeTopic
和 GroupSubscriberConstructor
都在函数参数中接收有关组名称的信息。
通用处理程序
从 Watermill v1.3 开始,可以使用通用处理程序来处理命令和事件。当您有大量命令/事件且不想为每个命令/事件创建一个处理程序时,这非常有用。
完整源码:github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
logger.Info("Beer ordered", watermill.LogFields{
"room_id": event.RoomId,
})
return nil
}),
// ...
在幕后,它创建了 EventHandler 或 CommandHandler 实现。它适用于所有类型的处理程序。
完整源码:github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go
// ...
// NewCommandHandler 根据提供的函数和从函数参数中推断出的命令类型创建一个新的 CommandHandler 实现。
func NewCommandHandler[Command any](
// ...
完整源码:github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go
// ...
// NewEventHandler 根据提供的函数和从函数参数中推断出的事件类型创建一个新的 EventHandler 实现。
func NewEventHandler[T any](
// ...
完整源码:github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go
// ...
// NewGroupEventHandler 根据提供的函数和从函数参数中推断出的事件类型创建一个新的 GroupEventHandler 实现。
func NewGroupEventHandler[T any](handleFunc func(ctx context.Context, event *T) error) GroupEventHandler {
// ...
使用事件处理程序构建读模型
完整源代码:github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
// BookingsFinancialReport是一个读模型,它计算我们可以从预订中赚取多少钱。
// 就像在发生RoomBooked事件时,它监听RoomBooked事件。
//
// 这个实现只是写入内存。在生产环境中,您可能会使用某种持久存储。
type BookingsFinancialReport struct {
handledBookings map[string]struct{}
totalCharge int64
lock sync.Mutex
}
func NewBookingsFinancialReport() *BookingsFinancialReport {
return &BookingsFinancialReport{handledBookings: map[string]struct{}{}}
}
func (b BookingsFinancialReport) HandlerName() string {
// 此名称传递给EventsSubscriberConstructor并用于生成队列名称
return "BookingsFinancialReport"
}
func (BookingsFinancialReport) NewEvent() interface{} {
return &RoomBooked{}
}
func (b *BookingsFinancialReport) Handle(ctx context.Context, e interface{}) error {
// Handle可能会并发调用,因此需要线程安全。
b.lock.Lock()
defer b.lock.Unlock()
event := e.(*RoomBooked)
// 当我们使用不提供精确一次交付语义的Pub/Sub时,我们需要对消息进行去重。
// GoChannel Pub/Sub提供了精确一次交付,
// 但是让我们为其他Pub/Sub实现准备这个示例。
if _, ok := b.handledBookings[event.ReservationId]; ok {
return nil
}
b.handledBookings[event.ReservationId] = struct{}{}
b.totalCharge += event.Price
fmt.Printf(">>> 已经预订了价值%d美元的房间\n", b.totalCharge)
return nil
}
var amqpAddress = "amqp://guest:guest@rabbitmq:5672/"
func main() {
// ...
连接一切
我们已经拥有了构建CQRS应用所需的所有组件。
我们将使用AMQP(RabbitMQ)作为我们的消息代理:AMQP。
在底层,CQRS使用了Watermill的消息路由器。如果您对此不熟悉,并且想要了解其工作原理,您应该查看入门指南。它还将向您展示如何使用某些标准的消息模式,如度量、毒消息队列、限流、关联和其他每个消息驱动应用程序所使用的工具。这些工具已经内置在Watermill中。
让我们回到CQRS。正如您已经知道的,CQRS由多个组件构成,如命令或事件总线、处理器等。
完整源代码:github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
func main() {
logger := watermill.NewStdLogger(false, false)
cqrsMarshaler := cqrs.ProtobufMarshaler{}
// You can use any Pub/Sub implementation from here: https://watermill.io/pubsubs/
// Detailed RabbitMQ implementation: https://watermill.io/pubsubs/amqp/
// Commands will be send to queue, because they need to be consumed once.
commandsAMQPConfig := amqp.NewDurableQueueConfig(amqpAddress)
commandsPublisher, err := amqp.NewPublisher(commandsAMQPConfig, logger)
if err != nil {
panic(err)
}
commandsSubscriber, err := amqp.NewSubscriber(commandsAMQPConfig, logger)
if err != nil {
panic(err)
}
// Events will be published to PubSub configured Rabbit, because they may be consumed by multiple consumers.
// (in that case BookingsFinancialReport and OrderBeerOnRoomBooked).
eventsPublisher, err := amqp.NewPublisher(amqp.NewDurablePubSubConfig(amqpAddress, nil), logger)
if err != nil {
panic(err)
}
// CQRS is built on messages router. Detailed documentation: https://watermill.io/docs/messages-router/
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// Simple middleware which will recover panics from event or command handlers.
// More about router middlewares you can find in the documentation:
// https://watermill.io/docs/messages-router/#middleware
//
// List of available middlewares you can find in message/router/middleware.
router.AddMiddleware(middleware.Recoverer)
commandBus, err := cqrs.NewCommandBusWithConfig(commandsPublisher, cqrs.CommandBusConfig{
GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) {
// we are using queue RabbitMQ config, so we need to have topic per command type
return params.CommandName, nil
},
OnSend: func(params cqrs.CommandBusOnSendParams) error {
logger.Info("Sending command", watermill.LogFields{
"command_name": params.CommandName,
})
params.Message.Metadata.Set("sent_at", time.Now().String())
return nil
},
Marshaler: cqrsMarshaler,
Logger: logger,
})
if err != nil {
panic(err)
}
commandProcessor, err := cqrs.NewCommandProcessorWithConfig(
router,
cqrs.CommandProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) {
// we are using queue RabbitMQ config, so we need to have topic per command type
return params.CommandName, nil
},
SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) {
// we can reuse subscriber, because all commands have separated topics
return commandsSubscriber, nil
},
OnHandle: func(params cqrs.CommandProcessorOnHandleParams) error {
start := time.Now()
err := params.Handler.Handle(params.Message.Context(), params.Command)
logger.Info("Command handled", watermill.LogFields{
"command_name": params.CommandName,
"duration": time.Since(start),
"err": err,
})
return err
},
Marshaler: cqrsMarshaler,
Logger: logger,
},
)
if err != nil {
panic(err)
}
eventBus, err := cqrs.NewEventBusWithConfig(eventsPublisher, cqrs.EventBusConfig{
GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) {
// because we are using PubSub RabbitMQ config, we can use one topic for all events
return "events", nil
// we can also use topic per event type
// return params.EventName, nil
},
OnPublish: func(params cqrs.OnEventSendParams) error {
logger.Info("Publishing event", watermill.LogFields{
"event_name": params.EventName,
})
params.Message.Metadata.Set("published_at", time.Now().String())
return nil
},
Marshaler: cqrsMarshaler,
Logger: logger,
})
if err != nil {
panic(err)
}
eventProcessor, err := cqrs.NewEventGroupProcessorWithConfig(
router,
cqrs.EventGroupProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.EventGroupProcessorGenerateSubscribeTopicParams) (string, error) {
return "events", nil
},
SubscriberConstructor: func(params cqrs.EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error) {
config := amqp.NewDurablePubSubConfig(
amqpAddress,
amqp.GenerateQueueNameTopicNameWithSuffix(params.EventGroupName),
)
return amqp.NewSubscriber(config, logger)
},
OnHandle: func(params cqrs.EventGroupProcessorOnHandleParams) error {
start := time.Now()
err := params.Handler.Handle(params.Message.Context(), params.Event)
logger.Info("Event handled", watermill.LogFields{
"event_name": params.EventName,
"duration": time.Since(start),
"err": err,
})
return err
},
Marshaler: cqrsMarshaler,
Logger: logger,
},
)
if err != nil {
panic(err)
}
err = commandProcessor.AddHandlers(
BookRoomHandler{eventBus},
OrderBeerHandler{eventBus},
)
if err != nil {
panic(err)
}
err = eventProcessor.AddHandlersGroup(
"events",
OrderBeerOnRoomBooked{commandBus},
NewBookingsFinancialReport(),
cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
logger.Info("Beer ordered", watermill.LogFields{
"room_id": event.RoomId,
})
return nil
}),
)
if err != nil {
panic(err)
}
// publish BookRoom commands every second to simulate incoming traffic
go publishCommands(commandBus)
// processors are based on router, so they will work when router will start
if err := router.Run(context.Background()); err != nil {
panic(err)
}
}
// ...
那就是全部了。我们有一个可运行的CQRS应用程序。