Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/aggregator/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1082,6 +1082,7 @@ func (m tickMetricsForMetricCategory) Report(tickResult tickResultForMetricCateg
}

type aggregatorTickMetrics struct {
scope tally.Scope
flushTimesErrors tally.Counter
duration tally.Timer
standard tickMetricsForMetricCategory
Expand All @@ -1094,6 +1095,7 @@ func newAggregatorTickMetrics(scope tally.Scope) aggregatorTickMetrics {
forwardedScope := scope.Tagged(map[string]string{"metric-type": "forwarded"})
timedScope := scope.Tagged(map[string]string{"metric-type": "timed"})
return aggregatorTickMetrics{
scope: scope,
flushTimesErrors: scope.Counter("flush-times-errors"),
duration: scope.Timer("duration"),
standard: newTickMetricsForMetricCategory(standardScope),
Expand All @@ -1107,6 +1109,9 @@ func (m aggregatorTickMetrics) Report(tickResult tickResult, duration time.Durat
m.standard.Report(tickResult.standard)
m.forwarded.Report(tickResult.forwarded)
m.timed.Report(tickResult.timed)
for source, cardinality := range tickResult.cardinalityBySource {
m.scope.Tagged(map[string]string{"source": source}).Counter("aggregator-cardinality").Inc(int64(cardinality))
}
}

type aggregatorShardsMetrics struct {
Expand Down
39 changes: 39 additions & 0 deletions src/aggregator/aggregator/extract_source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package aggregator

import "strings"

const (
_tagPairSplitter = ","
_tagPairSplitterByte = ','
_tagNameSplitter = "="
)

func ExtractSourceTag(id string, sourceTag string) (string, bool) {
var (
sourceTagAssign = _tagPairSplitter + sourceTag + _tagNameSplitter
minimumSourceTagLen = len(sourceTagAssign) + 1
)
idlen := len(id)
if idlen < minimumSourceTagLen {
return "", false
}

idx := strings.Index(id, sourceTagAssign)
if idx < 0 {
return "", false
}

// The front of the ID is now the tag value, along with the remainder of the
// ID's tags. Find the next tag splitter, if any.
id = id[idx+len(sourceTagAssign):]
idx = strings.IndexByte(id, _tagPairSplitterByte)

switch idx {
case -1: // delimiter not found; remainder of ID is the service name
return id, len(id) > 0
case 0: // id[0] = ',' - i.e., no tag value
return "", false
default: // id[:idx] == tag value
return id[:idx], true
}
}
65 changes: 65 additions & 0 deletions src/aggregator/aggregator/extract_source_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package aggregator

import (
"testing"
)

func TestExtractSourceTag(t *testing.T) {
sourceTag := "service"
tests := []struct {
name string
id string
want string
wantOk bool
}{
{
name: "valid source tag with more tags",
id: "metricname,service=myservice,env=test",
want: "myservice",
wantOk: true,
},
{
name: "valid source tag at end",
id: "metricname,service=myservice",
want: "myservice",
wantOk: true,
},
{
name: "service tag not found",
id: "metricname,env=test",
want: "",
wantOk: false,
},
{
name: "empty service tag value",
id: "metricname,service=",
want: "",
wantOk: false,
},
{
name: "empty ID",
id: "",
want: "",
wantOk: false,
},
{
name: "ID shorter than minimum length",
id: "s",
want: "",
wantOk: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, ok := ExtractSourceTag(tt.id, sourceTag)
if ok != tt.wantOk {
t.Errorf("ExtractSourceTag() ok = %v, want %v", ok, tt.wantOk)
return
}
if got != tt.want {
t.Errorf("ExtractSourceTag() = %v, want %v", got, tt.want)
}
})
}
}
20 changes: 20 additions & 0 deletions src/aggregator/aggregator/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func (c metricCategory) String() string {

type entryKey struct {
idHash hash.Hash128
id string
metricType metricType
metricCategory metricCategory
}
Expand Down Expand Up @@ -174,6 +175,7 @@ func (m *metricMap) AddUntimed(
metricCategory: untimedMetric,
metricType: metricType(metric.Type),
idHash: hash.Murmur3Hash128(metric.ID),
id: string(metric.ID),
}
entry, err := m.findOrCreate(key)
if err != nil {
Expand All @@ -192,6 +194,7 @@ func (m *metricMap) AddTimed(
metricCategory: timedMetric,
metricType: metricType(metric.Type),
idHash: hash.Murmur3Hash128(metric.ID),
id: string(metric.ID),
}
entry, err := m.findOrCreate(key)
if err != nil {
Expand All @@ -210,6 +213,7 @@ func (m *metricMap) AddTimedWithStagedMetadatas(
metricCategory: timedMetric,
metricType: metricType(metric.Type),
idHash: hash.Murmur3Hash128(metric.ID),
id: string(metric.ID),
}
entry, err := m.findOrCreate(key)
if err != nil {
Expand All @@ -228,6 +232,7 @@ func (m *metricMap) AddForwarded(
metricCategory: forwardedMetric,
metricType: metricType(metric.Type),
idHash: hash.Murmur3Hash128(metric.ID),
id: string(metric.ID),
}
entry, err := m.findOrCreate(key)
if err != nil {
Expand All @@ -244,9 +249,24 @@ func (m *metricMap) Tick(target time.Duration) tickResult {
mapTickRes.standard.activeElems = listsTickRes.standard
mapTickRes.forwarded.activeElems = listsTickRes.forwarded
mapTickRes.timed.activeElems = listsTickRes.timed
if m.opts.CardinalityMetricsEnabled() {
mapTickRes.cardinalityBySource = m.tickCardinalityMetrics()
}
return mapTickRes
}

func (m *metricMap) tickCardinalityMetrics() map[string]int {
sourceTag := m.opts.SourceTag()
cardinalityBySource := make(map[string]int)
for key := range m.entries {
source, ok := ExtractSourceTag(key.id, sourceTag)
if ok {
cardinalityBySource[source]++
}
}
return cardinalityBySource
}

func (m *metricMap) SetRuntimeOptions(opts runtime.Options) {
m.Lock()
m.runtimeOpts = opts
Expand Down
41 changes: 41 additions & 0 deletions src/aggregator/aggregator/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ var (
// are issues with the instances taking over the shards and as such we need to switch
// the traffic back to the previous owner of the shards immediately.
defaultBufferDurationAfterShardCutoff = time.Hour

// By default the cardinality metrics are disabled.
defaultCardinalityMetricsEnabled = false
// By default the source tag is "service".
defaultSourceTag = "service"
)

// MaxAllowedForwardingDelayFn returns the maximum allowed forwarding delay given
Expand Down Expand Up @@ -347,6 +352,18 @@ type Options interface {
// SetWritesIgnoreCutoffCutover sets a flag controlling whether cutoff/cutover timestamps
// are ignored for incoming writes.
SetWritesIgnoreCutoffCutover(value bool) Options

// CardinalityMetricsEnabled enables the emission of cardinality metrics.
CardinalityMetricsEnabled() bool

// SetCardinalityMetricsEnabled sets the cardinality metrics enabled.
SetCardinalityMetricsEnabled(value bool) Options

// SourceTag returns the source tag.
SourceTag() string

// SetSourceTag sets the source tag.
SetSourceTag(value string) Options
}

type options struct {
Expand Down Expand Up @@ -392,6 +409,8 @@ type options struct {
timedMetricsFlushOffsetEnabled bool
featureFlagBundlesParsed []FeatureFlagBundleParsed
writesIgnoreCutoffCutover bool
cardinalityMetricsEnabled bool
sourceTag string

// Derived options.
fullCounterPrefix []byte
Expand Down Expand Up @@ -434,6 +453,8 @@ func NewOptions(clockOpts clock.Options) Options {
maxNumCachedSourceSets: defaultMaxNumCachedSourceSets,
discardNaNAggregatedValues: defaultDiscardNaNAggregatedValues,
verboseErrors: defaultVerboseErrors,
cardinalityMetricsEnabled: defaultCardinalityMetricsEnabled,
sourceTag: defaultSourceTag,
}

// Initialize pools.
Expand Down Expand Up @@ -931,6 +952,26 @@ func (o *options) SetWritesIgnoreCutoffCutover(value bool) Options {
return &opts
}

func (o *options) CardinalityMetricsEnabled() bool {
return o.cardinalityMetricsEnabled
}

func (o *options) SetCardinalityMetricsEnabled(value bool) Options {
opts := *o
opts.cardinalityMetricsEnabled = value
return &opts
}

func (o *options) SourceTag() string {
return o.sourceTag
}

func (o *options) SetSourceTag(value string) Options {
opts := *o
opts.sourceTag = value
return &opts
}

func defaultMaxAllowedForwardingDelayFn(
resolution time.Duration,
numForwardedTimes int,
Expand Down
20 changes: 13 additions & 7 deletions src/aggregator/aggregator/tick_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,22 @@ func (r *tickResultForMetricCategory) merge(
}

type tickResult struct {
standard tickResultForMetricCategory
forwarded tickResultForMetricCategory
timed tickResultForMetricCategory
standard tickResultForMetricCategory
forwarded tickResultForMetricCategory
timed tickResultForMetricCategory
cardinalityBySource map[string]int
}

// merge merges two results. Both input results may become invalid after merge is called.
func (r *tickResult) merge(other tickResult) tickResult {
return tickResult{
standard: r.standard.merge(other.standard),
forwarded: r.forwarded.merge(other.forwarded),
timed: r.timed.merge(other.timed),
res := tickResult{
standard: r.standard.merge(other.standard),
forwarded: r.forwarded.merge(other.forwarded),
timed: r.timed.merge(other.timed),
cardinalityBySource: r.cardinalityBySource,
}
for k, v := range other.cardinalityBySource {
res.cardinalityBySource[k] += v
}
return res
}