Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions plugins/in_http/http.c
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,12 @@ static int in_http_exit(void *data, struct flb_config *config)

/* Configuration properties map */
static struct flb_config_map config_map[] = {
{
FLB_CONFIG_MAP_STR, "fixed_tag", NULL,
0, FLB_TRUE, offsetof(struct flb_http, tag),
"Set a fixed tag for all incoming records, overriding URL path-based tags."
},

{
FLB_CONFIG_MAP_BOOL, "http2", "true",
0, FLB_TRUE, offsetof(struct flb_http, enable_http2),
Expand Down
1 change: 1 addition & 0 deletions plugins/in_http/http.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ struct flb_http {
int successful_response_code;
flb_sds_t listen;
flb_sds_t tcp_port;
flb_sds_t tag; /* User configured fixed_tag */
flb_sds_t tag_key;
struct flb_record_accessor *ra_tag_key;

Expand Down
4 changes: 4 additions & 0 deletions plugins/in_http/http_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ int http_config_destroy(struct flb_http *ctx)
flb_ra_destroy(ctx->ra_tag_key);
}

if (ctx->tag) {
flb_sds_destroy(ctx->tag);
}

/* release all connections */
http_conn_release_all(ctx);

Expand Down
10 changes: 9 additions & 1 deletion plugins/in_http/http_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,16 @@ static int process_pack_record(struct flb_http *ctx, struct flb_time *tm,
ctx->log_encoder.output_buffer,
ctx->log_encoder.output_length);
}
else if (ctx->tag) {
/* use user configured tag */
ret = flb_input_log_append(ctx->ins,
ctx->tag,
flb_sds_len(ctx->tag),
ctx->log_encoder.output_buffer,
ctx->log_encoder.output_length);
}
else {
/* use default plugin Tag (it internal name, e.g: http.0 */
/* use default plugin Tag (its internal name, e.g: http.0) */
ret = flb_input_log_append(ctx->ins, NULL, 0,
ctx->log_encoder.output_buffer,
ctx->log_encoder.output_length);
Expand Down
81 changes: 81 additions & 0 deletions tests/runtime/in_http.c
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,86 @@ void flb_test_http_oauth2_accepts_valid_token()
jwks_mock_server_stop(&jwks);
}

/* Test fixed tag configuration */
void flb_test_http_fixed_tag()
{
struct flb_lib_out_cb cb_data;
struct test_ctx *ctx;
struct flb_http_client *c;
int ret;
int num;
size_t b_sent;

char *buf = "{\"test\":\"msg\"}";

clear_output_num();

cb_data.cb = cb_check_result_json;
cb_data.data = "\"test\":\"msg\"";

ctx = test_ctx_create(&cb_data);
if (!TEST_CHECK(ctx != NULL)) {
TEST_MSG("test_ctx_create failed");
exit(EXIT_FAILURE);
}

/* Configure fixed tag */
ret = flb_input_set(ctx->flb, ctx->i_ffd,
"fixed_tag", "fixed_tag",
NULL);
TEST_CHECK(ret == 0);

/* Output matches the fixed tag */
ret = flb_output_set(ctx->flb, ctx->o_ffd,
"match", "fixed_tag",
"format", "json",
NULL);
TEST_CHECK(ret == 0);

/* Start the engine */
ret = flb_start(ctx->flb);
TEST_CHECK(ret == 0);

ctx->httpc = http_client_ctx_create();
TEST_CHECK(ctx->httpc != NULL);

/* Send to root path (not using URL path as tag) */
c = flb_http_client(ctx->httpc->u_conn, FLB_HTTP_POST, "/",
buf, strlen(buf),
"127.0.0.1", 9880, NULL, 0);
if (!TEST_CHECK(c != NULL)) {
TEST_MSG("http_client failed");
exit(EXIT_FAILURE);
}
ret = flb_http_add_header(c, FLB_HTTP_HEADER_CONTENT_TYPE,
strlen(FLB_HTTP_HEADER_CONTENT_TYPE),
JSON_CONTENT_TYPE, strlen(JSON_CONTENT_TYPE));
TEST_CHECK(ret == 0);

ret = flb_http_do(c, &b_sent);
if (!TEST_CHECK(ret == 0)) {
TEST_MSG("ret error. ret=%d\n", ret);
}
else if (!TEST_CHECK(b_sent > 0)){
TEST_MSG("b_sent size error. b_sent = %lu\n", b_sent);
}
else if (!TEST_CHECK(c->resp.status == 201)) {
TEST_MSG("http response code error. expect: 201, got: %d\n", c->resp.status);
}

/* waiting to flush */
flb_time_msleep(1500);

num = get_output_num();
if (!TEST_CHECK(num > 0)) {
TEST_MSG("no outputs - tag may not match");
}

flb_http_client_destroy(c);
flb_upstream_conn_release(ctx->httpc->u_conn);
test_ctx_destroy(ctx);
}

void test_http_add_remote_addr(char *input, char *xff_content, char *expected_ip, char *http2_cfg)
{
struct flb_lib_out_cb cb_data;
Expand Down Expand Up @@ -1067,6 +1147,7 @@ TEST_LIST = {
{"failure_response_code_400_bad_disk_write", flb_test_http_failure_400_bad_disk_write},
{"tag_key_with_map_input", flb_test_http_tag_key_with_map_input},
{"tag_key_with_array_input", flb_test_http_tag_key_with_array_input},
{"fixed_tag", flb_test_http_fixed_tag},
{"oauth2_requires_token", flb_test_http_oauth2_requires_token},
{"oauth2_accepts_valid_token", flb_test_http_oauth2_accepts_valid_token},
{"add_remote_addr_skip_colliding_ng", flb_test_http_remote_addr_skip_colliding_ng},
Expand Down