From bf616f9db5173d5f32cf53e9d034e2c307dda006 Mon Sep 17 00:00:00 2001 From: Kuchenpirat <24235032+Kuchenpirat@users.noreply.github.com> Date: Wed, 22 Jan 2025 17:06:41 +0100 Subject: [PATCH] fix: prevent users from updating their own household privileges (#4928) Co-authored-by: Michael Genson <71845777+michael-genson@users.noreply.github.com> --- .../controller_household_self_service.py | 3 + mealie/routes/users/_helpers.py | 49 +++++++++++-- mealie/routes/users/crud.py | 21 +----- mealie/routes/users/images.py | 2 +- mealie/routes/users/ratings.py | 2 +- .../test_household_permissions.py | 13 ++++ .../user_tests/test_user_crud.py | 71 ++++++++++++++++++- 7 files changed, 133 insertions(+), 28 deletions(-) diff --git a/mealie/routes/households/controller_household_self_service.py b/mealie/routes/households/controller_household_self_service.py index c3a7e0083bc..89b7c711240 100644 --- a/mealie/routes/households/controller_household_self_service.py +++ b/mealie/routes/households/controller_household_self_service.py @@ -76,6 +76,9 @@ def set_member_permissions(self, permissions: SetPermissions): if target_user.household_id != self.household_id: raise HTTPException(status.HTTP_403_FORBIDDEN, detail="User is not a member of this household") + if target_user.id == self.user.id: + raise HTTPException(status.HTTP_403_FORBIDDEN, detail="User is not allowed to change their own permissions") + target_user.can_invite = permissions.can_invite target_user.can_manage = permissions.can_manage target_user.can_manage_household = permissions.can_manage_household diff --git a/mealie/routes/users/_helpers.py b/mealie/routes/users/_helpers.py index 83a5dbb25aa..6b69a18496b 100644 --- a/mealie/routes/users/_helpers.py +++ b/mealie/routes/users/_helpers.py @@ -1,10 +1,49 @@ from fastapi import HTTPException, status from pydantic import UUID4 -from mealie.schema.user.user import PrivateUser +from mealie.schema.response.responses import ErrorResponse +from mealie.schema.user.user import PrivateUser, UserBase +permission_attrs = ["can_invite", "can_manage", "can_manage_household", "can_organize", "admin"] -def assert_user_change_allowed(id: UUID4, current_user: PrivateUser): - if current_user.id != id and not current_user.admin: - # only admins can edit other users - raise HTTPException(status.HTTP_403_FORBIDDEN, detail="NOT_AN_ADMIN") + +def _assert_non_admin_user_change_allowed(user_id: UUID4, current_user: PrivateUser, new_data: UserBase): + if current_user.id != user_id: + # User is trying to edit another user + raise HTTPException(status.HTTP_403_FORBIDDEN, ErrorResponse.respond("User cannot edit other users")) + + if any(getattr(current_user, p) != getattr(new_data, p) for p in permission_attrs): + # User is trying to change their own permissions + raise HTTPException( + status.HTTP_403_FORBIDDEN, + ErrorResponse.respond("User cannot change their own permissions"), + ) + + if current_user.group != new_data.group: + # prevent a regular user from changing their group + raise HTTPException( + status.HTTP_403_FORBIDDEN, ErrorResponse.respond("User doesn't have permission to change their group") + ) + + if current_user.household != new_data.household: + # prevent a regular user from changing their household + raise HTTPException( + status.HTTP_403_FORBIDDEN, + ErrorResponse.respond("User doesn't have permission to change their household"), + ) + + +def assert_user_change_allowed(user_id: UUID4, current_user: PrivateUser, new_data: UserBase): + if not current_user.admin: + _assert_non_admin_user_change_allowed(user_id, current_user, new_data) + return + + if current_user.id != user_id: + raise HTTPException(status.HTTP_403_FORBIDDEN, ErrorResponse.respond("Use the Admin API to update other users")) + + # Admin is trying to edit themselves + if any(getattr(current_user, p) != getattr(new_data, p) for p in permission_attrs): + # prevent an admin from excalating their own permissions + raise HTTPException( + status.HTTP_403_FORBIDDEN, ErrorResponse.respond("Admins can't change their own permissions") + ) diff --git a/mealie/routes/users/crud.py b/mealie/routes/users/crud.py index bdec3a20cbb..a246b6fa3fa 100644 --- a/mealie/routes/users/crud.py +++ b/mealie/routes/users/crud.py @@ -46,13 +46,6 @@ def get_user(self, item_id: UUID4): @admin_router.delete("/{item_id}") def delete_user(self, item_id: UUID4): - """Removes a user from the database. Must be the current user or a super user""" - - assert_user_change_allowed(item_id, self.user) - - if item_id == 1: # TODO: identify super_user - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="SUPER_USER") - self.mixins.delete_one(item_id) @@ -106,19 +99,7 @@ def update_password(self, password_change: ChangePassword): @user_router.put("/{item_id}") def update_user(self, item_id: UUID4, new_data: UserBase): - assert_user_change_allowed(item_id, self.user) - - if not self.user.admin and (new_data.admin or self.user.group != new_data.group): - # prevent a regular user from doing admin tasks on themself - raise HTTPException( - status.HTTP_403_FORBIDDEN, ErrorResponse.respond("User doesn't have permission to change group") - ) - - if self.user.id == item_id and self.user.admin and not new_data.admin: - # prevent an admin from demoting themself - raise HTTPException( - status.HTTP_403_FORBIDDEN, ErrorResponse.respond("User doesn't have permission to change group") - ) + assert_user_change_allowed(item_id, self.user, new_data) try: self.repos.users.update(item_id, new_data.model_dump()) diff --git a/mealie/routes/users/images.py b/mealie/routes/users/images.py index e3d54e8d16b..b472e4b1ff4 100644 --- a/mealie/routes/users/images.py +++ b/mealie/routes/users/images.py @@ -23,7 +23,7 @@ def update_user_image( ): """Updates a User Image""" with get_temporary_path() as temp_path: - assert_user_change_allowed(id, self.user) + assert_user_change_allowed(id, self.user, self.user) temp_img = temp_path.joinpath(profile.filename) with temp_img.open("wb") as buffer: diff --git a/mealie/routes/users/ratings.py b/mealie/routes/users/ratings.py index c378499994e..16adc99e5ef 100644 --- a/mealie/routes/users/ratings.py +++ b/mealie/routes/users/ratings.py @@ -54,7 +54,7 @@ async def get_favorites(self, id: UUID4): @router.post("/{id}/ratings/{slug}") def set_rating(self, id: UUID4, slug: str, data: UserRatingUpdate): """Sets the user's rating for a recipe""" - assert_user_change_allowed(id, self.user) + assert_user_change_allowed(id, self.user, self.user) recipe = self.get_recipe_or_404(slug) user_rating = self.repos.user_ratings.get_by_user_and_recipe(id, recipe.id) diff --git a/tests/integration_tests/user_household_tests/test_household_permissions.py b/tests/integration_tests/user_household_tests/test_household_permissions.py index f467a6d7672..b18b3aa25d2 100644 --- a/tests/integration_tests/user_household_tests/test_household_permissions.py +++ b/tests/integration_tests/user_household_tests/test_household_permissions.py @@ -86,3 +86,16 @@ def test_set_member_permissions_no_user( payload = get_permissions_payload(str(uuid4())) response = api_client.put(api_routes.households_permissions, json=payload, headers=unique_user.token) assert response.status_code == 404 + + +def test_set_own_permissions(api_client: TestClient, unique_user: TestUser): + database = unique_user.repos + + user = database.users.get_one(unique_user.user_id) + assert user + user.can_manage = True + database.users.update(user.id, user) + + form = {"user_id": str(unique_user.user_id), "canOrganize": not user.can_organize} + response = api_client.put(api_routes.households_permissions, json=form, headers=unique_user.token) + assert response.status_code == 403 diff --git a/tests/integration_tests/user_tests/test_user_crud.py b/tests/integration_tests/user_tests/test_user_crud.py index e2c1d3f2b3b..9d4a60e2840 100644 --- a/tests/integration_tests/user_tests/test_user_crud.py +++ b/tests/integration_tests/user_tests/test_user_crud.py @@ -1,8 +1,9 @@ import pytest from fastapi.testclient import TestClient -from tests.utils import TestUser, api_routes +from tests.utils import api_routes from tests.utils.factories import random_email, random_int, random_string +from tests.utils.fixture_schemas import TestUser @pytest.mark.parametrize("use_admin_user", [True, False]) @@ -43,3 +44,71 @@ def test_get_all_users_admin(request: pytest.FixtureRequest, api_client: TestCli response_user_ids = {user["id"] for user in response.json()["items"]} for user_id in user_ids: assert user_id in response_user_ids + + +def test_user_update(api_client: TestClient, unique_user: TestUser, admin_user: TestUser): + response = api_client.get(api_routes.users_self, headers=unique_user.token) + user = response.json() + + # valid request without updates + response = api_client.put(api_routes.users_item_id(unique_user.user_id), json=user, headers=unique_user.token) + assert response.status_code == 200 + + # valid request with updates + tmp_user = user.copy() + tmp_user["email"] = random_email() + tmp_user["full_name"] = random_string() + response = api_client.put(api_routes.users_item_id(unique_user.user_id), json=tmp_user, headers=unique_user.token) + assert response.status_code == 200 + + # test user attempting to update another user + form = {"email": admin_user.email, "full_name": admin_user.full_name} + response = api_client.put(api_routes.users_item_id(admin_user.user_id), json=form, headers=unique_user.token) + assert response.status_code == 403 + + # test user attempting permission changes + permissions = ["canInvite", "canManage", "canManageHousehold", "canOrganize", "advanced", "admin"] + for permission in permissions: + tmp_user = user.copy() + tmp_user[permission] = not user[permission] + response = api_client.put(api_routes.users_item_id(unique_user.user_id), json=form, headers=unique_user.token) + assert response.status_code == 403 + + # test user attempting to change group + tmp_user = user.copy() + tmp_user["group"] = random_string() + response = api_client.put(api_routes.users_item_id(unique_user.user_id), json=tmp_user, headers=unique_user.token) + assert response.status_code == 403 + + # test user attempting to change household + tmp_user = user.copy() + tmp_user["household"] = random_string() + response = api_client.put(api_routes.users_item_id(unique_user.user_id), json=tmp_user, headers=unique_user.token) + assert response.status_code == 403 + + +def test_admin_updates(api_client: TestClient, admin_user: TestUser, unique_user: TestUser): + response = api_client.get(api_routes.users_item_id(unique_user.user_id), headers=admin_user.token) + user = response.json() + response = api_client.get(api_routes.users_item_id(admin_user.user_id), headers=admin_user.token) + admin = response.json() + + # admin updating themselves + tmp_user = admin.copy() + tmp_user["fullName"] = random_string() + response = api_client.put(api_routes.users_item_id(admin_user.user_id), json=tmp_user, headers=admin_user.token) + assert response.status_code == 200 + + # admin updating another user via the normal user route + tmp_user = user.copy() + tmp_user["fullName"] = random_string() + response = api_client.put(api_routes.users_item_id(unique_user.user_id), json=tmp_user, headers=admin_user.token) + assert response.status_code == 403 + + # admin updating their own permissions + permissions = ["canInvite", "canManage", "canManageHousehold", "canOrganize", "admin"] + for permission in permissions: + tmp_user = admin.copy() + tmp_user[permission] = not admin[permission] + response = api_client.put(api_routes.users_item_id(admin_user.user_id), json=tmp_user, headers=admin_user.token) + assert response.status_code == 403