forked from netobserv/flowlogs-pipeline
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathenrich.go
More file actions
144 lines (131 loc) · 4.34 KB
/
enrich.go
File metadata and controls
144 lines (131 loc) · 4.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package kubernetes
import (
"strings"
"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/datasource"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/informers"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/model"
"github.com/sirupsen/logrus"
)
var ds *datasource.Datasource
var infConfig informers.Config
// For testing
func MockInformers() {
infConfig = informers.NewConfig(&api.NetworkTransformKubeConfig{})
ds = &datasource.Datasource{Informers: informers.NewInformersMock()}
}
func InitInformerDatasource(config *api.NetworkTransformKubeConfig, opMetrics *operational.Metrics) error {
var err error
infConfig = informers.NewConfig(config)
if ds == nil {
ds, err = datasource.NewInformerDatasource(config.ConfigPath, &infConfig, opMetrics)
}
return err
}
func Enrich(outputEntry config.GenericMap, rule *api.K8sRule) {
ip, ok := outputEntry.LookupString(rule.IPField)
if !ok {
return
}
potentialKeys := infConfig.BuildSecondaryNetworkKeys(outputEntry, rule)
kubeInfo := ds.IndexLookup(potentialKeys, ip)
if kubeInfo == nil {
logrus.Tracef("can't find kubernetes info for keys %v and IP %s", potentialKeys, ip)
return
}
// NETOBSERV-666: avoid putting empty namespaces or Loki aggregation queries will
// differentiate between empty and nil namespaces.
if kubeInfo.Namespace != "" {
outputEntry[rule.OutputKeys.Namespace] = kubeInfo.Namespace
}
outputEntry[rule.OutputKeys.Name] = kubeInfo.Name
outputEntry[rule.OutputKeys.Kind] = kubeInfo.Kind
outputEntry[rule.OutputKeys.OwnerName] = kubeInfo.OwnerName
outputEntry[rule.OutputKeys.OwnerKind] = kubeInfo.OwnerKind
outputEntry[rule.OutputKeys.NetworkName] = kubeInfo.NetworkName
if rule.LabelsPrefix != "" {
for labelKey, labelValue := range kubeInfo.Labels {
outputEntry[rule.LabelsPrefix+"_"+labelKey] = labelValue
}
}
if kubeInfo.HostIP != "" {
outputEntry[rule.OutputKeys.HostIP] = kubeInfo.HostIP
if kubeInfo.HostName != "" {
outputEntry[rule.OutputKeys.HostName] = kubeInfo.HostName
}
}
fillInK8sZone(outputEntry, rule, kubeInfo)
if rule.Assignee == "otel" {
// A few otel-specific names
// TODO: remove them? This is adding quite some redundant content
switch kubeInfo.Kind {
case model.KindNode:
outputEntry[rule.Output+"k8s.node.name"] = kubeInfo.Name
outputEntry[rule.Output+"k8s.node.uid"] = kubeInfo.UID
case model.KindPod:
outputEntry[rule.Output+"k8s.pod.name"] = kubeInfo.Name
outputEntry[rule.Output+"k8s.pod.uid"] = kubeInfo.UID
case model.KindService:
outputEntry[rule.Output+"k8s.service.name"] = kubeInfo.Name
outputEntry[rule.Output+"k8s.service.uid"] = kubeInfo.UID
}
}
}
const nodeZoneLabelName = "topology.kubernetes.io/zone"
func fillInK8sZone(outputEntry config.GenericMap, rule *api.K8sRule, kubeInfo *model.ResourceMetaData) {
if !rule.AddZone {
// Nothing to do
return
}
switch kubeInfo.Kind {
case model.KindNode:
zone, ok := kubeInfo.Labels[nodeZoneLabelName]
if ok {
outputEntry[rule.OutputKeys.Zone] = zone
}
return
case model.KindPod:
nodeInfo, err := ds.GetNodeByName(kubeInfo.HostName)
if err != nil {
logrus.WithError(err).Tracef("can't find nodes info for node %v", kubeInfo.HostName)
return
}
if nodeInfo != nil {
zone, ok := nodeInfo.Labels[nodeZoneLabelName]
if ok {
outputEntry[rule.OutputKeys.Zone] = zone
}
}
return
case model.KindService:
// A service is not assigned to a dedicated zone, skipping
return
}
}
func EnrichLayer(outputEntry config.GenericMap, rule *api.K8sInfraRule) {
outputEntry[rule.Output] = "infra"
for _, nsnameFields := range rule.NamespaceNameFields {
if namespace, _ := outputEntry.LookupString(nsnameFields.Namespace); namespace != "" {
name, _ := outputEntry.LookupString(nsnameFields.Name)
if objectIsApp(namespace, name, rule) {
outputEntry[rule.Output] = "app"
return
}
}
}
}
func objectIsApp(namespace, name string, rule *api.K8sInfraRule) bool {
for _, prefix := range rule.InfraPrefixes {
if strings.HasPrefix(namespace, prefix) {
return false
}
}
for _, ref := range rule.InfraRefs {
if namespace == ref.Namespace && name == ref.Name {
return false
}
}
return true
}