Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion plugins/out_azure_kusto/azure_kusto.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ static int azure_kusto_get_service_principal_token(struct flb_azure_kusto *ctx)
return -1;
}

ret = flb_oauth2_payload_append(ctx->o, "scope", 5, FLB_AZURE_KUSTO_SCOPE, 39);
/* scope depends on cloud environment */
const char *scope = flb_azure_kusto_get_scope(ctx->cloud_environment);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you define this variable on top of this function?
This is because we still need to compile CentOS7 and gcc4.2.
This coding style causes compilation error(s) in that platform.

ret = flb_oauth2_payload_append(ctx->o, "scope", 5, scope, strlen(scope));

if (ret == -1) {
flb_plg_error(ctx->ins, "error appending oauth2 params");
return -1;
Expand Down
36 changes: 30 additions & 6 deletions plugins/out_azure_kusto/azure_kusto.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,21 @@ typedef enum {
FLB_AZURE_KUSTO_AUTH_WORKLOAD_IDENTITY /* Workload Identity */
} flb_azure_kusto_auth_type;

/* Kusto streaming inserts oauth scope */
#define FLB_AZURE_KUSTO_SCOPE "https://help.kusto.windows.net/.default"
/* Cloud environment */
typedef enum {
FLB_AZURE_CLOUD_GLOBAL = 0,
FLB_AZURE_CLOUD_CHINA
} flb_azure_cloud_environment;

/* Kusto streaming inserts oauth scope (per cloud) */
#define FLB_AZURE_KUSTO_SCOPE_GLOBAL "https://help.kusto.windows.net/.default"
#define FLB_AZURE_KUSTO_SCOPE_CHINA "https://help.kusto.chinacloudapi.cn/.default"

/* MSAL authorization URL */
#define FLB_MSAL_AUTH_URL_TEMPLATE \
/* MSAL authorization URL (per cloud) */
#define FLB_MSAL_AUTH_URL_TEMPLATE_GLOBAL \
"https://login.microsoftonline.com/%s/oauth2/v2.0/token"
#define FLB_MSAL_AUTH_URL_TEMPLATE_CHINA \
"https://login.chinacloudapi.cn/%s/oauth2/v2.0/token"

#define FLB_AZURE_KUSTO_MGMT_URI_PATH "/v1/rest/mgmt"
#define FLB_AZURE_KUSTO_MGMT_BODY_TEMPLATE "{\"csl\":\"%s\", \"db\": \"NetDefaultDB\"}"
Expand All @@ -74,8 +83,8 @@ typedef enum {

#define FLB_AZURE_IMDS_ENDPOINT "/metadata/identity/oauth2/token"
#define FLB_AZURE_IMDS_API_VERSION "2018-02-01"
#define FLB_AZURE_IMDS_RESOURCE "https://api.kusto.windows.net/"

#define FLB_AZURE_IMDS_RESOURCE_GLOBAL "https://api.kusto.windows.net/"
#define FLB_AZURE_IMDS_RESOURCE_CHINA "https://api.kusto.chinacloudapi.cn/"

struct flb_azure_kusto_resources {
struct flb_upstream_ha *blob_ha;
Expand Down Expand Up @@ -105,6 +114,9 @@ struct flb_azure_kusto {
char *auth_type_str;
char *workload_identity_token_file;

/* Cloud environment selection (default: Global) */
int cloud_environment;

/* compress payload */
int compression_enabled;

Expand Down Expand Up @@ -176,6 +188,18 @@ struct flb_azure_kusto {
struct flb_output_instance *ins;
};

/* Helper to get per-cloud scope string */
static inline const char *flb_azure_kusto_get_scope(int cloud_env)
{
return cloud_env == FLB_AZURE_CLOUD_CHINA ? FLB_AZURE_KUSTO_SCOPE_CHINA : FLB_AZURE_KUSTO_SCOPE_GLOBAL;
}

/* Helper to get IMDS resource per cloud */
static inline const char *flb_azure_kusto_get_imds_resource(int cloud_env)
{
return cloud_env == FLB_AZURE_CLOUD_CHINA ? FLB_AZURE_IMDS_RESOURCE_CHINA : FLB_AZURE_IMDS_RESOURCE_GLOBAL;
}

flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx);
flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *csl);

Expand Down
45 changes: 36 additions & 9 deletions plugins/out_azure_kusto/azure_kusto_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,20 @@
#include "azure_kusto_conf.h"
#include "azure_msiauth.h"

/* Cloud helpers: resolve MSAL auth URL template and Kusto scope/IMDS resource */
static const char *get_msal_auth_url_template(int cloud_env)
{
if (cloud_env == FLB_AZURE_CLOUD_CHINA) {
return FLB_MSAL_AUTH_URL_TEMPLATE_CHINA;
}
return FLB_MSAL_AUTH_URL_TEMPLATE_GLOBAL;
}

static const char *get_imds_resource(int cloud_env)
{
return flb_azure_kusto_get_imds_resource(cloud_env);
}

/* Constants for PCG random number generator */
#define PCG_DEFAULT_MULTIPLIER_64 6364136223846793005ULL
#define PCG_DEFAULT_INCREMENT_64 1442695040888963407ULL
Expand Down Expand Up @@ -782,6 +796,12 @@ struct flb_azure_kusto *flb_azure_kusto_conf_create(struct flb_output_instance *
return NULL;
}

/* Determine cloud environment dynamically from ingestion_endpoint */
ctx->cloud_environment = FLB_AZURE_CLOUD_GLOBAL; /* default */
if (ctx->ingestion_endpoint && strstr(ctx->ingestion_endpoint, "chinacloudapi.cn") != NULL) {
ctx->cloud_environment = FLB_AZURE_CLOUD_CHINA;
}

/* config: 'database_name' */
if (ctx->database_name == NULL) {
flb_plg_error(ctx->ins, "property 'database_name' is not defined");
Expand All @@ -800,40 +820,47 @@ struct flb_azure_kusto *flb_azure_kusto_conf_create(struct flb_output_instance *
if (ctx->auth_type == FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY_SYSTEM ||
ctx->auth_type == FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY_USER) {
/* MSI auth */
const char *imds_resource;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to define this variable on top of this function scope, too.


imds_resource = get_imds_resource(ctx->cloud_environment);

/* Construct the URL template with or without client_id for managed identity */
if (ctx->auth_type == FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY_SYSTEM) {
ctx->oauth_url = flb_sds_create_size(sizeof(FLB_AZURE_MSIAUTH_URL_TEMPLATE) - 1);
ctx->oauth_url = flb_sds_create_size(strlen(FLB_AZURE_MSIAUTH_URL_TEMPLATE) + strlen(imds_resource) + 1);
if (!ctx->oauth_url) {
flb_errno();
flb_azure_kusto_conf_destroy(ctx);
return NULL;
}
flb_sds_snprintf(&ctx->oauth_url, flb_sds_alloc(ctx->oauth_url),
FLB_AZURE_MSIAUTH_URL_TEMPLATE, "", "");
FLB_AZURE_MSIAUTH_URL_TEMPLATE,
"", "", imds_resource);
} else {
/* User-assigned managed identity */
ctx->oauth_url = flb_sds_create_size(sizeof(FLB_AZURE_MSIAUTH_URL_TEMPLATE) - 1 +
sizeof("&client_id=") - 1 +
flb_sds_len(ctx->client_id));
ctx->oauth_url = flb_sds_create_size(strlen(FLB_AZURE_MSIAUTH_URL_TEMPLATE) +
strlen("&client_id=") +
flb_sds_len(ctx->client_id) +
strlen(imds_resource) + 1);
if (!ctx->oauth_url) {
flb_errno();
flb_azure_kusto_conf_destroy(ctx);
return NULL;
}
flb_sds_snprintf(&ctx->oauth_url, flb_sds_alloc(ctx->oauth_url),
FLB_AZURE_MSIAUTH_URL_TEMPLATE, "&client_id=", ctx->client_id);
FLB_AZURE_MSIAUTH_URL_TEMPLATE,
"&client_id=", ctx->client_id, imds_resource);
}
} else {
/* Standard OAuth2 for service principal or workload identity */
ctx->oauth_url = flb_sds_create_size(sizeof(FLB_MSAL_AUTH_URL_TEMPLATE) - 1 +
flb_sds_len(ctx->tenant_id));
const char *tmpl = get_msal_auth_url_template(ctx->cloud_environment);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.
And we need to use slightly more concrete and meaningful name here.

E.g.)

const char *auth_url_tmpl;

would be better.

ctx->oauth_url = flb_sds_create_size(strlen(tmpl) + flb_sds_len(ctx->tenant_id) + 1);
if (!ctx->oauth_url) {
flb_errno();
flb_azure_kusto_conf_destroy(ctx);
return NULL;
}
flb_sds_snprintf(&ctx->oauth_url, flb_sds_alloc(ctx->oauth_url),
FLB_MSAL_AUTH_URL_TEMPLATE, ctx->tenant_id);
tmpl, ctx->tenant_id);
}

ctx->resources = flb_calloc(1, sizeof(struct flb_azure_kusto_resources));
Expand Down
12 changes: 11 additions & 1 deletion plugins/out_azure_kusto/azure_msiauth.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <fluent-bit/flb_http_client.h>

#include "azure_msiauth.h"
#include "azure_kusto.h"

char *flb_azure_msiauth_token_get(struct flb_oauth2 *ctx)
{
Expand Down Expand Up @@ -176,7 +177,16 @@ int flb_azure_workload_identity_token_get(struct flb_oauth2 *ctx, const char *to
body = flb_sds_cat(body, "&client_assertion=", 18);
body = flb_sds_cat(body, federated_token, flb_sds_len(federated_token));
/* Use the correct scope and length for Kusto */
body = flb_sds_cat(body, "&scope=https://help.kusto.windows.net/.default", 46);
{
int cloud_env = FLB_AZURE_CLOUD_GLOBAL;
if ((ctx->host && strstr(ctx->host, "chinacloudapi.cn") != NULL) ||
(ctx->uri && strstr(ctx->uri, "chinacloudapi.cn") != NULL)) {
cloud_env = FLB_AZURE_CLOUD_CHINA;
}
const char *scope = flb_azure_kusto_get_scope(cloud_env);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to define this variable on top of flb_azure_workload_identity_token_get function scope.

body = flb_sds_cat(body, "&scope=", 7);
body = flb_sds_cat(body, scope, strlen(scope));
}

if (!body) {
/* This check might be redundant if flb_sds_cat handles errors, but safe */
Expand Down
2 changes: 1 addition & 1 deletion plugins/out_azure_kusto/azure_msiauth.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

/* MSAL authorization URL */
#define FLB_AZURE_MSIAUTH_URL_TEMPLATE \
"http://169.254.169.254/metadata/identity/oauth2/token?api-version=2021-02-01%s%s&resource=https://api.kusto.windows.net"
"http://169.254.169.254/metadata/identity/oauth2/token?api-version=2021-02-01%s%s&resource=%s"

char *flb_azure_msiauth_token_get(struct flb_oauth2 *ctx);
int flb_azure_workload_identity_token_get(struct flb_oauth2 *ctx, const char *token_file, const char *client_id, const char *tenant_id);
Expand Down
74 changes: 73 additions & 1 deletion tests/runtime/out_azure_kusto.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ void flb_test_azure_kusto_managed_identity_system(void);
void flb_test_azure_kusto_managed_identity_user(void);
void flb_test_azure_kusto_service_principal(void);
void flb_test_azure_kusto_workload_identity(void);
void flb_test_azure_kusto_cloud_global_inference(void);
void flb_test_azure_kusto_cloud_china_inference(void);

/* Test list */
TEST_LIST = {
Expand All @@ -38,6 +40,8 @@ TEST_LIST = {
{"managed_identity_user", flb_test_azure_kusto_managed_identity_user},
{"service_principal", flb_test_azure_kusto_service_principal},
{"workload_identity", flb_test_azure_kusto_workload_identity},
{"cloud_global_inference", flb_test_azure_kusto_cloud_global_inference},
{"cloud_china_inference", flb_test_azure_kusto_cloud_china_inference},
{NULL, NULL}
};

Expand Down Expand Up @@ -210,4 +214,72 @@ void flb_test_azure_kusto_workload_identity(void)

flb_stop(ctx);
flb_destroy(ctx);
}
}

/* Test for cloud inference: "Global", based on ingestion_endpoint */
void flb_test_azure_kusto_cloud_global_inference(void)
{
int ret;
flb_ctx_t *ctx;
int in_ffd;
int out_ffd;

ctx = flb_create();
flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", "error", NULL);

in_ffd = flb_input(ctx, (char *) "lib", NULL);
TEST_CHECK(in_ffd >= 0);
flb_input_set(ctx, in_ffd, "tag", "test", NULL);

out_ffd = flb_output(ctx, (char *) "azure_kusto", NULL);
TEST_CHECK(out_ffd >= 0);
flb_output_set(ctx, out_ffd, "match", "test", NULL);
flb_output_set(ctx, out_ffd, "auth_type", "service_principal", NULL);
flb_output_set(ctx, out_ffd, "tenant_id", "your-tenant-id", NULL);
flb_output_set(ctx, out_ffd, "client_id", "your-client-id", NULL);
flb_output_set(ctx, out_ffd, "client_secret", "your-client-secret", NULL);
/* Global ingestion endpoint */
flb_output_set(ctx, out_ffd, "ingestion_endpoint", "https://ingest-CLUSTER.kusto.windows.net", NULL);
flb_output_set(ctx, out_ffd, "database_name", "telemetrydb", NULL);
flb_output_set(ctx, out_ffd, "table_name", "logs", NULL);

ret = flb_start(ctx);
TEST_CHECK(ret == 0);

flb_stop(ctx);
flb_destroy(ctx);
}

/* Test for cloud inference: "China", based on ingestion_endpoint */
void flb_test_azure_kusto_cloud_china_inference(void)
{
int ret;
flb_ctx_t *ctx;
int in_ffd;
int out_ffd;

ctx = flb_create();
flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", "error", NULL);

in_ffd = flb_input(ctx, (char *) "lib", NULL);
TEST_CHECK(in_ffd >= 0);
flb_input_set(ctx, in_ffd, "tag", "test", NULL);

out_ffd = flb_output(ctx, (char *) "azure_kusto", NULL);
TEST_CHECK(out_ffd >= 0);
flb_output_set(ctx, out_ffd, "match", "test", NULL);
flb_output_set(ctx, out_ffd, "auth_type", "service_principal", NULL);
flb_output_set(ctx, out_ffd, "tenant_id", "your-tenant-id", NULL);
flb_output_set(ctx, out_ffd, "client_id", "your-client-id", NULL);
flb_output_set(ctx, out_ffd, "client_secret", "your-client-secret", NULL);
/* China ingestion endpoint */
flb_output_set(ctx, out_ffd, "ingestion_endpoint", "https://ingest-CLUSTER.kusto.chinacloudapi.cn", NULL);
flb_output_set(ctx, out_ffd, "database_name", "telemetrydb", NULL);
flb_output_set(ctx, out_ffd, "table_name", "logs", NULL);

ret = flb_start(ctx);
TEST_CHECK(ret == 0);

flb_stop(ctx);
flb_destroy(ctx);
}
Loading