Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 36 additions & 24 deletions plugins/out_es/es.c
Original file line number Diff line number Diff line change
Expand Up @@ -793,8 +793,17 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx,
if (item_val.via.i64 == 200 || item_val.via.i64 == 201) {
check |= FLB_ES_STATUS_SUCCESS;
}
/* Check for errors other than version conflict (document already exists) */
if (item_val.via.i64 != 409) {
/* Check for version conflicts (document already exists) */
if (item_val.via.i64 == 409) {
check |= FLB_ES_STATUS_DUPLICATES;
}
/*
* Check for actual errors, excluding:
* - 200/201: success
* - 409: version conflict
*/
if (item_val.via.i64 != 200 && item_val.via.i64 != 201 &&
item_val.via.i64 != 409) {
check |= FLB_ES_STATUS_ERROR;
}
}
Expand Down Expand Up @@ -959,34 +968,37 @@ static void cb_es_flush(struct flb_event_chunk *event_chunk,
flb_plg_debug(ctx->ins, "Elasticsearch response\n%s",
c->resp.payload);
}
else {
/* we got an error */
if (ctx->trace_error) {
/* Trace errors/duplicates if trace_error is enabled */
if (ctx->trace_error &&
((ret & FLB_ES_STATUS_DUPLICATES) || (ret & FLB_ES_STATUS_ERROR))) {
Comment on lines +972 to +973
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we can simplify these lines as:

if (ctx->trace_error &&
    !(ret & FLB_ES_STATUS_SUCCESS)) {

/*
* If trace_error is set, trace the actual
* response from Elasticsearch explaining the problem.
* Trace_Output can be used to see the request.
*/
if (pack_size < 4000) {
flb_plg_debug(ctx->ins, "error caused by: Input\n%.*s\n",
(int) pack_size, pack);
}
if (c->resp.payload_size < 4000) {
flb_plg_error(ctx->ins, "error: Output\n%s",
c->resp.payload);
} else {
/*
* If trace_error is set, trace the actual
* response from Elasticsearch explaining the problem.
* Trace_Output can be used to see the request.
*/
if (pack_size < 4000) {
flb_plg_debug(ctx->ins, "error caused by: Input\n%.*s\n",
(int) pack_size, pack);
}
if (c->resp.payload_size < 4000) {
flb_plg_error(ctx->ins, "error: Output\n%s",
c->resp.payload);
} else {
/*
* We must use fwrite since the flb_log functions
* will truncate data at 4KB
*/
fwrite(c->resp.payload, 1, c->resp.payload_size, stderr);
fflush(stderr);
}
* We must use fwrite since the flb_log functions
* will truncate data at 4KB
*/
fwrite(c->resp.payload, 1, c->resp.payload_size, stderr);
fflush(stderr);
}
}
/* Only retry on actual errors (not 409 version conflicts) */
if (ret & FLB_ES_STATUS_ERROR) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This causes retrying model disruption.

I suppose that the following would be better:

/* Retry on any non-success, non-duplicate condition */
if (!(ret & FLB_ES_STATUS_SUCCESS) &&
    !(ret & FLB_ES_STATUS_DUPLICATES)) {
    goto retry;
}

This is because we need to handle precisely which types responses are needed to retry.
Or, do we need to throw away the response and permit data loss when 409 code were contaminated?

goto retry;
Comment on lines +995 to 997

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Retry is skipped for parsing/response failures

Here the retry gate was narrowed to FLB_ES_STATUS_ERROR only, but elasticsearch_error_check can return other non-success flags like FLB_ES_STATUS_BAD_RESPONSE, ERROR_UNPACK, BAD_TYPE, or IMCOMPLETE when the payload is malformed or truncated. In those cases ret won’t include FLB_ES_STATUS_ERROR, so the flush will fall through to FLB_OK and the chunk is dropped instead of retried. This can happen when Elasticsearch returns invalid JSON or the HTTP client truncates the body. Consider retrying on any non-success non-duplicate flag or mapping those parse/response flags into FLB_ES_STATUS_ERROR.

Useful? React with 👍 / 👎.

}
}
else {
/* No payload to parse, retry */
goto retry;
Comment on lines +971 to 1002
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Emit a warning for 409 duplicates even when trace_error is off.
Right now duplicates are only visible when trace_error is enabled, which makes 409 conflicts silent in normal operation. The PR objectives call for a warning with the payload; consider logging a warning whenever FLB_ES_STATUS_DUPLICATES is set.

🛠️ Suggested patch
             if (ret & FLB_ES_STATUS_SUCCESS) {
                 flb_plg_debug(ctx->ins, "Elasticsearch response\n%s",
                               c->resp.payload);
             }
+            if (ret & FLB_ES_STATUS_DUPLICATES) {
+                if (c->resp.payload_size < 4000) {
+                    flb_plg_warn(ctx->ins,
+                                 "Elasticsearch duplicate (409) detected: %s",
+                                 c->resp.payload);
+                }
+                else {
+                    flb_plg_warn(ctx->ins,
+                                 "Elasticsearch duplicate (409) detected; payload follows:");
+                    fwrite(c->resp.payload, 1, c->resp.payload_size, stderr);
+                    fflush(stderr);
+                }
+            }
             /* Trace errors/duplicates if trace_error is enabled */
             if (ctx->trace_error &&
                 ((ret & FLB_ES_STATUS_DUPLICATES) || (ret & FLB_ES_STATUS_ERROR))) {
🤖 Prompt for AI Agents
In `@plugins/out_es/es.c` around lines 971 - 1002, The duplicate (409) case is
currently only logged when ctx->trace_error is true; modify the block handling
FLB_ES_STATUS_DUPLICATES so that a warning is emitted even when ctx->trace_error
is false: detect (ret & FLB_ES_STATUS_DUPLICATES) and call
flb_plg_warn(ctx->ins, ...) with the response payload (use the same size check
c->resp.payload_size < 4000 to decide between flb_plg_warn with c->resp.payload
or writing to stderr via fwrite/fflush) and include pack content if helpful,
while keeping the existing trace/error logic and leaving retry behavior for
FLB_ES_STATUS_ERROR unchanged.

}
}
Expand Down
Loading