Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mq/internal/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ func (m *MessageQueue) Publish(topic string, opts ...miface.PubOption) error {
}

// topic string should follow the syntax of:
// kafka://topic-name
// nats://some-other-topic
// local://some-other-topic
func parseTopic(topic string) (mqType, string, error) {
sep := "://"

Expand Down
2 changes: 1 addition & 1 deletion mq/internal/local/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func CreateSubscription(
go func() {
for msg := range msgIn {
m := message2.Msg2Message(topic, msg)
if code := sub.handler(m, nil); code == common.ConsumeNackTransientFailure {
if code := sub.handler(ctx, m, nil); code == common.ConsumeNackTransientFailure {
msg.Nack()
} else {
msg.Ack()
Expand Down
2 changes: 1 addition & 1 deletion mq/internal/nats/message_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (m *MessageQueue) Subscribe(
go func() {
for msg := range msgChan {
ms := message2.Msg2Message(topic, msg)
if code := handler(ms, nil); code == common.ConsumeNackTransientFailure {
if code := handler(ctx, ms, nil); code == common.ConsumeNackTransientFailure {
msg.Nack()
} else {
msg.Ack()
Expand Down
8 changes: 4 additions & 4 deletions mq/internal/qerrors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package qerrors
import "github.com/pkg/errors"

var (
//// mq package scope

// ErrSubscriptionFailure error encountered subscribing to topic
ErrSubscriptionFailure = errors.New("ErrSubscriptionFailure")
// ErrMQTypeUnsupported specified message queue type not supported
Expand All @@ -19,8 +17,6 @@ var (
ErrNoNatsQueue = errors.New("ErrNoNatsQueue")
// ErrNoLocalQueue no Local MQ implementation was provided during dependency injection
ErrNoLocalQueue = errors.New("ErrNoLocalQueue")
// ErrInvalidSubscription invalid Subscription
ErrInvalidSubscription = errors.New("ErrInvalidSubscription")
// ErrDataAlreadySet data payload already set for PubOptions object
ErrDataAlreadySet = errors.New("ErrDataAlreadySet")
// ErrEmptyTopic empty topic value passed in as argument
Expand All @@ -29,4 +25,8 @@ var (
ErrSemanticsAlreadySet = errors.New("ErrSemanticsAlreadySet")
// ErrDelayedPublishUnsupported Delayed publishing not supported.
ErrDelayedPublishUnsupported = errors.New("ErrDelayedPublishUnsupported")
// ErrGroupAlreadySet groupId already set for PubOptions object
ErrGroupAlreadySet = errors.New("ErrGroupAlreadySet")
// ErrInvalidGroupId groupId is invalid
ErrInvalidGroupId = errors.New("ErrInvalidGroupId")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[golint-pr-review] reported by reviewdog 🐶
var ErrInvalidGroupId should be ErrInvalidGroupID

)
4 changes: 3 additions & 1 deletion mq/miface/handler.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package miface

import (
"context"

"github.com/gstones/moke-kit/mq/common"
)

type SubResponseHandler = func(msg Message, err error) common.ConsumptionCode
type SubResponseHandler = func(context context.Context, msg Message, err error) common.ConsumptionCode

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[golint-pr-review] reported by reviewdog 🐶
exported type SubResponseHandler should have comment or be unexported

166 changes: 147 additions & 19 deletions mq/miface/sub_options.go
Original file line number Diff line number Diff line change
@@ -1,57 +1,185 @@
package miface

import (
"time"

"github.com/gstones/moke-kit/mq/common"
"github.com/gstones/moke-kit/mq/internal/qerrors"
)

// SubOptions contains all the various options that the provided WithXyz functions construct.
// DefaultSubOptions 返回默认的订阅选项
func DefaultSubOptions() SubOptions {
return SubOptions{
DeliverySemantics: common.Unset,
GroupId: "",
Concurrency: 1, // 默认单并发
MaxRetries: 3, // 默认最多重试3次
RetryDelay: time.Second, // 默认重试延迟1秒
Timeout: time.Minute, // 默认处理超时1分钟
AutoAck: true, // 默认自动确认
DLQEnabled: false, // 默认不启用死信队列
DLQTopic: "",
}
}

// SubOptions 包含由 WithXyz 函数构造的各种订阅选项
type SubOptions struct {
DeliverySemantics common.DeliverySemantics
GroupId string
Concurrency int // 订阅的并发数
MaxRetries int // 处理失败时的最大重试次数
RetryDelay time.Duration // 处理失败时的重试延迟
Timeout time.Duration // 处理超时时间
AutoAck bool // 是否自动确认消息
DLQEnabled bool // 是否启用死信队列
DLQTopic string // 死信队列主题
}

// SubOption is a closure that updates SubOptions.
// SubOption 是更新 SubOptions 的闭包
type SubOption func(o *SubOptions) error

// NewSubOptions constructs an SubOptions struct from the provided SubOption closures and returns it.
// NewSubOptions 从提供的 SubOption 闭包构造 SubOptions 结构并返回
func NewSubOptions(opts ...SubOption) (options SubOptions, err error) {
options = DefaultSubOptions()
o := &options

for _, opt := range opts {
if err = opt(o); err != nil {
break
return options, err
}
}
return

// 可以在这里添加验证逻辑
if err = o.validate(); err != nil {
return options, err
}

return options, nil
}

// validate 验证选项是否有效
func (o *SubOptions) validate() error {
// 添加必要的验证逻辑
// 例如,如果 DeliverySemantics 是 AtMostOnce,GroupId 不能为空
if o.DeliverySemantics == common.AtMostOnce && o.GroupId == "" {
return qerrors.ErrInvalidGroupId
}
return nil
}

// Configures delivery semantics of subscription to be at-least-once delivery.
// If a semantics preference is not set, mq implementation will use its default mode.
// Mutually exclusive with WithAtMostOnceDelivery()
// WithAtLeastOnceDelivery 配置订阅的交付语义为至少一次交付
// 如果没有设置语义首选项,mq 实现将使用其默认模式
// WithAtMostOnceDelivery() 互斥
func WithAtLeastOnceDelivery() SubOption {
return func(o *SubOptions) error {
if o.DeliverySemantics != common.Unset {
return qerrors.ErrSemanticsAlreadySet
} else {
o.DeliverySemantics = common.AtLeastOnce
return nil
}
o.DeliverySemantics = common.AtLeastOnce
return nil
}
}

// Configures delivery semantics of subscription to be at-most-once delivery.
// If a semantics preference is not set, mq implementation will use its default mode.
// groupId is also optional. Pass in mq.DefaultId to have the mq implementation set a default groupId.
// Mutually exclusive with WithAtLeastOnceDelivery()
// WithAtMostOnceDelivery 配置订阅的交付语义为最多一次交付
// 如果没有设置语义首选项,mq 实现将使用其默认模式
// groupId 也是可选的。传入 mq.DefaultId mq 实现设置默认 groupId
// WithAtLeastOnceDelivery() 互斥
func WithAtMostOnceDelivery(groupId common.GroupId) SubOption {
return func(o *SubOptions) error {
if o.DeliverySemantics != common.Unset {
return qerrors.ErrSemanticsAlreadySet
} else {
o.DeliverySemantics = common.AtMostOnce
o.GroupId = string(groupId)
return nil
}
o.DeliverySemantics = common.AtMostOnce
o.GroupId = string(groupId)
return nil
}
}

// WithGroup 设置订阅的 GroupId
// 注意:对于 AtMostOnce 语义,GroupId 在 WithAtMostOnceDelivery 中已设置
func WithGroup(groupId common.GroupId) SubOption {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[golint-pr-review] reported by reviewdog 🐶
func parameter groupId should be groupID

return func(o *SubOptions) error {
if o.DeliverySemantics == common.AtMostOnce {
return qerrors.ErrGroupAlreadySet
}
o.GroupId = string(groupId)
return nil
}
}

// WithConcurrency 设置订阅的并发数
// 注意:如果设置了并发数,mq 实现可能会使用不同的实现来处理消息
// 例如,Kafka 可能会使用多个消费者组来处理消息
// 这可能会影响消息的顺序性和处理性能
func WithConcurrency(concurrency int) SubOption {
return func(o *SubOptions) error {
if concurrency <= 0 {
Copy link

Copilot AI Apr 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check for concurrency returns qerrors.ErrInvalidGroupId which is inconsistent with the validation of a concurrency value; consider defining a specific error for invalid concurrency input.

Copilot uses AI. Check for mistakes.
return qerrors.ErrInvalidGroupId
}
o.Concurrency = concurrency
return nil
}
}

// WithMaxRetries 设置处理失败时的最大重试次数
// 注意:如果设置了最大重试次数,mq 实现可能会使用不同的实现来处理消息
// 例如,Kafka 可能会使用重试主题来处理消息
// 这可能会影响消息的顺序性和处理性能
func WithMaxRetries(maxRetries int) SubOption {
return func(o *SubOptions) error {
if maxRetries < 0 {
return qerrors.ErrInvalidGroupId
}
o.MaxRetries = maxRetries
return nil
}
}

// WithRetryDelay 设置处理失败时的重试延迟
func WithRetryDelay(retryDelay time.Duration) SubOption {
return func(o *SubOptions) error {
if retryDelay < 0 {
return qerrors.ErrInvalidGroupId
}
o.RetryDelay = retryDelay
return nil
}
}

// WithTimeout 设置处理超时时间
func WithTimeout(timeout time.Duration) SubOption {
return func(o *SubOptions) error {
if timeout < 0 {
return qerrors.ErrInvalidGroupId
}
o.Timeout = timeout
return nil
}
}

// WithAutoAck 设置是否自动确认消息
func WithAutoAck(autoAck bool) SubOption {
return func(o *SubOptions) error {
o.AutoAck = autoAck
return nil
}
}

// WithDLQEnabled 设置是否启用死信队列
func WithDLQEnabled(enabled bool) SubOption {
return func(o *SubOptions) error {
o.DLQEnabled = enabled
return nil
}
}

// WithDLQTopic 设置死信队列主题
func WithDLQTopic(topic string) SubOption {
return func(o *SubOptions) error {
if topic == "" {
return qerrors.ErrEmptyTopic
}
o.DLQTopic = topic
return nil
}
}
Loading
Loading