From de881dab15c87bdef77ec0808be3ccc1dca7dd9e Mon Sep 17 00:00:00 2001 From: Kartikey Sharma <36632816+KartikeySharma@users.noreply.github.com> Date: Tue, 10 Dec 2024 23:05:35 -0500 Subject: [PATCH] #248 Add UTs (#252) --- .travis.yml | 2 +- src/accounts/tests.py | 121 ++--- src/moderation/tests.py | 714 ++++++++++++++++++++++++++++- src/public_service_finder/tests.py | 218 ++++++++- src/services/repositories.py | 28 +- src/services/tests.py | 191 +++++++- 6 files changed, 1175 insertions(+), 99 deletions(-) diff --git a/.travis.yml b/.travis.yml index 9f0b5ac..63f89a1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,7 +12,7 @@ script: - python src/manage.py makemigrations --check --dry-run - black . --check - flake8 . - - PYTHONPATH=. coverage run --source='.' src/manage.py test accounts home public_service_finder services forum + - PYTHONPATH=. coverage run --source='.' src/manage.py test accounts home public_service_finder services forum moderation after_success: - coveralls diff --git a/src/accounts/tests.py b/src/accounts/tests.py index 1964bf1..7e7398e 100644 --- a/src/accounts/tests.py +++ b/src/accounts/tests.py @@ -1,15 +1,16 @@ from unittest.mock import patch -from accounts.backends import EmailOrUsernameBackend -from accounts.forms import ServiceProviderLoginForm, UserLoginForm, UserRegisterForm -from accounts.models import CustomUser -from django.contrib.auth import authenticate -from django.contrib.auth import get_user_model +from django.contrib.auth import authenticate, get_user_model from django.test import TestCase, Client, RequestFactory from django.urls import reverse +from accounts.backends import EmailOrUsernameBackend +from accounts.forms import ServiceProviderLoginForm, UserLoginForm, UserRegisterForm +from accounts.models import CustomUser + +User = get_user_model() + -# ---------- Model Tests ---------- class CustomUserModelTest(TestCase): def setUp(self): self.user = CustomUser.objects.create_user( @@ -60,7 +61,6 @@ def test_invalid_form_password_mismatch(self): # ---------- View Tests ---------- class RegisterViewTest(TestCase): - def test_register_view_post_valid_data(self): """Test POST request with valid data to the registration page.""" response = self.client.post( @@ -120,6 +120,39 @@ def test_login_view_post_valid(self, mock_fetch_items): self.assertRedirects(response, reverse("home")) +class ServiceProviderLoginViewTest(TestCase): + def setUp(self): + self.client = Client() + self.sp_user = CustomUser.objects.create_user( + username="spuser", + email="spuser@example.com", + password="Testpassword123!", + user_type="service_provider", + ) + + def test_service_provider_login_valid(self): + """Test service provider login with valid credentials.""" + response = self.client.post( + reverse("service_provider_login"), + {"email": "spuser@example.com", "password": "Testpassword123!"}, + ) + self.assertEqual(response.status_code, 302) + # Assuming "services:list" is the redirect for service providers: + # Update to your actual service provider dashboard URL if different + self.assertRedirects(response, reverse("services:list")) + + def test_service_provider_login_invalid(self): + """Test service provider login with invalid credentials.""" + response = self.client.post( + reverse("service_provider_login"), + {"email": "wrong@example.com", "password": "wrongpassword"}, + ) + self.assertEqual(response.status_code, 200) + form = response.context.get("form") + self.assertFalse(form.is_valid()) + self.assertIn("Invalid email or password.", form.errors["__all__"]) + + class LogoutViewTest(TestCase): def setUp(self): self.user = CustomUser.objects.create_user( @@ -189,18 +222,6 @@ def test_user_redirects_to_home(self, mock_fetch_items): self.assertEqual(response.status_code, 302) self.assertRedirects(response, reverse("home")) - # def test_service_provider_redirects_to_dashboard(self): - # """Test if a service provider is redirected to the dashboard after registration.""" - # response = self.client.post(reverse("register"), { - # "username": "provider1", - # "email": "provider1@example.com", - # "password1": "Testpassword123!", - # "password2": "Testpassword123!", - # "user_type": "service_provider" - # }) - # self.assertEqual(response.status_code, 302) - # self.assertRedirects(response, reverse("service_provider_dashboard")) - class EmptyRegisterFormTest(TestCase): def test_register_view_post_empty_data(self): @@ -239,22 +260,6 @@ def setUp(self): user_type="user", ) - # def test_register_duplicate_email(self): - # """Test registration with a duplicate email.""" - # response = self.client.post( - # reverse("register"), - # { - # "username": "newuser", - # "email": "duplicate@example.com", - # "password1": "NewPassword123!", - # "password2": "NewPassword123!", - # "user_type": "user", - # }, - # ) - # form = response.context.get("form") - # self.assertIsNotNone(form) - # self.assertFalse(form.is_valid()) - class EmailOrUsernameBackendTest(TestCase): def setUp(self): @@ -319,7 +324,7 @@ def setUp(self): password="testpassword", user_type="service_provider", ) - # Create a user with type "user" and corresponding ServiceSeeker profile + # Create a user with type "user" self.service_seeker_user = CustomUser.objects.create_user( username="seeker", email="seeker@example.com", @@ -336,41 +341,13 @@ def test_profile_view_service_provider(self): def test_profile_view_service_seeker_get(self): """Test that the profile view renders correctly for a user with type 'user'.""" - # Log in as service seeker self.client.login(username="seeker", password="testpassword") - response = self.client.get(reverse("profile_view")) - self.assertEqual(response.status_code, 200) self.assertTemplateUsed(response, "profile_base.html") - - # def test_profile_view_service_seeker_post(self): - # """Test that the profile view updates profile information on a POST request.""" - # # Log in as service seeker - # self.client.login(username="seeker", password="testpassword") - - # # Define new data for the form submission - # form_data = { - # "location_preference": "New Location Value", # Assuming these are IDs of bookmarked services - # } - # response = self.client.post( - # reverse("accounts:profile_view"), - # data=json.dumps(form_data), - # content_type="application/json" # Set JSON content type - # ) - - # # Reload the service seeker profile from the database - # self.service_seeker.refresh_from_db() - - # # Assert redirection and form data update - # self.assertEqual(response.status_code, 302) - # # self.assertRedirects(response, reverse("accounts:profile_view")) - # self.assertEqual(self.service_seeker.location_preference, "New Location Value") - # self.assertQuerysetEqual( - # self.service_seeker.bookmarked_services.all(), - # [1, 2], # Replace with actual objects or IDs you expect - # transform=lambda x: x.id # If you use IDs - # ) + self.assertNotContains( + response, "is_service_provider" + ) # since user is not provider class UserRegisterFormEdgeCaseTest(TestCase): @@ -432,9 +409,9 @@ def test_login_with_username(self): def test_login_with_email(self): """Test login with a valid email and password.""" - request = self.factory.post("/login/") # Simulate a POST request + request = self.factory.post("/login/") form_data = {"username": "testuser@example.com", "password": "ValidPass123!"} - form = UserLoginForm(data=form_data, request=request) # Pass the request object + form = UserLoginForm(data=form_data, request=request) self.assertFalse(form.is_valid()) def test_login_with_invalid_email_or_username(self): @@ -528,7 +505,7 @@ def test_authenticate_with_none_password(self): self.assertIsNone(user) def test_user_not_found_by_username(self): - """Test backend tries email authentication if username doesn't exist.""" + """Test backend tries email if username doesn't exist.""" request = self.factory.post("/login/") user = authenticate( request=request, username="nonexistent", password="TestPass123!" @@ -536,7 +513,7 @@ def test_user_not_found_by_username(self): self.assertIsNone(user) def test_user_not_found_by_username_or_email(self): - """Test authenticate() returns None if both username and email don't exist.""" + """Test returns None if both username and email don't exist.""" request = self.factory.post("/login/") user = authenticate( request=request, username="wrongemail@example.com", password="TestPass123!" @@ -551,7 +528,7 @@ def test_authenticate_with_invalid_password(self): def test_authenticate_with_inactive_user(self): """Test that an inactive user cannot authenticate.""" - self.user.is_active = False # Set user as inactive + self.user.is_active = False self.user.save() request = self.factory.post("/login/") diff --git a/src/moderation/tests.py b/src/moderation/tests.py index f9723b4..7686959 100644 --- a/src/moderation/tests.py +++ b/src/moderation/tests.py @@ -1,13 +1,18 @@ import json +import uuid from datetime import datetime from unittest.mock import patch, MagicMock from django.contrib.auth.models import AnonymousUser from django.test import TestCase, Client from django.urls import reverse +from django.db import IntegrityError -from forum.models import Post +from accounts.models import CustomUser from services.models import ReviewDTO +from forum.models import Post, Comment, Category, Notification +from home.repositories import HomeRepository +from moderation.models import Flag class ModerationViewsTest(TestCase): @@ -141,3 +146,710 @@ def test_review_flag_unauthorized(self, mock_flag_get): ) self.assertEqual(response.status_code, 302) # Should redirect + + +class FlagModelTest(TestCase): + """ + Tests for the Flag model to ensure coverage of model logic, + including get_content_object, save, clean, unique constraints, + and string representation. + """ + + def setUp(self): + self.flagger = CustomUser.objects.create_user( + username="flaggeruser", email="flagger@example.com", password="testpass" + ) + self.author = CustomUser.objects.create_user( + username="authoruser", email="author@example.com", password="authorpass" + ) + + self.category = Category.objects.create(name="Test Category") + + self.post = Post.objects.create( + title="Sample Post", + content="This is a sample forum post.", + author=self.author, + category=self.category, + ) + + self.comment = Comment.objects.create( + content="This is a sample comment.", author=self.author, post=self.post + ) + + self.review_id = str(uuid.uuid4()) + + # Provide all required fields for ReviewDTO. + self.mock_review_dto = ReviewDTO( + review_id=self.review_id, + service_id="test_service", + user_id=str(self.flagger.id), + username=self.author.username, + rating_stars=5, # Use an int here + rating_message="Excellent service", + timestamp=datetime.now().isoformat(), + responseText="", + responded_at="", + ) + + @patch("services.repositories.ReviewRepository.get_review") + def test_flag_for_post(self, mock_get_review): + # For Posts, no call to ReviewRepository should be made + flag = Flag.objects.create( + content_type="FORUM POST", + object_id=self.post.id, + flagger=self.flagger, + reason="SPAM", + ) + self.assertEqual(flag.content_title, self.post.title) + self.assertEqual(flag.content_preview, self.post.content) + self.assertEqual(flag.content_author, self.author.username) + self.assertIsNone(flag.content_rating) + mock_get_review.assert_not_called() + + @patch("services.repositories.ReviewRepository.get_review") + def test_flag_for_comment(self, mock_get_review): + # For Comments, no call to ReviewRepository + flag = Flag.objects.create( + content_type="FORUM COMMENT", + object_id=self.comment.id, + flagger=self.flagger, + reason="OFFENSIVE", + ) + self.assertEqual(flag.content_preview, self.comment.content) + self.assertEqual(flag.content_author, self.author.username) + self.assertIsNone(flag.content_rating) + mock_get_review.assert_not_called() + + @patch("services.repositories.ReviewRepository.get_review") + def test_flag_for_review(self, mock_get_review): + mock_get_review.return_value = self.mock_review_dto + flag = Flag.objects.create( + content_type="REVIEW", + object_id=self.review_id, + flagger=self.flagger, + reason="SPAM", + ) + self.assertEqual(flag.content_preview, self.mock_review_dto.rating_message) + self.assertEqual(flag.content_author, self.mock_review_dto.username) + self.assertEqual(flag.content_rating, self.mock_review_dto.rating_stars) + mock_get_review.assert_called_once_with(self.review_id) + + def test_get_content_object_post(self): + flag = Flag.objects.create( + content_type="FORUM POST", + object_id=self.post.id, + flagger=self.flagger, + reason="SPAM", + ) + obj = flag.get_content_object() + self.assertEqual(obj, self.post) + + def test_get_content_object_comment(self): + flag = Flag.objects.create( + content_type="FORUM COMMENT", + object_id=self.comment.id, + flagger=self.flagger, + reason="OFFENSIVE", + ) + obj = flag.get_content_object() + self.assertEqual(obj, self.comment) + + @patch("services.repositories.ReviewRepository.get_review") + def test_get_content_object_review(self, mock_get_review): + mock_get_review.return_value = self.mock_review_dto + flag = Flag.objects.create( + content_type="REVIEW", + object_id=self.review_id, + flagger=self.flagger, + reason="SPAM", + ) + obj = flag.get_content_object() + self.assertEqual(obj, self.mock_review_dto) + + def test_get_content_object_nonexistent_post(self): + flag = Flag( + content_type="FORUM POST", + object_id="999999", + flagger=self.flagger, + reason="OTHER", + ) + self.assertIsNone(flag.get_content_object()) + + def test_object_id_always_string(self): + flag = Flag.objects.create( + content_type="FORUM POST", + object_id=12345, + flagger=self.flagger, + reason="SPAM", + ) + self.assertIsInstance(flag.object_id, str) + self.assertEqual(flag.object_id, "12345") + + def test_unique_constraint(self): + Flag.objects.create( + content_type="FORUM POST", + object_id=self.post.id, + flagger=self.flagger, + reason="SPAM", + ) + # Attempting to create the same flag again should fail + with self.assertRaises(IntegrityError): + Flag.objects.create( + content_type="FORUM POST", + object_id=self.post.id, + flagger=self.flagger, + reason="SPAM", + ) + + def test_ordering(self): + f1 = Flag.objects.create( + content_type="FORUM POST", + object_id=self.post.id, + flagger=self.flagger, + reason="SPAM", + ) + f2 = Flag.objects.create( + content_type="FORUM COMMENT", + object_id=self.comment.id, + flagger=self.flagger, + reason="OFFENSIVE", + ) + flags = list(Flag.objects.all()) + # f2 created after f1, so should appear first due to "-created_at" ordering + self.assertEqual(flags[0], f2) + self.assertEqual(flags[1], f1) + + def test_str_representation(self): + flag = Flag.objects.create( + content_type="FORUM POST", + object_id=self.post.id, + flagger=self.flagger, + reason="SPAM", + ) + expected_str = f"Flag by {self.flagger.username} on FORUM POST (PENDING)" + self.assertEqual(str(flag), expected_str) + + +class ModerationViewsAdditionalTest(TestCase): + """ + Additional tests for moderation views including: + - Creating flags for different content types + - Reviewing flags as admin + - Checking flag status + """ + + def setUp(self): + """Set up test environment before each test""" + self.client = Client() + + # Create regular user and admin user + self.regular_user = CustomUser.objects.create_user( + username="regularuser", email="regular@example.com", password="regularpass" + ) + self.admin_user = CustomUser.objects.create_user( + username="adminuser", + email="admin@example.com", + password="adminpass", + is_superuser=True, + ) + + # Ensure regular_user is not a superuser + self.regular_user.is_superuser = False + self.regular_user.save() + + # Create category, post, and comment + self.category = Category.objects.create(name="General") + self.post = Post.objects.create( + title="Test Post", + content="Content of the test post.", + author=self.regular_user, + category=self.category, + ) + self.comment = Comment.objects.create( + content="Test comment.", author=self.regular_user, post=self.post + ) + + # Create a ReviewDTO instance + self.review_id = str(uuid.uuid4()) + self.review_dto = ReviewDTO( + review_id=self.review_id, + service_id="service_test", + user_id=str(self.regular_user.id), + username=self.regular_user.username, + rating_stars=4, + rating_message="Good service.", + timestamp=datetime.now().isoformat(), + responseText="", + responded_at="", + ) + + def login_regular_user(self): + """Helper method to log in as regular user""" + self.client.login(username="regularuser", password="regularpass") + + def login_admin_user(self): + """Helper method to log in as admin user""" + self.client.login(username="adminuser", password="adminpass") + + @patch("services.repositories.ReviewRepository.get_review") + def test_create_flag_post_success(self, mock_get_review): + """Test successful flag creation for a forum post""" + mock_get_review.return_value = None # Not used for posts + + self.login_regular_user() + + response = self.client.post( + reverse("moderation:create_flag"), + { + "content_type": "FORUM POST", + "object_id": self.post.id, + "reason": "SPAM", + "explanation": "This post is spam.", + }, + ) + + self.assertEqual(response.status_code, 200) + data = json.loads(response.content) + self.assertTrue(data["success"]) + self.assertEqual(data["message"], "Content has been flagged for review") + + # Verify the flag was created + flag = Flag.objects.get( + content_type="FORUM POST", object_id=self.post.id, flagger=self.regular_user + ) + self.assertEqual(flag.reason, "SPAM") + self.assertEqual(flag.explanation, "This post is spam.") + + # Verify notifications were created for admins + notifications = Notification.objects.filter( + recipient=self.admin_user, notification_type="flag_admin", is_read=False + ) + self.assertTrue(notifications.exists()) + + @patch("services.repositories.ReviewRepository.get_review") + def test_create_flag_comment_success(self, mock_get_review): + """Test successful flag creation for a forum comment""" + mock_get_review.return_value = None # Not used for comments + + self.login_regular_user() + + response = self.client.post( + reverse("moderation:create_flag"), + { + "content_type": "FORUM COMMENT", + "object_id": self.comment.id, + "reason": "OFFENSIVE", + "explanation": "This comment is offensive.", + }, + ) + + self.assertEqual(response.status_code, 200) + data = json.loads(response.content) + self.assertTrue(data["success"]) + self.assertEqual(data["message"], "Content has been flagged for review") + + # Verify the flag was created + flag = Flag.objects.get( + content_type="FORUM COMMENT", + object_id=self.comment.id, + flagger=self.regular_user, + ) + self.assertEqual(flag.reason, "OFFENSIVE") + self.assertEqual(flag.explanation, "This comment is offensive.") + + # Verify notifications were created for admins + notifications = Notification.objects.filter( + recipient=self.admin_user, notification_type="flag_admin", is_read=False + ) + self.assertTrue(notifications.exists()) + + @patch("services.repositories.ReviewRepository.get_review") + def test_create_flag_review_success(self, mock_get_review): + """Test successful flag creation for a review""" + mock_get_review.return_value = self.review_dto + + self.login_regular_user() + + response = self.client.post( + reverse("moderation:create_flag"), + { + "content_type": "REVIEW", + "object_id": self.review_id, + "reason": "MISINFORMATION", + "explanation": "This review contains false information.", + }, + ) + + self.assertEqual(response.status_code, 200) + data = json.loads(response.content) + self.assertTrue(data["success"]) + self.assertEqual(data["message"], "Content has been flagged for review") + + # Verify the flag was created + flag = Flag.objects.get( + content_type="REVIEW", object_id=self.review_id, flagger=self.regular_user + ) + self.assertEqual(flag.reason, "MISINFORMATION") + self.assertEqual(flag.explanation, "This review contains false information.") + self.assertEqual(flag.content_rating, self.review_dto.rating_stars) + + # Verify notifications were created for admins + notifications = Notification.objects.filter( + recipient=self.admin_user, notification_type="flag_admin", is_read=False + ) + self.assertTrue(notifications.exists()) + + def test_create_flag_invalid_content_type(self): + """Test flag creation with invalid content type""" + self.login_regular_user() + + response = self.client.post( + reverse("moderation:create_flag"), + { + "content_type": "INVALID_TYPE", + "object_id": "test123", + "reason": "SPAM", + "explanation": "Invalid content type test.", + }, + ) + + self.assertEqual(response.status_code, 400) + data = json.loads(response.content) + self.assertIn("Invalid content type", data["error"]) + + @patch("moderation.views.Post.objects.get") + def test_create_flag_nonexistent_post(self, mock_post_get): + """Test flag creation for a nonexistent forum post""" + mock_post_get.side_effect = Post.DoesNotExist + + self.login_regular_user() + + response = self.client.post( + reverse("moderation:create_flag"), + { + "content_type": "FORUM POST", + "object_id": "nonexistent_post", + "reason": "SPAM", + "explanation": "Nonexistent post test.", + }, + ) + + self.assertEqual(response.status_code, 404) + data = json.loads(response.content) + self.assertIn("Post not found", data["error"]) + + @patch("services.repositories.ReviewRepository.get_review") + def test_create_flag_nonexistent_review(self, mock_get_review): + """Test flag creation for a nonexistent review""" + mock_get_review.return_value = None + + self.login_regular_user() + + response = self.client.post( + reverse("moderation:create_flag"), + { + "content_type": "REVIEW", + "object_id": "nonexistent_review", + "reason": "SPAM", + "explanation": "Nonexistent review test.", + }, + ) + + self.assertEqual(response.status_code, 404) + data = json.loads(response.content) + self.assertIn("Review not found", data["error"]) + + def test_create_flag_duplicate_flag(self): + """Test that a user cannot flag the same content twice""" + # First flag creation + Flag.objects.create( + content_type="FORUM POST", + object_id=self.post.id, + flagger=self.regular_user, + reason="SPAM", + explanation="First flag.", + ) + + self.login_regular_user() + + # Attempt to create the same flag again + response = self.client.post( + reverse("moderation:create_flag"), + { + "content_type": "FORUM POST", + "object_id": self.post.id, + "reason": "SPAM", + "explanation": "Second flag attempt.", + }, + ) + + self.assertEqual(response.status_code, 400) + data = json.loads(response.content) + self.assertIn("You have already flagged this content", data["error"]) + + def test_create_flag_unauthenticated(self): + """Test that unauthenticated users cannot create flags""" + response = self.client.post( + reverse("moderation:create_flag"), + { + "content_type": "FORUM POST", + "object_id": self.post.id, + "reason": "SPAM", + "explanation": "Unauthenticated flag attempt.", + }, + ) + + self.assertEqual(response.status_code, 302) # Redirect to login + self.assertTrue("/login/" in response.url) + + @patch("services.repositories.ReviewRepository.get_review") + def test_review_flag_dismiss(self, mock_get_review): + """Test admin dismissing a flag""" + mock_get_review.return_value = self.review_dto + + # Create a flag + flag = Flag.objects.create( + content_type="REVIEW", + object_id=self.review_id, + flagger=self.regular_user, + reason="SPAM", + explanation="Flag to dismiss.", + ) + + self.login_admin_user() + + response = self.client.post( + reverse("moderation:review_flag", args=[flag.id]), {"action": "dismiss"} + ) + + self.assertEqual(response.status_code, 200) + data = json.loads(response.content) + self.assertTrue(data["success"]) + self.assertEqual(data["status"], "DISMISSED") + + # Refresh flag from DB + flag.refresh_from_db() + self.assertEqual(flag.status, "DISMISSED") + self.assertEqual(flag.reviewed_by, self.admin_user) + + # Verify notification to flagger + notification = Notification.objects.get( + recipient=self.regular_user, + notification_type="flag_reviewed", + message="Your flag has been reviewed and dismissed", + ) + self.assertIsNotNone(notification) + + @patch("services.repositories.ReviewRepository.get_review") + def test_review_flag_revoke(self, mock_get_review): + """Test admin revoking a flag and deleting the content""" + mock_get_review.return_value = self.review_dto + + # Create a flag + flag = Flag.objects.create( + content_type="REVIEW", + object_id=self.review_id, + flagger=self.regular_user, + reason="SPAM", + explanation="Flag to revoke.", + ) + + self.login_admin_user() + + # Mock HomeRepository.delete_review + with patch.object(HomeRepository, "delete_review") as mock_delete_review: + response = self.client.post( + reverse("moderation:review_flag", args=[flag.id]), {"action": "revoke"} + ) + + self.assertEqual(response.status_code, 200) + data = json.loads(response.content) + self.assertTrue(data["success"]) + self.assertEqual(data["status"], "REVOKED") + + # Refresh flag from DB + flag.refresh_from_db() + self.assertEqual(flag.status, "REVOKED") + self.assertEqual(flag.reviewed_by, self.admin_user) + + # Verify content is deleted + # Assuming HomeRepository.delete_review deletes the review + mock_delete_review.assert_called_once_with(self.review_id) + + # Verify notification to flagger + notification = Notification.objects.get( + recipient=self.regular_user, + notification_type="flag_reviewed", + message="Your flag has been reviewed and accepted", + ) + self.assertIsNotNone(notification) + + def test_review_flag_invalid_action(self): + """Test admin reviewing a flag with an invalid action""" + # Create a flag + flag = Flag.objects.create( + content_type="FORUM POST", + object_id=self.post.id, + flagger=self.regular_user, + reason="SPAM", + explanation="Flag with invalid action.", + ) + + self.login_admin_user() + + response = self.client.post( + reverse("moderation:review_flag", args=[flag.id]), + {"action": "invalid_action"}, + ) + + self.assertEqual(response.status_code, 400) + data = json.loads(response.content) + self.assertIn("Invalid action", data["error"]) + + def test_review_flag_invalid_flag_id(self): + """Test admin reviewing a nonexistent flag""" + self.login_admin_user() + + response = self.client.post( + reverse("moderation:review_flag", args=[999]), # Assuming 999 doesn't exist + {"action": "dismiss"}, + ) + + self.assertEqual(response.status_code, 404) + # Forbidden + + def test_review_flag_unauthenticated(self): + """Test that unauthenticated users cannot review flags""" + # Create a flag + flag = Flag.objects.create( + content_type="FORUM POST", + object_id=self.post.id, + flagger=self.regular_user, + reason="SPAM", + explanation="Flag by unauthenticated user.", + ) + + response = self.client.post( + reverse("moderation:review_flag", args=[flag.id]), {"action": "dismiss"} + ) + + self.assertEqual(response.status_code, 302) # Redirect to login + self.assertTrue("/login/" in response.url) + + def test_check_flag_status_user_has_flagged(self): + """Test checking flag status when user has flagged the content""" + # Create a flag + Flag.objects.create( + content_type="FORUM POST", + object_id=self.post.id, + flagger=self.regular_user, + reason="SPAM", + explanation="User has flagged this post.", + ) + + self.login_regular_user() + + response = self.client.get( + reverse( + "moderation:check_flag_status", + kwargs={ + "content_type": "FORUM POST", + "object_id": self.post.id, + }, + ) + ) + + self.assertEqual(response.status_code, 200) + data = json.loads(response.content) + self.assertTrue(data["userHasFlagged"]) + self.assertTrue(data["hasPendingFlags"]) + self.assertEqual(data["pendingFlagsCount"], 1) + + def test_check_flag_status_user_has_not_flagged(self): + """Test checking flag status when user has not flagged the content""" + # Create a flag by another user + other_user = CustomUser.objects.create_user( + username="otheruser", email="other@example.com", password="otherpass" + ) + Flag.objects.create( + content_type="FORUM POST", + object_id=self.post.id, + flagger=other_user, + reason="SPAM", + explanation="Another user has flagged this post.", + ) + + self.login_regular_user() + + response = self.client.get( + reverse( + "moderation:check_flag_status", + kwargs={ + "content_type": "FORUM POST", + "object_id": self.post.id, + }, + ) + ) + + self.assertEqual(response.status_code, 200) + data = json.loads(response.content) + self.assertFalse(data["userHasFlagged"]) + self.assertTrue(data["hasPendingFlags"]) + self.assertEqual(data["pendingFlagsCount"], 1) + + def test_check_flag_status_no_pending_flags(self): + """Test checking flag status when there are no pending flags""" + self.login_regular_user() + + response = self.client.get( + reverse( + "moderation:check_flag_status", + kwargs={ + "content_type": "FORUM POST", + "object_id": self.post.id, + }, + ) + ) + + self.assertEqual(response.status_code, 200) + data = json.loads(response.content) + self.assertFalse(data["userHasFlagged"]) + self.assertFalse(data["hasPendingFlags"]) + self.assertEqual(data["pendingFlagsCount"], 0) + + def test_check_flag_status_invalid_content_type(self): + """Test checking flag status with an invalid content type""" + self.login_regular_user() + + response = self.client.get( + reverse( + "moderation:check_flag_status", + kwargs={ + "content_type": "INVALID_TYPE", + "object_id": "test123", + }, + ) + ) + + # Depending on implementation, this might return an error or simply false + # Here, assuming it returns a 200 with flags info + self.assertEqual(response.status_code, 200) + data = json.loads(response.content) + self.assertFalse(data["userHasFlagged"]) + self.assertFalse(data["hasPendingFlags"]) + self.assertEqual(data["pendingFlagsCount"], 0) + + def test_check_flag_status_unauthenticated(self): + """Test that unauthenticated users cannot check flag status""" + response = self.client.get( + reverse( + "moderation:check_flag_status", + kwargs={ + "content_type": "FORUM POST", + "object_id": self.post.id, + }, + ) + ) + + self.assertEqual(response.status_code, 302) # Redirect to login + self.assertTrue("/login/" in response.url) diff --git a/src/public_service_finder/tests.py b/src/public_service_finder/tests.py index aa4574c..4102746 100644 --- a/src/public_service_finder/tests.py +++ b/src/public_service_finder/tests.py @@ -1,17 +1,32 @@ +import uuid from unittest.mock import patch - -from allauth.socialaccount.models import SocialApp from django.test import TestCase, Client from django.urls import reverse +from django.contrib.auth import get_user_model +from allauth.socialaccount.models import SocialApp +from django.contrib.contenttypes.models import ContentType -from accounts.models import CustomUser +from moderation.models import Flag +from public_service_finder.utils.enums.service_status import ServiceStatus +from services.repositories import ServiceRepository + +User = get_user_model() class RootRedirectViewTest(TestCase): def setUp(self): self.client = Client() - self.user = CustomUser.objects.create_user( - username="testuser", password="testpass123" + self.user = User.objects.create_user( + username="testuser", + password="testpass123", + user_type="normal_user", + email="testuser@example.com", + ) + self.service_provider = User.objects.create_user( + username="serviceuser", + password="testpass123", + user_type="service_provider", + email="serviceuser@example.com", ) patcher = patch("allauth.socialaccount.models.SocialApp.objects.get") self.mock_get = patcher.start() @@ -20,7 +35,198 @@ def setUp(self): ) self.addCleanup(patcher.stop) - def test_root_redirect_authenticated(self): + def test_root_redirect_unauthenticated(self): + # Unauthenticated user should be redirected to "home" + response = self.client.get(reverse("root_redirect")) + self.assertRedirects(response, reverse("home")) + + def test_root_redirect_authenticated_normal_user(self): self.client.login(username="testuser", password="testpass123") response = self.client.get(reverse("root_redirect")) self.assertRedirects(response, reverse("home")) + + def test_root_redirect_authenticated_service_provider(self): + self.client.login(username="serviceuser", password="testpass123") + response = self.client.get(reverse("root_redirect")) + # For service_provider users, redirect to services:list + self.assertRedirects(response, reverse("services:list")) + + +class AdminOnlyViewNewListingsTest(TestCase): + def setUp(self): + self.client = Client() + self.superuser = User.objects.create_superuser( + username="adminuser", + password="adminpass123", + user_type="normal_user", + email="adminuser@example.com", + ) + self.regular_user = User.objects.create_user( + username="regularuser", + password="testpass123", + user_type="normal_user", + email="regularuser@example.com", + ) + + def test_admin_only_view_not_logged_in(self): + # Should redirect to login page due to @login_required + response = self.client.get(reverse("admin_only_view_new_listings")) + self.assertNotEqual(response.status_code, 200) + self.assertIn("/accounts/login/", response.url) + + def test_admin_only_view_non_superuser(self): + # Logged in as non-superuser should return 403 + self.client.login(username="regularuser", password="testpass123") + response = self.client.get(reverse("admin_only_view_new_listings")) + self.assertEqual(response.status_code, 403) + + @patch.object(ServiceRepository, "get_pending_approval_services", return_value=[]) + def test_admin_only_view_superuser_no_flags_no_services(self, mock_services): + # Logged in as superuser with no pending services and no flags + self.client.login(username="adminuser", password="adminpass123") + response = self.client.get(reverse("admin_only_view_new_listings")) + self.assertEqual(response.status_code, 200) + self.assertTemplateUsed(response, "admin_only.html") + self.assertEqual(len(response.context["pending_services"]), 0) + self.assertEqual(len(response.context["flags"]), 0) + + @patch.object( + ServiceRepository, + "get_pending_approval_services", + return_value=[{"id": str(uuid.uuid4()), "name": "Test Service"}], + ) + def test_admin_only_view_superuser_with_services(self, mock_services): + # Logged in as superuser with some pending services, but no flags + self.client.login(username="adminuser", password="adminpass123") + response = self.client.get(reverse("admin_only_view_new_listings")) + self.assertEqual(response.status_code, 200) + self.assertTemplateUsed(response, "admin_only.html") + self.assertEqual(len(response.context["pending_services"]), 1) + self.assertEqual(len(response.context["flags"]), 0) + + @patch.object(ServiceRepository, "get_pending_approval_services", return_value=[]) + def test_admin_only_view_superuser_with_flags(self, mock_services): + # Create some pending flags + self.client.login(username="adminuser", password="adminpass123") + flagger = User.objects.create_user( + username="flagger", password="flagpass", email="flagger@example.com" + ) + content_type = ContentType.objects.get_for_model(User) + + Flag.objects.create( + content_type=content_type, + object_id=flagger.id, + content_preview="Some preview", + content_title="Title", + content_rating=5, + content_author="Author", + created_at="2024-01-01T10:00:00Z", + flagger=flagger, + reason="Test reason", + explanation="Test explanation", + status="PENDING", + ) + + response = self.client.get(reverse("admin_only_view_new_listings")) + self.assertEqual(response.status_code, 200) + self.assertTemplateUsed(response, "admin_only.html") + self.assertEqual(len(response.context["flags"]), 1) + flag_group = response.context["flags"][0] + self.assertEqual(flag_group["content_preview"], "Some preview") + self.assertEqual(flag_group["flag_count"], 1) + self.assertEqual(len(flag_group["flags"]), 1) + + +class AdminUpdateListingTest(TestCase): + def setUp(self): + self.client = Client() + self.service_id = uuid.uuid4() # Use a UUID to match the URL pattern + self.superuser = User.objects.create_superuser( + username="adminuser", + password="adminpass123", + user_type="normal_user", + email="adminuser@example.com", + ) + self.regular_user = User.objects.create_user( + username="regularuser2", + password="testpass123", + user_type="normal_user", + email="regularuser2@example.com", + ) + + def test_admin_update_not_logged_in(self): + # Not logged in: should redirect to login + response = self.client.get( + reverse("admin_update_listing", args=[self.service_id]) + ) + self.assertNotEqual(response.status_code, 200) + self.assertIn("/accounts/login/", response.url) + + def test_admin_update_not_superuser(self): + # Logged in as non-superuser should return 403 on POST and GET + self.client.login(username="regularuser2", password="testpass123") + response = self.client.post( + reverse("admin_update_listing", args=[self.service_id]), + {"status": "approve"}, + ) + self.assertEqual(response.status_code, 403) + response = self.client.get( + reverse("admin_update_listing", args=[self.service_id]) + ) + self.assertEqual(response.status_code, 403) + + def test_admin_update_get_request_superuser(self): + # GET request should redirect back to listings page + self.client.login(username="adminuser", password="adminpass123") + response = self.client.get( + reverse("admin_update_listing", args=[self.service_id]) + ) + self.assertRedirects(response, reverse("admin_only_view_new_listings")) + + @patch.object(ServiceRepository, "update_service_status") + def test_admin_update_post_approve_superuser(self, mock_update): + self.client.login(username="adminuser", password="adminpass123") + response = self.client.post( + reverse("admin_update_listing", args=[self.service_id]), + {"status": "approve"}, + ) + self.assertRedirects(response, reverse("admin_only_view_new_listings")) + mock_update.assert_called_once_with( + self.service_id, ServiceStatus.APPROVED.value + ) + + @patch.object(ServiceRepository, "update_service_status") + def test_admin_update_post_reject_superuser(self, mock_update): + self.client.login(username="adminuser", password="adminpass123") + response = self.client.post( + reverse("admin_update_listing", args=[self.service_id]), + {"status": "reject"}, + ) + self.assertRedirects(response, reverse("admin_only_view_new_listings")) + mock_update.assert_called_once_with( + self.service_id, ServiceStatus.REJECTED.value + ) + + @patch.object(ServiceRepository, "update_service_status") + def test_admin_update_post_invalid_status_superuser(self, mock_update): + self.client.login(username="adminuser", password="adminpass123") + response = self.client.post( + reverse("admin_update_listing", args=[self.service_id]), + {"status": "invalid"}, + ) + self.assertRedirects(response, reverse("admin_only_view_new_listings")) + mock_update.assert_not_called() + + @patch.object( + ServiceRepository, + "update_service_status", + side_effect=Exception("Test Exception"), + ) + def test_admin_update_post_exception_handling(self, mock_update): + self.client.login(username="adminuser", password="adminpass123") + response = self.client.post( + reverse("admin_update_listing", args=[self.service_id]), + {"status": "approve"}, + ) + self.assertRedirects(response, reverse("admin_only_view_new_listings")) + mock_update.assert_called_once() diff --git a/src/services/repositories.py b/src/services/repositories.py index e325c42..b1c4abf 100644 --- a/src/services/repositories.py +++ b/src/services/repositories.py @@ -27,7 +27,7 @@ def create_service(self, service_dto: ServiceDTO): log.info(f"Persisted service: {item} to DynamoDB") return service_dto except ClientError as e: - log.error(f"Error creating service: {e.responseText['Error']['Message']}") + log.error(f"Error creating service: {e.response['Error']['Message']}") return None def get_services_by_provider(self, provider_id: int) -> list[ServiceDTO]: @@ -43,7 +43,7 @@ def get_services_by_provider(self, provider_id: int) -> list[ServiceDTO]: log.debug(f"Fetched {len(services)} services for provider {provider_id}") return services except ClientError as e: - log.error(f"Error fetching services: {e.responseText['Error']['Message']}") + log.error(f"Error fetching services: {e.response['Error']['Message']}") return [] def get_service(self, service_id: str) -> ServiceDTO | None: @@ -55,7 +55,7 @@ def get_service(self, service_id: str) -> ServiceDTO | None: return ServiceDTO.from_dynamodb_item(item) if item else None except ClientError as e: log.error( - f"Error fetching service {service_id}: {e.responseText['Error']['Message']}" + f"Error fetching service {service_id}: {e.response['Error']['Message']}" ) return None @@ -80,7 +80,7 @@ def update_service(self, service_dto: ServiceDTO) -> ServiceDTO | None: response = self.table.put_item(Item=item) return service_dto if response else None except ClientError as e: - print(e.response["Error"]["Message"]) + log.error(f"Error updating service: {e.response['Error']['Message']}") return None def delete_service(self, service_id: str) -> bool: @@ -88,23 +88,23 @@ def delete_service(self, service_id: str) -> bool: self.table.delete_item(Key={"Id": service_id}) return True except ClientError as e: - print(e.response["Error"]["Message"]) + log.error( + f"Error deleting service {service_id}: {e.response['Error']['Message']}" + ) return False def update_service_status(self, service_id: str, new_status: str) -> bool: try: - # Update service status service_id_str = str(service_id) response = self.table.update_item( Key={"Id": service_id_str}, UpdateExpression="SET ServiceStatus = :new_status", ExpressionAttributeValues={":new_status": new_status}, - ConditionExpression="attribute_exists(Id)", # Ensure item exists + ConditionExpression="attribute_exists(Id)", ReturnValues="UPDATED_NEW", ) - print("response: " + response) + print("response:", response) - # Logging successful update log.info( f"Updated ServiceStatus for service ID {service_id} to {new_status}" ) @@ -116,9 +116,7 @@ def update_service_status(self, service_id: str, new_status: str) -> bool: log.error( f"Error updating service status for ID {service_id}: {e.response['Error']['Message']}" ) - return False - except Exception as e: log.error( f"Unexpected error updating service status for ID {service_id}, exception: {e}" @@ -142,7 +140,7 @@ def get_review(self, review_id: str) -> ReviewDTO | None: return ReviewDTO.from_dynamodb_item(item) if item else None except ClientError as e: log.error( - f"Error fetching review {review_id}: {e.responseText['Error']['Message']}" + f"Error fetching review {review_id}: {e.response['Error']['Message']}" ) return None @@ -158,9 +156,7 @@ def respond_to_review(self, review_id: str, response_text: str) -> bool: self.table.update_item( Key={"ReviewId": review_id}, UpdateExpression="SET #response_text = :responseText, RespondedAt = :responded_at", - ExpressionAttributeNames={ - "#response_text": "ResponseText" - }, # Alias for reserved keyword + ExpressionAttributeNames={"#response_text": "ResponseText"}, ExpressionAttributeValues={ ":responseText": response_text, ":responded_at": current_time, @@ -178,7 +174,7 @@ def get_reviews_for_service(self, service_id: str) -> List[ReviewDTO]: """Retrieve all reviews for a given service ID.""" try: response = self.table.query( - IndexName="ServiceIdIndex", # Ensure this index exists in your DynamoDB table + IndexName="ServiceIdIndex", KeyConditionExpression=Key("ServiceId").eq(service_id), ) items = response.get("Items", []) diff --git a/src/services/tests.py b/src/services/tests.py index a1aad63..7d82c5e 100644 --- a/src/services/tests.py +++ b/src/services/tests.py @@ -1247,9 +1247,6 @@ def test_user_analytics_view_no_metrics(self, mock_compute_metrics): mock_compute_metrics.assert_called_once_with(str(self.service_provider.id)) -# tests.py (continued) - - class DTOModelTests(TestCase): def test_service_dto_from_dynamodb_item_with_service_status_prefix(self): # ServiceStatus with prefix @@ -1542,3 +1539,191 @@ def test_get_review_client_error(self, mock_boto_resource): review = self.review_repo.get_review(self.review_id) self.assertIsNone(review) # mock_table.get_item.assert_called_once_with(Key={"ReviewId": self.review_id}) + + +class ServiceRepositoryMoreTests(TestCase): + def setUp(self): + # Start patching + self.patcher = patch("services.repositories.boto3.resource") + self.mock_boto_resource = self.patcher.start() + self.mock_table = MagicMock() + self.mock_boto_resource.return_value.Table.return_value = self.mock_table + + self.service_repo = ServiceRepository() + self.service_id = str(uuid.uuid4()) + self.new_status = ServiceStatus.PENDING_APPROVAL.value + self.sample_service = ServiceDTO( + id=str(uuid.uuid4()), + name="Test Service", + address="123 Test St", + category="Mental Health Center", + provider_id="test_provider_id", + latitude=Decimal("40.7128"), + longitude=Decimal("-74.0060"), + ratings=Decimal("4.5"), + description={"hours": "9-5"}, + service_created_timestamp="2022-01-01T12:00:00Z", + service_status=ServiceStatus.APPROVED.value, + service_approved_timestamp="2022-01-01T12:00:00Z", + is_active=True, + ) + + def tearDown(self): + self.patcher.stop() + + def test_get_pending_approval_services_success(self): + self.mock_table.scan.return_value = { + "Items": [ + { + "Id": "pending-service-id", + "Name": "Pending Service", + "Address": "456 Pending Road", + "Category": "FOOD", + "ProviderId": "provider123", + "Lat": "40.7128", + "Log": "-74.0060", + "Ratings": "3.5", + "Description": {"notes": "This is pending."}, + "CreatedTimestamp": "2022-02-01T12:00:00Z", + "ServiceStatus": "PENDING_APPROVAL", + "ApprovedTimestamp": "", + "IsActive": False, + } + ] + } + pending_services = self.service_repo.get_pending_approval_services() + self.assertEqual(len(pending_services), 1) + self.assertEqual( + pending_services[0].service_status, ServiceStatus.PENDING_APPROVAL.value + ) + self.mock_table.scan.assert_called_once() + + def test_get_pending_approval_services_client_error(self): + error_response = { + "Error": {"Code": "InternalServerError", "Message": "Error scanning."} + } + self.mock_table.scan.side_effect = ClientError(error_response, "Scan") + pending_services = self.service_repo.get_pending_approval_services() + self.assertEqual(len(pending_services), 0) + self.mock_table.scan.assert_called_once() + + def test_update_service_error_handling(self): + # Simulate ClientError + error_response = { + "Error": { + "Code": "ConditionalCheckFailedException", + "Message": "Item not found.", + } + } + self.mock_table.put_item.side_effect = ClientError(error_response, "PutItem") + updated_service = self.sample_service + updated_service.name = "Updated Name" + result = self.service_repo.update_service(updated_service) + self.assertIsNone(result) + self.mock_table.put_item.assert_called_once() + + def test_delete_service_error_handling(self): + error_response = { + "Error": {"Code": "AccessDeniedException", "Message": "No access."} + } + self.mock_table.delete_item.side_effect = ClientError( + error_response, "DeleteItem" + ) + result = self.service_repo.delete_service(self.sample_service.id) + self.assertFalse(result) + self.mock_table.delete_item.assert_called_once() + + def test_create_service_error_handling(self): + error_response = { + "Error": { + "Code": "ProvisionedThroughputExceededException", + "Message": "Throughput exceeded.", + } + } + self.mock_table.put_item.side_effect = ClientError(error_response, "PutItem") + result = self.service_repo.create_service(self.sample_service) + self.assertIsNone(result) + self.mock_table.put_item.assert_called_once() + + +class ReviewRepositoryMoreTests(TestCase): + def setUp(self): + self.patcher = patch("services.repositories.boto3.resource") + self.mock_boto_resource = self.patcher.start() + self.mock_table = MagicMock() + self.mock_boto_resource.return_value.Table.return_value = self.mock_table + + self.review_repo = ReviewRepository() + self.review_id = str(uuid.uuid4()) + self.sample_review = ReviewDTO( + review_id=self.review_id, + service_id="service123", + user_id="user123", + username="reviewer", + rating_stars=5, + rating_message="Excellent service", + timestamp="2022-01-01T12:00:00Z", + ) + + def tearDown(self): + self.patcher.stop() + + def test_respond_to_review_success(self): + self.mock_table.update_item.return_value = {} + result = self.review_repo.respond_to_review( + self.review_id, "We appreciate your feedback!" + ) + self.assertTrue(result) + self.mock_table.update_item.assert_called_once() + + def test_respond_to_review_error_handling(self): + error_response = { + "Error": { + "Code": "ResourceNotFoundException", + "Message": "Review not found.", + } + } + self.mock_table.update_item.side_effect = ClientError( + error_response, "UpdateItem" + ) + result = self.review_repo.respond_to_review(self.review_id, "Response text") + self.assertFalse(result) + self.mock_table.update_item.assert_called_once() + + def test_get_reviews_for_service_success(self): + self.mock_table.query.return_value = { + "Items": [ + { + "ReviewId": "r1", + "ServiceId": "service123", + "UserId": "userABC", + "Username": "testuser", + "RatingStars": "4", + "RatingMessage": "Good", + "Timestamp": "2022-02-01T12:00:00Z", + }, + { + "ReviewId": "r2", + "ServiceId": "service123", + "UserId": "userXYZ", + "Username": "anotheruser", + "RatingStars": "5", + "RatingMessage": "Excellent", + "Timestamp": "2022-02-02T12:00:00Z", + }, + ] + } + reviews = self.review_repo.get_reviews_for_service("service123") + self.assertEqual(len(reviews), 2) + self.assertEqual(reviews[0].rating_message, "Good") + self.assertEqual(reviews[1].rating_message, "Excellent") + self.mock_table.query.assert_called_once() + + def test_get_reviews_for_service_client_error(self): + error_response = { + "Error": {"Code": "InternalServerError", "Message": "Something went wrong."} + } + self.mock_table.query.side_effect = ClientError(error_response, "Query") + reviews = self.review_repo.get_reviews_for_service("service123") + self.assertEqual(len(reviews), 0) + self.mock_table.query.assert_called_once()