Skip to content

Commit 72ffb4a

Browse files
Merge pull request thanos-io#8511 from thanos-io/mhoffmann/compute-proper-labelset-for-remote-engine
query: use proper labelset for fan-out pruning in distributed engine
2 parents 802f43f + d3df5e4 commit 72ffb4a

File tree

2 files changed

+105
-90
lines changed

2 files changed

+105
-90
lines changed

pkg/query/remote_engine.go

Lines changed: 77 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
"github.com/thanos-io/thanos/pkg/api/query/querypb"
2727
"github.com/thanos-io/thanos/pkg/info/infopb"
2828
"github.com/thanos-io/thanos/pkg/server/http/middleware"
29-
"github.com/thanos-io/thanos/pkg/store/labelpb"
3029
"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
3130
grpc_tracing "github.com/thanos-io/thanos/pkg/tracing/tracing_middleware"
3231
)
@@ -121,15 +120,14 @@ func (r remoteEndpoints) Engines() []api.RemoteEngine {
121120
type remoteEngine struct {
122121
opts Opts
123122
logger log.Logger
124-
125123
client Client
126124

127-
mintOnce sync.Once
128-
mint int64
129-
maxtOnce sync.Once
130-
maxt int64
131-
labelSetsOnce sync.Once
132-
labelSets []labels.Labels
125+
initOnce sync.Once
126+
127+
mint int64
128+
maxt int64
129+
labelSets []labels.Labels
130+
partitionLabelSets []labels.Labels
133131
}
134132

135133
func NewRemoteEngine(logger log.Logger, queryClient Client, opts Opts) *remoteEngine {
@@ -140,100 +138,109 @@ func NewRemoteEngine(logger log.Logger, queryClient Client, opts Opts) *remoteEn
140138
}
141139
}
142140

143-
// MinT returns the minimum timestamp that is safe to query in the remote engine.
144-
// In order to calculate it, we find the highest min time for each label set, and we return
145-
// the lowest of those values.
146-
// Calculating the MinT this way makes remote queries resilient to cases where one tsdb replica would delete
147-
// a block due to retention before other replicas did the same.
148-
// See https://github.com/thanos-io/promql-engine/issues/187.
149-
func (r *remoteEngine) MinT() int64 {
141+
func (r *remoteEngine) init() {
142+
r.initOnce.Do(func() {
143+
replicaLabelSet := make(map[string]struct{})
144+
for _, lbl := range r.opts.ReplicaLabels {
145+
replicaLabelSet[lbl] = struct{}{}
146+
}
147+
partitionLabelsSet := make(map[string]struct{})
148+
for _, lbl := range r.opts.PartitionLabels {
149+
partitionLabelsSet[lbl] = struct{}{}
150+
}
150151

151-
r.mintOnce.Do(func() {
152+
// strip out replica labels and scopes the remaining labels
153+
// onto the partition labels if they are set.
154+
155+
// partitionLabelSets are used to compute how to push down, they are the minimum set of labels
156+
// that form a partition of the remote engines.
157+
// labelSets are all labelsets of the remote engine, they are used for fan-out pruning on labels
158+
// that dont meaningfully contribute to the partitioning but are still useful.
152159
var (
153160
hashBuf = make([]byte, 0, 128)
154161
highestMintByLabelSet = make(map[uint64]int64)
162+
163+
labelSetsBuilder labels.ScratchBuilder
164+
partitionLabelSetsBuilder labels.ScratchBuilder
165+
166+
labelSets = make([]labels.Labels, 0, len(r.client.tsdbInfos))
167+
partitionLabelSets = make([]labels.Labels, 0, len(r.client.tsdbInfos))
155168
)
156-
for _, lset := range r.adjustedInfos() {
157-
key, _ := labelpb.ZLabelsToPromLabels(lset.Labels.Labels).HashWithoutLabels(hashBuf)
169+
for _, info := range r.client.tsdbInfos {
170+
labelSetsBuilder.Reset()
171+
partitionLabelSetsBuilder.Reset()
172+
for _, lbl := range info.Labels.Labels {
173+
if _, ok := replicaLabelSet[lbl.Name]; ok {
174+
continue
175+
}
176+
labelSetsBuilder.Add(lbl.Name, lbl.Value)
177+
if _, ok := partitionLabelsSet[lbl.Name]; !ok && len(partitionLabelsSet) > 0 {
178+
continue
179+
}
180+
partitionLabelSetsBuilder.Add(lbl.Name, lbl.Value)
181+
}
182+
183+
partitionLabelSet := partitionLabelSetsBuilder.Labels()
184+
labelSet := labelSetsBuilder.Labels()
185+
labelSets = append(labelSets, labelSet)
186+
partitionLabelSets = append(partitionLabelSets, partitionLabelSet)
187+
188+
key, _ := partitionLabelSet.HashWithoutLabels(hashBuf)
158189
lsetMinT, ok := highestMintByLabelSet[key]
159190
if !ok {
160-
highestMintByLabelSet[key] = lset.MinTime
191+
highestMintByLabelSet[key] = info.MinTime
161192
continue
162193
}
163194
// If we are querying with overlapping intervals, we want to find the first available timestamp
164195
// otherwise we want to find the last available timestamp.
165-
if r.opts.QueryDistributedWithOverlappingInterval && lset.MinTime < lsetMinT {
166-
highestMintByLabelSet[key] = lset.MinTime
167-
} else if !r.opts.QueryDistributedWithOverlappingInterval && lset.MinTime > lsetMinT {
168-
highestMintByLabelSet[key] = lset.MinTime
196+
if r.opts.QueryDistributedWithOverlappingInterval && info.MinTime < lsetMinT {
197+
highestMintByLabelSet[key] = info.MinTime
198+
} else if !r.opts.QueryDistributedWithOverlappingInterval && info.MinTime > lsetMinT {
199+
highestMintByLabelSet[key] = info.MinTime
169200
}
170201
}
171-
var mint int64 = math.MaxInt64
202+
203+
// mint is the minimum timestamp that is safe to query in the remote engine.
204+
// In order to calculate it, we find the highest min time for each label set, and we return
205+
// the lowest of those values.
206+
// Calculating the MinT this way makes remote queries resilient to cases where one tsdb replica would delete
207+
// a block due to retention before other replicas did the same.
208+
// See https://github.com/thanos-io/promql-engine/issues/187.
209+
var (
210+
mint = int64(math.MaxInt64)
211+
maxt = r.client.tsdbInfos.MaxT()
212+
)
172213
for _, m := range highestMintByLabelSet {
173214
if m < mint {
174215
mint = m
175216
}
176217
}
218+
177219
r.mint = mint
220+
r.maxt = maxt
221+
r.labelSets = labelSets
222+
r.partitionLabelSets = partitionLabelSets
178223
})
224+
}
179225

226+
func (r *remoteEngine) MinT() int64 {
227+
r.init()
180228
return r.mint
181229
}
182230

183231
func (r *remoteEngine) MaxT() int64 {
184-
r.maxtOnce.Do(func() {
185-
r.maxt = r.client.tsdbInfos.MaxT()
186-
})
232+
r.init()
187233
return r.maxt
188234
}
189235

190-
func (r *remoteEngine) PartitionLabelSets() []labels.Labels {
191-
r.labelSetsOnce.Do(func() {
192-
r.labelSets = r.adjustedInfos().LabelSets()
193-
})
194-
return r.labelSets
195-
}
196-
197236
func (r *remoteEngine) LabelSets() []labels.Labels {
198-
r.labelSetsOnce.Do(func() {
199-
r.labelSets = r.adjustedInfos().LabelSets()
200-
})
237+
r.init()
201238
return r.labelSets
202239
}
203240

204-
// adjustedInfos strips out replica labels and scopes the remaining labels
205-
// onto the partition labels if they are set.
206-
func (r *remoteEngine) adjustedInfos() infopb.TSDBInfos {
207-
replicaLabelSet := make(map[string]struct{})
208-
for _, lbl := range r.opts.ReplicaLabels {
209-
replicaLabelSet[lbl] = struct{}{}
210-
}
211-
partitionLabelsSet := make(map[string]struct{})
212-
for _, lbl := range r.opts.PartitionLabels {
213-
partitionLabelsSet[lbl] = struct{}{}
214-
}
215-
216-
// Strip replica labels from the result.
217-
infos := make(infopb.TSDBInfos, 0, len(r.client.tsdbInfos))
218-
var builder labels.ScratchBuilder
219-
for _, info := range r.client.tsdbInfos {
220-
builder.Reset()
221-
for _, lbl := range info.Labels.Labels {
222-
if _, ok := replicaLabelSet[lbl.Name]; ok {
223-
continue
224-
}
225-
if _, ok := partitionLabelsSet[lbl.Name]; !ok && len(partitionLabelsSet) > 0 {
226-
continue
227-
}
228-
builder.Add(lbl.Name, lbl.Value)
229-
}
230-
infos = append(infos, infopb.NewTSDBInfo(
231-
info.MinTime,
232-
info.MaxTime,
233-
labelpb.ZLabelsFromPromLabels(builder.Labels())),
234-
)
235-
}
236-
return infos
241+
func (r *remoteEngine) PartitionLabelSets() []labels.Labels {
242+
r.init()
243+
return r.partitionLabelSets
237244
}
238245

239246
func (r *remoteEngine) NewRangeQuery(_ context.Context, _ promql.QueryOpts, plan api.RemoteQuery, start, end time.Time, interval time.Duration) (promql.Query, error) {

pkg/query/remote_engine_test.go

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -105,37 +105,42 @@ func TestRemoteEngine_LabelSets(t *testing.T) {
105105
t.Parallel()
106106

107107
tests := []struct {
108-
name string
109-
tsdbInfos []infopb.TSDBInfo
110-
replicaLabels []string
111-
expected []labels.Labels
112-
partitionLabels []string
108+
name string
109+
tsdbInfos []infopb.TSDBInfo
110+
replicaLabels []string
111+
partitionLabels []string
112+
expectedLabelSets []labels.Labels
113+
expectedPartitionLabelSets []labels.Labels
113114
}{
114115
{
115-
name: "empty label sets",
116-
tsdbInfos: []infopb.TSDBInfo{},
117-
expected: []labels.Labels{},
116+
name: "empty label sets",
117+
tsdbInfos: []infopb.TSDBInfo{},
118+
expectedLabelSets: []labels.Labels{},
119+
expectedPartitionLabelSets: []labels.Labels{},
118120
},
119121
{
120-
name: "empty label sets with replica labels",
121-
tsdbInfos: []infopb.TSDBInfo{},
122-
replicaLabels: []string{"replica"},
123-
expected: []labels.Labels{},
122+
name: "empty label sets with replica labels",
123+
tsdbInfos: []infopb.TSDBInfo{},
124+
replicaLabels: []string{"replica"},
125+
expectedLabelSets: []labels.Labels{},
126+
expectedPartitionLabelSets: []labels.Labels{},
124127
},
125128
{
126129
name: "non-empty label sets",
127130
tsdbInfos: []infopb.TSDBInfo{{
128131
Labels: zLabelSetFromStrings("a", "1"),
129132
}},
130-
expected: []labels.Labels{labels.FromStrings("a", "1")},
133+
expectedLabelSets: []labels.Labels{labels.FromStrings("a", "1")},
134+
expectedPartitionLabelSets: []labels.Labels{labels.FromStrings("a", "1")},
131135
},
132136
{
133137
name: "non-empty label sets with replica labels",
134138
tsdbInfos: []infopb.TSDBInfo{{
135139
Labels: zLabelSetFromStrings("a", "1", "b", "2"),
136140
}},
137-
replicaLabels: []string{"a"},
138-
expected: []labels.Labels{labels.FromStrings("b", "2")},
141+
replicaLabels: []string{"a"},
142+
expectedLabelSets: []labels.Labels{labels.FromStrings("b", "2")},
143+
expectedPartitionLabelSets: []labels.Labels{labels.FromStrings("b", "2")},
139144
},
140145
{
141146
name: "replica labels not in label sets",
@@ -144,8 +149,9 @@ func TestRemoteEngine_LabelSets(t *testing.T) {
144149
Labels: zLabelSetFromStrings("a", "1", "c", "2"),
145150
},
146151
},
147-
replicaLabels: []string{"a", "b"},
148-
expected: []labels.Labels{labels.FromStrings("c", "2")},
152+
replicaLabels: []string{"a", "b"},
153+
expectedLabelSets: []labels.Labels{labels.FromStrings("c", "2")},
154+
expectedPartitionLabelSets: []labels.Labels{labels.FromStrings("c", "2")},
149155
},
150156
{
151157
name: "non-empty label sets with partition labels",
@@ -154,8 +160,9 @@ func TestRemoteEngine_LabelSets(t *testing.T) {
154160
Labels: zLabelSetFromStrings("a", "1", "c", "2"),
155161
},
156162
},
157-
partitionLabels: []string{"a"},
158-
expected: []labels.Labels{labels.FromStrings("a", "1")},
163+
partitionLabels: []string{"a"},
164+
expectedLabelSets: []labels.Labels{labels.FromStrings("a", "1", "c", "2")},
165+
expectedPartitionLabelSets: []labels.Labels{labels.FromStrings("a", "1")},
159166
},
160167
}
161168

@@ -167,7 +174,8 @@ func TestRemoteEngine_LabelSets(t *testing.T) {
167174
PartitionLabels: testCase.partitionLabels,
168175
})
169176

170-
testutil.Equals(t, testCase.expected, engine.LabelSets())
177+
testutil.Equals(t, testCase.expectedPartitionLabelSets, engine.PartitionLabelSets())
178+
testutil.Equals(t, testCase.expectedLabelSets, engine.LabelSets())
171179
})
172180
}
173181
}

0 commit comments

Comments
 (0)