Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DOP-21482] - implement unit tests for KeycloakAuthProvider #133

Merged
merged 5 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion .env.local
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,16 @@ export SYNCMASTER__CRYPTO_KEY=UBgPTioFrtH2unlC4XFDiGf5sYfzbdSf_VgiUSaQc94=
# Postgres
export SYNCMASTER__DATABASE__URL=postgresql+asyncpg://syncmaster:changeme@localhost:5432/syncmaster

# Auth
# Keycloack Auth
export SYNCMASTER__AUTH__SERVER_URL=http://keycloak:8080/
export SYNCMASTER__AUTH__REALM_NAME=manually_created
export SYNCMASTER__AUTH__CLIENT_ID=manually_created
export SYNCMASTER__AUTH__CLIENT_SECRET=generated_by_keycloak
export SYNCMASTER__AUTH__REDIRECT_URI=http://localhost:8000/auth/callback
export SYNCMASTER__AUTH__SCOPE=email
export SYNCMASTER__AUTH__PROVIDER=syncmaster.backend.providers.auth.keycloak_provider.KeycloakAuthProvider

# Dummy Auth
export SYNCMASTER__AUTH__PROVIDER=syncmaster.backend.providers.auth.dummy_provider.DummyAuthProvider
export SYNCMASTER__AUTH__ACCESS_TOKEN__SECRET_KEY=secret

Expand Down
45 changes: 32 additions & 13 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ onetl = {extras = ["spark", "s3", "hdfs"], version = "^0.12.0"}
faker = ">=28.4.1,<34.0.0"
coverage = "^7.6.1"
gevent = "^24.2.1"
responses = "*"

[tool.poetry.group.dev.dependencies]
mypy = "^1.11.2"
Expand Down
6 changes: 3 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ def event_loop():
loop.close()


@pytest.fixture(scope="session")
def settings():
return Settings()
@pytest.fixture(scope="session", params=[{}])
def settings(request: pytest.FixtureRequest) -> Settings:
return Settings.parse_obj(request.param)


@pytest.fixture(scope="session")
Expand Down
Empty file.
Empty file.
118 changes: 118 additions & 0 deletions tests/test_unit/test_auth/mocks/keycloak.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import json
from base64 import b64encode

import responses
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
from itsdangerous import TimestampSigner
from jose import jwt

# copied from .env.docker as backend tries to send requests to corresponding
KEYCLOAK_CONFIG = {
"server_url": "http://keycloak:8080",
dolfinus marked this conversation as resolved.
Show resolved Hide resolved
"realm_name": "manually_created",
"redirect_uri": "http://localhost:8000/v1/auth/callback",
"client_secret": "generated_by_keycloak",
"scope": "email",
"client_id": "test-client",
}
# create private & public keys to emulate Keycloak signing
PRIVATE_KEY = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
)
PRIVATE_PEM = PRIVATE_KEY.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)
PUBLIC_KEY = PRIVATE_KEY.public_key()
dolfinus marked this conversation as resolved.
Show resolved Hide resolved


def get_public_key_pem(public_key):
public_pem = public_key.public_bytes(
encoding=Encoding.PEM,
format=PublicFormat.SubjectPublicKeyInfo,
)
public_pem_str = public_pem.decode("utf-8")
public_pem_str = public_pem_str.replace("-----BEGIN PUBLIC KEY-----\n", "")
public_pem_str = public_pem_str.replace("-----END PUBLIC KEY-----\n", "")
public_pem_str = public_pem_str.replace("\n", "")
return public_pem_str


def create_session_cookie(payload: dict, session_secret_key: str) -> str:
access_token = jwt.encode(payload, PRIVATE_PEM, algorithm="RS256")
refresh_token = "mock_refresh_token"
session_data = {
"access_token": access_token,
"refresh_token": refresh_token,
}

signer = TimestampSigner(session_secret_key)
json_bytes = json.dumps(session_data).encode("utf-8")
base64_bytes = b64encode(json_bytes)
signed_data = signer.sign(base64_bytes)
return signed_data.decode("utf-8")


def mock_keycloak_well_known(responses_mock):
server_url = KEYCLOAK_CONFIG.get("server_url")
realm_name = KEYCLOAK_CONFIG.get("realm_name")
well_known_url = f"{server_url}/realms/{realm_name}/.well-known/openid-configuration"

responses_mock.add(
responses.GET,
well_known_url,
json={
"authorization_endpoint": f"{server_url}/realms/{realm_name}/protocol/openid-connect/auth",
"token_endpoint": f"{server_url}/realms/{realm_name}/protocol/openid-connect/token",
"userinfo_endpoint": f"{server_url}/realms/{realm_name}/protocol/openid-connect/userinfo",
"end_session_endpoint": f"{server_url}/realms/{realm_name}/protocol/openid-connect/logout",
"jwks_uri": f"{server_url}/realms/{realm_name}/protocol/openid-connect/certs",
"issuer": f"{server_url}/realms/{realm_name}",
},
status=200,
content_type="application/json",
)


def mock_keycloak_token_endpoint(responses_mock, access_token: str, refresh_token: str):
server_url = KEYCLOAK_CONFIG.get("server_url")
realm_name = KEYCLOAK_CONFIG.get("realm_name")
token_url = f"{server_url}/realms/{realm_name}/protocol/openid-connect/token"

responses_mock.add(
responses.POST,
token_url,
body=json.dumps(
{
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer",
"expires_in": 3600,
},
),
status=200,
content_type="application/json",
)


def mock_keycloak_realm(responses_mock):
server_url = KEYCLOAK_CONFIG.get("server_url")
realm_name = KEYCLOAK_CONFIG.get("realm_name")
realm_url = f"{server_url}/realms/{realm_name}"

responses_mock.add(
responses.GET,
realm_url,
json={
"realm": realm_name,
"public_key": get_public_key_pem(PUBLIC_KEY),
"token-service": f"{server_url}/realms/{realm_name}/protocol/openid-connect/token",
"account-service": f"{server_url}/realms/{realm_name}/account",
},
status=200,
content_type="application/json",
)
Loading
Loading