-
Notifications
You must be signed in to change notification settings - Fork 100
Expand file tree
/
Copy pathgroup_validation.py
More file actions
59 lines (50 loc) · 2.05 KB
/
group_validation.py
File metadata and controls
59 lines (50 loc) · 2.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
from flask import request, Blueprint
import sys, traceback
import json
import os
from webhook.request_helper import validate, responses
bp = Blueprint("group-webhook", __name__)
group_prefix = os.getenv("GROUP_VALIDATION_PREFIX", "osd-sre-")
admin_group = os.getenv("GROUP_VALIDATION_ADMIN_GROUP", "osd-sre-admins,osd-sre-cluster-admins")
admin_groups = admin_group.split(",")
@bp.route('/group-validation', methods=['POST'])
def handle_request():
debug = os.getenv("DEBUG_GROUP_VALIDATION", "False")
debug = (debug == "True")
if debug:
print("REQUEST BODY => {}".format(request.json))
valid = True
try:
valid = validate.validate_request_structure(request.json)
except:
valid = False
if not valid:
return responses.response_invalid()
try:
body_dict = request.json['request']
group_name = body_dict['object']['metadata']['name']
userinfo = body_dict['userInfo']
if userinfo['username'] == "kube:admin":
# kubeadmin can do anything
if debug:
print("Performing action: {} in {} group for kube:admin".format(body_dict['operation'],group_name))
return responses.response_allow(req=body_dict)
if group_name.startswith(group_prefix):
if debug:
print("Performing action: {} in {} group".format(body_dict['operation'],group_name))
if len(set(userinfo['groups']) & set(admin_groups)) > 0:
response_body = responses.response_allow(req=body_dict,msg="{} group {}".format(body_dict['operation'], group_name))
else:
deny_msg = "User not authorized to {} group {}".format(body_dict['operation'],group_name)
response_body = responses.response_deny(req=body_dict,msg=deny_msg)
else:
response_body = responses.response_allow(req=body_dict)
if debug:
print("Response body => {}".format(response_body))
return response_body
except Exception:
print("Exception when trying to access attributes. Request body: {}".format(request.json))
print("Backtrace:")
print("-"*60)
traceback.print_exc(file=sys.stdout)
return responses.response_invalid()