forked from EliuX/flask-opa
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathflask_opa.py
207 lines (166 loc) · 6.11 KB
/
flask_opa.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
"""
Flask Extension for OPA
"""
import requests
from flask.app import Flask
__version__ = "1.0.0"
class OPAException(Exception):
"""Exception evaluating a request in OPA"""
def __init__(self, message):
super().__init__(message)
class OPAUnexpectedException(OPAException):
"""Unexpected error evaluating the request in OPA"""
def __init__(self, message='Unexpected error'):
super().__init__(message)
class AccessDeniedException(OPAException):
"""OPA Denied the request"""
def __init__(self, message='Denied'):
super().__init__(message)
class OPAServerUnavailableException(OPAException):
"""When it cannot connect to the OPA Server"""
def __init__(self, message='OPA Server unavailable'):
super().__init__(message)
class OPA(object):
def __init__(self,
app: Flask,
input_function,
url: str = None,
allow_function=None,
wait_time: int = 20000):
super(OPA, self).__init__()
self._app = app
self._pep = {}
self._input_function = input_function
self._allow_function = allow_function or self.default_allow_function
self._deny_on_opa_fail = app.config.get('OPA_DENY_ON_FAIL', True)
self._url = url or app.config.get('OPA_URL')
self._wait_time = wait_time or app.config.get('OPA_WAIT_TIME')
if self._app.config.get('OPA_SECURED', False):
self.secured()
@staticmethod
def secure(*args, **kwargs):
return OPA(*args, **kwargs).secured()
def secured(self,
url=None,
input_function=None,
allow_function=None):
"""Secure app"""
if self.check_authorization not in self._app.before_request_funcs:
self._url = url or self._url
self._allow_function = allow_function or self._allow_function
self._input_function = input_function or self._input_function
if self._url and self._input_function and self._allow_function:
self._app.before_request(self.check_authorization)
else:
raise ValueError("Invalid OPA configuration")
return self
def check_authorization(self):
input = self.input
url = self.url
try:
response = self.query_opa(url, input)
if response is not None:
self.check_opa_response(response)
except OPAException as e:
if self.deny_on_opa_fail:
raise e
def query_opa(self, url, input):
self._app.logger.debug("%s query: %s. content: %s",
self.app, url, input)
try:
return requests.post(url, json=input, timeout=self.wait_time)
except requests.exceptions.ConnectionError as e:
if self.deny_on_opa_fail:
raise OPAServerUnavailableException(str(e))
def check_opa_response(self, response):
if response.status_code != 200:
opa_error = "OPA status code: {}. content: {}".format(
response.status_code, str(response)
)
self._app.logger.error(opa_error)
raise OPAUnexpectedException(opa_error)
resp_json = response.json()
self._app.logger.debug(" => %s", resp_json)
if not self.allow_function(resp_json):
raise AccessDeniedException()
return resp_json
def __call__(self, name: str, url: str,
input_function=None,
allow_function=None):
"""Creates a PEP"""
return PEP(self, name, url, input_function, allow_function)
@property
def pep(self):
return self._pep
@property
def url(self):
return self._url
@url.setter
def url(self, value):
self._url = value
@property
def deny_on_opa_fail(self):
return self._deny_on_opa_fail
@deny_on_opa_fail.setter
def deny_on_opa_fail(self, value):
self._deny_on_opa_fail = value
@property
def input(self):
return self.input_function()
@property
def input_function(self):
return self._input_function
@property
def allow_function(self):
return self._allow_function
@property
def app(self):
return self._app
@property
def wait_time(self):
return self._wait_time
@wait_time.setter
def wait_time(self, value):
self._wait_time = value
@classmethod
def default_allow_function(cls, response_json):
return response_json.get('result', False)
class PEP(OPA):
"""Class to handle Policy Enforcement Points"""
def __init__(self,
opa: OPA,
name: str,
url: str,
input_function=None,
allow_function=None,
deny_on_opa_fail: bool = False):
super(OPA, self).__init__()
self._app = opa.app
opa.pep[name] = self
self._url = url
self._input_function = input_function or opa.input_function
self._allow_function = allow_function or opa.allow_function
self._deny_on_opa_fail = deny_on_opa_fail or False
self._wait_time = opa.wait_time
self._name = name or "PEP"
if not (self._app and self._url and
self._input_function and self._allow_function):
raise ValueError("Invalid Police Enforcement Point configuration")
def check_authorization(self, *args, **kwargs):
_input = self.input(*args, **kwargs)
response = self.query_opa(self.url, _input)
if response is not None:
self.check_opa_response(response)
def __call__(self, f):
def secure_function(*args, **kwargs):
try:
self.check_authorization(*args, **kwargs)
return f(*args, **kwargs)
except OPAException as e:
if self.deny_on_opa_fail:
raise e
return secure_function
def input(self, *args, **kwargs):
return self._input_function(*args, **kwargs)
def __str__(self):
return "<{}>".format(self._name)