消息
消息是Watermill的核心部分之一。消息通过“发布者”发布并由“订阅者”接收。当处理消息时,如果处理失败,您应该发送一个Ack()
(表示成功处理)或一个Nack()
(表示处理失败)。
消息的Ack
和Nack
由订阅者进行处理(在默认实现中,订阅者会等待Ack
或Nack
)。
完整源代码:github.com/ThreeDotsLabs/watermill/message/message.go
// ...
type Message struct {
// UUID是消息的唯一标识符。
//
// 它仅用于Watermill进行调试。
// UUID可以为空。
UUID string
// Metadata包含消息的元数据。
//
// 可用于存储不需要解码整个有效载荷的数据。
// 它类似于HTTP请求的标头。
//
// Metadata会被编组并保存到PubSub中。
Metadata Metadata
// Payload是消息的有效载荷。
Payload Payload
// ack在接收到确认时关闭。
ack chan struct{}
// noACk在接收到否定确认时关闭。
noAck chan struct{}
ackMutex sync.Mutex
ackSentType ackType
ctx context.Context
}
// ...
Ack
发送Ack
完整源代码:github.com/ThreeDotsLabs/watermill/message/message.go
// ...
// Ack发送消息的确认。
//
// Ack不会阻塞。
// Ack具有幂等性。
// 如果已经发送了Nack,则返回false。
func (m *Message) Ack() bool {
// ...
Nack
完整源代码:github.com/ThreeDotsLabs/watermill/message/message.go
// ...
// Nack发送消息的否定确认。
//
// Nack不会阻塞。
// Nack具有幂等性。
// 如果已经发送了Ack,则返回false。
func (m *Message) Nack() bool {
// ...
接收Ack/Nack
完整源代码:github.com/ThreeDotsLabs/watermill/docs/content/docs/message/receiving-ack.go
// ...
select {
case
Context
消息包含标准库的上下文(Context),就像HTTP请求一样。
完整源代码:github.com/ThreeDotsLabs/watermill/message/message.go
// ...
// Context返回消息的上下文。要更改上下文,请使用SetContext。
//
// 返回的上下文始终非nil;默认为background上下文。
func (m *Message) Context() context.Context {
if m.ctx != nil {
return m.ctx
}
return context.Background()
}
// SetContext将提供的上下文设置为消息的上下文。
func (m *Message) SetContext(ctx context.Context) {
m.ctx = ctx
}
// ...