Skip to content

Commit c83de37

Browse files
committed
feat(seq-ql): add histogram pipe
1 parent ea417d7 commit c83de37

File tree

9 files changed

+177
-39
lines changed

9 files changed

+177
-39
lines changed

asyncsearcher/async_searcher.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,14 @@ func (as *AsyncSearcher) StartSearch(r AsyncSearchRequest, fracs fracmanager.Lis
237237
}
238238
r.Params.AST = ast.Root
239239

240+
// extract parameters from pipes
241+
for _, pipe := range ast.Pipes {
242+
p, ok := pipe.(*parser.PipeHistogram)
243+
if ok && r.Params.HistInterval == 0 {
244+
r.Params.HistInterval = uint64(p.Interval)
245+
}
246+
}
247+
240248
now := timeNow()
241249
if r.Retention < minRetention {
242250
return fmt.Errorf("retention time should be at least %s, got %s", minRetention, r.Retention)
@@ -350,6 +358,14 @@ func (as *AsyncSearcher) doSearch(id string, fracs fracmanager.List) {
350358
panic(fmt.Errorf("BUG: search query must be valid: %s", err))
351359
}
352360
info.Request.Params.AST = ast.Root
361+
362+
// extract parameters from pipes
363+
for _, pipe := range ast.Pipes {
364+
p, ok := pipe.(*parser.PipeHistogram)
365+
if ok && info.Request.Params.HistInterval == 0 {
366+
info.Request.Params.HistInterval = uint64(p.Interval)
367+
}
368+
}
353369
}
354370

355371
fracsByName := make(map[string]frac.Fraction)

docs/en/05-seq-ql.md

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ trace_id:in(123e4567-e89b-12d3-a456-426655440000, '123e4567-e89b-12d3-a456-42665
123123
Pipes in seq-ql are used to sequentially process data.
124124
Pipes enable data transformation, enrichment, filtering, aggregation, and formatting of results.
125125

126-
#### `fields` pipe
126+
### `fields` pipe
127127

128128
The `fields` pipe allows removing unnecessary fields from documents in the output.
129129
This is useful when only specific fields are needed, especially when exporting large datasets to reduce network load.
@@ -147,6 +147,18 @@ source_type:access* | fields except payload, cookies
147147

148148
In this example, the `payload` and `cookies` fields will be excluded from the result.
149149

150+
### `histogram` pipe
151+
152+
The `histogram` pipe allows to get a histogram by query.
153+
154+
The pipe takes `interval` parameter that specifies bucket size in time duration format: `10s` for ten seconds, `1h` for one hour etc.
155+
156+
Example:
157+
158+
```seq-ql
159+
source_type:access* | histogram 30s
160+
```
161+
150162
## Comments
151163

152164
Comments are user-provided text that will be ignored when executing a query.

docs/ru/05-seq-ql.md

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ trace_id:in(123e4567-e89b-12d3-a456-426655440000, '123e4567-e89b-12d3-a456-42665
116116
Pipes в seq-ql — это механизм для последовательной обработки данных.
117117
Pipes позволяют трансформировать, обогащать, фильтровать, агрегировать данные и форматировать результаты.
118118

119-
#### `fields` pipe
119+
### `fields` pipe
120120

121121
Pipe `fields` позволяет удалить лишние поля у документов в выдаче.
122122
Это может пригодиться, когда нужно оставить только нужные поля,
@@ -142,6 +142,19 @@ source_type:access* | fields except payload, cookies
142142

143143
В этом примере поля `payload`, `cookies` будут исключены из результата.
144144

145+
### `histogram` pipe
146+
147+
Pipe `histogram` позволяет получить агрегацию по поисковому запросу.
148+
149+
Pipe принимает параметр `interval`, который позволяет указать размер временного интервала для группировки данных:
150+
`10s` для десяти секунды, `1h` для одного часа и так далее.
151+
152+
Пример:
153+
154+
```seq-ql
155+
source_type:access* | histogram 30s
156+
```
157+
145158
## Комментарии
146159

147160
Комментарии – текст, предоставленный пользователем, который будет проигнорирован при выполнении запроса.

parser/seqql_filter_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -391,8 +391,10 @@ func TestParseSeqQLError(t *testing.T) {
391391
// Test pipes.
392392
test(`message:--||`, `unknown pipe: |`)
393393
test(`source_type:access* | fields message | fields except login:admin`, `parsing 'fields' pipe: unexpected symbol ":"`)
394-
test(`source_type:access* | fields message | fields login`, `multiple field filters is not allowed`)
394+
test(`source_type:access* | fields message | fields login`, `multiple field filters are not allowed`)
395395
test(`* | fields event, `, `parsing 'fields' pipe: trailing comma not allowed`)
396+
test(`* | histogram 1s | histogram 1s`, `multiple histograms are not allowed`)
397+
test(`* | histogram 123`, `parsing 'histogram' pipe: can't parse histogram interval`)
396398
}
397399

398400
// edited from the original answer https://stackoverflow.com/a/30230552/11750924

parser/seqql_pipes.go

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ import (
44
"fmt"
55
"strconv"
66
"strings"
7+
"time"
8+
9+
"github.com/ozontech/seq-db/util"
710
)
811

912
type Pipe interface {
@@ -14,6 +17,8 @@ type Pipe interface {
1417
func parsePipes(lex *lexer) ([]Pipe, error) {
1518
// Counter of 'fields' pipes.
1619
fieldFilters := 0
20+
histograms := 0
21+
1722
var pipes []Pipe
1823
for !lex.IsEnd() {
1924
if !lex.IsKeyword("|") {
@@ -29,14 +34,25 @@ func parsePipes(lex *lexer) ([]Pipe, error) {
2934
}
3035
pipes = append(pipes, p)
3136
fieldFilters++
37+
case lex.IsKeyword("histogram"):
38+
p, err := parsePipeHistogram(lex)
39+
if err != nil {
40+
return nil, fmt.Errorf("parsing 'histogram' pipe: %s", err)
41+
}
42+
pipes = append(pipes, p)
43+
histograms++
3244
default:
3345
return nil, fmt.Errorf("unknown pipe: %s", lex.Token)
3446
}
3547

3648
if fieldFilters > 1 {
37-
return nil, fmt.Errorf("multiple field filters is not allowed")
49+
return nil, fmt.Errorf("multiple field filters are not allowed")
50+
}
51+
if histograms > 1 {
52+
return nil, fmt.Errorf("multiple histograms are not allowed")
3853
}
3954
}
55+
4056
return pipes, nil
4157
}
4258

@@ -110,6 +126,51 @@ func parseFieldList(lex *lexer) ([]string, error) {
110126
return fields, nil
111127
}
112128

129+
type PipeHistogram struct {
130+
Interval int64
131+
}
132+
133+
func (h *PipeHistogram) Name() string {
134+
return "histogram"
135+
}
136+
137+
func (h *PipeHistogram) DumpSeqQL(o *strings.Builder) {
138+
o.WriteString("histogram ")
139+
interval := time.Duration(h.Interval) * time.Millisecond
140+
o.WriteString(quoteTokenIfNeeded(interval.String()))
141+
}
142+
143+
func parsePipeHistogram(lex *lexer) (*PipeHistogram, error) {
144+
if !lex.IsKeyword("histogram") {
145+
return nil, fmt.Errorf("missing 'histogram' keyword")
146+
}
147+
148+
lex.Next()
149+
150+
interval, err := parseInterval(lex)
151+
if err != nil {
152+
return nil, fmt.Errorf("can't parse histogram interval")
153+
}
154+
155+
return &PipeHistogram{
156+
Interval: interval,
157+
}, nil
158+
}
159+
160+
func parseInterval(lex *lexer) (int64, error) {
161+
interval, err := parseCompositeTokenReplaceWildcards(lex)
162+
if err != nil {
163+
return 0, err
164+
}
165+
166+
parsed, err := util.ParseDuration(interval)
167+
if err != nil {
168+
return 0, err
169+
}
170+
171+
return parsed.Milliseconds(), nil
172+
}
173+
113174
func quoteTokenIfNeeded(token string) string {
114175
if !needQuoteToken(token) {
115176
return token
@@ -149,7 +210,7 @@ var reservedKeywords = uniqueTokens([]string{
149210
"|",
150211

151212
// Pipe specific keywords.
152-
"fields", "except",
213+
"fields", "except", "histogram",
153214
})
154215

155216
func needQuoteToken(s string) bool {

parser/seqql_pipes_test.go

Lines changed: 33 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -3,38 +3,45 @@ package parser
33
import (
44
"testing"
55

6+
"github.com/stretchr/testify/assert"
67
"github.com/stretchr/testify/require"
78
)
89

910
func TestParsePipeFields(t *testing.T) {
10-
test := func(q, expected string) {
11-
t.Helper()
12-
query, err := ParseSeqQL(q, nil)
13-
require.NoError(t, err)
14-
require.Equal(t, expected, query.SeqQLString())
15-
}
16-
17-
test("* | fields message,error, level", "* | fields message, error, level")
18-
test("* | fields level", "* | fields level")
19-
test("* | fields level", "* | fields level")
20-
test(`* | fields "_id"`, `* | fields _id`)
21-
test(`* | fields "_\\message\\_"`, `* | fields "_\\message\\_"`)
22-
test(`* | fields "_\\message*"`, `* | fields "_\\message\*"`)
23-
test(`* | fields k8s_namespace`, `* | fields k8s_namespace`)
11+
test(t, "* | fields message,error, level", "* | fields message, error, level")
12+
test(t, "* | fields level", "* | fields level")
13+
test(t, "* | fields level", "* | fields level")
14+
test(t, `* | fields "_id"`, `* | fields _id`)
15+
test(t, `* | fields "_\\message\\_"`, `* | fields "_\\message\\_"`)
16+
test(t, `* | fields "_\\message*"`, `* | fields "_\\message\*"`)
17+
test(t, `* | fields k8s_namespace`, `* | fields k8s_namespace`)
2418
}
2519

2620
func TestParsePipeFieldsExcept(t *testing.T) {
27-
test := func(q, expected string) {
28-
t.Helper()
29-
query, err := ParseSeqQL(q, nil)
30-
require.NoError(t, err)
31-
require.Equal(t, expected, query.SeqQLString())
32-
}
21+
test(t, "* | fields except message,error, level", "* | fields except message, error, level")
22+
test(t, "* | fields except level", "* | fields except level")
23+
test(t, `* | fields except "_id"`, `* | fields except _id`)
24+
test(t, `* | fields except "_\\message\\_"`, `* | fields except "_\\message\\_"`)
25+
test(t, `* | fields except "_\\message*"`, `* | fields except "_\\message\*"`)
26+
test(t, `* | fields except k8s_namespace`, `* | fields except k8s_namespace`)
27+
}
28+
29+
func TestParsePipeHistogram(t *testing.T) {
30+
test(t, `* | histogram 1s`, `* | histogram 1s`)
31+
test(t, `* | histogram 60s`, `* | histogram 1m0s`)
32+
test(t, `* | histogram 1m`, `* | histogram 1m0s`)
33+
test(t, `* | histogram 10m`, `* | histogram 10m0s`)
34+
test(t, `* | histogram 2h`, `* | histogram 2h0m0s`)
35+
}
36+
37+
func TestPipesComposition(t *testing.T) {
38+
test(t, `* | fields level | histogram 1s`, `* | fields level | histogram 1s`)
39+
test(t, `* | histogram 1s | fields level`, `* | histogram 1s | fields level`)
40+
}
3341

34-
test("* | fields except message,error, level", "* | fields except message, error, level")
35-
test("* | fields except level", "* | fields except level")
36-
test(`* | fields except "_id"`, `* | fields except _id`)
37-
test(`* | fields except "_\\message\\_"`, `* | fields except "_\\message\\_"`)
38-
test(`* | fields except "_\\message*"`, `* | fields except "_\\message\*"`)
39-
test(`* | fields except k8s_namespace`, `* | fields except k8s_namespace`)
42+
func test(t *testing.T, q, expected string) {
43+
t.Helper()
44+
query, err := ParseSeqQL(q, nil)
45+
require.NoError(t, err)
46+
assert.Equal(t, expected, query.SeqQLString())
4047
}

proxyapi/grpc_complex_search.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func (g *grpcV1) ComplexSearch(
4444
resp.Aggs = makeProtoAggregation(allAggregations)
4545
aggTr.Done()
4646
}
47-
if req.Hist != nil {
47+
if sResp.qpr.Histogram != nil {
4848
histTr := tr.NewChild("histogram")
4949
resp.Hist = makeProtoHistogram(sResp.qpr)
5050
histTr.Done()

storeapi/grpc_search.go

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ func (g *GrpcV1) doSearch(
9999
t := time.Now()
100100

101101
parseQueryTr := tr.NewChild("parse query")
102-
ast, err := g.parseQuery(req.Query)
102+
ast, pipes, err := g.parseQuery(req.Query)
103103
if err != nil {
104104
parseQueryTr.Done()
105105
if code, ok := parseStoreError(err); ok {
@@ -125,7 +125,7 @@ func (g *GrpcV1) doSearch(
125125
zap.String("query", req.Query),
126126
)
127127

128-
parseQuery, err := g.parseQuery(g.config.Filter.Query)
128+
parseQuery, _, err := g.parseQuery(g.config.Filter.Query)
129129
if err != nil {
130130
parseQueryTr.Done()
131131
if code, ok := parseStoreError(err); ok {
@@ -152,10 +152,20 @@ func (g *GrpcV1) doSearch(
152152
const millisecondsInSecond = float64(time.Second / time.Millisecond)
153153
metric.SearchRangesSeconds.Observe(float64(to-from) / millisecondsInSecond)
154154

155+
histInterval := req.Interval
156+
157+
// extract parameters from pipes
158+
for _, pipe := range pipes {
159+
p, ok := pipe.(*parser.PipeHistogram)
160+
if ok && histInterval == 0 {
161+
histInterval = p.Interval
162+
}
163+
}
164+
155165
searchParams := processor.SearchParams{
156166
AST: ast,
157167
AggQ: aggQ,
158-
HistInterval: uint64(req.Interval),
168+
HistInterval: uint64(histInterval),
159169
From: from,
160170
To: to,
161171
Limit: limit,
@@ -216,18 +226,23 @@ func (g *GrpcV1) doSearch(
216226
return buildSearchResponse(qpr), nil
217227
}
218228

219-
func (g *GrpcV1) parseQuery(query string) (*parser.ASTNode, error) {
229+
func (g *GrpcV1) parseQuery(query string) (*parser.ASTNode, []parser.Pipe, error) {
220230
if query == "" {
221231
query = seq.TokenAll + ":*"
222232
}
223233

234+
var ast *parser.ASTNode
235+
var pipes []parser.Pipe
236+
224237
seqql, err := parser.ParseSeqQL(query, g.mappingProvider.GetMapping())
225238
if err != nil {
226-
return nil, status.Errorf(codes.InvalidArgument, "can't parse query %q: %v", query, err)
239+
return nil, nil, status.Errorf(codes.InvalidArgument, "can't parse query %q: %v", query, err)
227240
}
228-
ast := seqql.Root
229241

230-
return ast, nil
242+
ast = seqql.Root
243+
pipes = seqql.Pipes
244+
245+
return ast, pipes, nil
231246
}
232247

233248
func (g *GrpcV1) earlierThanOldestFrac(from uint64) bool {

tests/integration_tests/integration_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1307,6 +1307,18 @@ func (s *IntegrationTestSuite) TestAggNoTotal() {
13071307
histSum += v
13081308
}
13091309
assert.Equal(t, uint64(allDocsNum), histSum, "the sum of the histogram should be equal to the number of all documents")
1310+
1311+
// histogram from pipe
1312+
qpr, _, _, err = env.Search(`service:x* | histogram 60s`, size, setup.NoFetch())
1313+
require.NoError(t, err, "should be no errors")
1314+
assert.Equal(t, uint64(allDocsNum), qpr.Total, "we must scann all docs in withTotal=true mode")
1315+
assert.Equal(t, size, len(qpr.IDs), "we must get only size ids")
1316+
assert.Equal(t, histCnt, len(qpr.Histogram))
1317+
histSum = uint64(0)
1318+
for _, v := range qpr.Histogram {
1319+
histSum += v
1320+
}
1321+
assert.Equal(t, uint64(allDocsNum), histSum, "the sum of the histogram should be equal to the number of all documents")
13101322
}
13111323

13121324
s.T().Run("ActiveFraction", test)

0 commit comments

Comments
 (0)