Conversation
| // ErrGroupAlreadySet groupId already set for PubOptions object | ||
| ErrGroupAlreadySet = errors.New("ErrGroupAlreadySet") | ||
| // ErrInvalidGroupId groupId is invalid | ||
| ErrInvalidGroupId = errors.New("ErrInvalidGroupId") |
There was a problem hiding this comment.
[golint-pr-review] reported by reviewdog 🐶
var ErrInvalidGroupId should be ErrInvalidGroupID
| ) | ||
|
|
||
| type SubResponseHandler = func(msg Message, err error) common.ConsumptionCode | ||
| type SubResponseHandler = func(context context.Context, msg Message, err error) common.ConsumptionCode |
There was a problem hiding this comment.
[golint-pr-review] reported by reviewdog 🐶
exported type SubResponseHandler should have comment or be unexported
|
|
||
| // WithGroup 设置订阅的 GroupId | ||
| // 注意:对于 AtMostOnce 语义,GroupId 在 WithAtMostOnceDelivery 中已设置 | ||
| func WithGroup(groupId common.GroupId) SubOption { |
There was a problem hiding this comment.
[golint-pr-review] reported by reviewdog 🐶
func parameter groupId should be groupID
There was a problem hiding this comment.
Pull Request Overview
This PR addresses caching fixes by switching to a hash‐based implementation and refines related NoSQL document write-back functionality while also polishing message queue subscription options.
- Updated ICache interface and RedisCache implementation to use HGET/HSET for hash storage.
- Introduced a WriteBackWorker for asynchronous cache write-back using message queues.
- Refactored document operations to update or delete cache appropriately and improved subscription option validations.
Reviewed Changes
Copilot reviewed 16 out of 16 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| orm/nosql/worker.go | Added WriteBackWorker for delayed write-back via message queue. |
| orm/nosql/mongo/internal/driver.go | Changed update payload structure by removing extra nesting syntax. |
| orm/nosql/document.go | Updated synchronous and asynchronous save methods and cache updates. |
| orm/nosql/diface/icache.go | Updated ICache interface signatures for hash-based caching. |
| orm/nosql/common.go | Added utility functions for marshaling maps and struct conversions. |
| orm/nosql/cache/redis_cache.go | Refactored GetCache and SetCache to use Redis hash commands. |
| mq/miface/sub_options.go, handler.go, etc. | Adjusted subscription options and handler signatures for consistency. |
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull Request Overview
This pull request implements fixes to the cache functionality by switching to a HASH‐based approach while also updating associated document and message queue logic. Key changes include:
- Introducing a WriteBackWorker for asynchronous delayed writeback.
- Modifying the MongoDB driver to update documents using a raw source in the "$set" operation.
- Updating the ICache interface and the Redis cache implementation for HASH-based operations, along with adjustments to MQ handler signatures.
Reviewed Changes
Copilot reviewed 16 out of 16 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| orm/nosql/worker.go | Added WriteBackWorker for asynchronous delayed writeback. |
| orm/nosql/mongo/internal/driver.go | Changed "$set" update to use o.Source directly. |
| orm/nosql/document_test.go | New tests for CRUD and concurrent updates; uses os.Exit(0). |
| orm/nosql/document.go | Adjusted document operations for cache and writeback support. |
| orm/nosql/diface/icache.go | Updated ICache interface to work with HASH-based caching. |
| orm/nosql/cache/redis_cache.go | Revised Redis cache implementation to use HGET/HSET. |
| mq/* (several files) | Updated MQ handler signatures and subscription options. |
Comments suppressed due to low confidence (2)
orm/nosql/document_test.go:126
- Using os.Exit(0) in tests causes premature termination of the test suite. Consider removing these calls to allow all tests to run to completion.
os.Exit(0)
orm/nosql/document_test.go:185
- Using os.Exit(0) in tests causes premature termination of the test suite. Consider removing these calls so that test cleanup and subsequent tests can run.
os.Exit(0)
|
|
||
| update := bson.M{ | ||
| "$set": bson.M{"data": o.Source}, | ||
| "$set": o.Source, |
There was a problem hiding this comment.
The removal of the encapsulating 'data' field in the $set mapping may change the document structure. Ensure that o.Source is already structured as required for the update operation.
| "$set": o.Source, | |
| "$set": bson.M{"data": o.Source}, |
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull Request Overview
This PR refactors the caching mechanism to use Redis HASH commands and updates related components, including document caching and message queue handler signatures. Key changes include:
- Updating RedisCache to use HGETALL/HSET for hash-based operations.
- Modifying document caching functions to update or delete cache appropriately.
- Adjusting MQ handler signatures to propagate context.
Reviewed Changes
Copilot reviewed 16 out of 16 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| orm/nosql/worker.go | Introduces WriteBackWorker for delayed database write-back via MQ. |
| orm/nosql/mongo/internal/driver.go | Alters Mongo update document structure by setting o.Source directly. |
| orm/nosql/document.go | Updates cache integration and write-back scheduling logic. |
| orm/nosql/diface/icache.go | Revises ICache interface to work with hash-based cache data formats. |
| orm/nosql/cache/redis_cache.go | Implements Redis cache Get/Set methods using Redis hash commands. |
| mq/miface/handler.go, mq/internal/* | Adds context parameters to MQ handler functions and adjusts related APIs. |
Comments suppressed due to low confidence (3)
orm/nosql/cache/redis_cache.go:64
- The new SetCache method now returns an error, which differs from the previous implementation. Ensure that all callers of SetCache are updated to handle the error return appropriately.
func (c *RedisCache) SetCache(ctx context.Context, key key.Key, data map[string]any, expire time.Duration) error {
orm/nosql/mongo/internal/driver.go:48
- The Mongo update document now directly assigns o.Source to "$set" rather than wrapping it in an object (e.g., {"data": o.Source}). This change could be breaking if the schema expects a nested structure; verify that the document structure aligns with the intended design.
"$set": o.Source,
mq/miface/handler.go:9
- The signature of SubResponseHandler has been updated to include a context parameter. Confirm that all implementations of this handler are adjusted to accept the new signature to avoid runtime issues.
type SubResponseHandler = func(context context.Context, msg Message, err error) common.ConsumptionCode
There was a problem hiding this comment.
Pull Request Overview
This PR fixes the cache implementation to use Redis HASH commands and updates the corresponding document and message queue components accordingly. Key changes include refactoring the cache interface and RedisCache implementation, enhancing the write-back mechanism in DocumentBase and WriteBackWorker, and updating MQ handler signatures to include a context parameter.
- Introduces new write-back logic in orm/nosql/worker.go and updates DocumentBase logic in orm/nosql/document.go.
- Modifies cache interfaces (ICache) and updates the RedisCache implementation to use HSET/HMGET/HGetAll.
- Adjusts MQ handler functions and subscription implementations to pass context parameters.
Reviewed Changes
Copilot reviewed 16 out of 16 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| orm/nosql/worker.go | New write-back worker implementation for delayed document write-back. |
| orm/nosql/mongo/internal/driver.go | Updates the update query to use o.Source directly with "$set" for MongoDB operations. |
| orm/nosql/document_test.go | Adds tests for CRUD and concurrent updates; note the use of os.Exit(0) may affect test execution. |
| orm/nosql/document.go | Enhances caching with updateCache and write-back scheduling functionality. |
| orm/nosql/diface/icache.go | Changes ICache interface methods to return map data and to propagate errors. |
| orm/nosql/cache/redis_cache.go | Refactors RedisCache implementation to use Redis HASH commands for storing cache data. |
| mq/miface/handler.go, mq/internal/* | Updates handler and subscription functions to include context parameters. |
| mq/miface/sub_options.go | Adds additional validation and options adjustments in subscription options. |
orm/nosql/document_test.go
Outdated
| os.Exit(0) | ||
| return nil |
There was a problem hiding this comment.
Using os.Exit(0) in tests can prematurely terminate the test suite and block subsequent tests; consider using t.FailNow() or returning errors instead.
| os.Exit(0) | |
| return nil |
| os.Exit(0) | ||
| return nil |
There was a problem hiding this comment.
The os.Exit(0) call in this test forces early termination; replacing it with proper test assertions will ensure that all tests run.
| os.Exit(0) | |
| return nil |
|
|
||
| update := bson.M{ | ||
| "$set": bson.M{"data": o.Source}, | ||
| "$set": o.Source, |
There was a problem hiding this comment.
Ensure that using o.Source directly in the update query produces the intended schema change; if the original logic expected a nested document (e.g., "data": o.Source), update downstream code accordingly.
| "$set": o.Source, | |
| "$set": bson.M{"data": o.Source}, |
| // 这可能会影响消息的顺序性和处理性能 | ||
| func WithConcurrency(concurrency int) SubOption { | ||
| return func(o *SubOptions) error { | ||
| if concurrency <= 0 { |
There was a problem hiding this comment.
The check for concurrency returns qerrors.ErrInvalidGroupId which is inconsistent with the validation of a concurrency value; consider defining a specific error for invalid concurrency input.
主要修改: - 完善延迟回写机制(WriteBackWorker, WriteBackManager) - 添加配置管理和验证(WriteBackConfig) - 补充缺失的工具函数(struct2MapShallow, diffMapAny) - 增强错误处理和指标监控 - 完善单元测试覆盖 - 修复类型转换问题(支持[]byte和string JSON数据) - 将所有繁体中文注释转换为简体中文 - 添加配置验证和健康检查机制 - 优化缓存更新策略和异步回写流程 新增文件: - config.go: 回写配置管理 - manager.go: 回写工作器管理器 - config_test.go: 配置测试 - writeback_test.go: 回写功能测试 - REFACTOR_SUMMARY.md: 重构总结文档
- 移除 document_test.go 中的 os.Exit(0) 调用 - 确保测试能够正常完成而不是意外退出 - 验证所有新添加的测试都能正常通过
| if !isBasicType(reflect.TypeOf(v).Kind()) { | ||
| if js, err := json.Marshal(v); err != nil { | ||
| return nil, fmt.Errorf("failed to marshal: %w", err) | ||
| } else { |
There was a problem hiding this comment.
[golint-pr-review] reported by reviewdog 🐶
if block ends with a return statement, so drop this else and outdent its block (move short variable declaration to its own line if necessary)
| DefaultCacheTTL = 30 * time.Minute | ||
| // DefaultWriteBackDelay 默认回写延迟时间 | ||
| DefaultWriteBackDelay = 500 * time.Millisecond | ||
| ExpireRangeMin = 6 * time.Hour |
There was a problem hiding this comment.
[golint-pr-review] reported by reviewdog 🐶
exported const ExpireRangeMin should have comment (or a comment on this block) or be unexported
| field.Set(mv1.Convert(field.Type())) | ||
| continue | ||
| } | ||
|
|
There was a problem hiding this comment.
[gofmt] reported by reviewdog 🐶
| return fmt.Errorf("failed to marshal value for field %s: %w", k1, err) | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
[gofmt] reported by reviewdog 🐶
| type WriteBackConfig struct { | ||
| // Enabled 是否启用回写功能 | ||
| Enabled bool `json:"enabled" yaml:"enabled" envconfig:"WRITEBACK_ENABLED" default:"false"` | ||
|
|
There was a problem hiding this comment.
[gofmt] reported by reviewdog 🐶
|
|
||
| // Delay 回写延迟时间 | ||
| Delay time.Duration `json:"delay" yaml:"delay" envconfig:"WRITEBACK_DELAY" default:"500ms"` | ||
|
|
There was a problem hiding this comment.
[gofmt] reported by reviewdog 🐶
|
|
||
| // BatchSize 批处理大小 | ||
| BatchSize int `json:"batch_size" yaml:"batch_size" envconfig:"WRITEBACK_BATCH_SIZE" default:"100"` | ||
|
|
There was a problem hiding this comment.
[gofmt] reported by reviewdog 🐶
| totalLatency += workerMetrics.AverageLatency | ||
| workerCount++ | ||
| } | ||
|
|
There was a problem hiding this comment.
[gofmt] reported by reviewdog 🐶
| if workerCount > 0 { | ||
| metrics.AverageLatency = totalLatency / time.Duration(workerCount) | ||
| } | ||
|
|
There was a problem hiding this comment.
[gofmt] reported by reviewdog 🐶
|
|
||
| metrics.TotalProcessed = totalProcessed | ||
| metrics.TotalFailed = totalFailed | ||
|
|
There was a problem hiding this comment.
[gofmt] reported by reviewdog 🐶
|
|
||
|
|
||
| w.logger.Debug("Successfully wrote back document", | ||
| zap.String("key", payload.Key), | ||
| zap.String("collection", payload.CollectionName), | ||
| zap.Duration("latency", latency), | ||
| ) | ||
|
|
There was a problem hiding this comment.
[gofmt] reported by reviewdog 🐶
| w.logger.Debug("Successfully wrote back document", | |
| zap.String("key", payload.Key), | |
| zap.String("collection", payload.CollectionName), | |
| zap.Duration("latency", latency), | |
| ) | |
| w.logger.Debug("Successfully wrote back document", | |
| zap.String("key", payload.Key), | |
| zap.String("collection", payload.CollectionName), | |
| zap.Duration("latency", latency), | |
| ) | |
| func TestWriteBackPayload_JSON(t *testing.T) { | ||
| payload := WriteBackPayload{ | ||
| CollectionName: "test_collection", | ||
| Key: "test_key", |
There was a problem hiding this comment.
[gofmt] reported by reviewdog 🐶
| Key: "test_key", | |
| Key: "test_key", |
- 记录了重构的完成状态 - 总结了测试结果和提交记录 - 标记为部署就绪状态
No description provided.