Skip to content

Commit

Permalink
chore: Cleanup code and remove refresh token for now.
Browse files Browse the repository at this point in the history
  • Loading branch information
sn3ek committed Feb 4, 2025
1 parent 602dfce commit cabff90
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 18 deletions.
7 changes: 0 additions & 7 deletions examples/07_auth0_with_device_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,6 @@ async def main():
access_token = await auth0_device_auth.poll_token_endpoint()
print(f"Access Token: {access_token}")

# Optionally, refresh the access token using the refresh token
if auth0_device_auth.refresh_token:
new_access_token = auth0_device_auth.refresh_access_token(
auth0_device_auth.refresh_token
)
print(f"New Access Token: {new_access_token}")

except Exception as e:
print(f"An error occurred: {e}")

Expand Down
26 changes: 15 additions & 11 deletions nova/auth/authorization.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from pydantic import BaseModel, Field, ValidationError


class DeviceCodeInfo(BaseModel):
class Auth0DeviceCodeInfo(BaseModel):
"""
Model to store device code information.
Expand All @@ -22,7 +22,7 @@ class DeviceCodeInfo(BaseModel):
interval: int = Field(default=5)


class TokenInfo(BaseModel):
class Auth0TokenInfo(BaseModel):
"""
Model to store token information.
Expand Down Expand Up @@ -92,40 +92,41 @@ def __init__(self, auth0_domain: str, auth0_client_id: str, auth0_audience: str)
self.auth0_client_id = auth0_client_id
self.auth0_audience = auth0_audience
self.headers = {"content-type": "application/x-www-form-urlencoded"}
self.device_code_info: DeviceCodeInfo | None = None
self.device_code_info: Auth0DeviceCodeInfo | None = None
self.interval = 5
self.attempts = 10

def request_device_code(self):
"""
Requests a device code from Auth0.
Returns:
DeviceCodeInfo: The device code information.
Auth0DeviceCodeInfo: The device code information.
Raises:
Exception: If there is an error requesting the device code.
"""
device_code_url = f"https://{self.auth0_domain}/oauth/device/code"
data = {
"client_id": self.auth0_client_id,
"scope": "openid profile email offline_access",
"scope": "openid profile email",
"audience": self.auth0_audience,
}

response = httpx.post(device_code_url, headers=self.headers, data=data)
if response.status_code == 200:
self.device_code_info = DeviceCodeInfo(**response.json())
self.device_code_info = Auth0DeviceCodeInfo(**response.json())
self.interval = self.device_code_info.interval
return self.device_code_info
else:
raise Exception("Error requesting device code:", response.json())

def get_device_code_info(self) -> DeviceCodeInfo | None:
def get_device_code_info(self) -> Auth0DeviceCodeInfo | None:
"""
Returns the device code information.
Returns:
DeviceCodeInfo | None: The device code information.
Auth0DeviceCodeInfo | None: The device code information.
"""
return self.device_code_info

Expand Down Expand Up @@ -166,11 +167,12 @@ async def poll_token_endpoint(self):
}

async with httpx.AsyncClient() as client:
while True:
while self.attempts > 0:
self.attempts = self.attempts - 1
token_response = await client.post(token_url, headers=self.headers, data=token_data)
if token_response.status_code == 200:
# If the response status is 200, it means the access token is successfully obtained.
token_info = TokenInfo(**token_response.json())
token_info = Auth0TokenInfo(**token_response.json())
self.refresh_token = token_info.refresh_token
return token_info.access_token
elif token_response.status_code == 400:
Expand All @@ -189,6 +191,7 @@ async def poll_token_endpoint(self):
else:
# For other status codes, raise an exception with the error details.
raise Exception("Error:", token_response.status_code, token_response.json())
raise Exception("Error: It was not able to authenticate. Please try again.")

def refresh_access_token(self, refresh_token: str):
"""
Expand All @@ -214,8 +217,9 @@ def refresh_access_token(self, refresh_token: str):
}

response = httpx.post(token_url, headers=self.headers, data=token_data)

if response.status_code == 200:
token_info = TokenInfo(**response.json())
token_info = Auth0TokenInfo(**response.json())
return token_info.access_token
else:
raise Exception("Error refreshing access token:", response.json())

0 comments on commit cabff90

Please sign in to comment.