Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ minor_behavior_changes:
Updated libcircllhist to 0.3.2, which changes how bucket bounds are interpreted. This should not impact
production monitoring if the number of samples in the histograms is high. Affected tests were adjusted
to account for histogram changes.
- area: stat_sinks
change: |
OpenTelemetry :ref:`SinkConfig <envoy_v3_api_msg_extensions.stat_sinks.open_telemetry.v3.SinkConfig>`
stopped reporting empty delta counters and histograms.

bug_fixes:
# *Changes expected to improve the state of the world and are unlikely to have negative effects*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ void MetricAggregator::addGauge(
Metric metric;
metric.set_name(metric_name);
NumberDataPoint* data_point = metric.mutable_gauge()->add_data_points();
setCommonDataPoint(*data_point, &attributes,
setCommonDataPoint(*data_point, attributes,
AggregationTemporality::AGGREGATION_TEMPORALITY_UNSPECIFIED);
data_point->set_as_int(value);
non_aggregated_metrics_.push_back(std::move(metric));
Expand All @@ -52,8 +52,6 @@ void MetricAggregator::addGauge(
if (it != metric_data.gauge_points.end()) {
// If the data point exists, update it and return.
NumberDataPoint* data_point = it->second;
setCommonDataPoint(*data_point, nullptr,
AggregationTemporality::AGGREGATION_TEMPORALITY_UNSPECIFIED);

// Multiple stats are mapped to the same metric and we
// aggregate by summing the new value to the existing one.
Expand All @@ -64,7 +62,7 @@ void MetricAggregator::addGauge(
// If the data point does not exist, create a new one.
NumberDataPoint* data_point = metric_data.metric.mutable_gauge()->add_data_points();
metric_data.gauge_points[key] = data_point;
setCommonDataPoint(*data_point, &attributes,
setCommonDataPoint(*data_point, attributes,
AggregationTemporality::AGGREGATION_TEMPORALITY_UNSPECIFIED);
data_point->set_as_int(value);
}
Expand All @@ -73,15 +71,19 @@ void MetricAggregator::addCounter(
absl::string_view metric_name, uint64_t value, uint64_t delta,
AggregationTemporality temporality,
const Protobuf::RepeatedPtrField<opentelemetry::proto::common::v1::KeyValue>& attributes) {
const uint64_t point_value =
(temporality == AggregationTemporality::AGGREGATION_TEMPORALITY_DELTA) ? delta : value;
if (point_value == 0 && temporality == AggregationTemporality::AGGREGATION_TEMPORALITY_DELTA) {
return;
}
if (!enable_metric_aggregation_) {
Metric metric;
metric.set_name(metric_name);
metric.mutable_sum()->set_is_monotonic(true);
metric.mutable_sum()->set_aggregation_temporality(temporality);
NumberDataPoint* data_point = metric.mutable_sum()->add_data_points();
setCommonDataPoint(*data_point, &attributes, temporality);
data_point->set_as_int(
(temporality == AggregationTemporality::AGGREGATION_TEMPORALITY_DELTA) ? delta : value);
setCommonDataPoint(*data_point, attributes, temporality);
data_point->set_as_int(point_value);
non_aggregated_metrics_.push_back(std::move(metric));
return;
}
Expand All @@ -92,14 +94,9 @@ void MetricAggregator::addCounter(
if (it != metric_data.counter_points.end()) {
// If the data point exists, update it and return.
NumberDataPoint* data_point = it->second;
// Update time for the existing data point.
setCommonDataPoint(*data_point, nullptr, temporality);

// For DELTA, add the change since the last export. For CUMULATIVE, add the
// total value.
data_point->set_as_int(
data_point->as_int() +
((temporality == AggregationTemporality::AGGREGATION_TEMPORALITY_DELTA) ? delta : value));
data_point->set_as_int(data_point->as_int() + point_value);
return;
}

Expand All @@ -108,21 +105,24 @@ void MetricAggregator::addCounter(
metric_data.metric.mutable_sum()->set_is_monotonic(true);
metric_data.metric.mutable_sum()->set_aggregation_temporality(temporality);
metric_data.counter_points[key] = data_point;
setCommonDataPoint(*data_point, &attributes, temporality);
data_point->set_as_int(
(temporality == AggregationTemporality::AGGREGATION_TEMPORALITY_DELTA) ? delta : value);
setCommonDataPoint(*data_point, attributes, temporality);
data_point->set_as_int(point_value);
}

void MetricAggregator::addHistogram(
absl::string_view stat_name, absl::string_view metric_name,
const Stats::HistogramStatistics& stats, AggregationTemporality temporality,
const Protobuf::RepeatedPtrField<opentelemetry::proto::common::v1::KeyValue>& attributes) {
if (stats.sampleCount() == 0 &&
temporality == AggregationTemporality::AGGREGATION_TEMPORALITY_DELTA) {
return;
}
if (!enable_metric_aggregation_) {
Metric metric;
metric.set_name(metric_name);
metric.mutable_histogram()->set_aggregation_temporality(temporality);
HistogramDataPoint* data_point = metric.mutable_histogram()->add_data_points();
setCommonDataPoint(*data_point, &attributes, temporality);
setCommonDataPoint(*data_point, attributes, temporality);

data_point->set_count(stats.sampleCount());
data_point->set_sum(stats.sampleSum());
Expand All @@ -147,10 +147,6 @@ void MetricAggregator::addHistogram(
if (static_cast<size_t>(data_point->explicit_bounds_size()) ==
stats.supportedBuckets().size() &&
static_cast<size_t>(data_point->bucket_counts_size()) == new_bucket_counts.size() + 1) {

// Update time.
setCommonDataPoint(*data_point, nullptr, temporality);

// Aggregate count and sum.
data_point->set_count(data_point->count() + stats.sampleCount());
data_point->set_sum(data_point->sum() + stats.sampleSum());
Expand All @@ -174,7 +170,7 @@ void MetricAggregator::addHistogram(
metric_data.metric.mutable_histogram()->set_aggregation_temporality(temporality);
metric_data.histogram_points[key] = data_point;
// Set common fields directly here
setCommonDataPoint(*data_point, &attributes, temporality);
setCommonDataPoint(*data_point, attributes, temporality);

data_point->set_count(stats.sampleCount());
data_point->set_sum(stats.sampleSum());
Expand Down
29 changes: 12 additions & 17 deletions source/extensions/stat_sinks/open_telemetry/open_telemetry_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,28 +110,23 @@ class MetricAggregator : public Logger::Loggable<Logger::Id::stats> {
// Sets common fields for a data point.
// For gauge metrics,
// temporality should be AGGREGATION_TEMPORALITY_UNSPECIFIED.
// For the aggregation case, attributes should be nullptr as they have already been
// set.
template <typename DataPoint>
void setCommonDataPoint(
DataPoint& data_point,
const Protobuf::RepeatedPtrField<opentelemetry::proto::common::v1::KeyValue>* attributes,
const Protobuf::RepeatedPtrField<opentelemetry::proto::common::v1::KeyValue>& attributes,
::opentelemetry::proto::metrics::v1::AggregationTemporality temporality) {
data_point.set_time_unix_nano(snapshot_time_ns_);
if (attributes) {
// When attributes are present, set the start time for delta/cumulative metrics.
data_point.mutable_attributes()->CopyFrom(*attributes);
switch (temporality) {
case AggregationTemporality::AGGREGATION_TEMPORALITY_DELTA:
data_point.set_start_time_unix_nano(delta_start_time_ns_);
break;
case AggregationTemporality::AGGREGATION_TEMPORALITY_CUMULATIVE:
data_point.set_start_time_unix_nano(cumulative_start_time_ns_);
break;
default:
// Do not set start time for UNSPECIFIED.
break;
}
data_point.mutable_attributes()->CopyFrom(attributes);
switch (temporality) {
case AggregationTemporality::AGGREGATION_TEMPORALITY_DELTA:
data_point.set_start_time_unix_nano(delta_start_time_ns_);
break;
case AggregationTemporality::AGGREGATION_TEMPORALITY_CUMULATIVE:
data_point.set_start_time_unix_nano(cumulative_start_time_ns_);
break;
default:
// Do not set start time for UNSPECIFIED.
break;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ class OpenTelemetryStatsSinkTests : public testing::Test {
}

void addHistogramToSnapshot(const std::string& name, bool is_delta = false, bool used = true,
const Stats::TagVector& tags = {{"hist_key", "hist_val"}}) {
const Stats::TagVector& tags = {{"hist_key", "hist_val"}},
bool add_values = true) {
auto histogram = std::make_unique<NiceMock<Stats::MockParentHistogram>>();

histogram_t* hist = hist_alloc();
Expand All @@ -130,8 +131,10 @@ class OpenTelemetryStatsSinkTests : public testing::Test {
values = {0.7, 7, 35, 200, 750, 4000, 20000, 200000, 1500000, 4000000};
}

for (auto value : values) {
hist_insert(hist, value, 1);
if (add_values) {
for (auto value : values) {
hist_insert(hist, value, 1);
}
}

histogram_ptrs_.push_back(hist);
Expand Down Expand Up @@ -554,6 +557,7 @@ TEST_F(OtlpMetricsFlusherTests, DeltaCounterMetric) {

addCounterToSnapshot("test_counter1", 1, 1);
addCounterToSnapshot("test_counter2", 2, 3);
addCounterToSnapshot("test_counter3", 0, 4);
addHostCounterToSnapshot("test_host_counter1", 2, 4);
addHostCounterToSnapshot("test_host_counter2", 5, 10);

Expand Down Expand Up @@ -591,6 +595,7 @@ TEST_F(OtlpMetricsFlusherTests, DeltaHistogramMetric) {

addHistogramToSnapshot("test_histogram1", true);
addHistogramToSnapshot("test_histogram2", true);
addHistogramToSnapshot("test_histogram3", true, true, {}, false);

MetricsExportRequestSharedPtr metrics =
flusher.flush(snapshot_, delta_start_time_ns_, cumulative_start_time_ns_);
Expand Down
Loading