Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions internal/xds/clients/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,23 @@ type MetricsReporter interface {
// Each client will produce different metrics. Please see the client's
// documentation for a list of possible metrics events.
ReportMetric(metric any)

// RegisterAsyncReporter registers a reporter to produce metric values for
// only the listed descriptors. The returned function must be called when
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where are these descriptors listed? Can that be specified in this docstring?

// the metrics are no longer needed, which will remove the reporter.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which will remove the reporter

Does this guarantee that once the returned cancel func is called, the Report method on the registered AsyncReporter is never called? If so, does it make sense to mention that in the docstring.

FYI: Even though this is an internal package, this xDS client is used by non-grpc folks, and we might fork this off to a separate repo or sub-module at some point in time. So, the requirements for docstrings here should be the same as that of any public APIs in our repo.

RegisterAsyncReporter(reporter AsyncReporter) func()
}

// AsyncReporter is an interface for types that record metrics asynchronously.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Can this line be replaced with:
// AsyncReporter allows implementations to record metrics asynchronously

The fact that it is an interface is clear by the definition of the type. So, it need not be specified in the docstring.

// Implementations must be concurrent-safe.
type AsyncReporter interface {
// Report records metric values using the provided recorder.
Report(AsyncMetricsRecorder) error
}

// AsyncMetricsRecorder is a recorder for async metrics.
type AsyncMetricsRecorder interface {
// ReportMetric reports a metric. The metric will be one of the predefined
// set of types in the metrics.go file.
ReportMetric(metric any)
Comment on lines +128 to +130
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if this method is called and passed a metric that is not supposed to be reported asynchronously?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of metric being of type any, can this instead be a marker interface? That way, we can ensure that this method is invoked only for types that are to be reported asynchronously. Doing that would also mean that the types newly added for the two metrics in metrics.go would have to implement that interface.

}
6 changes: 6 additions & 0 deletions internal/xds/clients/xdsclient/ads_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,3 +715,9 @@ func (fc *adsFlowControl) wait() bool {

return fc.stopped
}

func (fc *adsFlowControl) isStopped() bool {
fc.mu.Lock()
defer fc.mu.Unlock()
return fc.stopped
}
15 changes: 15 additions & 0 deletions internal/xds/clients/xdsclient/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,18 @@ type ResourceUpdateInvalid struct {
type ServerFailure struct {
ServerURI string
}

// XDSClientConnected reports the connectivity state of the xDS stream.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: s/xDS stream/ADS stream/

FYI: The xDS client establishes two streams, ADS and LRS.

// Value is 1 if connected, 0 otherwise.
type XDSClientConnected struct {
ServerURI string
Value int64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this not be of type bool?

}

// XDSClientResourceStats reports the number of resources currently cached.
type XDSClientResourceStats struct {
Authority string
ResourceType string
CacheState string
Count int64
Comment on lines +53 to +56
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that the previously defined metric structs did not document the fields. But I feel we should do one of the following:

  • trialing line comments for all of the fields
  • a package level comment that talks about every single field
  • a package level comment pointing to the section in A78 that contains a description of the labels, which is what these fields correspond to

}
43 changes: 41 additions & 2 deletions internal/xds/clients/xdsclient/test/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ import (
"fmt"
"strconv"
"strings"
"sync"
"testing"
"time"

"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/xds/clients"
"google.golang.org/grpc/internal/xds/clients/internal/pretty"
"google.golang.org/grpc/internal/xds/clients/internal/testutils"
"google.golang.org/grpc/internal/xds/clients/xdsclient"
Expand Down Expand Up @@ -279,12 +281,16 @@ func buildResourceName(typeName, auth, id string, ctxParams map[string]string) s
// have taken place.
type testMetricsReporter struct {
metricsCh *testutils.Channel

mu sync.Mutex
asyncReporters map[clients.AsyncReporter]struct{}
}

// newTestMetricsReporter returns a new testMetricsReporter.
func newTestMetricsReporter() *testMetricsReporter {
return &testMetricsReporter{
metricsCh: testutils.NewChannelWithSize(1),
metricsCh: testutils.NewChannelWithSize(50),
asyncReporters: make(map[clients.AsyncReporter]struct{}),
}
}

Expand All @@ -302,7 +308,40 @@ func (r *testMetricsReporter) waitForMetric(ctx context.Context, metricsDataWant
return nil
}

func (r *testMetricsReporter) waitForSpecificMetric(ctx context.Context, metricsDataWant any) error {
for {
got, err := r.metricsCh.Receive(ctx)
if err != nil {
return fmt.Errorf("timeout waiting for metric: %v (want %T)", err, metricsDataWant)
}
if diff := cmp.Diff(got, metricsDataWant); diff == "" {
return nil
}
// Continue if mismatch.

}
}

// ReportMetric sends the metrics data to the metricsCh channel.
func (r *testMetricsReporter) ReportMetric(m any) {
r.metricsCh.Replace(m)
r.metricsCh.Send(m)
}

func (r *testMetricsReporter) RegisterAsyncReporter(reporter clients.AsyncReporter) func() {
r.mu.Lock()
defer r.mu.Unlock()
r.asyncReporters[reporter] = struct{}{}
return func() {
r.mu.Lock()
defer r.mu.Unlock()
delete(r.asyncReporters, reporter)
}
}

func (r *testMetricsReporter) triggerAsyncMetrics() {
r.mu.Lock()
defer r.mu.Unlock()
for reporter := range r.asyncReporters {
reporter.Report(r)
}
}
Loading
Loading