Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions backend/clickhouse/cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ import (
modelInputs "github.com/highlight-run/highlight/backend/private-graph/graph/model"
)

func getLogsPayload(logs []*modelInputs.LogEdge, limit uint64) *modelInputs.LogsPayload {
hasNextPage := uint64(len(logs)) == limit+1
func getLogsPayload(logs []*modelInputs.LogEdge) *modelInputs.LogsPayload {
hasNextPage := len(logs) == Limit+1

var endCursor string
if hasNextPage {
logs = logs[:limit]
logs = logs[:Limit]
endCursor = logs[len(logs)-1].Cursor
}

Expand Down
132 changes: 81 additions & 51 deletions backend/clickhouse/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"strings"
"time"

sq "github.com/Masterminds/squirrel"
"github.com/google/uuid"
modelInputs "github.com/highlight-run/highlight/backend/private-graph/graph/model"
"github.com/huandu/go-sqlbuilder"
flat "github.com/nqd/flat"
e "github.com/pkg/errors"
)
Expand Down Expand Up @@ -71,22 +71,22 @@ func (client *Client) BatchWriteLogRows(ctx context.Context, logRows []*LogRow)
return batch.Send()
}

const Limit uint64 = 100
const Limit int = 100

func (client *Client) ReadLogs(ctx context.Context, projectID int, params modelInputs.LogsParamsInput, after *string) (*modelInputs.LogsPayload, error) {
query := makeSelectQuery("Timestamp, UUID, SeverityText, Body, LogAttributes", projectID, params, after)
query = query.OrderBy("Timestamp DESC, UUID DESC").Limit(Limit + 1)
sb, err := makeSelectBuilder("Timestamp, UUID, SeverityText, Body, LogAttributes", projectID, params, after)
if err != nil {
return nil, err
}

sql, args, err := query.ToSql()
sb.OrderBy("Timestamp DESC, UUID DESC").Limit(Limit + 1)

sql, args := sb.Build()
if err != nil {
return nil, err
}

rows, err := client.conn.Query(
ctx,
sql,
args...,
)
rows, err := client.conn.Query(ctx, sql, args...)

if err != nil {
return nil, err
Expand Down Expand Up @@ -118,16 +118,17 @@ func (client *Client) ReadLogs(ctx context.Context, projectID int, params modelI
}
rows.Close()

return getLogsPayload(logs, Limit), rows.Err()
return getLogsPayload(logs), rows.Err()
}

func (client *Client) ReadLogsTotalCount(ctx context.Context, projectID int, params modelInputs.LogsParamsInput) (uint64, error) {
query := makeSelectQuery("COUNT(*)", projectID, params, nil)
sql, args, err := query.ToSql()
sb, err := makeSelectBuilder("COUNT(*)", projectID, params, nil)
if err != nil {
return 0, err
}

sql, args := sb.Build()

var count uint64
err = client.conn.QueryRow(
ctx,
Expand All @@ -139,18 +140,15 @@ func (client *Client) ReadLogsTotalCount(ctx context.Context, projectID int, par
}

func (client *Client) LogsKeys(ctx context.Context, projectID int) ([]*modelInputs.LogKey, error) {
query := sq.Select("arrayJoin(LogAttributes.keys) as key, count() as cnt").
sb := sqlbuilder.NewSelectBuilder()
sb.Select("arrayJoin(LogAttributes.keys) as key, count() as cnt").
From("logs").
Where(sq.Eq{"ProjectId": projectID}).
Where(sb.Equal("ProjectId", projectID)).
GroupBy("key").
OrderBy("cnt DESC").
Limit(50)

sql, args, err := query.ToSql()

if err != nil {
return nil, err
}
sql, args := sb.Build()

rows, err := client.conn.Query(ctx, sql, args...)

Expand All @@ -174,30 +172,41 @@ func (client *Client) LogsKeys(ctx context.Context, projectID int) ([]*modelInpu
})
}

keys = append(keys, &modelInputs.LogKey{
Name: "level",
Type: modelInputs.LogKeyTypeString,
})

rows.Close()
return keys, rows.Err()

}

func (client *Client) LogsKeyValues(ctx context.Context, projectID int, keyName string) ([]string, error) {
query := sq.Select("LogAttributes[?] as value, count() as cnt").
From("logs").
Where(sq.Eq{"ProjectId": projectID}).
Where("mapContains(LogAttributes, ?)", keyName).
GroupBy("value").
OrderBy("cnt DESC").
Limit(50)

sql, args, err := query.ToSql()

// Injects `keyName` into LogAttributes[?]
argsWithKeyName := append([]interface{}{keyName}, args...)

if err != nil {
return nil, err
sb := sqlbuilder.NewSelectBuilder()

// TODO(et) - this is going to be a mess when we add other reserved keys like Trace. Clean this up when that happens.
if keyName == "level" {
sb.Select("SeverityText level, count() as cnt").
From("logs").
Where(sb.Equal("ProjectId", projectID)).
Where(sb.NotEqual("level", "")).
GroupBy("level").
OrderBy("cnt DESC").
Limit(50)
} else {
sb.Select("LogAttributes [" + sb.Var(keyName) + "] as value, count() as cnt").
From("logs").
Where(sb.Equal("ProjectId", projectID)).
Where("mapContains(LogAttributes, " + sb.Var(keyName) + ")").
GroupBy("value").
OrderBy("cnt DESC").
Limit(50)
}

rows, err := client.conn.Query(ctx, sql, argsWithKeyName...)
sql, args := sb.Build()

rows, err := client.conn.Query(ctx, sql, args...)

if err != nil {
return nil, err
Expand Down Expand Up @@ -257,47 +266,61 @@ func makeSeverityText(severityText string) modelInputs.SeverityText {
}
}

func makeSelectQuery(selectStr string, projectID int, params modelInputs.LogsParamsInput, after *string) sq.SelectBuilder {
query := sq.Select(selectStr).
func makeSelectBuilder(selectStr string, projectID int, params modelInputs.LogsParamsInput, after *string) (*sqlbuilder.SelectBuilder, error) {
sb := sqlbuilder.NewSelectBuilder()
sb.Select(selectStr).
From("logs").
Where(sq.Eq{"ProjectId": projectID})
Where(sb.Equal("ProjectId", projectID))

if after != nil && len(*after) > 1 {
timestamp, uuid, err := decodeCursor(*after)
if err != nil {
fmt.Print("error decoding cursor")
return nil, err
}

// See https://dba.stackexchange.com/a/206811
query = query.Where("toUInt64(toDateTime(Timestamp)) <= ?", uint64(timestamp.Unix())).
Where("(toUInt64(toDateTime(Timestamp)) < ? OR UUID < ?)", uint64(timestamp.Unix()), uuid)
sb.Where(sb.LessEqualThan("toUInt64(toDateTime(Timestamp))", uint64(timestamp.Unix()))).
Where(
sb.Or(
sb.LessThan("toUInt64(toDateTime(Timestamp))", uint64(timestamp.Unix())),
sb.LessThan("UUID", uuid),
),
)

} else {
query = query.Where(sq.LtOrEq{"toUInt64(toDateTime(Timestamp))": uint64(params.DateRange.EndDate.Unix())}).
Where(sq.GtOrEq{"toUInt64(toDateTime(Timestamp))": uint64(params.DateRange.StartDate.Unix())})
sb.Where(sb.LessEqualThan("toUInt64(toDateTime(Timestamp))", uint64(params.DateRange.EndDate.Unix()))).
Where(sb.GreaterEqualThan("toUInt64(toDateTime(Timestamp))", uint64(params.DateRange.StartDate.Unix())))
}

filters := makeFilters(params.Query)

if len(filters.body) > 0 {
query = query.Where(sq.ILike{"Body": filters.body})
sb.Where("Body ILIKE" + sb.Var(filters.body))
}

if len(filters.level) > 0 {
if strings.Contains(filters.level, "%") {
sb.Where(sb.Like("SeverityText", filters.level))
} else {
sb.Where(sb.Equal("SeverityText", filters.level))
}
}

for key, value := range filters.attributes {
column := fmt.Sprintf("LogAttributes['%s']", key)
if strings.Contains(value, "%") {
query = query.Where(sq.Like{column: value})

sb.Where(sb.Like(column, value))
} else {
query = query.Where(sq.Eq{column: value})
sb.Where(sb.Equal(column, value))
}
}

return query
return sb, nil
}

type filters struct {
body string
level string
attributes map[string]string
}

Expand All @@ -319,8 +342,15 @@ func makeFilters(query string) filters {
}
filters.body = filters.body + body
} else if len(parts) == 2 {
wildcardValue := strings.ReplaceAll(parts[1], "*", "%")
filters.attributes[parts[0]] = wildcardValue
key, value := parts[0], parts[1]

wildcardValue := strings.ReplaceAll(value, "*", "%")

if key == "level" {
filters.level = wildcardValue
} else {
filters.attributes[key] = wildcardValue
}
}
}

Expand Down
104 changes: 103 additions & 1 deletion backend/clickhouse/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestReadLogsHasNextPage(t *testing.T) {
now := time.Now()
var rows []*LogRow

for i := uint64(1); i <= Limit; i++ { // 100 is a hardcoded limit
for i := 1; i <= Limit; i++ { // 100 is a hardcoded limit
rows = append(rows, &LogRow{
LogRowPrimaryAttrs: LogRowPrimaryAttrs{
Timestamp: now,
Expand Down Expand Up @@ -266,6 +266,57 @@ func TestReadLogsWithKeyFilter(t *testing.T) {
assert.Len(t, payload.Edges, 1)
}

func TestReadLogsWithLevelFilter(t *testing.T) {
ctx := context.Background()
client := setup(t)
defer teardown(client)

now := time.Now()
rows := []*LogRow{
{
LogRowPrimaryAttrs: LogRowPrimaryAttrs{
Timestamp: now,
ProjectId: 1,
},
SeverityText: "INFO",
},
{
LogRowPrimaryAttrs: LogRowPrimaryAttrs{
Timestamp: now,
ProjectId: 1,
},
LogAttributes: map[string]string{
"level": "WARN",
},
},
}

assert.NoError(t, client.BatchWriteLogRows(ctx, rows))

payload, err := client.ReadLogs(ctx, 1, modelInputs.LogsParamsInput{
DateRange: makeDateWithinRange(now),
Query: "level:INFO",
}, nil)
assert.NoError(t, err)
assert.Len(t, payload.Edges, 1)
assert.Equal(t, modelInputs.SeverityText("INFO"), payload.Edges[0].Node.SeverityText)

payload, err = client.ReadLogs(ctx, 1, modelInputs.LogsParamsInput{
DateRange: makeDateWithinRange(now),
Query: "level:*NF*",
}, nil)
assert.NoError(t, err)
assert.Len(t, payload.Edges, 1)
assert.Equal(t, modelInputs.SeverityText("INFO"), payload.Edges[0].Node.SeverityText)

payload, err = client.ReadLogs(ctx, 1, modelInputs.LogsParamsInput{
DateRange: makeDateWithinRange(now),
Query: "level:WARN",
}, nil)
assert.NoError(t, err)
assert.Len(t, payload.Edges, 0)
}

func TestLogsKeys(t *testing.T) {
ctx := context.Background()
client := setup(t)
Expand Down Expand Up @@ -302,6 +353,12 @@ func TestLogsKeys(t *testing.T) {
Name: "user_id",
Type: modelInputs.LogKeyTypeString,
},

// Non-custom keys ranked lower
{
Name: "level",
Type: modelInputs.LogKeyTypeString,
},
}
assert.Equal(t, expected, keys)
}
Expand Down Expand Up @@ -372,6 +429,51 @@ func TestLogKeyValues(t *testing.T) {
assert.Equal(t, expected, values)
}

func TestLogKeyValuesLevel(t *testing.T) {
ctx := context.Background()
client := setup(t)
defer teardown(client)

rows := []*LogRow{
{
LogRowPrimaryAttrs: LogRowPrimaryAttrs{
Timestamp: time.Now(),
ProjectId: 1,
},
SeverityText: "INFO",
},
{
LogRowPrimaryAttrs: LogRowPrimaryAttrs{
Timestamp: time.Now(),
ProjectId: 1,
},
SeverityText: "WARN",
},
{
LogRowPrimaryAttrs: LogRowPrimaryAttrs{
Timestamp: time.Now(),
ProjectId: 1,
},
SeverityText: "INFO",
},
{
LogRowPrimaryAttrs: LogRowPrimaryAttrs{
Timestamp: time.Now(),
ProjectId: 1,
},
LogAttributes: map[string]string{"level": "FATAL"}, // should be skipped in the output
},
}

assert.NoError(t, client.BatchWriteLogRows(ctx, rows))

values, err := client.LogsKeyValues(ctx, 1, "level")
assert.NoError(t, err)

expected := []string{"INFO", "WARN"}
assert.Equal(t, expected, values)
}

func TestExpandJSON(t *testing.T) {
var tests = []struct {
logAttributes map[string]string
Expand Down
Loading