-
Notifications
You must be signed in to change notification settings - Fork 155
✨ feat(rabbitmq): support quorum queue and delivery limit #101
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
【feat】扩展 QueueConf 支持队列类型与投递限制 【feat】新增 DeclareQueueConf 方法支持声明 Quorum 队列 【feat】支持配置死信交换机和死信路由键 ✨ feat(rabbitmq): add custom error handler for listener 【feat】新增 WithErrorHandler 选项用于自定义错误处理 【feat】允许在消费失败时记录详细的上下文信息 【feat】更新日志组件依赖从 logx 改为 logc ✅ test(rabbitmq): add tests for queue args and error handler 【test】覆盖 buildQueueArgs 的各种参数组合场景 【test】验证监听器错误处理器的调用逻辑 📝 docs(rabbitmq): add quorum queue usage guide 【docs】新增 RabbitMQ Quorum Queue 使用指南文档 【docs】详细介绍 Delivery Limit 机制与死信队列配置 【docs】提供完整的声明、消费和死信处理示例
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
7 files reviewed, 1 comment
| go func() { | ||
| for d := range msg { | ||
| ctx := context.Background() | ||
| if err := q.handler.Consume(string(d.Body)); err != nil { | ||
| logx.Errorf("Error on consuming: %s, error: %v", string(d.Body), err) | ||
| q.errorHandler(ctx, d, err) | ||
| if !que.AutoAck { | ||
| if e := d.Nack(false, true); e != nil { | ||
| logc.Errorf(ctx, "nack failed, error: %v", e) | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: successful messages are never acknowledged when AutoAck is false. this will cause all successfully processed messages to remain unacknowledged, eventually blocking the consumer.
| go func() { | |
| for d := range msg { | |
| ctx := context.Background() | |
| if err := q.handler.Consume(string(d.Body)); err != nil { | |
| logx.Errorf("Error on consuming: %s, error: %v", string(d.Body), err) | |
| q.errorHandler(ctx, d, err) | |
| if !que.AutoAck { | |
| if e := d.Nack(false, true); e != nil { | |
| logc.Errorf(ctx, "nack failed, error: %v", e) | |
| } | |
| } | |
| } | |
| } | |
| go func() { | |
| for d := range msg { | |
| ctx := context.Background() | |
| if err := q.handler.Consume(string(d.Body)); err != nil { | |
| q.errorHandler(ctx, d, err) | |
| if !que.AutoAck { | |
| if e := d.Nack(false, true); e != nil { | |
| logc.Errorf(ctx, "nack failed, error: %v", e) | |
| } | |
| } | |
| } else if !que.AutoAck { | |
| if e := d.Ack(false); e != nil { | |
| logc.Errorf(ctx, "ack failed, error: %v", e) | |
| } | |
| } | |
| } | |
| }() |
Prompt To Fix With AI
This is a comment left during a code review.
Path: rabbitmq/listener.go
Line: 81:92
Comment:
**logic:** successful messages are never acknowledged when `AutoAck` is false. this will cause all successfully processed messages to remain unacknowledged, eventually blocking the consumer.
```suggestion
go func() {
for d := range msg {
ctx := context.Background()
if err := q.handler.Consume(string(d.Body)); err != nil {
q.errorHandler(ctx, d, err)
if !que.AutoAck {
if e := d.Nack(false, true); e != nil {
logc.Errorf(ctx, "nack failed, error: %v", e)
}
}
} else if !que.AutoAck {
if e := d.Ack(false); e != nil {
logc.Errorf(ctx, "ack failed, error: %v", e)
}
}
}
}()
```
How can I resolve this? If you propose a fix, please make it concise.
【feat】扩展 QueueConf 支持队列类型与投递限制
【feat】新增 DeclareQueueConf 方法支持声明 Quorum 队列
【feat】支持配置死信交换机和死信路由键
✨ feat(rabbitmq): add custom error handler for listener 【feat】新增 WithErrorHandler 选项用于自定义错误处理
【feat】允许在消费失败时记录详细的上下文信息
【feat】更新日志组件依赖从 logx 改为 logc
✅ test(rabbitmq): add tests for queue args and error handler 【test】覆盖 buildQueueArgs 的各种参数组合场景
【test】验证监听器错误处理器的调用逻辑
📝 docs(rabbitmq): add quorum queue usage guide
【docs】新增 RabbitMQ Quorum Queue 使用指南文档
【docs】详细介绍 Delivery Limit 机制与死信队列配置
【docs】提供完整的声明、消费和死信处理示例
Greptile Summary
adds support for RabbitMQ quorum queues with delivery limit and dead letter configuration to prevent poison message loops.
QueueConfwithQueueType,DeliveryLimit,DeadLetterExchange, andDeadLetterRoutingKeyfieldsDeclareQueueConf()method that usesbuildQueueArgs()to construct RabbitMQ arguments from configWithErrorHandler()to allow detailed logging of consume failureslogxtologcfor context-aware loggingCritical Issue:
listener.go:81-92only callsNack()on errors but never callsAck()on success whenAutoAckis false, which will cause all successfully processed messages to remain unacknowledged and eventually block the consumer.Confidence Score: 1/5
Ack()call inlistener.gois a critical functional bug that will cause the consumer to stop processing messages whenAutoAckis false. this breaks a core feature and makes manual ack mode unusable.rabbitmq/listener.gorequires immediate attention to add proper message acknowledgementImportant Files Changed
Sequence Diagram
sequenceDiagram participant App as Application participant Admin as RabbitMQ Admin participant Listener as RabbitMQ Listener participant Queue as RabbitMQ Queue participant Handler as Consumer Handler participant ErrorHandler as Error Handler Note over App,Admin: Setup Phase App->>Admin: DeclareQueueConf(QueueConf) Admin->>Admin: buildQueueArgs(conf) Note over Admin: Set x-queue-type=quorum<br/>x-delivery-limit=20<br/>x-dead-letter-exchange<br/>x-dead-letter-routing-key Admin->>Queue: QueueDeclare(args) Queue-->>Admin: Queue created Note over App,Queue: Consumption Phase App->>Listener: MustNewListener(conf, handler, opts) Listener->>Listener: ensureListenerOptions(opts) App->>Listener: Start() Listener->>Queue: Consume(queueName) loop For each message Queue->>Listener: Delivery (msg + headers) Note over Queue,Listener: x-delivery-count header included Listener->>Handler: Consume(message) alt Success Handler-->>Listener: nil Note over Listener: BUG: No Ack() called<br/>when AutoAck=false else Failure Handler-->>Listener: error Listener->>ErrorHandler: errorHandler(ctx, msg, err) ErrorHandler-->>Listener: (logs error) alt AutoAck=false Listener->>Queue: Nack(requeue=true) Note over Queue: Increment x-delivery-count alt delivery-count > delivery-limit Queue->>Queue: Route to dead letter exchange else delivery-count <= delivery-limit Queue->>Queue: Requeue message end end end end