-
Notifications
You must be signed in to change notification settings - Fork 1.9k
out_es: fix retry on 409 reponse #11422
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -793,8 +793,17 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx, | |
| if (item_val.via.i64 == 200 || item_val.via.i64 == 201) { | ||
| check |= FLB_ES_STATUS_SUCCESS; | ||
| } | ||
| /* Check for errors other than version conflict (document already exists) */ | ||
| if (item_val.via.i64 != 409) { | ||
| /* Check for version conflicts (document already exists) */ | ||
| if (item_val.via.i64 == 409) { | ||
| check |= FLB_ES_STATUS_DUPLICATES; | ||
| } | ||
| /* | ||
| * Check for actual errors, excluding: | ||
| * - 200/201: success | ||
| * - 409: version conflict | ||
| */ | ||
| if (item_val.via.i64 != 200 && item_val.via.i64 != 201 && | ||
| item_val.via.i64 != 409) { | ||
| check |= FLB_ES_STATUS_ERROR; | ||
| } | ||
| } | ||
|
|
@@ -959,34 +968,37 @@ static void cb_es_flush(struct flb_event_chunk *event_chunk, | |
| flb_plg_debug(ctx->ins, "Elasticsearch response\n%s", | ||
| c->resp.payload); | ||
| } | ||
| else { | ||
| /* we got an error */ | ||
| if (ctx->trace_error) { | ||
| /* Trace errors/duplicates if trace_error is enabled */ | ||
| if (ctx->trace_error && | ||
| ((ret & FLB_ES_STATUS_DUPLICATES) || (ret & FLB_ES_STATUS_ERROR))) { | ||
| /* | ||
| * If trace_error is set, trace the actual | ||
| * response from Elasticsearch explaining the problem. | ||
| * Trace_Output can be used to see the request. | ||
| */ | ||
| if (pack_size < 4000) { | ||
| flb_plg_debug(ctx->ins, "error caused by: Input\n%.*s\n", | ||
| (int) pack_size, pack); | ||
| } | ||
| if (c->resp.payload_size < 4000) { | ||
| flb_plg_error(ctx->ins, "error: Output\n%s", | ||
| c->resp.payload); | ||
| } else { | ||
| /* | ||
| * If trace_error is set, trace the actual | ||
| * response from Elasticsearch explaining the problem. | ||
| * Trace_Output can be used to see the request. | ||
| */ | ||
| if (pack_size < 4000) { | ||
| flb_plg_debug(ctx->ins, "error caused by: Input\n%.*s\n", | ||
| (int) pack_size, pack); | ||
| } | ||
| if (c->resp.payload_size < 4000) { | ||
| flb_plg_error(ctx->ins, "error: Output\n%s", | ||
| c->resp.payload); | ||
| } else { | ||
| /* | ||
| * We must use fwrite since the flb_log functions | ||
| * will truncate data at 4KB | ||
| */ | ||
| fwrite(c->resp.payload, 1, c->resp.payload_size, stderr); | ||
| fflush(stderr); | ||
| } | ||
| * We must use fwrite since the flb_log functions | ||
| * will truncate data at 4KB | ||
| */ | ||
| fwrite(c->resp.payload, 1, c->resp.payload_size, stderr); | ||
| fflush(stderr); | ||
| } | ||
| } | ||
| /* Only retry on actual errors (not 409 version conflicts) */ | ||
| if (ret & FLB_ES_STATUS_ERROR) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This causes retrying model disruption. I suppose that the following would be better: /* Retry on any non-success, non-duplicate condition */
if (!(ret & FLB_ES_STATUS_SUCCESS) &&
!(ret & FLB_ES_STATUS_DUPLICATES)) {
goto retry;
}This is because we need to handle precisely which types responses are needed to retry. |
||
| goto retry; | ||
|
Comment on lines
+995
to
997
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Here the retry gate was narrowed to Useful? React with 👍 / 👎. |
||
| } | ||
| } | ||
| else { | ||
| /* No payload to parse, retry */ | ||
| goto retry; | ||
|
Comment on lines
+971
to
1002
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Emit a warning for 409 duplicates even when 🛠️ Suggested patch if (ret & FLB_ES_STATUS_SUCCESS) {
flb_plg_debug(ctx->ins, "Elasticsearch response\n%s",
c->resp.payload);
}
+ if (ret & FLB_ES_STATUS_DUPLICATES) {
+ if (c->resp.payload_size < 4000) {
+ flb_plg_warn(ctx->ins,
+ "Elasticsearch duplicate (409) detected: %s",
+ c->resp.payload);
+ }
+ else {
+ flb_plg_warn(ctx->ins,
+ "Elasticsearch duplicate (409) detected; payload follows:");
+ fwrite(c->resp.payload, 1, c->resp.payload_size, stderr);
+ fflush(stderr);
+ }
+ }
/* Trace errors/duplicates if trace_error is enabled */
if (ctx->trace_error &&
((ret & FLB_ES_STATUS_DUPLICATES) || (ret & FLB_ES_STATUS_ERROR))) {🤖 Prompt for AI Agents |
||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, we can simplify these lines as: