Skip to content

Conversation

@lambda-hj
Copy link

@lambda-hj lambda-hj commented Jan 22, 2026

【feat】扩展 QueueConf 支持队列类型与投递限制
【feat】新增 DeclareQueueConf 方法支持声明 Quorum 队列
【feat】支持配置死信交换机和死信路由键

✨ feat(rabbitmq): add custom error handler for listener 【feat】新增 WithErrorHandler 选项用于自定义错误处理
【feat】允许在消费失败时记录详细的上下文信息
【feat】更新日志组件依赖从 logx 改为 logc

✅ test(rabbitmq): add tests for queue args and error handler 【test】覆盖 buildQueueArgs 的各种参数组合场景
【test】验证监听器错误处理器的调用逻辑

📝 docs(rabbitmq): add quorum queue usage guide
【docs】新增 RabbitMQ Quorum Queue 使用指南文档
【docs】详细介绍 Delivery Limit 机制与死信队列配置
【docs】提供完整的声明、消费和死信处理示例

Greptile Summary

adds support for RabbitMQ quorum queues with delivery limit and dead letter configuration to prevent poison message loops.

  • Extended QueueConf with QueueType, DeliveryLimit, DeadLetterExchange, and DeadLetterRoutingKey fields
  • Added DeclareQueueConf() method that uses buildQueueArgs() to construct RabbitMQ arguments from config
  • Introduced custom error handler option via WithErrorHandler() to allow detailed logging of consume failures
  • Migrated from logx to logc for context-aware logging
  • Provided comprehensive documentation with examples for quorum queue usage patterns
  • Added test coverage for queue argument construction and error handler options

Critical Issue: listener.go:81-92 only calls Nack() on errors but never calls Ack() on success when AutoAck is false, which will cause all successfully processed messages to remain unacknowledged and eventually block the consumer.

Confidence Score: 1/5

  • this PR has a critical bug that breaks manual acknowledgement mode
  • the missing Ack() call in listener.go is a critical functional bug that will cause the consumer to stop processing messages when AutoAck is false. this breaks a core feature and makes manual ack mode unusable.
  • rabbitmq/listener.go requires immediate attention to add proper message acknowledgement

Important Files Changed

Filename Overview
rabbitmq/listener.go Added custom error handler and context support, but missing Ack call for successful messages in manual ack mode
rabbitmq/config.go Extended QueueConf with quorum queue support, delivery limit, and dead letter configuration fields
rabbitmq/rabbitmqadmin.go Added DeclareQueueConf method and buildQueueArgs helper for configuring queue arguments

Sequence Diagram

sequenceDiagram
    participant App as Application
    participant Admin as RabbitMQ Admin
    participant Listener as RabbitMQ Listener
    participant Queue as RabbitMQ Queue
    participant Handler as Consumer Handler
    participant ErrorHandler as Error Handler

    Note over App,Admin: Setup Phase
    App->>Admin: DeclareQueueConf(QueueConf)
    Admin->>Admin: buildQueueArgs(conf)
    Note over Admin: Set x-queue-type=quorum<br/>x-delivery-limit=20<br/>x-dead-letter-exchange<br/>x-dead-letter-routing-key
    Admin->>Queue: QueueDeclare(args)
    Queue-->>Admin: Queue created

    Note over App,Queue: Consumption Phase
    App->>Listener: MustNewListener(conf, handler, opts)
    Listener->>Listener: ensureListenerOptions(opts)
    App->>Listener: Start()
    Listener->>Queue: Consume(queueName)
    
    loop For each message
        Queue->>Listener: Delivery (msg + headers)
        Note over Queue,Listener: x-delivery-count header included
        Listener->>Handler: Consume(message)
        
        alt Success
            Handler-->>Listener: nil
            Note over Listener: BUG: No Ack() called<br/>when AutoAck=false
        else Failure
            Handler-->>Listener: error
            Listener->>ErrorHandler: errorHandler(ctx, msg, err)
            ErrorHandler-->>Listener: (logs error)
            alt AutoAck=false
                Listener->>Queue: Nack(requeue=true)
                Note over Queue: Increment x-delivery-count
                alt delivery-count > delivery-limit
                    Queue->>Queue: Route to dead letter exchange
                else delivery-count <= delivery-limit
                    Queue->>Queue: Requeue message
                end
            end
        end
    end
Loading

【feat】扩展 QueueConf 支持队列类型与投递限制
【feat】新增 DeclareQueueConf 方法支持声明 Quorum 队列
【feat】支持配置死信交换机和死信路由键

✨ feat(rabbitmq): add custom error handler for listener
【feat】新增 WithErrorHandler 选项用于自定义错误处理
【feat】允许在消费失败时记录详细的上下文信息
【feat】更新日志组件依赖从 logx 改为 logc

✅ test(rabbitmq): add tests for queue args and error handler
【test】覆盖 buildQueueArgs 的各种参数组合场景
【test】验证监听器错误处理器的调用逻辑

📝 docs(rabbitmq): add quorum queue usage guide
【docs】新增 RabbitMQ Quorum Queue 使用指南文档
【docs】详细介绍 Delivery Limit 机制与死信队列配置
【docs】提供完整的声明、消费和死信处理示例
Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7 files reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

Comment on lines 81 to 92
go func() {
for d := range msg {
ctx := context.Background()
if err := q.handler.Consume(string(d.Body)); err != nil {
logx.Errorf("Error on consuming: %s, error: %v", string(d.Body), err)
q.errorHandler(ctx, d, err)
if !que.AutoAck {
if e := d.Nack(false, true); e != nil {
logc.Errorf(ctx, "nack failed, error: %v", e)
}
}
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: successful messages are never acknowledged when AutoAck is false. this will cause all successfully processed messages to remain unacknowledged, eventually blocking the consumer.

Suggested change
go func() {
for d := range msg {
ctx := context.Background()
if err := q.handler.Consume(string(d.Body)); err != nil {
logx.Errorf("Error on consuming: %s, error: %v", string(d.Body), err)
q.errorHandler(ctx, d, err)
if !que.AutoAck {
if e := d.Nack(false, true); e != nil {
logc.Errorf(ctx, "nack failed, error: %v", e)
}
}
}
}
go func() {
for d := range msg {
ctx := context.Background()
if err := q.handler.Consume(string(d.Body)); err != nil {
q.errorHandler(ctx, d, err)
if !que.AutoAck {
if e := d.Nack(false, true); e != nil {
logc.Errorf(ctx, "nack failed, error: %v", e)
}
}
} else if !que.AutoAck {
if e := d.Ack(false); e != nil {
logc.Errorf(ctx, "ack failed, error: %v", e)
}
}
}
}()
Prompt To Fix With AI
This is a comment left during a code review.
Path: rabbitmq/listener.go
Line: 81:92

Comment:
**logic:** successful messages are never acknowledged when `AutoAck` is false. this will cause all successfully processed messages to remain unacknowledged, eventually blocking the consumer.

```suggestion
		go func() {
			for d := range msg {
				ctx := context.Background()
				if err := q.handler.Consume(string(d.Body)); err != nil {
					q.errorHandler(ctx, d, err)
					if !que.AutoAck {
						if e := d.Nack(false, true); e != nil {
							logc.Errorf(ctx, "nack failed, error: %v", e)
						}
					}
				} else if !que.AutoAck {
					if e := d.Ack(false); e != nil {
						logc.Errorf(ctx, "ack failed, error: %v", e)
					}
				}
			}
		}()
```

How can I resolve this? If you propose a fix, please make it concise.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants