Conversation
| if err != nil { | ||
| worker.span.LogFields(slog.String("grabbit", "failed processing duplicate")) | ||
| worker.log().WithError(err).Error("failed checking for existing message") | ||
| return true, err |
There was a problem hiding this comment.
why returning true if you don't know the message really is a duplicate?
There was a problem hiding this comment.
you can't return nil for a bool value so I return true and an error
There was a problem hiding this comment.
@vladshub I would return the default value of a bool in go which is false
There was a problem hiding this comment.
@rhinof this way I'm more conservative since I'd rather have this message rejected as a duplicate and go to DLQ or through the error flow then to have processed it twice if I can't ensure deduplication.
| return | ||
| } | ||
|
|
||
| isDuplicate, err := worker.handleDuplicates(bm, delivery, msgSpecificLogEntry) |
There was a problem hiding this comment.
wouldnt you want to "handle duplicates" in case of global handlers/ dead letter handlers invocation as well?
There was a problem hiding this comment.
the deduplication is on the GbusMessage level in dlq/global we don't yet have the gbusMessage yet
| } | ||
|
|
||
| // | ||
| func (d *deduper) StoreMessageID(tx *sql.Tx, id string) error { |
There was a problem hiding this comment.
did you consider to store the message in another storage mechanism? like redis etc.
There was a problem hiding this comment.
I didn't want to introduce another infrastructure dependency but we can have different storages that would implement the same interface.
@rhinof we can but if we do that we can't guarantee that a service will receive only one since issues with the connection to rabbitmq might create additional duplications and also the time a message is in-flight might provide for additional duplications. |
|
|
||
| var _ deduplicator.Store = &deduper{} | ||
|
|
||
| type deduper struct { |
There was a problem hiding this comment.
nitpicking, the struct is named deduper which is the main struct here yet the file name is tx.go
gbus/worker.go
Outdated
| if worker.delicatePolicy == DeduplicationPolicyNone { | ||
| return false, nil | ||
| } | ||
| duplicate, err := worker.duplicateStore.MessageExists(message.IdempotencyKey) |
There was a problem hiding this comment.
shouldn't we be passing in the active transaction ?
There was a problem hiding this comment.
For message exists there is no real need, we can and we pass in the StoreMessage but here it brings no to little value
…ay to close the deduplicator
|
@rhinof & @adiweiss can you please re-review? |
A way to manage message de-duplication in grabbit.
Implementation details:
This PR closes issue #33