-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain_session.py
135 lines (102 loc) · 3.99 KB
/
main_session.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import secrets
from datetime import datetime, timedelta, timezone
from typing import Dict, Optional
import uvicorn
from fastapi import Cookie, Depends, FastAPI, HTTPException, Response, status
from fastapi.security import OAuth2PasswordRequestForm
from passlib.context import CryptContext
from pydantic import BaseModel
# パスワードをハッシュ化するためのキー。本番環境では変更すること。
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
# アクセストークンの有効期限(分単位)
ACCESS_TOKEN_EXPIRE_MINUTES = 5
# セッションIDの長さ
SESSION_ID_LENGTH = 64
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
}
}
# セッションIDとユーザー名を紐付けるためのデータベース。今回はメモリ上に保存する。
sessions_db: Dict[str, str] = {}
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
class UserInDB(User):
hashed_password: str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
app = FastAPI()
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_session_id() -> str:
while True:
session_id = secrets.token_hex(SESSION_ID_LENGTH)
if session_id not in sessions_db:
return session_id
async def get_current_user(session_id: str = Cookie(None)):
if not session_id or session_id not in sessions_db:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
username = sessions_db[session_id]
user = get_user(fake_users_db, username=username)
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/login")
async def login(response: Response, form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(
fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
session_id = create_session_id()
sessions_db[session_id] = user.username
expires = datetime.now(tz=timezone.utc) + \
timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
response.set_cookie(
key="session_id",
value=session_id,
expires=expires,
httponly=True
)
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
@app.get("/users/me/items/")
async def read_own_items(current_user: User = Depends(get_current_active_user)):
return [{"item_id": "Foo", "owner": current_user.username}]
if __name__ == "__main__":
uvicorn.run("main_session:app", port=8000, reload=True)