From 14d3115b9e0fc73a786dd92c30abb8d75ede5ef0 Mon Sep 17 00:00:00 2001 From: James Watkins-Harvey Date: Tue, 3 Feb 2026 21:47:43 -0500 Subject: [PATCH 1/8] feat(runtime): add support for buffered metrics --- packages/common/src/metrics.ts | 86 ++++-- .../bridge-macros/src/derive_tryintojs.rs | 40 +++ .../core-bridge/src/helpers/try_into_js.rs | 90 +++++- packages/core-bridge/src/metrics.rs | 266 +++++++++++++++++- packages/core-bridge/src/runtime.rs | 96 +++++-- packages/core-bridge/ts/native.ts | 36 ++- packages/test/src/test-otel.ts | 160 +---------- .../test/src/test-runtime-buffered-metrics.ts | 204 ++++++++++++++ packages/test/src/test-runtime-otel.ts | 209 ++++++++++++++ ...ometheus.ts => test-runtime-prometheus.ts} | 0 packages/worker/src/runtime-metrics.ts | 66 +++++ packages/worker/src/runtime-options.ts | 43 ++- packages/worker/src/runtime.ts | 24 +- packages/workflow/src/metrics.ts | 7 + 14 files changed, 1115 insertions(+), 212 deletions(-) create mode 100644 packages/test/src/test-runtime-buffered-metrics.ts create mode 100644 packages/test/src/test-runtime-otel.ts rename packages/test/src/{test-prometheus.ts => test-runtime-prometheus.ts} (100%) diff --git a/packages/common/src/metrics.ts b/packages/common/src/metrics.ts index c7fe003ca..506d8b89b 100644 --- a/packages/common/src/metrics.ts +++ b/packages/common/src/metrics.ts @@ -69,8 +69,43 @@ export interface Metric { * The description of the metric, if any. */ description?: string; + + /** + * The kind of the metric (e.g. `counter`, `histogram`, `gauge`). + */ + kind: MetricKind; + + /** + * The type of value recorded by the metric. Either `int` or `float`. + */ + valueType: NumericMetricValueType; } +/** + * Tags to be attached to some metrics. + * + * @experimental The Metric API is an experimental feature and may be subject to change. + */ +export type MetricTags = Record; + +/** + * Type of numerical values recorded by a metric. + * + * Note that this represents the _configuration_ of the metric; however, since JavaScript doesn't + * actually have different representation for integers and floats, the actual value type is always + * a JS 'number'. + * + * @experimental The Metric API is an experimental feature and may be subject to change. + */ +export type NumericMetricValueType = 'int' | 'float'; + +/** + * The kind of a metric. + * + * @experimental The Metric API is an experimental feature and may be subject to change. + */ +export type MetricKind = 'counter' | 'histogram' | 'gauge'; + /** * A metric that supports adding values as a counter. * @@ -91,6 +126,9 @@ export interface MetricCounter extends Metric { * @param tags Tags to append to existing tags. */ withTags(tags: MetricTags): MetricCounter; + + kind: 'counter'; + valueType: 'int'; } /** @@ -99,11 +137,6 @@ export interface MetricCounter extends Metric { * @experimental The Metric API is an experimental feature and may be subject to change. */ export interface MetricHistogram extends Metric { - /** - * The type of value to record. Either `int` or `float`. - */ - valueType: NumericMetricValueType; - /** * Record the given value on the histogram. * @@ -120,6 +153,8 @@ export interface MetricHistogram extends Metric { * @param tags Tags to append to existing tags. */ withTags(tags: MetricTags): MetricHistogram; + + kind: 'histogram'; } /** @@ -128,11 +163,6 @@ export interface MetricHistogram extends Metric { * @experimental The Metric API is an experimental feature and may be subject to change. */ export interface MetricGauge extends Metric { - /** - * The type of value to set. Either `int` or `float`. - */ - valueType: NumericMetricValueType; - /** * Set the given value on the gauge. * @@ -147,16 +177,9 @@ export interface MetricGauge extends Metric { * @param tags Tags to append to existing tags. */ withTags(tags: MetricTags): MetricGauge; -} -/** - * Tags to be attached to some metrics. - * - * @experimental The Metric API is an experimental feature and may be subject to change. - */ -export type MetricTags = Record; - -export type NumericMetricValueType = 'int' | 'float'; + kind: 'gauge'; +} //////////////////////////////////////////////////////////////////////////////////////////////////// @@ -170,6 +193,9 @@ class NoopMetricMeter implements MetricMeter { unit, description, + kind: 'counter', + valueType: 'int', + add(_value, _extraTags) {}, withTags(_extraTags) { @@ -186,10 +212,12 @@ class NoopMetricMeter implements MetricMeter { ): MetricHistogram { return { name, - valueType, unit, description, + kind: 'histogram', + valueType, + record(_value, _extraTags) {}, withTags(_extraTags) { @@ -198,13 +226,20 @@ class NoopMetricMeter implements MetricMeter { }; } - createGauge(name: string, valueType?: NumericMetricValueType, unit?: string, description?: string): MetricGauge { + createGauge( + name: string, + valueType: NumericMetricValueType = 'int', + unit?: string, + description?: string + ): MetricGauge { return { name, - valueType: valueType ?? 'int', unit, description, + kind: 'gauge', + valueType, + set(_value, _extraTags) {}, withTags(_extraTags) { @@ -300,6 +335,9 @@ export class MetricMeterWithComposedTags implements MetricMeter { * @experimental The Metric API is an experimental feature and may be subject to change. */ class MetricCounterWithComposedTags implements MetricCounter { + public readonly kind = 'counter'; + public readonly valueType = 'int'; + constructor( private parentCounter: MetricCounter, private contributors: MetricTagsOrFunc[] @@ -332,6 +370,8 @@ class MetricCounterWithComposedTags implements MetricCounter { * @experimental The Metric API is an experimental feature and may be subject to change. */ class MetricHistogramWithComposedTags implements MetricHistogram { + public readonly kind = 'histogram'; + constructor( private parentHistogram: MetricHistogram, private contributors: MetricTagsOrFunc[] @@ -369,6 +409,8 @@ class MetricHistogramWithComposedTags implements MetricHistogram { * @hidden */ class MetricGaugeWithComposedTags implements MetricGauge { + public readonly kind = 'gauge'; + constructor( private parentGauge: MetricGauge, private contributors: MetricTagsOrFunc[] diff --git a/packages/core-bridge/bridge-macros/src/derive_tryintojs.rs b/packages/core-bridge/bridge-macros/src/derive_tryintojs.rs index 68ed64998..a9b57e5b9 100644 --- a/packages/core-bridge/bridge-macros/src/derive_tryintojs.rs +++ b/packages/core-bridge/bridge-macros/src/derive_tryintojs.rs @@ -30,6 +30,46 @@ pub fn derive_tryintojs_struct(input: &DeriveInput, data: &syn::DataStruct) -> T } pub fn derive_tryintojs_enum(input: &DeriveInput, data: &syn::DataEnum) -> TokenStream { + let all_unit = data + .variants + .iter() + .all(|v| matches!(v.fields, syn::Fields::Unit)); + if all_unit { + derive_tryintojs_enum_as_string(input, data) + } else { + derive_tryintojs_enum_as_objects(input, data) + } +} + +fn derive_tryintojs_enum_as_string(input: &DeriveInput, data: &syn::DataEnum) -> TokenStream { + let enum_ident = &input.ident; + let generics = &input.generics; + let (impl_generics, ty_generics, where_clause) = generics.split_for_impl(); + + let variant_conversions = data.variants.iter().map(|v| { + let variant_ident = &v.ident; + let js_discriminant = variant_ident.to_string().to_case(Case::Camel); + quote! { + #enum_ident::#variant_ident => cx.string(#js_discriminant) + } + }); + + let expanded = quote! { + impl #impl_generics crate::helpers::TryIntoJs for #enum_ident #ty_generics #where_clause { + type Output = neon::types::JsString; + + fn try_into_js<'a>(self, cx: &mut impl neon::prelude::Context<'a>) -> neon::result::JsResult<'a, Self::Output> { + Ok(match self { + #(#variant_conversions),* + }) + } + } + }; + + TokenStream::from(expanded) +} + +fn derive_tryintojs_enum_as_objects(input: &DeriveInput, data: &syn::DataEnum) -> TokenStream { let enum_ident = &input.ident; let generics = &input.generics; let (impl_generics, ty_generics, where_clause) = generics.split_for_impl(); diff --git a/packages/core-bridge/src/helpers/try_into_js.rs b/packages/core-bridge/src/helpers/try_into_js.rs index fb0e3ca04..8685cdc09 100644 --- a/packages/core-bridge/src/helpers/try_into_js.rs +++ b/packages/core-bridge/src/helpers/try_into_js.rs @@ -1,11 +1,11 @@ use std::{ - sync::Arc, + sync::{Arc, Mutex}, time::{Duration, SystemTime, UNIX_EPOCH}, }; use neon::{ object::Object, - prelude::Context, + prelude::{Context, Root}, result::JsResult, types::{ JsArray, JsBigInt, JsBoolean, JsBuffer, JsNumber, JsString, JsUndefined, JsValue, Value, @@ -36,6 +36,13 @@ impl TryIntoJs for u32 { } } +impl TryIntoJs for f64 { + type Output = JsNumber; + fn try_into_js<'a>(self, cx: &mut impl Context<'a>) -> JsResult<'a, JsNumber> { + Ok(cx.number(self)) + } +} + impl TryIntoJs for String { type Output = JsString; fn try_into_js<'a>(self, cx: &mut impl Context<'a>) -> JsResult<'a, JsString> { @@ -127,3 +134,82 @@ impl TryIntoJs for (T0, T1) { Ok(array) } } + +/// A handle that wraps another `TryIntoJs` type, memoizing the converted JavaScript value. +/// That is, the value is converted to JavaScript only once, and then the same JavaScript value +/// is returned for subsequent calls. This notably ensures that the value sent to the JS side +/// is exactly the same object every time (i.e. `===` comparison is true). +#[derive(Clone, Debug)] +pub struct MemoizedHandle +where + T::Output: std::fmt::Debug, +{ + internal: Arc>>, +} + +#[derive(Debug)] +enum MemoizedInternal { + Pending(T), + Rooted(Root), +} + +impl MemoizedHandle +where + T::Output: Object + std::fmt::Debug, +{ + pub fn new(value: T) -> Self { + Self { + internal: Arc::new(Mutex::new(MemoizedInternal::Pending(value))), + } + } +} + +impl TryIntoJs for MemoizedHandle +where + T::Output: Object + std::fmt::Debug, +{ + type Output = T::Output; + fn try_into_js<'cx>(self, cx: &mut impl Context<'cx>) -> JsResult<'cx, T::Output> { + let mut guard = self.internal.lock().expect("MemoizedHandle lock"); + match *guard { + MemoizedInternal::Pending(ref value) => { + let rooted_value = value.clone().try_into_js(cx)?.root(cx); + let js_value = rooted_value.to_inner(cx); + *guard = MemoizedInternal::Rooted(rooted_value); + Ok(js_value) + } + MemoizedInternal::Rooted(ref handle) => Ok(handle.to_inner(cx)), + } + } +} + +/// To avoid some recuring error patterns when crossing the JS bridge, we normally translate +/// `Option` to `T | null` on the JS side. This however implies extra code on the JS side +/// to check for `null` and convert to `undefined` as appropriate. This generally poses no +/// problem, as manipulation of objects on the JS side is anyway desirable for other reasons. +/// +/// In rare cases, however, this extra manipulation may not be desirable. For example, when +/// passing buffered metrics to the JS Side, we want to preserve object identity. Modifying +/// objects on the JS side would either break object identity or introduce unnecessary overhead. +/// +/// For those rare cases, this newtype wrapper to indicate that an option property should be +/// translated to `undefined` on the JS side, rather than `null`. +#[derive(Clone, Debug)] +pub struct OptionAsUndefined(Option); + +impl From> for OptionAsUndefined { + fn from(value: Option) -> Self { + Self(value) + } +} + +impl TryIntoJs for OptionAsUndefined { + type Output = JsValue; + fn try_into_js<'a>(self, cx: &mut impl Context<'a>) -> JsResult<'a, JsValue> { + if let Some(value) = self.0 { + Ok(value.try_into_js(cx)?.upcast()) + } else { + Ok(cx.undefined().upcast()) + } + } +} diff --git a/packages/core-bridge/src/metrics.rs b/packages/core-bridge/src/metrics.rs index d03c71ed8..87acbc94e 100644 --- a/packages/core-bridge/src/metrics.rs +++ b/packages/core-bridge/src/metrics.rs @@ -1,28 +1,41 @@ +use std::any::Any; use std::collections::HashMap; +use std::sync::Arc; use anyhow::Context as _; use neon::prelude::*; use serde::Deserialize; use temporalio_common::telemetry::metrics::{ - CoreMeter, Counter as CoreCounter, Gauge as CoreGauge, Histogram as CoreHistogram, - MetricParametersBuilder, NewAttributes, TemporalMeter, + BufferInstrumentRef as CoreBufferInstrumentRef, CoreMeter, Counter as CoreCounter, + CustomMetricAttributes, Gauge as CoreGauge, Histogram as CoreHistogram, MetricCallBufferer, + MetricEvent as CoreMetricEvent, MetricKind as CoreMetricKind, + MetricParameters as CoreMetricParameters, MetricParametersBuilder, NewAttributes, + TemporalMeter, }; use temporalio_common::telemetry::metrics::{ GaugeF64 as CoreGaugeF64, HistogramF64 as CoreHistogramF64, }; -use temporalio_common::telemetry::metrics::{ - MetricKeyValue as CoreMetricKeyValue, MetricValue as CoreMetricValue, +use temporalio_common::telemetry::{ + metrics, + metrics::{MetricKeyValue as CoreMetricKeyValue, MetricValue as CoreMetricValue}, }; -use bridge_macros::js_function; +use bridge_macros::{TryIntoJs, js_function}; +use temporalio_sdk_core::telemetry::MetricsCallBuffer as CoreMetricsCallBuffer; +use crate::helpers::properties::ObjectExt as _; +use crate::helpers::try_into_js::{MemoizedHandle, OptionAsUndefined}; use crate::helpers::{ BridgeError, BridgeResult, JsonString, MutableFinalize, OpaqueInboundHandle, - OpaqueOutboundHandle, + OpaqueOutboundHandle, TryIntoJs, }; use crate::runtime::Runtime; +//////////////////////////////////////////////////////////////////////////////////////////////////// +// Metric Meter (aka Custom Metrics) +//////////////////////////////////////////////////////////////////////////////////////////////////// + pub fn init(cx: &mut neon::prelude::ModuleContext) -> neon::prelude::NeonResult<()> { cx.export_function("newMetricCounter", new_metric_counter)?; cx.export_function("newMetricHistogram", new_metric_histogram)?; @@ -245,10 +258,12 @@ pub fn add_metric_counter_value( attributes: JsonString, ) -> BridgeResult<()> { let counter_handle = counter_handle.borrow()?; + let attributes = counter_handle .meter .inner .new_attributes(parse_metric_attributes(attributes.value)); + counter_handle.counter.add(value as u64, &attributes); Ok(()) } @@ -313,6 +328,232 @@ pub fn set_metric_gauge_f64_value( Ok(()) } +//////////////////////////////////////////////////////////////////////////////////////////////////// +// Buffered Metrics (aka lang-side metrics exporter) +//////////////////////////////////////////////////////////////////////////////////////////////////// + +#[derive(Debug)] +pub struct MetricsCallBuffer { + pub(crate) core_buffer: Arc>, + use_seconds_for_durations: bool, +} + +impl MetricsCallBuffer { + pub(crate) fn new(max_buffer_size: usize, use_seconds_for_durations: bool) -> Self { + Self { + core_buffer: Arc::new(CoreMetricsCallBuffer::new(max_buffer_size)), + use_seconds_for_durations, + } + } + + pub(crate) fn retrieve(&self) -> Vec { + self.core_buffer + .retrieve() + .iter() + .filter_map(|e| self.convert_metric_event(e)) + .collect() + } + + fn convert_metric_event( + &self, + event: &CoreMetricEvent, + ) -> Option { + match event { + CoreMetricEvent::Create { + params, + populate_into, + kind, + } => { + // Create the metric and put it on the lazy ref + let metric = BufferedMetric::new(params, kind, self.use_seconds_for_durations); + populate_into + .set(Arc::new(BufferedMetricRef(MemoizedHandle::new(metric)))) + .expect("Unable to set buffered metric on reference"); + + None + } + + // Create the attributes and put it on the lazy ref + CoreMetricEvent::CreateAttributes { + populate_into, + append_from, + attributes, + } => { + let append_from = append_from.as_ref().map(|f| { + f.get() + .clone() + .as_any() + .downcast::() + .expect("Unable to downcast to expected buffered metric attributes") + }); + let attributes = BufferedMetricAttributes { + new_attributes: attributes.clone(), + append_from: append_from.map(|f| f.as_ref().clone()), + }; + + let r = BufferedMetricAttributesRef(MemoizedHandle::new(attributes)); + populate_into + .set(Arc::new(r)) + .expect("Unable to set buffered metric attributes on reference"); + + None + } + + CoreMetricEvent::Update { + instrument, + attributes, + update, + } => Some(BufferedMetricUpdate { + metric: instrument.get().as_ref().clone(), + value: match update { + metrics::MetricUpdateVal::Duration(v) if self.use_seconds_for_durations => { + v.as_secs_f64() + } + metrics::MetricUpdateVal::Duration(v) => v.as_millis() as f64, + metrics::MetricUpdateVal::Delta(v) => *v as f64, + metrics::MetricUpdateVal::DeltaF64(v) => *v, + metrics::MetricUpdateVal::Value(v) => *v as f64, + metrics::MetricUpdateVal::ValueF64(v) => *v, + }, + attributes: attributes + .get() + .clone() + .as_any() + .downcast::() + .expect("Unable to downcast to expected buffered metric attributes") + .as_ref() + .clone(), + }), + } + } +} + +#[derive(TryIntoJs)] +pub struct BufferedMetricUpdate { + metric: BufferedMetricRef, + value: f64, + attributes: BufferedMetricAttributesRef, +} + +#[derive(TryIntoJs, Clone, Debug)] +struct BufferedMetric { + name: String, + description: OptionAsUndefined, + unit: OptionAsUndefined, + kind: MetricKind, + value_type: MetricValueType, +} + +impl BufferedMetric { + pub fn new( + params: &CoreMetricParameters, + kind: &CoreMetricKind, + use_seconds_for_durations: bool, + ) -> Self { + let unit = match *kind { + CoreMetricKind::HistogramDuration if params.unit == "duration" => { + Some((if use_seconds_for_durations { "s" } else { "ms" }).to_string()) + } + _ => (!params.unit.is_empty()).then_some(params.unit.to_string()), + }; + + let (kind, value_type) = match *kind { + CoreMetricKind::Counter => (MetricKind::Counter, MetricValueType::Int), + CoreMetricKind::Gauge => (MetricKind::Gauge, MetricValueType::Int), + CoreMetricKind::GaugeF64 => (MetricKind::Gauge, MetricValueType::Float), + CoreMetricKind::Histogram => (MetricKind::Histogram, MetricValueType::Int), + CoreMetricKind::HistogramF64 => (MetricKind::Histogram, MetricValueType::Float), + CoreMetricKind::HistogramDuration => (MetricKind::Histogram, MetricValueType::Int), + }; + + let description = + (!params.description.is_empty()).then_some(params.description.to_string()); + + Self { + name: params.name.to_string(), + description: description.into(), + unit: unit.into(), + kind, + value_type, + } + } +} + +#[derive(Clone, Debug)] +pub struct BufferedMetricRef(MemoizedHandle); +impl CoreBufferInstrumentRef for BufferedMetricRef {} + +impl TryIntoJs for BufferedMetricRef { + type Output = JsObject; + fn try_into_js<'cx>(self, cx: &mut impl Context<'cx>) -> JsResult<'cx, Self::Output> { + self.0.try_into_js(cx) + } +} + +#[derive(Clone, Debug)] +struct BufferedMetricAttributes { + new_attributes: Vec, + append_from: Option, +} + +impl TryIntoJs for BufferedMetricAttributes { + type Output = JsObject; + fn try_into_js<'cx>(self, cx: &mut impl Context<'cx>) -> JsResult<'cx, Self::Output> { + let object = cx.empty_object(); + + // Copy existing attributes, if any + if let Some(existing) = self.append_from { + let existing_attrs = existing.try_into_js(cx)?; + + object_assign(cx, object, existing_attrs)?; + } + + // Assign new attributes + for kv in self.new_attributes { + let k = kv.key.as_str(); + match &kv.value { + metrics::MetricValue::String(v) => object.set_property_from(cx, k, v.as_str()), + metrics::MetricValue::Int(v) => object.set_property_from(cx, k, *v as f64), + metrics::MetricValue::Float(v) => object.set_property_from(cx, k, *v), + metrics::MetricValue::Bool(v) => object.set_property_from(cx, k, *v), + }?; + } + + Ok(object) + } +} + +#[derive(Clone, Debug)] +struct BufferedMetricAttributesRef(MemoizedHandle); + +impl CustomMetricAttributes for BufferedMetricAttributesRef { + fn as_any(self: Arc) -> Arc { + self as Arc + } +} + +impl TryIntoJs for BufferedMetricAttributesRef { + type Output = JsObject; + fn try_into_js<'cx>(self, cx: &mut impl Context<'cx>) -> JsResult<'cx, Self::Output> { + self.0.try_into_js(cx) + } +} + +#[derive(TryIntoJs, Clone, Debug)] +enum MetricKind { + Counter, + Gauge, + Histogram, +} + +#[derive(TryIntoJs, Clone, Debug)] +enum MetricValueType { + Int, + Float, +} + +//////////////////////////////////////////////////////////////////////////////////////////////////// +// Helpers //////////////////////////////////////////////////////////////////////////////////////////////////// fn parse_metric_attributes(attrs: MetricAttributes) -> NewAttributes { @@ -326,3 +567,16 @@ fn parse_metric_attributes(attrs: MetricAttributes) -> NewAttributes { .collect(); NewAttributes { attributes: attrs } } + +fn object_assign<'cx>( + cx: &mut impl Context<'cx>, + object: Handle<'cx, JsObject>, + source: Handle<'cx, JsObject>, +) -> JsResult<'cx, JsObject> { + let object_class = cx.global::("Object")?; + let assign_function = object_class.get::(cx, "assign")?; + let null = cx.null(); + assign_function.call(cx, null, vec![object.upcast(), source.upcast()])?; + + Ok(object) +} diff --git a/packages/core-bridge/src/runtime.rs b/packages/core-bridge/src/runtime.rs index fae1d1449..0bbb8cc82 100644 --- a/packages/core-bridge/src/runtime.rs +++ b/packages/core-bridge/src/runtime.rs @@ -20,6 +20,7 @@ use tokio_stream::StreamExt as _; use crate::{ helpers::{handles::MutableFinalize, *}, logs::LogEntry, + metrics::{BufferedMetricUpdate, MetricsCallBuffer}, }; #[macro_export] @@ -36,6 +37,10 @@ macro_rules! enter_sync { pub fn init(cx: &mut neon::prelude::ModuleContext) -> neon::prelude::NeonResult<()> { cx.export_function("newRuntime", runtime_new)?; cx.export_function("runtimeShutdown", runtime_shutdown)?; + cx.export_function( + "runtimeRetrieveBufferedMetrics", + runtime_retrieve_buffered_metrics, + )?; Ok(()) } @@ -51,6 +56,9 @@ pub struct Runtime { // For some unknown reason, the otel metrics exporter will go crazy on shutdown in some // scenarios if we don't hold on to the `CoreOtelMeter` till the `Runtime` finally gets dropped. _otel_metrics_exporter: Option>, + + // Buffered metrics call buffer, if buffered metrics are enabled + pub(crate) metrics_call_buffer: Option, } /// Initialize Core global telemetry and create the tokio runtime required to run Core. @@ -74,30 +82,44 @@ pub fn runtime_new( enter_sync!(core_runtime); // Run the metrics exporter task, if needed. Created after Runtime since it needs Tokio handle - let (prom_metrics_exporter_task, otel_metrics_exporter) = match metrics_options { - Some(BridgeMetricsExporter::Prometheus(prom_opts)) => { - let exporter = start_prometheus_metric_exporter(prom_opts) - .context("Failed to start prometheus metrics exporter")?; + let (prom_metrics_exporter_task, otel_metrics_exporter, metrics_call_buffer) = + match metrics_options { + Some(BridgeMetricsExporter::Prometheus(prom_opts)) => { + let exporter = start_prometheus_metric_exporter(prom_opts) + .context("Failed to start prometheus metrics exporter")?; - core_runtime - .telemetry_mut() - .attach_late_init_metrics(exporter.meter); + core_runtime + .telemetry_mut() + .attach_late_init_metrics(exporter.meter); - (Some(exporter.abort_handle), None) - } - Some(BridgeMetricsExporter::Otel(otel_opts)) => { - let exporter = build_otlp_metric_exporter(otel_opts) - .context("Failed to start OTel metrics exporter")?; + (Some(exporter.abort_handle), None, None) + } + Some(BridgeMetricsExporter::Otel(otel_opts)) => { + let exporter = build_otlp_metric_exporter(otel_opts) + .context("Failed to start OTel metrics exporter")?; - let exporter: Arc = Arc::new(exporter); - core_runtime - .telemetry_mut() - .attach_late_init_metrics(exporter.clone()); + let exporter: Arc = Arc::new(exporter); + core_runtime + .telemetry_mut() + .attach_late_init_metrics(exporter.clone()); - (None, Some(exporter)) - } - None => (None, None), - }; + (None, Some(exporter), None) + } + Some(BridgeMetricsExporter::Buffered { + max_buffer_size, + use_seconds_for_durations, + }) => { + let metrics_call_buffer = + MetricsCallBuffer::new(max_buffer_size, use_seconds_for_durations); + + core_runtime + .telemetry_mut() + .attach_late_init_metrics(metrics_call_buffer.core_buffer.clone()); + + (None, None, Some(metrics_call_buffer)) + } + None => (None, None, None), + }; // Run the log exporter task, if needed. Created after Runtime since it needs Tokio handle. let log_exporter_task = if let BridgeLogExporter::Push { stream, receiver } = logging_options { @@ -127,6 +149,7 @@ pub fn runtime_new( log_exporter_task, metrics_exporter_task: prom_metrics_exporter_task.map(Arc::new), _otel_metrics_exporter: otel_metrics_exporter, + metrics_call_buffer, })) } @@ -140,6 +163,24 @@ pub fn runtime_shutdown(runtime: OpaqueInboundHandle) -> BridgeResult<( Ok(()) } +/// Retrieve buffered metrics from the runtime. +/// +/// This function drains the metrics buffer and returns all metric updates that have been +/// accumulated since the last call to this function. +#[js_function] +pub fn runtime_retrieve_buffered_metrics( + runtime: OpaqueInboundHandle, +) -> BridgeResult> { + let runtime = runtime.borrow()?; + let buffer = runtime.metrics_call_buffer.as_ref().ok_or_else(|| { + BridgeError::UnexpectedError( + "Attempting to retrieve buffered metrics of a runtime without buffer".into(), + ) + })?; + + Ok(buffer.retrieve()) +} + /// Drop will handle the cleanup impl MutableFinalize for Runtime {} @@ -223,6 +264,10 @@ impl RuntimeExt for Arc { pub enum BridgeMetricsExporter { Prometheus(CorePrometheusExporterOptions), Otel(CoreOtelCollectorOptions), + Buffered { + max_buffer_size: usize, + use_seconds_for_durations: bool, + }, } pub enum BridgeLogExporter { @@ -291,6 +336,7 @@ mod config { pub(super) enum MetricsExporterOptions { Prometheus(PrometheusMetricsExporterConfig), Otel(OtelMetricsExporterConfig), + Buffered(BufferedMetricsExporterConfig), } #[derive(Debug, Clone, TryFromJs)] @@ -315,6 +361,12 @@ mod config { protocol: StringEncoded, } + #[derive(Debug, Clone, TryFromJs)] + pub(super) struct BufferedMetricsExporterConfig { + max_buffer_size: usize, + use_seconds_for_durations: bool, + } + /// A private newtype so that we can implement `TryFromJs` on simple externally defined enums #[derive(Debug, Clone)] struct StringEncoded(T); @@ -389,6 +441,10 @@ mod config { Ok(super::BridgeMetricsExporter::Prometheus(prom.try_into()?)) } Self::Otel(otel) => Ok(super::BridgeMetricsExporter::Otel(otel.try_into()?)), + Self::Buffered(buffered) => Ok(super::BridgeMetricsExporter::Buffered { + max_buffer_size: buffered.max_buffer_size, + use_seconds_for_durations: buffered.use_seconds_for_durations, + }), } } } diff --git a/packages/core-bridge/ts/native.ts b/packages/core-bridge/ts/native.ts index ac8b3284b..538e4d153 100644 --- a/packages/core-bridge/ts/native.ts +++ b/packages/core-bridge/ts/native.ts @@ -71,7 +71,11 @@ export type LogExporterOptions = receiver: (entries: JsonString[]) => void; }; -export type MetricExporterOptions = PrometheusMetricsExporterOptions | OtelMetricsExporterOptions | null; +export type MetricExporterOptions = + | PrometheusMetricsExporterOptions + | OtelMetricsExporterOptions + | BufferedMetricsExporterOptions + | null; export interface PrometheusMetricsExporterOptions { type: 'prometheus'; @@ -95,6 +99,12 @@ export interface OtelMetricsExporterOptions { protocol: 'http' | 'grpc'; } +export interface BufferedMetricsExporterOptions { + type: 'buffered'; + maxBufferSize: number; + useSecondsForDurations: boolean; +} + //////////////////////////////////////////////////////////////////////////////////////////////////// // Client //////////////////////////////////////////////////////////////////////////////////////////////////// @@ -525,3 +535,27 @@ export declare function setMetricGaugeF64Value( value: number, attrs: JsonString ): void; + +//////////////////////////////////////////////////////////////////////////////////////////////////// +// Buffered Metrics +//////////////////////////////////////////////////////////////////////////////////////////////////// + +export declare function runtimeRetrieveBufferedMetrics(runtime: Runtime): BufferedMetricUpdate[]; + +export interface BufferedMetricUpdate { + metric: BufferedMetric; + value: number; + attributes: MetricAttributes; +} + +export interface BufferedMetric { + name: string; + description: string; + unit: string; + kind: BufferedMetricKind; + valueType: BufferedMetricValueType; +} + +export type BufferedMetricKind = 'counter' | 'histogram' | 'gauge'; + +export type BufferedMetricValueType = 'int' | 'float'; diff --git a/packages/test/src/test-otel.ts b/packages/test/src/test-otel.ts index 35f136167..7536b7791 100644 --- a/packages/test/src/test-otel.ts +++ b/packages/test/src/test-otel.ts @@ -2,7 +2,6 @@ /** * Manual tests to inspect tracing output */ -import * as http from 'http'; import * as http2 from 'http2'; import * as otelApi from '@opentelemetry/api'; import { SpanStatusCode, createTraceState } from '@opentelemetry/api'; @@ -40,7 +39,7 @@ import { } from '@temporalio/worker'; import { WorkflowInboundCallsInterceptor, WorkflowOutboundCallsInterceptor } from '@temporalio/workflow'; import * as activities from './activities'; -import { loadHistory, RUN_INTEGRATION_TESTS, TestWorkflowEnvironment, Worker } from './helpers'; +import { loadHistory, RUN_INTEGRATION_TESTS, Worker } from './helpers'; import * as workflows from './workflows'; import { createTestWorkflowBundle } from './helpers-integration'; @@ -64,7 +63,7 @@ async function withFakeGrpcServer( }); res.write( // This is a raw gRPC response, of length 0 - Buffer.from([ + new Uint8Array([ // Frame Type: Data; Not Compressed 0, // Message Length: 0 @@ -90,161 +89,6 @@ async function withFakeGrpcServer( }); } -async function withHttpServer( - fn: (port: number) => Promise, - requestListener?: (request: http.IncomingMessage) => void -): Promise { - return new Promise((resolve, reject) => { - const srv = http.createServer(); - srv.listen({ port: 0, host: '127.0.0.1' }, () => { - const addr = srv.address(); - if (typeof addr === 'string' || addr === null) { - throw new Error('Unexpected server address type'); - } - srv.on('request', async (req, res) => { - if (requestListener) await requestListener(req); - res.statusCode = 200; - res.end(); - }); - fn(addr.port) - .catch((e) => reject(e)) - .finally(() => { - resolve(); - - // The OTel exporter will try to flush metrics on drop, which may result in tons of ERROR - // messages on the console if the server has had time to complete shutdown before then. - // Delaying closing the server by 1 second is enough to avoid that situation, and doesn't - // need to be awaited, no that doesn't slow down tests. - setTimeout(() => { - srv.close(); - }, 1000).unref(); - }); - }); - }); -} - -test.serial('Runtime.install() throws meaningful error when passed invalid metrics.otel.url', async (t) => { - t.throws(() => Runtime.install({ telemetryOptions: { metrics: { otel: { url: ':invalid' } } } }), { - instanceOf: TypeError, - message: /metricsExporter.otel.url/, - }); -}); - -test.serial('Runtime.install() accepts metrics.otel.url without headers', async (t) => { - try { - Runtime.install({ telemetryOptions: { metrics: { otel: { url: 'http://127.0.0.1:1234' } } } }); - t.pass(); - } finally { - // Cleanup the runtime so that it doesn't interfere with other tests - await Runtime._instance?.shutdown(); - } -}); - -test.serial('Exporting OTEL metrics from Core works', async (t) => { - let resolveCapturedRequest = (_req: http2.Http2ServerRequest) => undefined as void; - const capturedRequest = new Promise((r) => (resolveCapturedRequest = r)); - try { - await withFakeGrpcServer(async (port: number) => { - Runtime.install({ - telemetryOptions: { - metrics: { - otel: { - url: `http://127.0.0.1:${port}`, - headers: { - 'x-test-header': 'test-value', - }, - metricsExportInterval: 10, - }, - }, - }, - }); - - const localEnv = await TestWorkflowEnvironment.createLocal(); - try { - const worker = await Worker.create({ - connection: localEnv.nativeConnection, - workflowsPath: require.resolve('./workflows'), - taskQueue: 'test-otel', - }); - const client = new WorkflowClient({ - connection: localEnv.connection, - }); - await worker.runUntil(async () => { - await client.execute(workflows.successString, { - taskQueue: 'test-otel', - workflowId: uuid4(), - }); - const req = await Promise.race([ - capturedRequest, - await new Promise((resolve) => setTimeout(() => resolve(undefined), 2000)), - ]); - t.truthy(req); - t.is(req?.url, '/opentelemetry.proto.collector.metrics.v1.MetricsService/Export'); - t.is(req?.headers['x-test-header'], 'test-value'); - }); - } finally { - await localEnv.teardown(); - } - }, resolveCapturedRequest); - } finally { - // Cleanup the runtime so that it doesn't interfere with other tests - await Runtime._instance?.shutdown(); - } -}); - -test.serial('Exporting OTEL metrics using OTLP/HTTP from Core works', async (t) => { - let resolveCapturedRequest = (_req: http.IncomingMessage) => undefined as void; - const capturedRequest = new Promise((r) => (resolveCapturedRequest = r)); - try { - await withHttpServer(async (port: number) => { - Runtime.install({ - telemetryOptions: { - metrics: { - otel: { - url: `http://127.0.0.1:${port}/v1/metrics`, - http: true, - headers: { - 'x-test-header': 'test-value', - }, - metricsExportInterval: 10, - }, - }, - }, - }); - - const localEnv = await TestWorkflowEnvironment.createLocal(); - try { - const worker = await Worker.create({ - connection: localEnv.nativeConnection, - workflowsPath: require.resolve('./workflows'), - taskQueue: 'test-otel', - }); - const client = new WorkflowClient({ - connection: localEnv.connection, - }); - await worker.runUntil(async () => { - await client.execute(workflows.successString, { - taskQueue: 'test-otel', - workflowId: uuid4(), - }); - const req = await Promise.race([ - capturedRequest, - await new Promise((resolve) => setTimeout(() => resolve(undefined), 2000)), - ]); - t.truthy(req); - t.is(req?.url, '/v1/metrics'); - t.is(req?.headers['x-test-header'], 'test-value'); - }); - } finally { - await localEnv.teardown(); - } - }, resolveCapturedRequest); - } finally { - // Cleanup the runtime so that it doesn't interfere with other tests - await Runtime._instance?.shutdown(); - } -}); - if (RUN_INTEGRATION_TESTS) { test.serial('Otel interceptor spans are connected and complete', async (t) => { Runtime.install({}); diff --git a/packages/test/src/test-runtime-buffered-metrics.ts b/packages/test/src/test-runtime-buffered-metrics.ts new file mode 100644 index 000000000..07dd42233 --- /dev/null +++ b/packages/test/src/test-runtime-buffered-metrics.ts @@ -0,0 +1,204 @@ +import { Runtime } from '@temporalio/worker'; +import { Context as BaseContext, makeTestFunction } from './helpers-integration'; + +interface Context extends BaseContext { + port: number; +} + +const test = makeTestFunction({ + workflowsPath: __filename, + runtimeOpts: { + telemetryOptions: { + metrics: { + buffered: { + maxBufferSize: 1000, + useSecondsForDurations: false, + }, + }, + }, + }, +}); + +// Asserts that: +// - Buffered metrics can be retrieved from Core +// - Metric parameters are properly reported in update events +// - A Metric object is reused across update events for the same metric +// - Metric update events contain the correct value +test.serial('Buffered Metrics - Exporting buffered metrics from Core works properly', async (t) => { + const meter = Runtime.instance().metricMeter; + + // Counter (events 0-2) + const counter = meter.createCounter('my-counter', 'my-counter-unit', 'my-counter-description'); + counter.add(1); + counter.add(1); + counter.add(40); // 1+1+40 => 42 + + // Int Gauge (events 3-4) + const gaugeInt = meter.createGauge('my-int-gauge', 'int', 'my-int-gauge-unit', 'my-int-gauge-description'); + gaugeInt.set(1); + gaugeInt.set(40); + + // Float Gauge (events 5-7) + const gaugeFloat = meter.createGauge('my-float-gauge', 'float', 'my-float-gauge-unit', 'my-float-gauge-description'); + gaugeFloat.set(1.1); + gaugeFloat.set(1.1); + gaugeFloat.set(40.1); + + // Int Histogram (events 8-10) + const histogramInt = meter.createHistogram( + 'my-int-histogram', + 'int', + 'my-int-histogram-unit', + 'my-int-histogram-description' + ); + histogramInt.record(20); + histogramInt.record(200); + histogramInt.record(2000); + + // Float Histogram (events 11-13) + const histogramFloat = meter.createHistogram( + 'my-float-histogram', + 'float', + 'my-float-histogram-unit', + 'my-float-histogram-description' + ); + histogramFloat.record(0.02); + histogramFloat.record(0.07); + histogramFloat.record(0.99); + + const updatesIterator = Runtime.instance().retrieveBufferedMetrics(); + + // Metric events may include other metrics emitted by Core, which would be too + // flaky to test here. Hence we filter out all metrics that do not start with 'my-'. + const updates = Array.from(updatesIterator).filter((update) => update.metric.name.startsWith('my-')); + t.is(updates.length, 14); + + t.deepEqual(updates[0].metric, { + name: 'my-counter', + kind: 'counter', + valueType: 'int', + unit: 'my-counter-unit', + description: 'my-counter-description', + }); + t.is(updates[0].value, 1); + t.is(updates[1].metric, updates[0].metric); + t.is(updates[1].value, 1); + t.is(updates[2].metric, updates[0].metric); + t.is(updates[2].value, 40); + + t.deepEqual(updates[3].metric, { + name: 'my-int-gauge', + kind: 'gauge', + valueType: 'int', + unit: 'my-int-gauge-unit', + description: 'my-int-gauge-description', + }); + t.is(updates[3].value, 1); + t.is(updates[4].metric, updates[3].metric); + t.is(updates[4].value, 40); + + t.deepEqual(updates[5].metric, { + name: 'my-float-gauge', + kind: 'gauge', + // valueType: 'float', + valueType: 'int', // FIXME: pending on https://github.com/temporalio/sdk-core/pull/1108 + unit: 'my-float-gauge-unit', + description: 'my-float-gauge-description', + }); + t.is(updates[5].value, 1.1); + t.is(updates[6].metric, updates[5].metric); + t.is(updates[6].value, 1.1); + t.is(updates[7].metric, updates[5].metric); + t.is(updates[7].value, 40.1); + + t.deepEqual(updates[8].metric, { + name: 'my-int-histogram', + kind: 'histogram', + valueType: 'int', + unit: 'my-int-histogram-unit', + description: 'my-int-histogram-description', + }); + t.is(updates[8].value, 20); + t.is(updates[9].metric, updates[8].metric); + t.is(updates[9].value, 200); + t.is(updates[10].metric, updates[8].metric); + t.is(updates[10].value, 2000); + + t.deepEqual(updates[11].metric, { + name: 'my-float-histogram', + kind: 'histogram', + // valueType: 'float', + valueType: 'int', // FIXME: pending on https://github.com/temporalio/sdk-core/pull/1108 + unit: 'my-float-histogram-unit', + description: 'my-float-histogram-description', + }); + t.is(updates[11].value, 0.02); + t.is(updates[12].metric, updates[11].metric); + t.is(updates[12].value, 0.07); + t.is(updates[13].metric, updates[11].metric); + t.is(updates[13].value, 0.99); +}); + +test.serial('Buffered Metrics - Metric attributes are properly reported', async (t) => { + const meter = Runtime.instance().metricMeter; + + const counter = meter.createCounter('my-counter', 'my-counter-unit', 'my-counter-description'); + const counter2 = counter.withTags({ labelA: 'value-a', labelB: true, labelC: 123, labelD: 123.456 }); + + counter2.add(1); + counter2.add(2, { labelA: 'value-a2', labelE: 12.34 }); + counter2.add(3, { labelC: 'value-c2', labelE: 23.45 }); + + const updatesIterator = Runtime.instance().retrieveBufferedMetrics(); + + // Metric events may include other metrics emitted by Core, which would be too + // flaky to test here. Hence we filter out all metrics that do not start with 'my-'. + const updates = Array.from(updatesIterator).filter((update) => update.metric.name.startsWith('my-')); + t.is(updates.length, 3); + + t.is(updates[0].value, 1); + t.deepEqual(updates[0].attributes, { + labelA: 'value-a', + labelB: true, + labelC: 123, + labelD: 123.456, + }); + t.is(updates[1].value, 2); + t.deepEqual(updates[1].attributes, { + labelA: 'value-a2', + labelB: true, + labelC: 123, + labelD: 123.456, + labelE: 12.34, + }); + t.is(updates[2].value, 3); + t.deepEqual(updates[2].attributes, { + labelA: 'value-a', + labelB: true, + labelC: 'value-c2', + labelD: 123.456, + labelE: 23.45, + }); +}); + +test.serial('Buffered Metrics - empty description and unit are reported as undefined', async (t) => { + const meter = Runtime.instance().metricMeter; + + const counter = meter.createCounter('my-counter', undefined, undefined); + counter.add(1); + + // Metric events may include other metrics emitted by Core, which would be too + // flaky to test here. Hence we filter out all metrics that do not start with 'my-'. + const updatesIterator = Runtime.instance().retrieveBufferedMetrics(); + const updates = Array.from(updatesIterator).filter((update) => update.metric.name.startsWith('my-')); + t.is(updates.length, 1); + + t.deepEqual(updates[0].metric, { + name: 'my-counter', + kind: 'counter', + valueType: 'int', + unit: undefined, + description: undefined, + }); + t.is(updates[0].value, 1); +}); diff --git a/packages/test/src/test-runtime-otel.ts b/packages/test/src/test-runtime-otel.ts new file mode 100644 index 000000000..0de128e1b --- /dev/null +++ b/packages/test/src/test-runtime-otel.ts @@ -0,0 +1,209 @@ +import * as http from 'http'; +import * as http2 from 'http2'; +import test from 'ava'; +import { v4 as uuid4 } from 'uuid'; +import { WorkflowClient } from '@temporalio/client'; +import { Runtime } from '@temporalio/worker'; +import { TestWorkflowEnvironment, Worker } from './helpers'; +import * as workflows from './workflows'; + +async function withFakeGrpcServer( + fn: (port: number) => Promise, + requestListener?: (request: http2.Http2ServerRequest, response: http2.Http2ServerResponse) => void +): Promise { + return new Promise((resolve, reject) => { + const srv = http2.createServer(); + srv.listen({ port: 0, host: '127.0.0.1' }, () => { + const addr = srv.address(); + if (typeof addr === 'string' || addr === null) { + throw new Error('Unexpected server address type'); + } + srv.on('request', async (req, res) => { + if (requestListener) requestListener(req, res); + res.statusCode = 200; + res.addTrailers({ + 'grpc-status': '0', + 'grpc-message': 'OK', + }); + res.write( + // This is a raw gRPC response, of length 0 + new Uint8Array([ + // Frame Type: Data; Not Compressed + 0, + // Message Length: 0 + 0, 0, 0, 0, + ]) + ); + res.end(); + }); + fn(addr.port) + .catch((e) => reject(e)) + .finally(() => { + resolve(); + + // The OTel exporter will try to flush metrics on drop, which may result in tons of ERROR + // messages on the console if the server has had time to complete shutdown before then. + // Delaying closing the server by 1 second is enough to avoid that situation, and doesn't + // need to be awaited, no that doesn't slow down tests. + setTimeout(() => { + srv.close(); + }, 1000).unref(); + }); + }); + }); +} + +async function withHttpServer( + fn: (port: number) => Promise, + requestListener?: (request: http.IncomingMessage) => void +): Promise { + return new Promise((resolve, reject) => { + const srv = http.createServer(); + srv.listen({ port: 0, host: '127.0.0.1' }, () => { + const addr = srv.address(); + if (typeof addr === 'string' || addr === null) { + throw new Error('Unexpected server address type'); + } + srv.on('request', async (req, res) => { + if (requestListener) await requestListener(req); + res.statusCode = 200; + res.end(); + }); + fn(addr.port) + .catch((e) => reject(e)) + .finally(() => { + resolve(); + + // The OTel exporter will try to flush metrics on drop, which may result in tons of ERROR + // messages on the console if the server has had time to complete shutdown before then. + // Delaying closing the server by 1 second is enough to avoid that situation, and doesn't + // need to be awaited, no that doesn't slow down tests. + setTimeout(() => { + srv.close(); + }, 1000).unref(); + }); + }); + }); +} + +test.serial('Runtime.install() throws meaningful error when passed invalid metrics.otel.url', async (t) => { + t.throws(() => Runtime.install({ telemetryOptions: { metrics: { otel: { url: ':invalid' } } } }), { + instanceOf: TypeError, + message: /metricsExporter.otel.url/, + }); +}); + +test.serial('Runtime.install() accepts metrics.otel.url without headers', async (t) => { + try { + Runtime.install({ telemetryOptions: { metrics: { otel: { url: 'http://127.0.0.1:1234' } } } }); + t.pass(); + } finally { + // Cleanup the runtime so that it doesn't interfere with other tests + await Runtime._instance?.shutdown(); + } +}); + +test.serial('Exporting OTEL metrics from Core works', async (t) => { + let resolveCapturedRequest = (_req: http2.Http2ServerRequest) => undefined as void; + const capturedRequest = new Promise((r) => (resolveCapturedRequest = r)); + try { + await withFakeGrpcServer(async (port: number) => { + Runtime.install({ + telemetryOptions: { + metrics: { + otel: { + url: `http://127.0.0.1:${port}`, + headers: { + 'x-test-header': 'test-value', + }, + metricsExportInterval: 10, + }, + }, + }, + }); + + const localEnv = await TestWorkflowEnvironment.createLocal(); + try { + const worker = await Worker.create({ + connection: localEnv.nativeConnection, + workflowsPath: require.resolve('./workflows'), + taskQueue: 'test-otel', + }); + const client = new WorkflowClient({ + connection: localEnv.connection, + }); + await worker.runUntil(async () => { + await client.execute(workflows.successString, { + taskQueue: 'test-otel', + workflowId: uuid4(), + }); + const req = await Promise.race([ + capturedRequest, + await new Promise((resolve) => setTimeout(() => resolve(undefined), 2000)), + ]); + t.truthy(req); + t.is(req?.url, '/opentelemetry.proto.collector.metrics.v1.MetricsService/Export'); + t.is(req?.headers['x-test-header'], 'test-value'); + }); + } finally { + await localEnv.teardown(); + } + }, resolveCapturedRequest); + } finally { + // Cleanup the runtime so that it doesn't interfere with other tests + await Runtime._instance?.shutdown(); + } +}); + +test.serial('Exporting OTEL metrics using OTLP/HTTP from Core works', async (t) => { + let resolveCapturedRequest = (_req: http.IncomingMessage) => undefined as void; + const capturedRequest = new Promise((r) => (resolveCapturedRequest = r)); + try { + await withHttpServer(async (port: number) => { + Runtime.install({ + telemetryOptions: { + metrics: { + otel: { + url: `http://127.0.0.1:${port}/v1/metrics`, + http: true, + headers: { + 'x-test-header': 'test-value', + }, + metricsExportInterval: 10, + }, + }, + }, + }); + + const localEnv = await TestWorkflowEnvironment.createLocal(); + try { + const worker = await Worker.create({ + connection: localEnv.nativeConnection, + workflowsPath: require.resolve('./workflows'), + taskQueue: 'test-otel', + }); + const client = new WorkflowClient({ + connection: localEnv.connection, + }); + await worker.runUntil(async () => { + await client.execute(workflows.successString, { + taskQueue: 'test-otel', + workflowId: uuid4(), + }); + const req = await Promise.race([ + capturedRequest, + await new Promise((resolve) => setTimeout(() => resolve(undefined), 2000)), + ]); + t.truthy(req); + t.is(req?.url, '/v1/metrics'); + t.is(req?.headers['x-test-header'], 'test-value'); + }); + } finally { + await localEnv.teardown(); + } + }, resolveCapturedRequest); + } finally { + // Cleanup the runtime so that it doesn't interfere with other tests + await Runtime._instance?.shutdown(); + } +}); diff --git a/packages/test/src/test-prometheus.ts b/packages/test/src/test-runtime-prometheus.ts similarity index 100% rename from packages/test/src/test-prometheus.ts rename to packages/test/src/test-runtime-prometheus.ts diff --git a/packages/worker/src/runtime-metrics.ts b/packages/worker/src/runtime-metrics.ts index bc9a7f0ea..dee8ed80d 100644 --- a/packages/worker/src/runtime-metrics.ts +++ b/packages/worker/src/runtime-metrics.ts @@ -1,4 +1,5 @@ import { + Metric, MetricCounter, MetricGauge, MetricHistogram, @@ -8,6 +9,16 @@ import { } from '@temporalio/common'; import { native } from '@temporalio/core-bridge'; +//////////////////////////////////////////////////////////////////////////////////////////////////// +// Metric Meter (aka Custom Metrics) +//////////////////////////////////////////////////////////////////////////////////////////////////// + +/** + * An implementation of the {@link MetricMeter} interface that pushes emitted metrics through + * the bridge, to be collected by whatever exporter is configured on the Core Runtime. + * + * @internal + */ export class RuntimeMetricMeter implements MetricMeter { public constructor(protected runtime: native.Runtime) {} @@ -59,6 +70,9 @@ export class RuntimeMetricMeter implements MetricMeter { } class RuntimeMetricCounter implements MetricCounter { + public readonly kind = 'counter'; + public readonly valueType = 'int'; + public constructor( private readonly native: native.MetricCounter, public readonly name: string, @@ -80,6 +94,7 @@ class RuntimeMetricCounter implements MetricCounter { } class RuntimeMetricHistogram implements MetricHistogram { + public readonly kind = 'histogram'; public readonly valueType = 'int'; public constructor( @@ -103,6 +118,7 @@ class RuntimeMetricHistogram implements MetricHistogram { } class RuntimeMetricHistogramF64 implements MetricHistogram { + public readonly kind = 'histogram'; public readonly valueType = 'float'; public constructor( @@ -126,6 +142,7 @@ class RuntimeMetricHistogramF64 implements MetricHistogram { } class RuntimeMetricGauge implements MetricGauge { + public readonly kind = 'gauge'; public readonly valueType = 'int'; public constructor( @@ -149,6 +166,7 @@ class RuntimeMetricGauge implements MetricGauge { } class RuntimeMetricGaugeF64 implements MetricGauge { + public readonly kind = 'gauge'; public readonly valueType = 'float'; public constructor( @@ -170,3 +188,51 @@ class RuntimeMetricGaugeF64 implements MetricGauge { throw new Error('withTags is not supported directly on RuntimeMetricGaugeF64'); } } + +//////////////////////////////////////////////////////////////////////////////////////////////////// +// Buffered Metrics (aka lang-side metrics exporter) +//////////////////////////////////////////////////////////////////////////////////////////////////// + +/** + * A single update event on a metric, recorded by buffered metrics exporter. + * + * When the {@link Runtime} is configured to buffer metrics, user code is expected to periodically + * call {@link Runtime.retrieveBufferedMetrics} to retrieve the buffered metric updates. Each update + * event will be represented as a single instance of this interface. + * + * @experimental Buffered metrics is an experiemental feature. APIs may be subject to change. + */ +export interface BufferedMetricUpdate { + /** + * The metric being updated. + * + * For performance reasons, the SDK tries to re-use the same object across updates for the same + * metric. User code may take advantage of this, e.g. by attaching downstream metric references to + * as a supplementary property on the `Metric` object. Note that the SDK may sometimes miss + * deduplication opportunities, notably when a same metric is accessed from different execution + * contexts (e.g. from both activity code and workflow code). + */ + metric: Metric; + + /** + * Value for this update event. + * + * For counters this is a delta; for gauges and histograms this is the value itself. + */ + value: number; + + /** + * Attributes for this update event. + * + * For performance reasons, the SDK tries to re-use the same object across updates for the same + * attribute set. User code may take advantage of this, e.g. by attaching downstream attribute + * sets references as a supplementary, _non-enumerable_ property on the `MetricTags` object. Make + * sure however not to add, modify or delete any enumerable properties on the `MetricTags` object, + * as those changes would affect future update events using the same `MetricTags` object, as well + * as further events that extend that `MetricTags` object. + * + * Note that the SDK may miss deduplication oppportunities, notably when a same set of attributes + * is recreated by the code emitting the metric updates, e.g. when extending an existing set of + */ + attributes: MetricTags; +} diff --git a/packages/worker/src/runtime-options.ts b/packages/worker/src/runtime-options.ts index 2d4a9cbc4..fc8f802ed 100644 --- a/packages/worker/src/runtime-options.ts +++ b/packages/worker/src/runtime-options.ts @@ -214,6 +214,8 @@ export type MetricsExporterConfig = { /** * Tags to add to all metrics emitted by the worker. + * + * Note that this is not supported when the metrics are buffered. */ globalTags?: Record; @@ -223,7 +225,7 @@ export type MetricsExporterConfig = { * @default true */ attachServiceName?: boolean; -} & (PrometheusMetricsExporter | OtelCollectorExporter); +} & (PrometheusMetricsExporter | OtelCollectorExporter | BufferedMetricsExporter); /** * OpenTelemetry Collector options for exporting metrics or traces @@ -359,6 +361,33 @@ export interface PrometheusMetricsExporter { }; } +/** + * Buffered metrics exporter options + * + * @experimental Buffered metrics is an experiemental feature. APIs may be subject to change. + */ +export interface BufferedMetricsExporter { + buffered: { + /** + * Maximum number of metric events to buffer before dropping new events. + * + * The buffer accumulates metric updates from Core and should be drained regularly by calling + * {@link Runtime.retrieveBufferedMetrics}. If the buffer fills up, new metric updates will be + * dropped and an error will be logged. + * + * @default 10000 + */ + maxBufferSize?: number; + + /** + * If set to true, the exporter will use seconds for durations instead of milliseconds. + * + * @default false + */ + useSecondsForDurations?: boolean; + }; +} + // Compile Options //////////////////////////////////////////////////////////////////////////////// /** @@ -409,7 +438,13 @@ export function compileOptions(options: RuntimeOptions): CompiledRuntimeOptions histogramBucketOverrides: metrics.otel.histogramBucketOverrides ?? {}, globalTags: metrics.globalTags ?? {}, } satisfies native.MetricExporterOptions) - : null, + : metrics && isBufferedMetricsExporter(metrics) + ? ({ + type: 'buffered', + maxBufferSize: metrics.buffered.maxBufferSize ?? 10000, + useSecondsForDurations: metrics.buffered.useSecondsForDurations ?? false, + } satisfies native.MetricExporterOptions) + : null, workerHeartbeatIntervalMillis: heartbeatMillis === 0 ? null : heartbeatMillis, }, }; @@ -490,6 +525,10 @@ function isPrometheusMetricsExporter(metrics: MetricsExporterConfig): metrics is return 'prometheus' in metrics && typeof metrics.prometheus === 'object'; } +function isBufferedMetricsExporter(metrics: MetricsExporterConfig): metrics is BufferedMetricsExporter { + return 'buffered' in metrics && typeof metrics.buffered === 'object'; +} + function isForwardingLogger(options: LogExporterConfig): boolean { return 'forward' in options && typeof options.forward === 'object'; } diff --git a/packages/worker/src/runtime.ts b/packages/worker/src/runtime.ts index e98227e68..e74eba88a 100644 --- a/packages/worker/src/runtime.ts +++ b/packages/worker/src/runtime.ts @@ -8,7 +8,7 @@ import { temporal } from '@temporalio/proto'; import { History } from '@temporalio/common/lib/proto-utils'; import { MetricMeterWithComposedTags } from '@temporalio/common/lib/metrics'; import { isFlushableLogger } from './logger'; -import { RuntimeMetricMeter } from './runtime-metrics'; +import { RuntimeMetricMeter, BufferedMetricUpdate } from './runtime-metrics'; import { toNativeClientOptions, NativeConnectionOptions } from './connection-options'; import { byteArrayToBuffer, toMB } from './utils'; import { CompiledRuntimeOptions, compileOptions, RuntimeOptions } from './runtime-options'; @@ -288,6 +288,28 @@ export class Runtime { } } + /** + * Retrieve buffered metric updates from the runtime. + * + * This method drains the metrics buffer and returns all metric events that have accumulated + * since the last call to this method. This method should be called regularly when using + * buffered metrics to prevent buffer overflow. + * + * @throws {IllegalStateError} If buffered metrics have not been enabled for this runtime, + * or if the runtime has been shut down. + * @returns Array of buffered metric updates, each containing the metric metadata, + * current value, and attributes + * @experimental Buffered metrics is an experiemental feature. APIs may be subject to change. + */ + public retrieveBufferedMetrics(): ArrayIterator { + if (this.native === undefined) throw new IllegalStateError('Runtime has been shut down'); + + // We return an iterator instead of an array in case we should, at some point in the future, + // need to apply per item transformation. Copying to an array would obviously be possible, the + // buffered metric array might be large, so avoiding the extra array allocation makes sense. + return native.runtimeRetrieveBufferedMetrics(this.native).values(); + } + /** * Used by Workers to register for shutdown signals * diff --git a/packages/workflow/src/metrics.ts b/packages/workflow/src/metrics.ts index b13819263..cf38b74cc 100644 --- a/packages/workflow/src/metrics.ts +++ b/packages/workflow/src/metrics.ts @@ -48,6 +48,9 @@ class WorkflowMetricMeterImpl implements MetricMeter { } class WorkflowMetricCounter implements MetricCounter { + public readonly kind = 'counter'; + public readonly valueType = 'int'; + constructor( public readonly name: string, public readonly unit: string | undefined, @@ -70,6 +73,8 @@ class WorkflowMetricCounter implements MetricCounter { } class WorkflowMetricHistogram implements MetricHistogram { + public readonly kind = 'histogram'; + constructor( public readonly name: string, public readonly valueType: NumericMetricValueType, @@ -93,6 +98,8 @@ class WorkflowMetricHistogram implements MetricHistogram { } class WorkflowMetricGauge implements MetricGauge { + public readonly kind = 'gauge'; + constructor( public readonly name: string, public readonly valueType: NumericMetricValueType, From 476853bb5a2dbacc0baa7a02c7fdc8b6572194c6 Mon Sep 17 00:00:00 2001 From: James Watkins-Harvey Date: Wed, 4 Feb 2026 10:32:36 -0500 Subject: [PATCH 2/8] Fix spelling mistakes reported by AI agent review --- .../core-bridge/src/helpers/try_into_js.rs | 2 +- packages/test/src/test-otel.ts | 121 +++++++++--------- packages/test/src/test-runtime-otel.ts | 4 +- packages/worker/src/runtime-metrics.ts | 16 +-- packages/worker/src/runtime-options.ts | 46 +++---- packages/worker/src/runtime.ts | 14 +- 6 files changed, 101 insertions(+), 102 deletions(-) diff --git a/packages/core-bridge/src/helpers/try_into_js.rs b/packages/core-bridge/src/helpers/try_into_js.rs index 8685cdc09..9818b6a9e 100644 --- a/packages/core-bridge/src/helpers/try_into_js.rs +++ b/packages/core-bridge/src/helpers/try_into_js.rs @@ -183,7 +183,7 @@ where } } -/// To avoid some recuring error patterns when crossing the JS bridge, we normally translate +/// To avoid some recurring error patterns when crossing the JS bridge, we normally translate /// `Option` to `T | null` on the JS side. This however implies extra code on the JS side /// to check for `null` and convert to `undefined` as appropriate. This generally poses no /// problem, as manipulation of objects on the JS side is anyway desirable for other reasons. diff --git a/packages/test/src/test-otel.ts b/packages/test/src/test-otel.ts index 7536b7791..c1efcae81 100644 --- a/packages/test/src/test-otel.ts +++ b/packages/test/src/test-otel.ts @@ -80,7 +80,7 @@ async function withFakeGrpcServer( // The OTel exporter will try to flush metrics on drop, which may result in tons of ERROR // messages on the console if the server has had time to complete shutdown before then. // Delaying closing the server by 1 second is enough to avoid that situation, and doesn't - // need to be awaited, no that doesn't slow down tests. + // need to be awaited, so that doesn't slow down tests. setTimeout(() => { srv.close(); }, 1000).unref(); @@ -386,7 +386,7 @@ if (RUN_INTEGRATION_TESTS) { export(_spans, resultCallback) { resultCallback({ code: ExportResultCode.SUCCESS }); }, - async shutdown() {}, + async shutdown() { }, }; const sinks: InjectedSinks = { @@ -438,74 +438,73 @@ if (RUN_INTEGRATION_TESTS) { // - Using SpanExporter directly: async attributes are not awaited // - Using SpanProcessor: async attributes are awaited before export for (const useSpanProcessor of [false, true]) { - test(`makeWorkflowExporter with ${ - useSpanProcessor ? 'SpanProcessor does' : 'SpanExporter does not' - } await async resource attributes`, async (t) => { - const taskQueue = `test-otel-async-${useSpanProcessor ? 'processor' : 'exporter'}`; - const serviceName = `ts-test-otel-async-attributes`; - - // Create a promise for async attributes that we control. - // If not using a span processor it will never resolve. - let resolveAsyncAttrs: (attrs: opentelemetry.resources.ResourceAttributes) => void; - const asyncAttributesPromise = new Promise((resolve) => { - resolveAsyncAttrs = resolve; - }); + test(`makeWorkflowExporter with ${useSpanProcessor ? 'SpanProcessor does' : 'SpanExporter does not' + } await async resource attributes`, async (t) => { + const taskQueue = `test-otel-async-${useSpanProcessor ? 'processor' : 'exporter'}`; + const serviceName = `ts-test-otel-async-attributes`; + + // Create a promise for async attributes that we control. + // If not using a span processor it will never resolve. + let resolveAsyncAttrs: (attrs: opentelemetry.resources.ResourceAttributes) => void; + const asyncAttributesPromise = new Promise((resolve) => { + resolveAsyncAttrs = resolve; + }); - const resource = new opentelemetry.resources.Resource( - { [SEMRESATTRS_SERVICE_NAME]: serviceName }, - asyncAttributesPromise - ); + const resource = new opentelemetry.resources.Resource( + { [SEMRESATTRS_SERVICE_NAME]: serviceName }, + asyncAttributesPromise + ); - const spans: opentelemetry.tracing.ReadableSpan[] = []; - const traceExporter: opentelemetry.tracing.SpanExporter = { - export(spans_, resultCallback) { - spans.push(...spans_); - resultCallback({ code: ExportResultCode.SUCCESS }); - }, - async shutdown() {}, - }; + const spans: opentelemetry.tracing.ReadableSpan[] = []; + const traceExporter: opentelemetry.tracing.SpanExporter = { + export(spans_, resultCallback) { + spans.push(...spans_); + resultCallback({ code: ExportResultCode.SUCCESS }); + }, + async shutdown() { }, + }; - // Custom SpanProcessor that resolves async resource attributes after the first onEnd is called. - // SpanProcessors are expected to wait on async resource attributes to settle before exporting the span. - class TestSpanProcessor extends SimpleSpanProcessor { - override onEnd(span: opentelemetry.tracing.ReadableSpan): void { - super.onEnd(span); - // Resolve async attributes so waitForAsyncAttributes can complete - resolveAsyncAttrs({ 'async.attr': 'resolved' }); + // Custom SpanProcessor that resolves async resource attributes after the first onEnd is called. + // SpanProcessors are expected to wait on async resource attributes to settle before exporting the span. + class TestSpanProcessor extends SimpleSpanProcessor { + override onEnd(span: opentelemetry.tracing.ReadableSpan): void { + super.onEnd(span); + // Resolve async attributes so waitForAsyncAttributes can complete + resolveAsyncAttrs({ 'async.attr': 'resolved' }); + } } - } - const sinks: InjectedSinks = { - exporter: useSpanProcessor - ? makeWorkflowExporter(new TestSpanProcessor(traceExporter), resource) - : makeWorkflowExporter(traceExporter, resource), - }; + const sinks: InjectedSinks = { + exporter: useSpanProcessor + ? makeWorkflowExporter(new TestSpanProcessor(traceExporter), resource) + : makeWorkflowExporter(traceExporter, resource), + }; - const worker = await Worker.create({ - workflowsPath: require.resolve('./workflows'), - activities, - taskQueue, - interceptors: { - workflowModules: [require.resolve('./workflows/otel-interceptors')], - activity: [ - (ctx) => ({ - inbound: new OpenTelemetryActivityInboundInterceptor(ctx), - outbound: new OpenTelemetryActivityOutboundInterceptor(ctx), - }), - ], - }, - sinks, - }); + const worker = await Worker.create({ + workflowsPath: require.resolve('./workflows'), + activities, + taskQueue, + interceptors: { + workflowModules: [require.resolve('./workflows/otel-interceptors')], + activity: [ + (ctx) => ({ + inbound: new OpenTelemetryActivityInboundInterceptor(ctx), + outbound: new OpenTelemetryActivityOutboundInterceptor(ctx), + }), + ], + }, + sinks, + }); - const client = new WorkflowClient(); - await worker.runUntil(client.execute(workflows.successString, { taskQueue, workflowId: uuid4() })); + const client = new WorkflowClient(); + await worker.runUntil(client.execute(workflows.successString, { taskQueue, workflowId: uuid4() })); - t.deepEqual(spans[0].resource.attributes, { - [SEMRESATTRS_SERVICE_NAME]: serviceName, - // If not using a span processor, then we do not expect the async attr to be present - ...(useSpanProcessor ? { 'async.attr': 'resolved' } : {}), + t.deepEqual(spans[0].resource.attributes, { + [SEMRESATTRS_SERVICE_NAME]: serviceName, + // If not using a span processor, then we do not expect the async attr to be present + ...(useSpanProcessor ? { 'async.attr': 'resolved' } : {}), + }); }); - }); } // Regression test for https://github.com/temporalio/sdk-typescript/issues/1738 diff --git a/packages/test/src/test-runtime-otel.ts b/packages/test/src/test-runtime-otel.ts index 0de128e1b..8ab9469de 100644 --- a/packages/test/src/test-runtime-otel.ts +++ b/packages/test/src/test-runtime-otel.ts @@ -44,7 +44,7 @@ async function withFakeGrpcServer( // The OTel exporter will try to flush metrics on drop, which may result in tons of ERROR // messages on the console if the server has had time to complete shutdown before then. // Delaying closing the server by 1 second is enough to avoid that situation, and doesn't - // need to be awaited, no that doesn't slow down tests. + // need to be awaited, so that doesn't slow down tests. setTimeout(() => { srv.close(); }, 1000).unref(); @@ -77,7 +77,7 @@ async function withHttpServer( // The OTel exporter will try to flush metrics on drop, which may result in tons of ERROR // messages on the console if the server has had time to complete shutdown before then. // Delaying closing the server by 1 second is enough to avoid that situation, and doesn't - // need to be awaited, no that doesn't slow down tests. + // need to be awaited, so that doesn't slow down tests. setTimeout(() => { srv.close(); }, 1000).unref(); diff --git a/packages/worker/src/runtime-metrics.ts b/packages/worker/src/runtime-metrics.ts index dee8ed80d..4d0561981 100644 --- a/packages/worker/src/runtime-metrics.ts +++ b/packages/worker/src/runtime-metrics.ts @@ -20,7 +20,7 @@ import { native } from '@temporalio/core-bridge'; * @internal */ export class RuntimeMetricMeter implements MetricMeter { - public constructor(protected runtime: native.Runtime) {} + public constructor(protected runtime: native.Runtime) { } createCounter(name: string, unit: string = '', description: string = ''): MetricCounter { const nativeMetric = native.newMetricCounter(this.runtime, name, unit, description); @@ -78,7 +78,7 @@ class RuntimeMetricCounter implements MetricCounter { public readonly name: string, public readonly unit: string, public readonly description: string - ) {} + ) { } add(value: number, tags: MetricTags = {}): void { if (value < 0) { @@ -102,7 +102,7 @@ class RuntimeMetricHistogram implements MetricHistogram { public readonly name: string, public readonly unit: string, public readonly description: string - ) {} + ) { } record(value: number, tags: MetricTags = {}): void { if (value < 0) { @@ -126,7 +126,7 @@ class RuntimeMetricHistogramF64 implements MetricHistogram { public readonly name: string, public readonly unit: string, public readonly description: string - ) {} + ) { } record(value: number, tags: MetricTags = {}): void { if (value < 0) { @@ -150,7 +150,7 @@ class RuntimeMetricGauge implements MetricGauge { public readonly name: string, public readonly unit: string, public readonly description: string - ) {} + ) { } set(value: number, tags: MetricTags = {}): void { if (value < 0) { @@ -174,7 +174,7 @@ class RuntimeMetricGaugeF64 implements MetricGauge { public readonly name: string, public readonly unit: string, public readonly description: string - ) {} + ) { } set(value: number, tags: MetricTags = {}): void { if (value < 0) { @@ -200,7 +200,7 @@ class RuntimeMetricGaugeF64 implements MetricGauge { * call {@link Runtime.retrieveBufferedMetrics} to retrieve the buffered metric updates. Each update * event will be represented as a single instance of this interface. * - * @experimental Buffered metrics is an experiemental feature. APIs may be subject to change. + * @experimental Buffered metrics is an experimental feature. APIs may be subject to change. */ export interface BufferedMetricUpdate { /** @@ -231,7 +231,7 @@ export interface BufferedMetricUpdate { * as those changes would affect future update events using the same `MetricTags` object, as well * as further events that extend that `MetricTags` object. * - * Note that the SDK may miss deduplication oppportunities, notably when a same set of attributes + * Note that the SDK may miss deduplication opportunities, notably when a same set of attributes * is recreated by the code emitting the metric updates, e.g. when extending an existing set of */ attributes: MetricTags; diff --git a/packages/worker/src/runtime-options.ts b/packages/worker/src/runtime-options.ts index fc8f802ed..d72d299b8 100644 --- a/packages/worker/src/runtime-options.ts +++ b/packages/worker/src/runtime-options.ts @@ -364,7 +364,7 @@ export interface PrometheusMetricsExporter { /** * Buffered metrics exporter options * - * @experimental Buffered metrics is an experiemental feature. APIs may be subject to change. + * @experimental Buffered metrics is an experimental feature. APIs may be subject to change. */ export interface BufferedMetricsExporter { buffered: { @@ -418,32 +418,32 @@ export function compileOptions(options: RuntimeOptions): CompiledRuntimeOptions metricsExporter: metrics && isPrometheusMetricsExporter(metrics) ? ({ - type: 'prometheus', - socketAddr: metrics.prometheus.bindAddress, - countersTotalSuffix: metrics.prometheus.countersTotalSuffix ?? false, - unitSuffix: metrics.prometheus.unitSuffix ?? false, - useSecondsForDurations: metrics.prometheus.useSecondsForDurations ?? false, - histogramBucketOverrides: metrics.prometheus.histogramBucketOverrides ?? {}, - globalTags: metrics.globalTags ?? {}, - } satisfies native.MetricExporterOptions) + type: 'prometheus', + socketAddr: metrics.prometheus.bindAddress, + countersTotalSuffix: metrics.prometheus.countersTotalSuffix ?? false, + unitSuffix: metrics.prometheus.unitSuffix ?? false, + useSecondsForDurations: metrics.prometheus.useSecondsForDurations ?? false, + histogramBucketOverrides: metrics.prometheus.histogramBucketOverrides ?? {}, + globalTags: metrics.globalTags ?? {}, + } satisfies native.MetricExporterOptions) : metrics && isOtelCollectorExporter(metrics) ? ({ - type: 'otel', - url: metrics.otel.url, - protocol: metrics.otel.http ? 'http' : 'grpc', - headers: metrics.otel.headers ?? {}, - metricPeriodicity: msToNumber(metrics.otel.metricsExportInterval ?? '1s'), - useSecondsForDurations: metrics.otel.useSecondsForDurations ?? false, - metricTemporality: metrics.otel.temporality ?? metrics.temporality ?? 'cumulative', // eslint-disable-line deprecation/deprecation - histogramBucketOverrides: metrics.otel.histogramBucketOverrides ?? {}, - globalTags: metrics.globalTags ?? {}, - } satisfies native.MetricExporterOptions) + type: 'otel', + url: metrics.otel.url, + protocol: metrics.otel.http ? 'http' : 'grpc', + headers: metrics.otel.headers ?? {}, + metricPeriodicity: msToNumber(metrics.otel.metricsExportInterval ?? '1s'), + useSecondsForDurations: metrics.otel.useSecondsForDurations ?? false, + metricTemporality: metrics.otel.temporality ?? metrics.temporality ?? 'cumulative', // eslint-disable-line deprecation/deprecation + histogramBucketOverrides: metrics.otel.histogramBucketOverrides ?? {}, + globalTags: metrics.globalTags ?? {}, + } satisfies native.MetricExporterOptions) : metrics && isBufferedMetricsExporter(metrics) ? ({ - type: 'buffered', - maxBufferSize: metrics.buffered.maxBufferSize ?? 10000, - useSecondsForDurations: metrics.buffered.useSecondsForDurations ?? false, - } satisfies native.MetricExporterOptions) + type: 'buffered', + maxBufferSize: metrics.buffered.maxBufferSize ?? 10000, + useSecondsForDurations: metrics.buffered.useSecondsForDurations ?? false, + } satisfies native.MetricExporterOptions) : null, workerHeartbeatIntervalMillis: heartbeatMillis === 0 ? null : heartbeatMillis, }, diff --git a/packages/worker/src/runtime.ts b/packages/worker/src/runtime.ts index e74eba88a..fabe0ce9b 100644 --- a/packages/worker/src/runtime.ts +++ b/packages/worker/src/runtime.ts @@ -299,7 +299,7 @@ export class Runtime { * or if the runtime has been shut down. * @returns Array of buffered metric updates, each containing the metric metadata, * current value, and attributes - * @experimental Buffered metrics is an experiemental feature. APIs may be subject to change. + * @experimental Buffered metrics is an experimental feature. APIs may be subject to change. */ public retrieveBufferedMetrics(): ArrayIterator { if (this.native === undefined) throw new IllegalStateError('Runtime has been shut down'); @@ -396,12 +396,12 @@ export class Runtime { this.logger.warn( `This program is running inside a containerized environment with a memory constraint ` + - `(eg. '${dockerArgs}' or similar). Node itself does not consider this memory constraint ` + - `in how it manages its heap memory. There is consequently a high probability that ` + - `the process will crash due to running out of memory. To increase reliability, we recommend ` + - `adding '--max-old-space-size=${suggestedOldSpaceSizeInMb}' to your node arguments. ` + - `Refer to https://docs.temporal.io/develop/typescript/core-application#run-a-worker-on-docker ` + - `for more advice on tuning your Workers.`, + `(eg. '${dockerArgs}' or similar). Node itself does not consider this memory constraint ` + + `in how it manages its heap memory. There is consequently a high probability that ` + + `the process will crash due to running out of memory. To increase reliability, we recommend ` + + `adding '--max-old-space-size=${suggestedOldSpaceSizeInMb}' to your node arguments. ` + + `Refer to https://docs.temporal.io/develop/typescript/core-application#run-a-worker-on-docker ` + + `for more advice on tuning your Workers.`, { sdkComponent: SdkComponent.worker } ); } From 25614ce9e52591964445aa634bba0ce9d3c4361b Mon Sep 17 00:00:00 2001 From: James Watkins-Harvey Date: Wed, 4 Feb 2026 13:58:55 -0500 Subject: [PATCH 3/8] lint and format --- packages/core-bridge/src/metrics.rs | 11 ++- packages/test/src/test-ai-sdk.ts | 2 +- packages/test/src/test-otel.ts | 121 +++++++++++++------------ packages/worker/src/runtime-metrics.ts | 10 +- 4 files changed, 74 insertions(+), 70 deletions(-) diff --git a/packages/core-bridge/src/metrics.rs b/packages/core-bridge/src/metrics.rs index 87acbc94e..f6f0b46eb 100644 --- a/packages/core-bridge/src/metrics.rs +++ b/packages/core-bridge/src/metrics.rs @@ -365,7 +365,7 @@ impl MetricsCallBuffer { kind, } => { // Create the metric and put it on the lazy ref - let metric = BufferedMetric::new(params, kind, self.use_seconds_for_durations); + let metric = BufferedMetric::new(params, *kind, self.use_seconds_for_durations); populate_into .set(Arc::new(BufferedMetricRef(MemoizedHandle::new(metric)))) .expect("Unable to set buffered metric on reference"); @@ -405,6 +405,7 @@ impl MetricsCallBuffer { update, } => Some(BufferedMetricUpdate { metric: instrument.get().as_ref().clone(), + #[allow(clippy::match_same_arms, clippy::cast_precision_loss)] value: match update { metrics::MetricUpdateVal::Duration(v) if self.use_seconds_for_durations => { v.as_secs_f64() @@ -447,17 +448,18 @@ struct BufferedMetric { impl BufferedMetric { pub fn new( params: &CoreMetricParameters, - kind: &CoreMetricKind, + kind: CoreMetricKind, use_seconds_for_durations: bool, ) -> Self { - let unit = match *kind { + let unit = match kind { CoreMetricKind::HistogramDuration if params.unit == "duration" => { Some((if use_seconds_for_durations { "s" } else { "ms" }).to_string()) } _ => (!params.unit.is_empty()).then_some(params.unit.to_string()), }; - let (kind, value_type) = match *kind { + #[allow(clippy::match_same_arms)] + let (kind, value_type) = match kind { CoreMetricKind::Counter => (MetricKind::Counter, MetricValueType::Int), CoreMetricKind::Gauge => (MetricKind::Gauge, MetricValueType::Int), CoreMetricKind::GaugeF64 => (MetricKind::Gauge, MetricValueType::Float), @@ -511,6 +513,7 @@ impl TryIntoJs for BufferedMetricAttributes { // Assign new attributes for kv in self.new_attributes { let k = kv.key.as_str(); + #[allow(clippy::cast_precision_loss)] match &kv.value { metrics::MetricValue::String(v) => object.set_property_from(cx, k, v.as_str()), metrics::MetricValue::Int(v) => object.set_property_from(cx, k, *v as f64), diff --git a/packages/test/src/test-ai-sdk.ts b/packages/test/src/test-ai-sdk.ts index 2a18568c4..152fc3e26 100644 --- a/packages/test/src/test-ai-sdk.ts +++ b/packages/test/src/test-ai-sdk.ts @@ -439,7 +439,7 @@ test('Telemetry', async (t) => { }); await otel.start(); const sinks: InjectedSinks = { - exporter: makeWorkflowExporter(traceExporter, staticResource), + exporter: makeWorkflowExporter(traceExporter, staticResource), // eslint-disable-line deprecation/deprecation }; const worker = await Worker.create({ diff --git a/packages/test/src/test-otel.ts b/packages/test/src/test-otel.ts index c1efcae81..568bcf6e3 100644 --- a/packages/test/src/test-otel.ts +++ b/packages/test/src/test-otel.ts @@ -386,7 +386,7 @@ if (RUN_INTEGRATION_TESTS) { export(_spans, resultCallback) { resultCallback({ code: ExportResultCode.SUCCESS }); }, - async shutdown() { }, + async shutdown() {}, }; const sinks: InjectedSinks = { @@ -438,73 +438,74 @@ if (RUN_INTEGRATION_TESTS) { // - Using SpanExporter directly: async attributes are not awaited // - Using SpanProcessor: async attributes are awaited before export for (const useSpanProcessor of [false, true]) { - test(`makeWorkflowExporter with ${useSpanProcessor ? 'SpanProcessor does' : 'SpanExporter does not' - } await async resource attributes`, async (t) => { - const taskQueue = `test-otel-async-${useSpanProcessor ? 'processor' : 'exporter'}`; - const serviceName = `ts-test-otel-async-attributes`; - - // Create a promise for async attributes that we control. - // If not using a span processor it will never resolve. - let resolveAsyncAttrs: (attrs: opentelemetry.resources.ResourceAttributes) => void; - const asyncAttributesPromise = new Promise((resolve) => { - resolveAsyncAttrs = resolve; - }); + test(`makeWorkflowExporter with ${ + useSpanProcessor ? 'SpanProcessor does' : 'SpanExporter does not' + } await async resource attributes`, async (t) => { + const taskQueue = `test-otel-async-${useSpanProcessor ? 'processor' : 'exporter'}`; + const serviceName = `ts-test-otel-async-attributes`; + + // Create a promise for async attributes that we control. + // If not using a span processor it will never resolve. + let resolveAsyncAttrs: (attrs: opentelemetry.resources.ResourceAttributes) => void; + const asyncAttributesPromise = new Promise((resolve) => { + resolveAsyncAttrs = resolve; + }); - const resource = new opentelemetry.resources.Resource( - { [SEMRESATTRS_SERVICE_NAME]: serviceName }, - asyncAttributesPromise - ); + const resource = new opentelemetry.resources.Resource( + { [SEMRESATTRS_SERVICE_NAME]: serviceName }, + asyncAttributesPromise + ); - const spans: opentelemetry.tracing.ReadableSpan[] = []; - const traceExporter: opentelemetry.tracing.SpanExporter = { - export(spans_, resultCallback) { - spans.push(...spans_); - resultCallback({ code: ExportResultCode.SUCCESS }); - }, - async shutdown() { }, - }; + const spans: opentelemetry.tracing.ReadableSpan[] = []; + const traceExporter: opentelemetry.tracing.SpanExporter = { + export(spans_, resultCallback) { + spans.push(...spans_); + resultCallback({ code: ExportResultCode.SUCCESS }); + }, + async shutdown() {}, + }; - // Custom SpanProcessor that resolves async resource attributes after the first onEnd is called. - // SpanProcessors are expected to wait on async resource attributes to settle before exporting the span. - class TestSpanProcessor extends SimpleSpanProcessor { - override onEnd(span: opentelemetry.tracing.ReadableSpan): void { - super.onEnd(span); - // Resolve async attributes so waitForAsyncAttributes can complete - resolveAsyncAttrs({ 'async.attr': 'resolved' }); - } + // Custom SpanProcessor that resolves async resource attributes after the first onEnd is called. + // SpanProcessors are expected to wait on async resource attributes to settle before exporting the span. + class TestSpanProcessor extends SimpleSpanProcessor { + override onEnd(span: opentelemetry.tracing.ReadableSpan): void { + super.onEnd(span); + // Resolve async attributes so waitForAsyncAttributes can complete + resolveAsyncAttrs({ 'async.attr': 'resolved' }); } + } - const sinks: InjectedSinks = { - exporter: useSpanProcessor - ? makeWorkflowExporter(new TestSpanProcessor(traceExporter), resource) - : makeWorkflowExporter(traceExporter, resource), - }; + const sinks: InjectedSinks = { + exporter: useSpanProcessor + ? makeWorkflowExporter(new TestSpanProcessor(traceExporter), resource) + : makeWorkflowExporter(traceExporter, resource), // eslint-disable-line deprecation/deprecation + }; - const worker = await Worker.create({ - workflowsPath: require.resolve('./workflows'), - activities, - taskQueue, - interceptors: { - workflowModules: [require.resolve('./workflows/otel-interceptors')], - activity: [ - (ctx) => ({ - inbound: new OpenTelemetryActivityInboundInterceptor(ctx), - outbound: new OpenTelemetryActivityOutboundInterceptor(ctx), - }), - ], - }, - sinks, - }); + const worker = await Worker.create({ + workflowsPath: require.resolve('./workflows'), + activities, + taskQueue, + interceptors: { + workflowModules: [require.resolve('./workflows/otel-interceptors')], + activity: [ + (ctx) => ({ + inbound: new OpenTelemetryActivityInboundInterceptor(ctx), + outbound: new OpenTelemetryActivityOutboundInterceptor(ctx), + }), + ], + }, + sinks, + }); - const client = new WorkflowClient(); - await worker.runUntil(client.execute(workflows.successString, { taskQueue, workflowId: uuid4() })); + const client = new WorkflowClient(); + await worker.runUntil(client.execute(workflows.successString, { taskQueue, workflowId: uuid4() })); - t.deepEqual(spans[0].resource.attributes, { - [SEMRESATTRS_SERVICE_NAME]: serviceName, - // If not using a span processor, then we do not expect the async attr to be present - ...(useSpanProcessor ? { 'async.attr': 'resolved' } : {}), - }); + t.deepEqual(spans[0].resource.attributes, { + [SEMRESATTRS_SERVICE_NAME]: serviceName, + // If not using a span processor, then we do not expect the async attr to be present + ...(useSpanProcessor ? { 'async.attr': 'resolved' } : {}), }); + }); } // Regression test for https://github.com/temporalio/sdk-typescript/issues/1738 @@ -555,7 +556,7 @@ if (RUN_INTEGRATION_TESTS) { otel.start(); const sinks: InjectedSinks = { - exporter: makeWorkflowExporter(workflowSpanExporter, staticResource), + exporter: makeWorkflowExporter(workflowSpanExporter, staticResource), // eslint-disable-line deprecation/deprecation }; const worker = await Worker.create({ diff --git a/packages/worker/src/runtime-metrics.ts b/packages/worker/src/runtime-metrics.ts index 4d0561981..131f80876 100644 --- a/packages/worker/src/runtime-metrics.ts +++ b/packages/worker/src/runtime-metrics.ts @@ -78,7 +78,7 @@ class RuntimeMetricCounter implements MetricCounter { public readonly name: string, public readonly unit: string, public readonly description: string - ) { } + ) {} add(value: number, tags: MetricTags = {}): void { if (value < 0) { @@ -102,7 +102,7 @@ class RuntimeMetricHistogram implements MetricHistogram { public readonly name: string, public readonly unit: string, public readonly description: string - ) { } + ) {} record(value: number, tags: MetricTags = {}): void { if (value < 0) { @@ -126,7 +126,7 @@ class RuntimeMetricHistogramF64 implements MetricHistogram { public readonly name: string, public readonly unit: string, public readonly description: string - ) { } + ) {} record(value: number, tags: MetricTags = {}): void { if (value < 0) { @@ -150,7 +150,7 @@ class RuntimeMetricGauge implements MetricGauge { public readonly name: string, public readonly unit: string, public readonly description: string - ) { } + ) {} set(value: number, tags: MetricTags = {}): void { if (value < 0) { @@ -174,7 +174,7 @@ class RuntimeMetricGaugeF64 implements MetricGauge { public readonly name: string, public readonly unit: string, public readonly description: string - ) { } + ) {} set(value: number, tags: MetricTags = {}): void { if (value < 0) { From 74104acd7d1c6ff0455de0f70e54f355b38b7511 Mon Sep 17 00:00:00 2001 From: James Watkins-Harvey Date: Wed, 4 Feb 2026 14:00:09 -0500 Subject: [PATCH 4/8] Add an explicit MetricsBuffer user-facing class --- packages/core-bridge/src/runtime.rs | 8 +- packages/core-bridge/ts/native.ts | 2 +- .../test/src/test-runtime-buffered-metrics.ts | 458 +++++++++++------- packages/worker/src/index.ts | 1 + packages/worker/src/runtime-metrics.ts | 134 ++++- packages/worker/src/runtime-options.ts | 69 ++- packages/worker/src/runtime.ts | 48 +- 7 files changed, 460 insertions(+), 260 deletions(-) diff --git a/packages/core-bridge/src/runtime.rs b/packages/core-bridge/src/runtime.rs index 0bbb8cc82..55b4f26a3 100644 --- a/packages/core-bridge/src/runtime.rs +++ b/packages/core-bridge/src/runtime.rs @@ -105,7 +105,7 @@ pub fn runtime_new( (None, Some(exporter), None) } - Some(BridgeMetricsExporter::Buffered { + Some(BridgeMetricsExporter::Buffer { max_buffer_size, use_seconds_for_durations, }) => { @@ -264,7 +264,7 @@ impl RuntimeExt for Arc { pub enum BridgeMetricsExporter { Prometheus(CorePrometheusExporterOptions), Otel(CoreOtelCollectorOptions), - Buffered { + Buffer { max_buffer_size: usize, use_seconds_for_durations: bool, }, @@ -336,7 +336,7 @@ mod config { pub(super) enum MetricsExporterOptions { Prometheus(PrometheusMetricsExporterConfig), Otel(OtelMetricsExporterConfig), - Buffered(BufferedMetricsExporterConfig), + Buffer(BufferedMetricsExporterConfig), } #[derive(Debug, Clone, TryFromJs)] @@ -441,7 +441,7 @@ mod config { Ok(super::BridgeMetricsExporter::Prometheus(prom.try_into()?)) } Self::Otel(otel) => Ok(super::BridgeMetricsExporter::Otel(otel.try_into()?)), - Self::Buffered(buffered) => Ok(super::BridgeMetricsExporter::Buffered { + Self::Buffer(buffered) => Ok(super::BridgeMetricsExporter::Buffer { max_buffer_size: buffered.max_buffer_size, use_seconds_for_durations: buffered.use_seconds_for_durations, }), diff --git a/packages/core-bridge/ts/native.ts b/packages/core-bridge/ts/native.ts index 538e4d153..b9f552be3 100644 --- a/packages/core-bridge/ts/native.ts +++ b/packages/core-bridge/ts/native.ts @@ -100,7 +100,7 @@ export interface OtelMetricsExporterOptions { } export interface BufferedMetricsExporterOptions { - type: 'buffered'; + type: 'buffer'; maxBufferSize: number; useSecondsForDurations: boolean; } diff --git a/packages/test/src/test-runtime-buffered-metrics.ts b/packages/test/src/test-runtime-buffered-metrics.ts index 07dd42233..d5ea0d651 100644 --- a/packages/test/src/test-runtime-buffered-metrics.ts +++ b/packages/test/src/test-runtime-buffered-metrics.ts @@ -1,23 +1,5 @@ -import { Runtime } from '@temporalio/worker'; -import { Context as BaseContext, makeTestFunction } from './helpers-integration'; - -interface Context extends BaseContext { - port: number; -} - -const test = makeTestFunction({ - workflowsPath: __filename, - runtimeOpts: { - telemetryOptions: { - metrics: { - buffered: { - maxBufferSize: 1000, - useSecondsForDurations: false, - }, - }, - }, - }, -}); +import test from 'ava'; +import { MetricsBuffer, Runtime } from '@temporalio/worker'; // Asserts that: // - Buffered metrics can be retrieved from Core @@ -25,180 +7,294 @@ const test = makeTestFunction({ // - A Metric object is reused across update events for the same metric // - Metric update events contain the correct value test.serial('Buffered Metrics - Exporting buffered metrics from Core works properly', async (t) => { - const meter = Runtime.instance().metricMeter; - - // Counter (events 0-2) - const counter = meter.createCounter('my-counter', 'my-counter-unit', 'my-counter-description'); - counter.add(1); - counter.add(1); - counter.add(40); // 1+1+40 => 42 - - // Int Gauge (events 3-4) - const gaugeInt = meter.createGauge('my-int-gauge', 'int', 'my-int-gauge-unit', 'my-int-gauge-description'); - gaugeInt.set(1); - gaugeInt.set(40); - - // Float Gauge (events 5-7) - const gaugeFloat = meter.createGauge('my-float-gauge', 'float', 'my-float-gauge-unit', 'my-float-gauge-description'); - gaugeFloat.set(1.1); - gaugeFloat.set(1.1); - gaugeFloat.set(40.1); - - // Int Histogram (events 8-10) - const histogramInt = meter.createHistogram( - 'my-int-histogram', - 'int', - 'my-int-histogram-unit', - 'my-int-histogram-description' - ); - histogramInt.record(20); - histogramInt.record(200); - histogramInt.record(2000); - - // Float Histogram (events 11-13) - const histogramFloat = meter.createHistogram( - 'my-float-histogram', - 'float', - 'my-float-histogram-unit', - 'my-float-histogram-description' - ); - histogramFloat.record(0.02); - histogramFloat.record(0.07); - histogramFloat.record(0.99); - - const updatesIterator = Runtime.instance().retrieveBufferedMetrics(); - - // Metric events may include other metrics emitted by Core, which would be too - // flaky to test here. Hence we filter out all metrics that do not start with 'my-'. - const updates = Array.from(updatesIterator).filter((update) => update.metric.name.startsWith('my-')); - t.is(updates.length, 14); - - t.deepEqual(updates[0].metric, { - name: 'my-counter', - kind: 'counter', - valueType: 'int', - unit: 'my-counter-unit', - description: 'my-counter-description', - }); - t.is(updates[0].value, 1); - t.is(updates[1].metric, updates[0].metric); - t.is(updates[1].value, 1); - t.is(updates[2].metric, updates[0].metric); - t.is(updates[2].value, 40); - - t.deepEqual(updates[3].metric, { - name: 'my-int-gauge', - kind: 'gauge', - valueType: 'int', - unit: 'my-int-gauge-unit', - description: 'my-int-gauge-description', - }); - t.is(updates[3].value, 1); - t.is(updates[4].metric, updates[3].metric); - t.is(updates[4].value, 40); - - t.deepEqual(updates[5].metric, { - name: 'my-float-gauge', - kind: 'gauge', - // valueType: 'float', - valueType: 'int', // FIXME: pending on https://github.com/temporalio/sdk-core/pull/1108 - unit: 'my-float-gauge-unit', - description: 'my-float-gauge-description', - }); - t.is(updates[5].value, 1.1); - t.is(updates[6].metric, updates[5].metric); - t.is(updates[6].value, 1.1); - t.is(updates[7].metric, updates[5].metric); - t.is(updates[7].value, 40.1); - - t.deepEqual(updates[8].metric, { - name: 'my-int-histogram', - kind: 'histogram', - valueType: 'int', - unit: 'my-int-histogram-unit', - description: 'my-int-histogram-description', + const buffer = new MetricsBuffer({ + maxBufferSize: 1000, + useSecondsForDurations: false, }); - t.is(updates[8].value, 20); - t.is(updates[9].metric, updates[8].metric); - t.is(updates[9].value, 200); - t.is(updates[10].metric, updates[8].metric); - t.is(updates[10].value, 2000); - - t.deepEqual(updates[11].metric, { - name: 'my-float-histogram', - kind: 'histogram', - // valueType: 'float', - valueType: 'int', // FIXME: pending on https://github.com/temporalio/sdk-core/pull/1108 - unit: 'my-float-histogram-unit', - description: 'my-float-histogram-description', + const runtime = Runtime.install({ + telemetryOptions: { + metrics: { + buffer, + }, + }, }); - t.is(updates[11].value, 0.02); - t.is(updates[12].metric, updates[11].metric); - t.is(updates[12].value, 0.07); - t.is(updates[13].metric, updates[11].metric); - t.is(updates[13].value, 0.99); + const meter = runtime.metricMeter; + + try { + // Counter (events 0-2) + const counter = meter.createCounter('my-counter', 'my-counter-unit', 'my-counter-description'); + counter.add(1); + counter.add(1); + counter.add(40); // 1+1+40 => 42 + + // Int Gauge (events 3-4) + const gaugeInt = meter.createGauge('my-int-gauge', 'int', 'my-int-gauge-unit', 'my-int-gauge-description'); + gaugeInt.set(1); + gaugeInt.set(40); + + // Float Gauge (events 5-7) + const gaugeFloat = meter.createGauge( + 'my-float-gauge', + 'float', + 'my-float-gauge-unit', + 'my-float-gauge-description' + ); + gaugeFloat.set(1.1); + gaugeFloat.set(1.1); + gaugeFloat.set(40.1); + + // Int Histogram (events 8-10) + const histogramInt = meter.createHistogram( + 'my-int-histogram', + 'int', + 'my-int-histogram-unit', + 'my-int-histogram-description' + ); + histogramInt.record(20); + histogramInt.record(200); + histogramInt.record(2000); + + // Float Histogram (events 11-13) + const histogramFloat = meter.createHistogram( + 'my-float-histogram', + 'float', + 'my-float-histogram-unit', + 'my-float-histogram-description' + ); + histogramFloat.record(0.02); + histogramFloat.record(0.07); + histogramFloat.record(0.99); + + const updatesIterator = buffer.retrieveUpdates(); + + // Metric events may include other metrics emitted by Core, which would be too + // flaky to test here. Hence we filter out all metrics that do not start with 'my-'. + const updates = Array.from(updatesIterator).filter((update) => update.metric.name.startsWith('my-')); + t.is(updates.length, 14); + + t.deepEqual(updates[0].metric, { + name: 'my-counter', + kind: 'counter', + valueType: 'int', + unit: 'my-counter-unit', + description: 'my-counter-description', + }); + t.is(updates[0].value, 1); + t.is(updates[1].metric, updates[0].metric); + t.is(updates[1].value, 1); + t.is(updates[2].metric, updates[0].metric); + t.is(updates[2].value, 40); + + t.deepEqual(updates[3].metric, { + name: 'my-int-gauge', + kind: 'gauge', + valueType: 'int', + unit: 'my-int-gauge-unit', + description: 'my-int-gauge-description', + }); + t.is(updates[3].value, 1); + t.is(updates[4].metric, updates[3].metric); + t.is(updates[4].value, 40); + + t.deepEqual(updates[5].metric, { + name: 'my-float-gauge', + kind: 'gauge', + // valueType: 'float', + valueType: 'int', // FIXME: pending on https://github.com/temporalio/sdk-core/pull/1108 + unit: 'my-float-gauge-unit', + description: 'my-float-gauge-description', + }); + t.is(updates[5].value, 1.1); + t.is(updates[6].metric, updates[5].metric); + t.is(updates[6].value, 1.1); + t.is(updates[7].metric, updates[5].metric); + t.is(updates[7].value, 40.1); + + t.deepEqual(updates[8].metric, { + name: 'my-int-histogram', + kind: 'histogram', + valueType: 'int', + unit: 'my-int-histogram-unit', + description: 'my-int-histogram-description', + }); + t.is(updates[8].value, 20); + t.is(updates[9].metric, updates[8].metric); + t.is(updates[9].value, 200); + t.is(updates[10].metric, updates[8].metric); + t.is(updates[10].value, 2000); + + t.deepEqual(updates[11].metric, { + name: 'my-float-histogram', + kind: 'histogram', + // valueType: 'float', + valueType: 'int', // FIXME: pending on https://github.com/temporalio/sdk-core/pull/1108 + unit: 'my-float-histogram-unit', + description: 'my-float-histogram-description', + }); + t.is(updates[11].value, 0.02); + t.is(updates[12].metric, updates[11].metric); + t.is(updates[12].value, 0.07); + t.is(updates[13].metric, updates[11].metric); + t.is(updates[13].value, 0.99); + } finally { + await runtime.shutdown(); + } }); test.serial('Buffered Metrics - Metric attributes are properly reported', async (t) => { - const meter = Runtime.instance().metricMeter; + const buffer = new MetricsBuffer({ + maxBufferSize: 1000, + useSecondsForDurations: false, + }); + const runtime = Runtime.install({ + telemetryOptions: { + metrics: { + buffer, + }, + }, + }); + const meter = runtime.metricMeter; - const counter = meter.createCounter('my-counter', 'my-counter-unit', 'my-counter-description'); - const counter2 = counter.withTags({ labelA: 'value-a', labelB: true, labelC: 123, labelD: 123.456 }); + try { + const counter = meter.createCounter('my-counter', 'my-counter-unit', 'my-counter-description'); + const counter2 = counter.withTags({ labelA: 'value-a', labelB: true, labelC: 123, labelD: 123.456 }); - counter2.add(1); - counter2.add(2, { labelA: 'value-a2', labelE: 12.34 }); - counter2.add(3, { labelC: 'value-c2', labelE: 23.45 }); + counter2.add(1); + counter2.add(2, { labelA: 'value-a2', labelE: 12.34 }); + counter2.add(3, { labelC: 'value-c2', labelE: 23.45 }); - const updatesIterator = Runtime.instance().retrieveBufferedMetrics(); + const updatesIterator = buffer.retrieveUpdates(); - // Metric events may include other metrics emitted by Core, which would be too - // flaky to test here. Hence we filter out all metrics that do not start with 'my-'. - const updates = Array.from(updatesIterator).filter((update) => update.metric.name.startsWith('my-')); - t.is(updates.length, 3); + // Metric events may include other metrics emitted by Core, which would be too + // flaky to test here. Hence we filter out all metrics that do not start with 'my-'. + const updates = Array.from(updatesIterator).filter((update) => update.metric.name.startsWith('my-')); + t.is(updates.length, 3); - t.is(updates[0].value, 1); - t.deepEqual(updates[0].attributes, { - labelA: 'value-a', - labelB: true, - labelC: 123, - labelD: 123.456, - }); - t.is(updates[1].value, 2); - t.deepEqual(updates[1].attributes, { - labelA: 'value-a2', - labelB: true, - labelC: 123, - labelD: 123.456, - labelE: 12.34, + t.is(updates[0].value, 1); + t.deepEqual(updates[0].attributes, { + labelA: 'value-a', + labelB: true, + labelC: 123, + labelD: 123.456, + }); + t.is(updates[1].value, 2); + t.deepEqual(updates[1].attributes, { + labelA: 'value-a2', + labelB: true, + labelC: 123, + labelD: 123.456, + labelE: 12.34, + }); + t.is(updates[2].value, 3); + t.deepEqual(updates[2].attributes, { + labelA: 'value-a', + labelB: true, + labelC: 'value-c2', + labelD: 123.456, + labelE: 23.45, + }); + } finally { + await runtime.shutdown(); + } +}); + +test.serial('Buffered Metrics - empty description and unit are reported as undefined', async (t) => { + const buffer = new MetricsBuffer({ + maxBufferSize: 1000, + useSecondsForDurations: false, }); - t.is(updates[2].value, 3); - t.deepEqual(updates[2].attributes, { - labelA: 'value-a', - labelB: true, - labelC: 'value-c2', - labelD: 123.456, - labelE: 23.45, + const runtime = Runtime.install({ + telemetryOptions: { + metrics: { + buffer, + }, + }, }); + const meter = runtime.metricMeter; + + try { + const counter = meter.createCounter('my-counter', undefined, undefined); + counter.add(1); + + // Metric events may include other metrics emitted by Core, which would be too + // flaky to test here. Hence we filter out all metrics that do not start with 'my-'. + const updatesIterator = buffer.retrieveUpdates(); + const updates = Array.from(updatesIterator).filter((update) => update.metric.name.startsWith('my-')); + t.is(updates.length, 1); + + t.deepEqual(updates[0].metric, { + name: 'my-counter', + kind: 'counter', + valueType: 'int', + unit: undefined, + description: undefined, + }); + t.is(updates[0].value, 1); + } finally { + await runtime.shutdown(); + } }); -test.serial('Buffered Metrics - empty description and unit are reported as undefined', async (t) => { - const meter = Runtime.instance().metricMeter; - - const counter = meter.createCounter('my-counter', undefined, undefined); - counter.add(1); - - // Metric events may include other metrics emitted by Core, which would be too - // flaky to test here. Hence we filter out all metrics that do not start with 'my-'. - const updatesIterator = Runtime.instance().retrieveBufferedMetrics(); - const updates = Array.from(updatesIterator).filter((update) => update.metric.name.startsWith('my-')); - t.is(updates.length, 1); - - t.deepEqual(updates[0].metric, { - name: 'my-counter', - kind: 'counter', - valueType: 'int', - unit: undefined, - description: undefined, +test.serial('Buffered Metrics - MetricsBuffer does not lose metric events on runtime shutdown', async (t) => { + const buffer = new MetricsBuffer({ + maxBufferSize: 1000, + useSecondsForDurations: false, }); - t.is(updates[0].value, 1); + + { + const runtime = Runtime.install({ + telemetryOptions: { + metrics: { + buffer, + }, + }, + }); + const meter = runtime.metricMeter; + + try { + const counter = meter.createCounter('my-counter', undefined, undefined); + counter.add(1); + + const updatesIterator = buffer.retrieveUpdates(); + const updates = Array.from(updatesIterator).filter((update) => update.metric.name === 'my-counter'); + t.is(updates.length, 1); + t.is(updates[0].value, 1); + + // That metric event should not be lost; we'll retrieve it after the runtime is shut down. + counter.add(2); + } finally { + await runtime.shutdown(); + } + } + + { + // This will create a new runtime instance, reusing the previous RuntimeOptions. + // That notably means that the same MetricsBuffer instance will be reused. + // This is the behavior we want to test here. + const runtime = Runtime.instance(); + const meter = runtime.metricMeter; + + try { + const counter = meter.createCounter('my-counter', undefined, undefined); + counter.add(3); + + const updatesIterator = buffer.retrieveUpdates(); + const updates = Array.from(updatesIterator).filter((update) => update.metric.name === 'my-counter'); + t.is(updates.length, 2); + t.is(updates[0].value, 2); + t.is(updates[1].value, 3); + + // That metric event should not be lost; we'll retrieve it after the runtime is shut down. + counter.add(4); + } finally { + await runtime.shutdown(); + } + } + + { + const updatesIterator = buffer.retrieveUpdates(); + const updates = Array.from(updatesIterator).filter((update) => update.metric.name === 'my-counter'); + t.is(updates.length, 1); + t.is(updates[0].value, 4); + } }); diff --git a/packages/worker/src/index.ts b/packages/worker/src/index.ts index e8a72da9b..87f9a35d3 100644 --- a/packages/worker/src/index.ts +++ b/packages/worker/src/index.ts @@ -22,6 +22,7 @@ export { export * from './interceptors'; export { DefaultLogger, LogEntry, LogLevel, LogMetadata, LogTimestamp, Logger } from './logger'; export { History, Runtime } from './runtime'; +export { MetricsBuffer, MetricsBufferOptions, BufferedMetricUpdate } from './runtime-metrics'; export { RuntimeOptions, makeTelemetryFilterString, diff --git a/packages/worker/src/runtime-metrics.ts b/packages/worker/src/runtime-metrics.ts index 131f80876..e86fd2899 100644 --- a/packages/worker/src/runtime-metrics.ts +++ b/packages/worker/src/runtime-metrics.ts @@ -1,4 +1,5 @@ import { + IllegalStateError, Metric, MetricCounter, MetricGauge, @@ -8,6 +9,7 @@ import { NumericMetricValueType, } from '@temporalio/common'; import { native } from '@temporalio/core-bridge'; +import type { Runtime } from './runtime'; //////////////////////////////////////////////////////////////////////////////////////////////////// // Metric Meter (aka Custom Metrics) @@ -20,7 +22,7 @@ import { native } from '@temporalio/core-bridge'; * @internal */ export class RuntimeMetricMeter implements MetricMeter { - public constructor(protected runtime: native.Runtime) { } + public constructor(protected runtime: native.Runtime) {} createCounter(name: string, unit: string = '', description: string = ''): MetricCounter { const nativeMetric = native.newMetricCounter(this.runtime, name, unit, description); @@ -193,11 +195,137 @@ class RuntimeMetricGaugeF64 implements MetricGauge { // Buffered Metrics (aka lang-side metrics exporter) //////////////////////////////////////////////////////////////////////////////////////////////////// +/** + * A buffer that can be set on {@link RuntimeOptions.telemetry.metricsExporter} to record + metrics instead of ignoring/exporting them. + + .. warning:: + It is important that the buffer size is set to a high number and that + :py:meth:`retrieve_updates` is called regularly to drain the buffer. If + the buffer is full, metric updates will be dropped and an error will be + logged. + * + * @experimental Buffered metrics is an experimental feature. APIs may be subject to change. + */ +export class MetricsBuffer { + public readonly maxBufferSize: number; + public readonly useSecondsForDurations: boolean; + + private runtime: Runtime | undefined = undefined; + private pendingUpdates: BufferedMetricUpdate[] | undefined = undefined; + + public constructor(options: MetricsBufferOptions = {}) { + this.maxBufferSize = options.maxBufferSize ?? 10000; + this.useSecondsForDurations = options.useSecondsForDurations ?? false; + } + + /** + * Bind the MetricsBuffer to the given runtime. + * + * @internal + * @hidden + */ + bind(runtime: Runtime): MetricsBuffer { + if (this.runtime !== undefined) { + throw new IllegalStateError('MetricsBuffer already bound to a runtime'); + } + this.runtime = runtime; + return this; + } + + /** + * Unbind the MetricsBuffer from the given runtime. + * + * @internal + * @hidden + */ + unbind(runtime: Runtime): void { + if (this.runtime !== undefined) { + if (this.runtime !== runtime) throw new IllegalStateError('MetricsBuffer is bound to a different runtime'); + + try { + // We proactively drain buffered metrics from the native side, and keep them in the + // pendingUpdates buffer until the user code calls retrieveUpdates(). Without this, + // we would lose metric events on runtime shutdown. + this.retrieveUpdatesInternal(); + } finally { + this.runtime = undefined; + } + } + } + + /** + * Retrieve buffered metric updates. + * + * This method drains the metrics buffer and returns all metric events that have accumulated + * since the last call to this method. This method should be called regularly when using + * buffered metrics to prevent buffer overflow. + * + * @returns Array of buffered metric updates, each containing the metric metadata, + * current value, and attributes + * @experimental Buffered metrics is an experimental feature. APIs may be subject to change. + */ + public retrieveUpdates(): ArrayIterator { + this.retrieveUpdatesInternal(); + + const updates = this.pendingUpdates ?? []; + this.pendingUpdates = undefined; + + // We return an iterator instead of an array in case we should, at some point in the future, + // need to apply per item transformation. Copying to an array would obviously be possible, the + // buffered metric array might be large, so avoiding the extra array allocation makes sense. + return updates.values(); + } + + /** + * Fetch buffered metric updates from the native side, storing them in the pendingUpdates buffer. + * + * @internal + * @hidden + */ + private retrieveUpdatesInternal(): void { + if (this.runtime === undefined) return; + try { + const updates = native.runtimeRetrieveBufferedMetrics(this.runtime.native); + if (updates.length > 0) { + if (this.pendingUpdates === undefined) { + this.pendingUpdates = updates; + } else { + this.pendingUpdates.push(...updates); + } + } + } catch (error) { + // Ignore errors on retrieving buffered metrics after the runtime has been shut down. + if (!(error instanceof IllegalStateError)) throw error; + } + } +} + +export interface MetricsBufferOptions { + /** + * Maximum number of metric events to buffer before dropping new events. + * + * The buffer accumulates metric updates from Core and should be drained regularly by calling + * {@link Runtime.retrieveBufferedMetrics}. If the buffer fills up, new metric updates will be + * dropped and an error will be logged. + * + * @default 10000 + */ + maxBufferSize?: number; + + /** + * If set to true, the exporter will use seconds for durations instead of milliseconds. + * + * @default false + */ + useSecondsForDurations?: boolean; +} + /** * A single update event on a metric, recorded by buffered metrics exporter. * - * When the {@link Runtime} is configured to buffer metrics, user code is expected to periodically - * call {@link Runtime.retrieveBufferedMetrics} to retrieve the buffered metric updates. Each update + * When the {@link Runtime} is configured to buffer metrics, user code must regularly call + * {@link MetricsBuffer.retrieveUpdates} to retrieve the buffered metric updates. Each update * event will be represented as a single instance of this interface. * * @experimental Buffered metrics is an experimental feature. APIs may be subject to change. diff --git a/packages/worker/src/runtime-options.ts b/packages/worker/src/runtime-options.ts index d72d299b8..b178a4450 100644 --- a/packages/worker/src/runtime-options.ts +++ b/packages/worker/src/runtime-options.ts @@ -3,6 +3,7 @@ import { Logger, LogLevel } from '@temporalio/common'; import { Duration, msToNumber } from '@temporalio/common/lib/time'; import { DefaultLogger } from './logger'; import { NativeLogCollector } from './runtime-logger'; +import { MetricsBuffer } from './runtime-metrics'; /** * Options used to create a Temporal Runtime. @@ -367,25 +368,7 @@ export interface PrometheusMetricsExporter { * @experimental Buffered metrics is an experimental feature. APIs may be subject to change. */ export interface BufferedMetricsExporter { - buffered: { - /** - * Maximum number of metric events to buffer before dropping new events. - * - * The buffer accumulates metric updates from Core and should be drained regularly by calling - * {@link Runtime.retrieveBufferedMetrics}. If the buffer fills up, new metric updates will be - * dropped and an error will be logged. - * - * @default 10000 - */ - maxBufferSize?: number; - - /** - * If set to true, the exporter will use seconds for durations instead of milliseconds. - * - * @default false - */ - useSecondsForDurations?: boolean; - }; + buffer: MetricsBuffer; } // Compile Options //////////////////////////////////////////////////////////////////////////////// @@ -398,6 +381,7 @@ export interface CompiledRuntimeOptions { shutdownSignals: NodeJS.Signals[]; runtimeOptions: native.RuntimeOptions; logger: Logger; + metricsBuffer: MetricsBuffer | undefined; } export function compileOptions(options: RuntimeOptions): CompiledRuntimeOptions { @@ -418,35 +402,36 @@ export function compileOptions(options: RuntimeOptions): CompiledRuntimeOptions metricsExporter: metrics && isPrometheusMetricsExporter(metrics) ? ({ - type: 'prometheus', - socketAddr: metrics.prometheus.bindAddress, - countersTotalSuffix: metrics.prometheus.countersTotalSuffix ?? false, - unitSuffix: metrics.prometheus.unitSuffix ?? false, - useSecondsForDurations: metrics.prometheus.useSecondsForDurations ?? false, - histogramBucketOverrides: metrics.prometheus.histogramBucketOverrides ?? {}, - globalTags: metrics.globalTags ?? {}, - } satisfies native.MetricExporterOptions) - : metrics && isOtelCollectorExporter(metrics) - ? ({ - type: 'otel', - url: metrics.otel.url, - protocol: metrics.otel.http ? 'http' : 'grpc', - headers: metrics.otel.headers ?? {}, - metricPeriodicity: msToNumber(metrics.otel.metricsExportInterval ?? '1s'), - useSecondsForDurations: metrics.otel.useSecondsForDurations ?? false, - metricTemporality: metrics.otel.temporality ?? metrics.temporality ?? 'cumulative', // eslint-disable-line deprecation/deprecation - histogramBucketOverrides: metrics.otel.histogramBucketOverrides ?? {}, + type: 'prometheus', + socketAddr: metrics.prometheus.bindAddress, + countersTotalSuffix: metrics.prometheus.countersTotalSuffix ?? false, + unitSuffix: metrics.prometheus.unitSuffix ?? false, + useSecondsForDurations: metrics.prometheus.useSecondsForDurations ?? false, + histogramBucketOverrides: metrics.prometheus.histogramBucketOverrides ?? {}, globalTags: metrics.globalTags ?? {}, } satisfies native.MetricExporterOptions) + : metrics && isOtelCollectorExporter(metrics) + ? ({ + type: 'otel', + url: metrics.otel.url, + protocol: metrics.otel.http ? 'http' : 'grpc', + headers: metrics.otel.headers ?? {}, + metricPeriodicity: msToNumber(metrics.otel.metricsExportInterval ?? '1s'), + useSecondsForDurations: metrics.otel.useSecondsForDurations ?? false, + metricTemporality: metrics.otel.temporality ?? metrics.temporality ?? 'cumulative', // eslint-disable-line deprecation/deprecation + histogramBucketOverrides: metrics.otel.histogramBucketOverrides ?? {}, + globalTags: metrics.globalTags ?? {}, + } satisfies native.MetricExporterOptions) : metrics && isBufferedMetricsExporter(metrics) ? ({ - type: 'buffered', - maxBufferSize: metrics.buffered.maxBufferSize ?? 10000, - useSecondsForDurations: metrics.buffered.useSecondsForDurations ?? false, - } satisfies native.MetricExporterOptions) + type: 'buffer', + maxBufferSize: metrics.buffer.maxBufferSize ?? 10000, + useSecondsForDurations: metrics.buffer.useSecondsForDurations ?? false, + } satisfies native.MetricExporterOptions) : null, workerHeartbeatIntervalMillis: heartbeatMillis === 0 ? null : heartbeatMillis, }, + metricsBuffer: metrics && isBufferedMetricsExporter(metrics) ? metrics.buffer : undefined, }; } @@ -526,7 +511,7 @@ function isPrometheusMetricsExporter(metrics: MetricsExporterConfig): metrics is } function isBufferedMetricsExporter(metrics: MetricsExporterConfig): metrics is BufferedMetricsExporter { - return 'buffered' in metrics && typeof metrics.buffered === 'object'; + return 'buffer' in metrics && typeof metrics.buffer === 'object'; } function isForwardingLogger(options: LogExporterConfig): boolean { diff --git a/packages/worker/src/runtime.ts b/packages/worker/src/runtime.ts index fabe0ce9b..d44a8ed82 100644 --- a/packages/worker/src/runtime.ts +++ b/packages/worker/src/runtime.ts @@ -8,7 +8,7 @@ import { temporal } from '@temporalio/proto'; import { History } from '@temporalio/common/lib/proto-utils'; import { MetricMeterWithComposedTags } from '@temporalio/common/lib/metrics'; import { isFlushableLogger } from './logger'; -import { RuntimeMetricMeter, BufferedMetricUpdate } from './runtime-metrics'; +import { RuntimeMetricMeter, MetricsBuffer } from './runtime-metrics'; import { toNativeClientOptions, NativeConnectionOptions } from './connection-options'; import { byteArrayToBuffer, toMB } from './utils'; import { CompiledRuntimeOptions, compileOptions, RuntimeOptions } from './runtime-options'; @@ -28,6 +28,13 @@ export class Runtime { /** The metric meter associated with this runtime. */ public readonly metricMeter: MetricMeter; + /** + * The metrics buffer associated with this runtime, if buffered metrics are enabled. + * + * @experimental Buffered metrics is an experimental feature. APIs may be subject to change. + */ + public readonly metricsBuffer: MetricsBuffer | undefined; + /** Track the number of pending creation calls into the tokio runtime to prevent shut down */ protected pendingCreations = 0; /** Track the registered native objects to automatically shutdown when all have been deregistered */ @@ -50,6 +57,7 @@ export class Runtime { public readonly options: CompiledRuntimeOptions ) { this.logger = options.logger; + this.metricsBuffer = options.metricsBuffer?.bind(this); this.metricMeter = options.runtimeOptions.metricsExporter ? MetricMeterWithComposedTags.compose(new RuntimeMetricMeter(this.native), {}, true) : noopMetricMeter; @@ -277,6 +285,10 @@ export class Runtime { try { if (Runtime._instance === this) delete Runtime._instance; (this as any).metricMeter = noopMetricMeter; + if (this.metricsBuffer !== undefined) { + this.metricsBuffer.unbind(this); + (this as any).metricsBuffer = undefined; + } this.teardownShutdownHook(); // FIXME(JWH): I think we no longer need this, but will have to thoroughly validate. native.runtimeShutdown(this.native); @@ -288,28 +300,6 @@ export class Runtime { } } - /** - * Retrieve buffered metric updates from the runtime. - * - * This method drains the metrics buffer and returns all metric events that have accumulated - * since the last call to this method. This method should be called regularly when using - * buffered metrics to prevent buffer overflow. - * - * @throws {IllegalStateError} If buffered metrics have not been enabled for this runtime, - * or if the runtime has been shut down. - * @returns Array of buffered metric updates, each containing the metric metadata, - * current value, and attributes - * @experimental Buffered metrics is an experimental feature. APIs may be subject to change. - */ - public retrieveBufferedMetrics(): ArrayIterator { - if (this.native === undefined) throw new IllegalStateError('Runtime has been shut down'); - - // We return an iterator instead of an array in case we should, at some point in the future, - // need to apply per item transformation. Copying to an array would obviously be possible, the - // buffered metric array might be large, so avoiding the extra array allocation makes sense. - return native.runtimeRetrieveBufferedMetrics(this.native).values(); - } - /** * Used by Workers to register for shutdown signals * @@ -396,12 +386,12 @@ export class Runtime { this.logger.warn( `This program is running inside a containerized environment with a memory constraint ` + - `(eg. '${dockerArgs}' or similar). Node itself does not consider this memory constraint ` + - `in how it manages its heap memory. There is consequently a high probability that ` + - `the process will crash due to running out of memory. To increase reliability, we recommend ` + - `adding '--max-old-space-size=${suggestedOldSpaceSizeInMb}' to your node arguments. ` + - `Refer to https://docs.temporal.io/develop/typescript/core-application#run-a-worker-on-docker ` + - `for more advice on tuning your Workers.`, + `(eg. '${dockerArgs}' or similar). Node itself does not consider this memory constraint ` + + `in how it manages its heap memory. There is consequently a high probability that ` + + `the process will crash due to running out of memory. To increase reliability, we recommend ` + + `adding '--max-old-space-size=${suggestedOldSpaceSizeInMb}' to your node arguments. ` + + `Refer to https://docs.temporal.io/develop/typescript/core-application#run-a-worker-on-docker ` + + `for more advice on tuning your Workers.`, { sdkComponent: SdkComponent.worker } ); } From 38b343a36b718fa9aed5c5923b03c5872b7d236f Mon Sep 17 00:00:00 2001 From: James Watkins-Harvey Date: Wed, 4 Feb 2026 14:19:29 -0500 Subject: [PATCH 5/8] Update core to latest; fix incorrect metric value type on buffered metrics in tests --- packages/core-bridge/sdk-core | 2 +- packages/test/src/test-runtime-buffered-metrics.ts | 6 ++---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/packages/core-bridge/sdk-core b/packages/core-bridge/sdk-core index 9dedad111..231e21cad 160000 --- a/packages/core-bridge/sdk-core +++ b/packages/core-bridge/sdk-core @@ -1 +1 @@ -Subproject commit 9dedad1115ab64a0ffb8decf3ad3f338979dea9f +Subproject commit 231e21cadb8004197a5b1dba2ecb1313596cc34c diff --git a/packages/test/src/test-runtime-buffered-metrics.ts b/packages/test/src/test-runtime-buffered-metrics.ts index d5ea0d651..f5af8a693 100644 --- a/packages/test/src/test-runtime-buffered-metrics.ts +++ b/packages/test/src/test-runtime-buffered-metrics.ts @@ -99,8 +99,7 @@ test.serial('Buffered Metrics - Exporting buffered metrics from Core works prope t.deepEqual(updates[5].metric, { name: 'my-float-gauge', kind: 'gauge', - // valueType: 'float', - valueType: 'int', // FIXME: pending on https://github.com/temporalio/sdk-core/pull/1108 + valueType: 'float', unit: 'my-float-gauge-unit', description: 'my-float-gauge-description', }); @@ -126,8 +125,7 @@ test.serial('Buffered Metrics - Exporting buffered metrics from Core works prope t.deepEqual(updates[11].metric, { name: 'my-float-histogram', kind: 'histogram', - // valueType: 'float', - valueType: 'int', // FIXME: pending on https://github.com/temporalio/sdk-core/pull/1108 + valueType: 'float', unit: 'my-float-histogram-unit', description: 'my-float-histogram-description', }); From 223e928fed6c50094e2e225c876e5b27b1c971f4 Mon Sep 17 00:00:00 2001 From: James Watkins-Harvey Date: Wed, 4 Feb 2026 14:24:45 -0500 Subject: [PATCH 6/8] Address review comments --- packages/worker/src/runtime-metrics.ts | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/packages/worker/src/runtime-metrics.ts b/packages/worker/src/runtime-metrics.ts index e86fd2899..ef39b4ea8 100644 --- a/packages/worker/src/runtime-metrics.ts +++ b/packages/worker/src/runtime-metrics.ts @@ -22,7 +22,7 @@ import type { Runtime } from './runtime'; * @internal */ export class RuntimeMetricMeter implements MetricMeter { - public constructor(protected runtime: native.Runtime) {} + public constructor(protected runtime: native.Runtime) { } createCounter(name: string, unit: string = '', description: string = ''): MetricCounter { const nativeMetric = native.newMetricCounter(this.runtime, name, unit, description); @@ -80,7 +80,7 @@ class RuntimeMetricCounter implements MetricCounter { public readonly name: string, public readonly unit: string, public readonly description: string - ) {} + ) { } add(value: number, tags: MetricTags = {}): void { if (value < 0) { @@ -104,7 +104,7 @@ class RuntimeMetricHistogram implements MetricHistogram { public readonly name: string, public readonly unit: string, public readonly description: string - ) {} + ) { } record(value: number, tags: MetricTags = {}): void { if (value < 0) { @@ -128,7 +128,7 @@ class RuntimeMetricHistogramF64 implements MetricHistogram { public readonly name: string, public readonly unit: string, public readonly description: string - ) {} + ) { } record(value: number, tags: MetricTags = {}): void { if (value < 0) { @@ -152,7 +152,7 @@ class RuntimeMetricGauge implements MetricGauge { public readonly name: string, public readonly unit: string, public readonly description: string - ) {} + ) { } set(value: number, tags: MetricTags = {}): void { if (value < 0) { @@ -176,7 +176,7 @@ class RuntimeMetricGaugeF64 implements MetricGauge { public readonly name: string, public readonly unit: string, public readonly description: string - ) {} + ) { } set(value: number, tags: MetricTags = {}): void { if (value < 0) { @@ -197,13 +197,11 @@ class RuntimeMetricGaugeF64 implements MetricGauge { /** * A buffer that can be set on {@link RuntimeOptions.telemetry.metricsExporter} to record - metrics instead of ignoring/exporting them. - - .. warning:: - It is important that the buffer size is set to a high number and that - :py:meth:`retrieve_updates` is called regularly to drain the buffer. If - the buffer is full, metric updates will be dropped and an error will be - logged. + * metrics instead of ignoring/exporting them. + * + * It is important that the buffer size is set to a high number and that `retrieveUpdates` is + * called regularly to drain the buffer. If the buffer is full, metric updates will be dropped + * and an error will be logged. * * @experimental Buffered metrics is an experimental feature. APIs may be subject to change. */ @@ -360,7 +358,7 @@ export interface BufferedMetricUpdate { * as further events that extend that `MetricTags` object. * * Note that the SDK may miss deduplication opportunities, notably when a same set of attributes - * is recreated by the code emitting the metric updates, e.g. when extending an existing set of + * is recreated by the code emitting the metric updates. */ attributes: MetricTags; } From 9db55882e31c3f3b0e43b24c4ba0ba6124f2e5a2 Mon Sep 17 00:00:00 2001 From: James Watkins-Harvey Date: Wed, 4 Feb 2026 14:53:51 -0500 Subject: [PATCH 7/8] format --- packages/worker/src/runtime-metrics.ts | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/packages/worker/src/runtime-metrics.ts b/packages/worker/src/runtime-metrics.ts index ef39b4ea8..cfb5aa070 100644 --- a/packages/worker/src/runtime-metrics.ts +++ b/packages/worker/src/runtime-metrics.ts @@ -22,7 +22,7 @@ import type { Runtime } from './runtime'; * @internal */ export class RuntimeMetricMeter implements MetricMeter { - public constructor(protected runtime: native.Runtime) { } + public constructor(protected runtime: native.Runtime) {} createCounter(name: string, unit: string = '', description: string = ''): MetricCounter { const nativeMetric = native.newMetricCounter(this.runtime, name, unit, description); @@ -80,7 +80,7 @@ class RuntimeMetricCounter implements MetricCounter { public readonly name: string, public readonly unit: string, public readonly description: string - ) { } + ) {} add(value: number, tags: MetricTags = {}): void { if (value < 0) { @@ -104,7 +104,7 @@ class RuntimeMetricHistogram implements MetricHistogram { public readonly name: string, public readonly unit: string, public readonly description: string - ) { } + ) {} record(value: number, tags: MetricTags = {}): void { if (value < 0) { @@ -128,7 +128,7 @@ class RuntimeMetricHistogramF64 implements MetricHistogram { public readonly name: string, public readonly unit: string, public readonly description: string - ) { } + ) {} record(value: number, tags: MetricTags = {}): void { if (value < 0) { @@ -152,7 +152,7 @@ class RuntimeMetricGauge implements MetricGauge { public readonly name: string, public readonly unit: string, public readonly description: string - ) { } + ) {} set(value: number, tags: MetricTags = {}): void { if (value < 0) { @@ -176,7 +176,7 @@ class RuntimeMetricGaugeF64 implements MetricGauge { public readonly name: string, public readonly unit: string, public readonly description: string - ) { } + ) {} set(value: number, tags: MetricTags = {}): void { if (value < 0) { @@ -198,7 +198,7 @@ class RuntimeMetricGaugeF64 implements MetricGauge { /** * A buffer that can be set on {@link RuntimeOptions.telemetry.metricsExporter} to record * metrics instead of ignoring/exporting them. - * + * * It is important that the buffer size is set to a high number and that `retrieveUpdates` is * called regularly to drain the buffer. If the buffer is full, metric updates will be dropped * and an error will be logged. From 09b22c04e6114219e6ce5b3690f0b5ed3d812417 Mon Sep 17 00:00:00 2001 From: James Watkins-Harvey Date: Tue, 10 Feb 2026 00:20:19 -0500 Subject: [PATCH 8/8] typedoc --- packages/common/src/metrics.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/common/src/metrics.ts b/packages/common/src/metrics.ts index 506d8b89b..92a7788dd 100644 --- a/packages/common/src/metrics.ts +++ b/packages/common/src/metrics.ts @@ -92,7 +92,7 @@ export type MetricTags = Record; * Type of numerical values recorded by a metric. * * Note that this represents the _configuration_ of the metric; however, since JavaScript doesn't - * actually have different representation for integers and floats, the actual value type is always + * have different runtime representation for integers and floats, the actual value type is always * a JS 'number'. * * @experimental The Metric API is an experimental feature and may be subject to change.