From 06f9d737d522923380608c8f1f2821cea0ed3752 Mon Sep 17 00:00:00 2001 From: moonlightnexus Date: Tue, 26 Mar 2024 16:59:39 +0530 Subject: [PATCH] RBAC full update modified: .gitignore new file: .vscode/settings.json modified: setup.py modified: trustauthx/authlite.py --- .gitignore | 1 + .vscode/settings.json | 3 + setup.py | 2 +- trustauthx/authlite.py | 301 +++++++++++++++++++++++++++++++++++++++-- 4 files changed, 293 insertions(+), 14 deletions(-) create mode 100644 .vscode/settings.json diff --git a/.gitignore b/.gitignore index 117763b..9aa0bc6 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ __pycache__/ *.py[cod] *$py.class +*.ipynb # C extensions *.so diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..b881eff --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "python.analysis.autoImportCompletions": true +} \ No newline at end of file diff --git a/setup.py b/setup.py index 9ae1574..4386560 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ setup( name='trustauthx', - version='0.6.0', + version='0.6.2', description='Official connector SDK for TrustAuthx', long_description=long_description, long_description_content_type='text/markdown', # This is important! diff --git a/trustauthx/authlite.py b/trustauthx/authlite.py index 59ffa64..2ad50c3 100644 --- a/trustauthx/authlite.py +++ b/trustauthx/authlite.py @@ -3,10 +3,275 @@ from jose import JWTError, jwt from jose.constants import ALGORITHMS import json - -# authx/authlite.py - -class AuthLiteClient: +import sqlite3 +from dataclasses import dataclass, asdict +from typing import List, Dict + +@dataclass +class Permission: + name: str + value: str + +@dataclass +class Role: + org_id: str + rol_id: str + name: str + permissions: List[Permission] + +class _EdgeDBRoleQuery: + def __init__(self, roles, in_memory=True): + self.in_memory = in_memory + if self.in_memory: + self.roles = {role_id: permissions for role in roles for role_id, permissions in role.items()} + else: + self.conn = sqlite3.connect(':memory:') # replace ':memory:' with your database path + self.cursor = self.conn.cursor() + self.cursor.execute(""" + CREATE TABLE IF NOT EXISTS roles ( + role_id TEXT PRIMARY KEY, + permissions TEXT + ) + """) + for role in roles: + for role_id, permissions in role.items(): + self.cursor.execute("INSERT INTO roles VALUES (?, ?)", (role_id, permissions)) + self.conn.commit() + + def query(self, role_id=None, permission_key=None): + if self.in_memory: + if role_id and permission_key: + return self.roles.get(role_id, {}).get(permission_key, None) + elif role_id: + return self.roles.get(role_id, None) + elif permission_key: + return {role_id: permissions[permission_key] for role_id, permissions in self.roles.items() if permission_key in permissions} + else: + return self.roles + else: + if role_id and permission_key: + self.cursor.execute("SELECT permissions FROM roles WHERE role_id = ?", (role_id,)) + permissions = self.cursor.fetchone() + if permissions: + return permissions[0].get(permission_key, None) + elif role_id: + self.cursor.execute("SELECT permissions FROM roles WHERE role_id = ?", (role_id,)) + return self.cursor.fetchone() + elif permission_key: + self.cursor.execute("SELECT * FROM roles") + return {role_id: permissions[permission_key] for role_id, permissions in self.cursor.fetchall() if permission_key in permissions} + else: + self.cursor.execute("SELECT * FROM roles") + return self.cursor.fetchall() + + def validate(self, role_id, permission_key, permission_val): + if self.in_memory: + return self.roles.get(role_id, {}).get(permission_key, None) == permission_val + else: + self.cursor.execute("SELECT permissions FROM roles WHERE role_id = ?", (role_id,)) + permissions = self.cursor.fetchone() + if permissions: + return permissions[0].get(permission_key, None) == permission_val + + def count_roles(self): + if self.in_memory: + return len(self.roles) + else: + self.cursor.execute("SELECT COUNT(*) FROM roles") + return self.cursor.fetchone()[0] + +class _Roles(_EdgeDBRoleQuery): + def __init__(self, roles, org_id, api_key, signed_key, secret_key, API_BASE_URL, InMemory=True): + self.org_id = org_id + self.api_key = api_key + self._secret_key = secret_key + self.signed_key = signed_key + self.API_BASE_URL = API_BASE_URL + super().__init__(roles, in_memory=InMemory) + print(self.roles) + + def get_all_roles(self): + """[ + { + "org_id": "4195502c85984d27ae1aceb677d99551543808625aeb11ee88069dc8f7663e88", + "rol_id": "rol_gCD_ebc6f7715bb14554", + "name": "string", + "permissions": [ + { + "user": "administration" + } + ] + }, + { + "org_id": "4195502c85984d27ae1aceb677d99551543808625aeb11ee88069dc8f7663e88", + "rol_id": "rol_Ahy_f51d73ff656545e5", + "name": "string", + "permissions": [] + }, + { + "org_id": "4195502c85984d27ae1aceb677d99551543808625aeb11ee88069dc8f7663e88", + "rol_id": "rol_rce_474ae9e59b3d49ce", + "name": "string", + "permissions": [ + { + "user": "administration" + }, + { + "viewer": "administration" + }, + { + "maintainer": "administration" + } + ] + } +]""" + url = f'{self.API_BASE_URL}/rbac/role' + headers = {'accept': 'application/json'} + params = { + 'org_id': f'{self.org_id}', + 'api_key': f'{self.api_key}', + 'signed_key': f'{self._secret_key}' + } + response = requests.get(url, headers=headers, params=params) + roles = [Role(**role_data) for role_data in response.json()] + return roles + + def add_role(self, name, **Permission): + """{ + "org_id": "4195502c85984d27ae1aceb677d99551543808625aeb11ee88069dc8f7663e88", + "rol_id": "rol_rce_474ae9e59b3d49ce", + "name": "string", + "permissions": [ + { + "user": "administration" + }, + { + "viewer": "administration" + }, + { + "maintainer": "administration" + } + ] +}""" + url = f'{self.API_BASE_URL}/rbac/role' + headers = { + 'accept': 'application/json', + 'Content-Type': 'application/json' + } + params = { + 'org_id': f'{self.org_id}', + 'api_key': f'{self.api_key}', + 'signed_key': f'{self._secret_key}' + } + permissions = [{k: v} for k, v in Permission.items()] + data = { + "org_id": f'{self.org_id}', + "name": name, + "permissions": permissions + } + response = requests.post(url, headers=headers, params=params, data=json.dumps(data)) + return response.json() + + def delete_role(self, rol_id): + + """{ + "org_id": "4195502c85984d27ae1aceb677d99551543808625aeb11ee88069dc8f7663e88", + "rol_id": "rol_YHV_78ae9006bcaa4c77", + "name": "string", + "permissions": [ + { + "user": "administration" + }, + { + "viewer": "administration" + }, + { + "maintainer": "administration" + } + ] +}""" + url = f'{self.API_BASE_URL}/rbac/role' + headers = { + 'accept': 'application/json', + 'Content-Type': 'application/json' + } + params = { + 'org_id': f'{self.org_id}', + 'api_key': f'{self.api_key}', + 'signed_key': f'{self._secret_key}' + } + data = { + "org_id": f'{self.org_id}', + "rol_id": rol_id + } + response = requests.delete(url, headers=headers, params=params, data=json.dumps(data)) + return response.json() + + def add_permission(self, rol_id, **Permission): + """{ + "org_id": "4195502c85984d27ae1aceb677d99551543808625aeb11ee88069dc8f7663e88", + "rol_id": "rol_rce_474ae9e59b3d49ce", + "permissions": [ + { + "any": "view" ##only return added content + } + ] +}""" + url = f'{self.API_BASE_URL}/rbac/permission' + headers = { + 'accept': 'application/json', + 'Content-Type': 'application/json' + } + params = { + 'org_id': f'{self.org_id}', + 'api_key': f'{self.api_key}', + 'signed_key': f'{self._secret_key}' + } + permissions = [{k: v} for k, v in Permission.items()] + data = { + "org_id": f'{self.org_id}', + "rol_id": rol_id, + "permissions": permissions + } + response = requests.post(url, headers=headers, params=params, data=json.dumps(data)) + return response.json() + + def delete_permission(self, rol_id, **Permission): + """{ + "org_id": "4195502c85984d27ae1aceb677d99551543808625aeb11ee88069dc8f7663e88", + "rol_id": "rol_rce_474ae9e59b3d49ce", + "permissions": [ + { + "user": "administration" + }, + { + "viewer": "administration" + }, + { + "maintainer": "administration" + } + ] +}""" #return full + url = f'{self.API_BASE_URL}/rbac/permission' + headers = { + 'accept': 'application/json', + 'Content-Type': 'application/json' + } + params = { + 'org_id': f'{self.org_id}', + 'api_key': f'{self.api_key}', + 'signed_key': f'{self._secret_key}' + } + permissions = [{k: v} for k, v in Permission.items()] + data = { + "org_id": f'{self.org_id}', + "rol_id": rol_id, + "permissions": permissions + } + response = requests.delete(url, headers=headers, params=params, data=json.dumps(data)) + return response.json() + +class AuthLiteClient(): """ AuthLiteClient is a Python client for the TrustAuthX authentication service. @@ -63,7 +328,7 @@ class TokenCheck: refresh:str state:bool - def __init__(self, api_key, secret_key, org_id=None): + def __init__(self, api_key, secret_key, org_id=None, API_BASE_URL="https://api.trustauthx.com", in_memory=True): """ Initializes an AuthLiteClient instance. @@ -81,6 +346,11 @@ def __init__(self, api_key, secret_key, org_id=None): self.api_key = api_key self.org_id = org_id self.signed_key = self.jwt_encode(key=self.secret_key, data={"api_key":self.api_key}) + self.API_BASE_URL = API_BASE_URL + self.Roles: _Roles = _Roles(roles=self._set_edge_roles(), org_id=self.org_id, + api_key=self.api_key, signed_key=self.signed_key, + secret_key=self.secret_key, API_BASE_URL=self.API_BASE_URL, + InMemory=in_memory) def generate_url(self) -> str: """ @@ -115,7 +385,7 @@ def generate_edit_user_url(self, access_token, url) -> str: 'signed_key': self.signed_key, 'url':url } - url = "https://api.trustauthx.com/api/user/me/settings/" + url = f"{self.API_BASE_URL}/api/user/me/settings/" req = requests.Request('GET', url, params=params, headers=headers).prepare() return req.url @@ -132,7 +402,7 @@ def re_auth(self, code): Raises: HTTPError: If the request fails with an HTTP error status code. """ - url = "https://api.trustauthx.com/api/user/me/widget/re-auth/token" + url = f"{self.API_BASE_URL}/api/user/me/widget/re-auth/token" params = { "code": code, 'api_key': self.api_key, @@ -167,7 +437,7 @@ def get_user(self, token) -> dict: HTTPError: If the request fails with an HTTP error status code. """ # Validate the given authentication token - url = 'https://api.trustauthx.com/api/user/me/auth/data' + url = f'{self.API_BASE_URL}/api/user/me/auth/data' headers = {'accept': 'application/json'} params = { 'UserToken': token, @@ -203,7 +473,7 @@ def get_user_data(self, AccessToken) -> dict: """ # Validate the given authentication token """returns a dict containing 'access_token', 'refresh_token', 'img', 'sub'""" - url = 'https://api.trustauthx.com/api/user/me/data' + url = f'{self.API_BASE_URL}/api/user/me/data' headers = {'accept': 'application/json'} params = { 'AccessToken': AccessToken, @@ -234,7 +504,7 @@ def get_access_token_from_refresh_token(self, refresh_token): HTTPError: If the request fails with an HTTP error status code. """ # Store the given authentication token - url = 'https://api.trustauthx.com/api/user/me/access/token/' + url = f'{self.API_BASE_URL}/api/user/me/access/token/' headers = {'accept': 'application/json'} params = { 'RefreshToken': refresh_token, @@ -260,7 +530,7 @@ def validate_access_token(self, access_token) -> bool: bool: True if the access token is valid, False otherwise. """ # Store the given authentication token - url = 'https://api.trustauthx.com/api/user/me/auth/validate/token' + url = f'{self.API_BASE_URL}/api/user/me/auth/validate/token' headers = {'accept': 'application/json'} params = { 'AccessToken': access_token, @@ -286,7 +556,7 @@ def revoke_token(self,AccessToken:str=None, RefreshToken:str = None, revoke_all_ HTTPError: If the request fails with an HTTP error status code. AttributeError: If neither AccessToken nor RefreshToken is provided. """ - url = 'https://api.trustauthx.com/api/user/me/token/' + url = f'{self.API_BASE_URL}/api/user/me/token/' headers = {'accept': 'application/json'} if not AccessToken and not RefreshToken:raise AttributeError("must provide either AccessToken or RefreshToken") tt=True if AccessToken else False @@ -333,4 +603,9 @@ def validate_token_set(self, access_token, refresh_token) -> TokenCheck: d.refresh = refresh_token return d except: - raise HTTPError('both tokens are invalid login again') \ No newline at end of file + raise HTTPError('both tokens are invalid login again') + + def _set_edge_roles(self) -> list: + return [] + +