Skip to content

Commit

Permalink
Add permission checks for admin to read and write in any `cod_SIAPE_i…
Browse files Browse the repository at this point in the history
…nstituidora` (#104)

* Add test for admin create pe in another unit

* Checks that user is admin in test

* Add test admin create status_participante

in a unit other than their own

* Solve linting warning singleton comparison

* Add test for admin create pt in another unit

* Use for tests user that is not actual admin

and adjust the unit to a different unit

* Fix organizational unit in test

* Implement is_admin check in endpoint permissions

* Solve linting warning

* Solve linting warning

* Format with black

* Add fixtures to fix test

* Add fixture for creating a pt in unit 3

* Adjust fixtures to recreate pe at every test

* Add tests for creating a pt in different unit

* Change user in test to not admin user

* Implement permission check to read pt

* Add docstrings to tests

* Add tests for creating pe in different unit

* Implement access control for reading plano_entrega

* Add missing docstrings and make summaries uniform

* Fix test to use a normal user and add example data

* Add assert equal in get pe

* Add helper functions for comparing status part

* Compare status part content in test

* Formatting and remove debug prints

* Create example_part_unidade_3 fixture

* Implement permission check in GET status_part

* Replace fixture used in URL

* Rename test

* Add test add part in different unit as admin
  • Loading branch information
augusto-herrmann authored Jan 31, 2024
1 parent d8a9d57 commit d586819
Show file tree
Hide file tree
Showing 6 changed files with 390 additions and 63 deletions.
80 changes: 46 additions & 34 deletions src/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,15 @@

@app.on_event("startup")
async def on_startup():
"""Executa as rotinas de inicialização da API."""
await create_db_and_tables()
await crud_auth.init_user_admin()


@app.get("/", include_in_schema=False)
async def docs_redirect(accept: Union[str, None] = Header(default="text/html")):
async def docs_redirect(
accept: Union[str, None] = Header(default="text/html")
) -> RedirectResponse:
"""
Redireciona para a documentação da API.
"""
Expand All @@ -66,14 +69,16 @@ async def docs_redirect(accept: Union[str, None] = Header(default="text/html")):

@app.post(
"/token",
summary="Autentica na api-pgd",
summary="Autentica na API.",
response_model=schemas.Token,
tags=["Auth"],
)
async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
db: DbContextManager = Depends(DbContextManager),
):
) -> dict:
"""Realiza o login na API usando as credenciais de acesso, obtendo um
token de acesso."""
try:
schemas.UsersInputSchema(email=form_data.username)
except Exception as exception:
Expand Down Expand Up @@ -101,7 +106,7 @@ async def login_for_access_token(

@app.get(
"/users",
summary="Consulta usuários da api-pgd",
summary="Lista usuários da API.",
tags=["Auth"],
)
async def get_users(
Expand All @@ -111,12 +116,13 @@ async def get_users(
],
db: DbContextManager = Depends(DbContextManager),
) -> list[schemas.UsersGetSchema]:
"""Obtém a lista de usuários da API."""
return await crud_auth.get_all_users(db)


@app.put(
"/user/{email}",
summary="Cria ou edita usuário na api-pgd",
summary="Cria ou altera usuário na API.",
tags=["Auth"],
)
async def create_or_update_user(
Expand All @@ -127,6 +133,8 @@ async def create_or_update_user(
email: str,
db: DbContextManager = Depends(DbContextManager),
) -> dict:
"""Cria um usuário da API ou atualiza os seus dados cadastrais."""

# Validações

# ## url
Expand Down Expand Up @@ -166,7 +174,7 @@ async def create_or_update_user(

@app.get(
"/user/{email}",
summary="Consulta usuários da api-pgd",
summary="Consulta um usuário da API.",
tags=["Auth"],
)
async def get_user(
Expand All @@ -177,6 +185,9 @@ async def get_user(
email: str,
db: DbContextManager = Depends(DbContextManager),
) -> schemas.UsersGetSchema:
"""Retorna os dados cadastrais do usuário da API especificado pelo
e-mail informado."""

user = await crud_auth.get_user(db, email)

if user:
Expand All @@ -189,13 +200,16 @@ async def get_user(

@app.post(
"/user/forgot_password/{email}",
summary="Recuperação de Acesso",
summary="Solicita recuperação de acesso à API.",
tags=["Auth"],
)
async def forgot_password(
email: str,
db: DbContextManager = Depends(DbContextManager),
) -> schemas.UsersInputSchema:
"""Dispara o processo de recuperação de senha, enviando um token de
redefinição de senha ao e-mail informado no cadastro do usuário."""

user = await crud_auth.get_user(db, email)

if user:
Expand All @@ -213,7 +227,7 @@ async def forgot_password(

@app.get(
"/user/reset_password/",
summary="Criar nova senha a partir do token de acesso",
summary="Criar nova senha a partir do token de acesso.",
tags=["Auth"],
)
async def reset_password(
Expand Down Expand Up @@ -243,13 +257,22 @@ async def reset_password(
)
async def get_plano_entrega(
user: Annotated[schemas.UsersSchema, Depends(crud_auth.get_current_active_user)],
cod_SIAPE_instituidora: int,
id_plano_entrega_unidade: int,
db: DbContextManager = Depends(DbContextManager),
):
"Consulta o plano de entregas com o código especificado."

# Validações de permissão
if (cod_SIAPE_instituidora != user.cod_SIAPE_instituidora) and not user.is_admin:
raise HTTPException(
status.HTTP_401_UNAUTHORIZED,
detail="Usuário não tem permissão na cod_SIAPE_instituidora informada",
)

db_plano_entrega = await crud.get_plano_entregas(
db_session=db,
cod_SIAPE_instituidora=user.cod_SIAPE_instituidora,
cod_SIAPE_instituidora=cod_SIAPE_instituidora,
id_plano_entrega_unidade=id_plano_entrega_unidade,
)
if not db_plano_entrega:
Expand Down Expand Up @@ -278,12 +301,7 @@ async def create_or_update_plano_entregas(
plano de entregas por um novo com os dados informados."""

# Validações de permissão
if (
cod_SIAPE_instituidora
!= user.cod_SIAPE_instituidora
# TODO: Dar acesso ao superusuário em todas as unidades.
# and "all:write" not in access_token_info["permissions"]
):
if (cod_SIAPE_instituidora != user.cod_SIAPE_instituidora) and not user.is_admin:
raise HTTPException(
status.HTTP_401_UNAUTHORIZED,
detail="Usuário não tem permissão na cod_SIAPE_instituidora informada",
Expand Down Expand Up @@ -366,13 +384,22 @@ async def create_or_update_plano_entregas(
)
async def get_plano_trabalho(
user: Annotated[schemas.UsersSchema, Depends(crud_auth.get_current_active_user)],
cod_SIAPE_instituidora: int,
id_plano_trabalho_participante: int,
db: DbContextManager = Depends(DbContextManager),
):
"Consulta o plano de trabalho com o código especificado."

# Validações de permissão
if (cod_SIAPE_instituidora != user.cod_SIAPE_instituidora) and not user.is_admin:
raise HTTPException(
status.HTTP_401_UNAUTHORIZED,
detail="Usuário não tem permissão na cod_SIAPE_instituidora informada",
)

db_plano_trabalho = await crud.get_plano_trabalho(
db_session=db,
cod_SIAPE_instituidora=user.cod_SIAPE_instituidora,
cod_SIAPE_instituidora=cod_SIAPE_instituidora,
id_plano_trabalho_participante=id_plano_trabalho_participante,
)
if not db_plano_trabalho:
Expand Down Expand Up @@ -401,12 +428,7 @@ async def create_or_update_plano_trabalho(
plano de trabalho por um novo com os dados informados."""

# Validações de permissão
if (
cod_SIAPE_instituidora
!= user.cod_SIAPE_instituidora
# TODO: Dar acesso ao superusuário em todas as unidades.
# and "all:write" not in access_token_info["permissions"]
):
if (cod_SIAPE_instituidora != user.cod_SIAPE_instituidora) and not user.is_admin:
raise HTTPException(
status.HTTP_401_UNAUTHORIZED,
detail="Usuário não tem permissão na cod_SIAPE_instituidora informada",
Expand Down Expand Up @@ -499,12 +521,7 @@ async def get_status_participante(
"Consulta o status do participante a partir da matricula SIAPE."

# Validações de permissão
if (
cod_SIAPE_instituidora
!= user.cod_SIAPE_instituidora
# TODO: Dar acesso ao superusuário em todas as unidades.
# and "all:write" not in access_token_info["permissions"]
):
if (cod_SIAPE_instituidora != user.cod_SIAPE_instituidora) and not user.is_admin:
raise HTTPException(
status.HTTP_401_UNAUTHORIZED,
detail="Usuário não tem permissão na cod_SIAPE_instituidora informada",
Expand Down Expand Up @@ -540,12 +557,7 @@ async def create_status_participante(
"""Envia um ou mais status de Programa de Gestão de um participante."""

# Validações de permissão
if (
cod_SIAPE_instituidora
!= user.cod_SIAPE_instituidora
# TODO: Dar acesso ao superusuário em todas as unidades.
# and "all:write" not in access_token_info["permissions"]
):
if (cod_SIAPE_instituidora != user.cod_SIAPE_instituidora) and not user.is_admin:
raise HTTPException(
status.HTTP_401_UNAUTHORIZED,
detail="Usuário não tem permissão na cod_SIAPE_instituidora informada",
Expand Down
58 changes: 53 additions & 5 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,23 @@ def example_pe(
)


@pytest.fixture()
def example_pe_unidade_3(
client: httpx.Client,
input_pe: dict,
user1_credentials: dict,
header_usr_1: dict,
):
"""Cria um Plano de Entrega como exemplo."""
input_pe["cod_SIAPE_instituidora"] = 3
client.put(
f"/organizacao/{input_pe['cod_SIAPE_instituidora']}"
f"/plano_entregas/{input_pe['id_plano_entrega_unidade']}",
json=input_pe,
headers=header_usr_1,
)


@pytest.fixture()
def example_pt(
client: httpx.Client, input_pt: dict, user1_credentials: dict, header_usr_1: dict
Expand All @@ -241,15 +258,46 @@ def example_pt(
)


@pytest.fixture()
def example_pt_unidade_3(
client: httpx.Client,
input_pt: dict,
header_admin: dict,
):
"""Cria um Plano de Trabalho do Participante como exemplo."""
input_pt["cod_SIAPE_instituidora"] = 3
client.put(
f"/organizacao/{input_pt['cod_SIAPE_instituidora']}"
f"/plano_trabalho/{input_pt['id_plano_trabalho_participante']}",
json=input_pt,
headers=header_admin,
)


@pytest.fixture()
def example_part(
client: httpx.Client, input_part: dict, user1_credentials: dict, header_usr_1: dict
client: httpx.Client, input_part: dict, header_admin: dict
):
"""Cria um exemplo de status de participante."""
client.put(
f"/organizacao/{user1_credentials['cod_SIAPE_instituidora']}"
f"/organizacao/{input_part['cod_SIAPE_instituidora']}"
f"/participante/{input_part['cpf_participante']}",
json={"lista_status": [input_part]},
headers=header_usr_1,
headers=header_admin,
)


@pytest.fixture()
def example_part_unidade_3(
client: httpx.Client, input_part: dict, header_admin: dict
):
"""Cria um exemplo de status de participante na unidade 3."""
input_part["cod_SIAPE_instituidora"] = 3
client.put(
f"/organizacao/{input_part['cod_SIAPE_instituidora']}"
f"/participante/{input_part['cpf_participante']}",
json={"lista_status": [input_part]},
headers=header_admin,
)


Expand All @@ -269,7 +317,7 @@ def truncate_participantes():


@pytest.fixture(scope="module", name="truncate_users")
def fixture_truncate_users(admin_credentials: dict):
def fixture_truncate_users(admin_credentials: dict): # pylint: disable=unused-argument
truncate_user()
asyncio.get_event_loop().run_until_complete(init_user_admin())

Expand Down Expand Up @@ -309,7 +357,7 @@ def header_not_logged_in() -> dict:

@pytest.fixture(scope="module", name="header_admin")
def fixture_header_admin(
register_admin, admin_credentials: dict # pylint: disable=unused-argument
admin_credentials: dict, # pylint: disable=unused-argument
) -> dict:
"""Authenticate in the API as an admin and return a dict with bearer
header parameter to be passed to API's requests."""
Expand Down
Loading

0 comments on commit d586819

Please sign in to comment.