-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
80 lines (63 loc) · 2.05 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
from functools import wraps
from flask import g, request, jsonify
import sqlite3
import time
DATABASE = "./data.db"
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if g.user is None:
return "No"
return f(*args, **kwargs)
return decorated_function
def get_db():
db = getattr(g, "_database", None)
if db is None:
db = g._database = sqlite3.connect(DATABASE)
return db
def close_connection(exception):
db = getattr(g, "_database", None)
if db is not None:
db.close()
def query_db(query, args=(), one=False):
cur = get_db().execute(query, args)
rv = cur.fetchall()
get_db().commit()
cur.close()
return (rv[0] if rv else None) if one else rv
def environment_user(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if request.method == "OPTIONS":
return ""
auth_header = request.headers.get("Authorization")
if auth_header is not None:
token = auth_header[len("Token ") :]
else:
return jsonify({"error": "Bad credentials"}), 401
username = query_db(
"select username from user_token where token = ?", (token,), one=True
)
if username is not None:
g.user = username[0]
else:
return jsonify({"error": "Bad credentials"}), 401
return f(*args, **kwargs)
return decorated_function
def cors(response):
response.headers["Access-Control-Expose-Headers"] = ", ".join(
[header for header in response.headers.keys()]
)
response.headers["Access-Control-Allow-Origin"] = "*"
response.headers[
"Access-Control-Allow-Methods"
] = "GET, POST, OPTIONS, DELETE, PUT, PATCH"
response.headers[
"Access-Control-Allow-Headers"
] = "origin, content-type, accept, authorization"
response.headers["Access-Control-Allow-Credentials"] = True
response.headers["Access-Control-Max-Age"] = "1209600"
time.sleep(1)
return response
def catch_all(path):
return ""