-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathapp.py
144 lines (121 loc) · 4.92 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
from flask import Flask, jsonify, request, Response
from flask_sqlalchemy import SQLAlchemy
import jwt
from jwt.exceptions import DecodeError, MissingRequiredClaimError, InvalidKeyError
import json
from base64 import b64decode
import datetime
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///test.db'
app.config['SECRET_KEY_HMAC'] = 'secret'
with open('public.pem','r') as f:
app.config['PUBLIC_KEY_RSA'] = f.read()
db = SQLAlchemy(app)
class User(db.Model):
'''
Using SQLAlchemy to generate a SQLite DB. This is a very minimal user table with username and password.
The username:password combinations used in the default DB is "admin:admin123", "guest:guest123"
'''
id = db.Column(db.Integer, primary_key = True)
username = db.Column(db.String(80), unique = True)
password = db.Column(db.String(80), unique = True)
def __init__(self, username, password):
self.username = username
self.password = password
def __repr__(self):
return "<User {0}>".format(self.username)
def get_exp_date():
exp_date = datetime.datetime.utcnow() + datetime.timedelta(minutes = 1)
return exp_date
def verify_jwt(token):
try:
decoded = jwt.decode(token, app.config['SECRET_KEY_HMAC'], verify=True, issuer = 'we45', leeway=10, algorithms=['HS256'])
print("JWT Token from API: {0}".format(decoded))
return True
except DecodeError:
print("Error in decoding token")
return False
except MissingRequiredClaimError as e:
print('Claim required is missing: {0}'.format(e))
return False
def verify_rsa_jwt(token):
try:
decoded = jwt.decode(token, app.config['PUBLIC_KEY_RSA'])
print("JWT Token from API: {0}".format(decoded))
return True
except DecodeError:
print("Error in decoding Token")
return False
except InvalidKeyError as key:
print(key)
return False
def insecure_verify(token):
decoded = jwt.decode(token, verify = False)
print decoded
return True
@app.route('/login', methods = ['POST'])
def login():
'''
You will need to authenticate to this URI first. You will need to pass a JSON body with a username and password key.
If you enter a valid username and password, a JWT token is returned in the HTTP Response in the Authorization header.
This token can be used for subsequent requests.
'''
try:
content = request.json
username = content['username']
password = content['password']
auth_user = User.query.filter_by(username = username, password = password).first()
if auth_user:
auth_token = jwt.encode({'user': username, 'exp': get_exp_date(), 'nbf': datetime.datetime.utcnow(), 'iss': 'we45', 'iat': datetime.datetime.utcnow()}, app.config['SECRET_KEY_HMAC'], algorithm='HS256')
resp = Response(json.dumps({'Hello': username}))
resp.headers['Authorization'] = "{0}".format(auth_token)
resp.status_code = 200
resp.mimetype = 'application/json'
return resp
else:
return jsonify({'Error': 'No User here...'}),404
except:
return jsonify({'Error': 'Unable to recognize Input'}),404
@app.route('/auth', methods = ["GET"])
def protected_page():
'''
You will need to pass a valid JWT in the HTTP Authorization Header to get a valid response from this URL.
The Token validates the signature, expiration and issuer claims. The
'''
token = request.headers.get('Authorization')
if not token:
return jsonify({'Error': 'No Token in Request'}), 404
else:
if not verify_jwt(token):
return jsonify({'Error': 'Token cannot be validated'}),404
else:
return jsonify({'Hello': 'This is an authenicated response'}),200
@app.route('/insecure_auth', methods = ["GET"])
def insecure_page():
'''
This function does not verify the JWT. Hence, you can pass any JWT and it will accept it as valid.
'''
token = request.headers.get('Authorization')
if not token:
return jsonify({'Error': 'No Token in Request'}), 404
else:
if not insecure_verify(token):
return jsonify({'Error': 'Token cannot be validated'}),404
else:
return jsonify({'Hello': 'This is an authenicated response'}),200
@app.route('/rsa_auth', methods = ["GET"])
def rsa_page():
'''
This function errors out when you throw a JWT that is signed with a public key. In this case, we are using the
key from the public.pem file.
'''
token = request.headers.get('Authorization')
if not token:
return jsonify({'Error': 'No Token in Request'}), 404
else:
if not verify_rsa_jwt(token):
return jsonify({'Error': 'Token cannot be validated'}),404
else:
return jsonify({'Hello': 'This is an authenicated response'}),200
if __name__ == '__main__':
app.run(debug=True)