-
Notifications
You must be signed in to change notification settings - Fork 1.9k
out_es: fix retry on 409 reponse #11422
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
📝 WalkthroughWalkthroughThe Elasticsearch output plugin's error handling in Changes
Sequence DiagramsequenceDiagram
participant Client as Fluent Bit Client
participant Plugin as ES Plugin
participant ES as Elasticsearch
Client->>Plugin: Flush data (cb_es_flush)
Plugin->>ES: Send request
ES-->>Plugin: HTTP Response (Status Code)
Plugin->>Plugin: elasticsearch_error_check()
alt Status 200 or 201
Plugin->>Plugin: FLB_ES_STATUS_OK
Plugin-->>Client: Success
else Status 409
Plugin->>Plugin: FLB_ES_STATUS_DUPLICATES
alt trace_error enabled
Plugin->>Plugin: Log trace (duplicates)
end
Plugin-->>Client: No retry (duplicate)
else Other Status
Plugin->>Plugin: FLB_ES_STATUS_ERROR
alt trace_error enabled
Plugin->>Plugin: Log trace (error)
end
Plugin-->>Client: Retry required
else No Payload
Plugin->>Plugin: Explicit retry path
Plugin-->>Client: Retry required
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~15 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 8be0294fd9
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| /* Only retry on actual errors (not 409 version conflicts) */ | ||
| if (ret & FLB_ES_STATUS_ERROR) { | ||
| goto retry; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Retry is skipped for parsing/response failures
Here the retry gate was narrowed to FLB_ES_STATUS_ERROR only, but elasticsearch_error_check can return other non-success flags like FLB_ES_STATUS_BAD_RESPONSE, ERROR_UNPACK, BAD_TYPE, or IMCOMPLETE when the payload is malformed or truncated. In those cases ret won’t include FLB_ES_STATUS_ERROR, so the flush will fall through to FLB_OK and the chunk is dropped instead of retried. This can happen when Elasticsearch returns invalid JSON or the HTTP client truncates the body. Consider retrying on any non-success non-duplicate flag or mapping those parse/response flags into FLB_ES_STATUS_ERROR.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@plugins/out_es/es.c`:
- Around line 971-1002: The duplicate (409) case is currently only logged when
ctx->trace_error is true; modify the block handling FLB_ES_STATUS_DUPLICATES so
that a warning is emitted even when ctx->trace_error is false: detect (ret &
FLB_ES_STATUS_DUPLICATES) and call flb_plg_warn(ctx->ins, ...) with the response
payload (use the same size check c->resp.payload_size < 4000 to decide between
flb_plg_warn with c->resp.payload or writing to stderr via fwrite/fflush) and
include pack content if helpful, while keeping the existing trace/error logic
and leaving retry behavior for FLB_ES_STATUS_ERROR unchanged.
| /* Trace errors/duplicates if trace_error is enabled */ | ||
| if (ctx->trace_error && | ||
| ((ret & FLB_ES_STATUS_DUPLICATES) || (ret & FLB_ES_STATUS_ERROR))) { | ||
| /* | ||
| * If trace_error is set, trace the actual | ||
| * response from Elasticsearch explaining the problem. | ||
| * Trace_Output can be used to see the request. | ||
| */ | ||
| if (pack_size < 4000) { | ||
| flb_plg_debug(ctx->ins, "error caused by: Input\n%.*s\n", | ||
| (int) pack_size, pack); | ||
| } | ||
| if (c->resp.payload_size < 4000) { | ||
| flb_plg_error(ctx->ins, "error: Output\n%s", | ||
| c->resp.payload); | ||
| } else { | ||
| /* | ||
| * If trace_error is set, trace the actual | ||
| * response from Elasticsearch explaining the problem. | ||
| * Trace_Output can be used to see the request. | ||
| */ | ||
| if (pack_size < 4000) { | ||
| flb_plg_debug(ctx->ins, "error caused by: Input\n%.*s\n", | ||
| (int) pack_size, pack); | ||
| } | ||
| if (c->resp.payload_size < 4000) { | ||
| flb_plg_error(ctx->ins, "error: Output\n%s", | ||
| c->resp.payload); | ||
| } else { | ||
| /* | ||
| * We must use fwrite since the flb_log functions | ||
| * will truncate data at 4KB | ||
| */ | ||
| fwrite(c->resp.payload, 1, c->resp.payload_size, stderr); | ||
| fflush(stderr); | ||
| } | ||
| * We must use fwrite since the flb_log functions | ||
| * will truncate data at 4KB | ||
| */ | ||
| fwrite(c->resp.payload, 1, c->resp.payload_size, stderr); | ||
| fflush(stderr); | ||
| } | ||
| } | ||
| /* Only retry on actual errors (not 409 version conflicts) */ | ||
| if (ret & FLB_ES_STATUS_ERROR) { | ||
| goto retry; | ||
| } | ||
| } | ||
| else { | ||
| /* No payload to parse, retry */ | ||
| goto retry; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Emit a warning for 409 duplicates even when trace_error is off.
Right now duplicates are only visible when trace_error is enabled, which makes 409 conflicts silent in normal operation. The PR objectives call for a warning with the payload; consider logging a warning whenever FLB_ES_STATUS_DUPLICATES is set.
🛠️ Suggested patch
if (ret & FLB_ES_STATUS_SUCCESS) {
flb_plg_debug(ctx->ins, "Elasticsearch response\n%s",
c->resp.payload);
}
+ if (ret & FLB_ES_STATUS_DUPLICATES) {
+ if (c->resp.payload_size < 4000) {
+ flb_plg_warn(ctx->ins,
+ "Elasticsearch duplicate (409) detected: %s",
+ c->resp.payload);
+ }
+ else {
+ flb_plg_warn(ctx->ins,
+ "Elasticsearch duplicate (409) detected; payload follows:");
+ fwrite(c->resp.payload, 1, c->resp.payload_size, stderr);
+ fflush(stderr);
+ }
+ }
/* Trace errors/duplicates if trace_error is enabled */
if (ctx->trace_error &&
((ret & FLB_ES_STATUS_DUPLICATES) || (ret & FLB_ES_STATUS_ERROR))) {🤖 Prompt for AI Agents
In `@plugins/out_es/es.c` around lines 971 - 1002, The duplicate (409) case is
currently only logged when ctx->trace_error is true; modify the block handling
FLB_ES_STATUS_DUPLICATES so that a warning is emitted even when ctx->trace_error
is false: detect (ret & FLB_ES_STATUS_DUPLICATES) and call
flb_plg_warn(ctx->ins, ...) with the response payload (use the same size check
c->resp.payload_size < 4000 to decide between flb_plg_warn with c->resp.payload
or writing to stderr via fwrite/fflush) and include pack content if helpful,
while keeping the existing trace/error logic and leaving retry behavior for
FLB_ES_STATUS_ERROR unchanged.
| } | ||
| } | ||
| /* Only retry on actual errors (not 409 version conflicts) */ | ||
| if (ret & FLB_ES_STATUS_ERROR) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This causes retrying model disruption.
I suppose that the following would be better:
/* Retry on any non-success, non-duplicate condition */
if (!(ret & FLB_ES_STATUS_SUCCESS) &&
!(ret & FLB_ES_STATUS_DUPLICATES)) {
goto retry;
}This is because we need to handle precisely which types responses are needed to retry.
Or, do we need to throw away the response and permit data loss when 409 code were contaminated?
| if (ctx->trace_error && | ||
| ((ret & FLB_ES_STATUS_DUPLICATES) || (ret & FLB_ES_STATUS_ERROR))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, we can simplify these lines as:
if (ctx->trace_error &&
!(ret & FLB_ES_STATUS_SUCCESS)) {
Summary
When using generate_id or id_key with the Elasticsearch output plugin, documents that already exist return a 409 (version conflict) status in the bulk API response.
The plugin would retry 409 codes indefinitely because:
Changes
Test
Built the container image and deployed. Now the output when Elasticsearch returns a 409 code for a record print a warning and, if enabled, the trace_error.
From both the logs and internal Fluent Bit metrics, no retry is issued after the 409 code is received.
No new memory is allocated.
Enter
[N/A]in the box, if an item is not applicable to your change.Testing
Before we can approve your change; please submit the following in a comment:
If this is a change to packaging of containers or native binaries then please confirm it works for all targets.
ok-package-testlabel to test for all targets (requires maintainer to do).Documentation
Backporting
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
Summary by CodeRabbit
Bug Fixes