diff --git a/.coveragerc b/.coveragerc
new file mode 100644
index 0000000000..55bdf67d40
--- /dev/null
+++ b/.coveragerc
@@ -0,0 +1,33 @@
+[run]
+source = coldfront
+omit =
+ */migrations/*
+ */tests/*
+ */venv/*
+ */env/*
+ manage.py
+ */settings/*
+ */config/*
+ */__pycache__/*
+ */static/*
+ */media/*
+
+[report]
+exclude_lines =
+ pragma: no cover
+ def __repr__
+ if self.debug:
+ if settings.DEBUG
+ raise AssertionError
+ raise NotImplementedError
+ if 0:
+ if __name__ == .__main__.:
+ class .*\bProtocol\):
+ @(abc\.)?abstractmethod
+
+[html]
+directory = htmlcov
+title = ColdFront Test Coverage Report
+
+[xml]
+output = coverage.xml
\ No newline at end of file
diff --git a/README.md b/README.md
index 642ea8213d..e56ff1b0b0 100644
--- a/README.md
+++ b/README.md
@@ -28,6 +28,85 @@ Documentation resides in a separate repository. Please request access.
- [Deployment](bootstrap/ansible/README.md)
- [REST API](coldfront/api/README.md)
+## Running Tests
+
+### Setting Up the Test Environment
+
+1. **Activate the virtual environment**:
+ ```bash
+ source coldfront/venv/bin/activate # or wherever your venv is located
+ ```
+
+2. **Install test dependencies** (if not already installed):
+ ```bash
+ pip install pytest pytest-django pytest-cov factory-boy
+ ```
+
+3. **Configure Django settings for tests**:
+ The test configuration is already set up in `pytest.ini` and uses `coldfront.config.test_settings` which configures an in-memory SQLite database for fast test execution.
+
+### Running Tests
+
+**Run all tests**:
+```bash
+pytest coldfront/tests
+```
+
+**Run specific test directories**:
+```bash
+# Unit tests only
+pytest coldfront/tests/utils/unit/
+
+# Whitebox integration tests
+pytest coldfront/tests/utils/whitebox/
+
+# Blackbox end-to-end tests
+pytest coldfront/tests/utils/blackbox/
+```
+
+**Run tests with coverage report**:
+```bash
+pytest coldfront/tests --cov=coldfront --cov-report=html
+# View coverage report in htmlcov/index.html
+```
+
+**Run tests with specific markers**:
+```bash
+# Run only unit tests
+pytest -m unit
+
+# Run only whitebox tests
+pytest -m whitebox
+
+# Run only blackbox tests
+pytest -m blackbox
+```
+
+**Run tests verbosely**:
+```bash
+pytest -v # Verbose output
+pytest -vv # Very verbose output
+pytest -s # Show print statements
+```
+
+### Test Structure
+
+- **Unit Tests** (`coldfront/tests/utils/unit/`): Test individual components in isolation
+- **Whitebox Tests** (`coldfront/tests/utils/whitebox/`): Test internal behavior and integrations
+- **Blackbox Tests** (`coldfront/tests/utils/blackbox/`): Test end-to-end functionality from user perspective
+
+### Test Fixtures
+
+Test fixtures are defined in:
+- `coldfront/tests/conftest.py` - Global fixtures for all tests
+- `coldfront/tests/utils/conftest.py` - Fixtures specific to utils tests
+
+Key fixtures include:
+- `user`, `staff_user`, `superuser` - Different user types
+- `project`, `allocation` - Project and allocation instances
+- `project_user`, `active_project_user` - User-project relationships
+- `django_db_setup` - Session-scoped database setup with required reference data
+
## License
ColdFront is released under the GPLv3 license. See the LICENSE file.
diff --git a/coldfront/config/local_settings.py.sample b/coldfront/config/local_settings.py.sample
index 034faf4e88..f19cac2691 100644
--- a/coldfront/config/local_settings.py.sample
+++ b/coldfront/config/local_settings.py.sample
@@ -479,10 +479,16 @@ except ImportError:
pass
# Update extra apps based on potentially-updated variables.
-EXTRA_APPS += EXTRA_EXTRA_APPS
+try:
+ EXTRA_APPS += EXTRA_EXTRA_APPS
+except NameError:
+ pass # EXTRA_EXTRA_APPS not defined
# Update extra middleware based on potentially-updated variables.
-EXTRA_MIDDLEWARE += EXTRA_EXTRA_MIDDLEWARE
+try:
+ EXTRA_MIDDLEWARE += EXTRA_EXTRA_MIDDLEWARE
+except NameError:
+ pass # EXTRA_EXTRA_MIDDLEWARE not defined
# Update logging settings based on potentially-updated variables.
LOGGING['handlers']['file']['filename'] = LOG_PATH
diff --git a/coldfront/config/test_settings.py b/coldfront/config/test_settings.py
new file mode 100644
index 0000000000..4e9029e019
--- /dev/null
+++ b/coldfront/config/test_settings.py
@@ -0,0 +1,9 @@
+from .settings import *
+
+# Override the DATABASES setting to use SQLite in-memory database
+DATABASES = {
+ 'default': {
+ 'ENGINE': 'django.db.backends.sqlite3',
+ 'NAME': ':memory:'
+ }
+}
\ No newline at end of file
diff --git a/coldfront/config/test_settings.py.sample b/coldfront/config/test_settings.py.sample
index 40ec1d34b6..7c6bde79de 100644
--- a/coldfront/config/test_settings.py.sample
+++ b/coldfront/config/test_settings.py.sample
@@ -1,5 +1,12 @@
import os
+# Define variables that local_settings.py expects before importing
+EXTRA_EXTRA_APPS = []
+EXTRA_EXTRA_MIDDLEWARE = []
+STREAM_LOGS_TO_STDOUT = False
+CILOGON_APP_CLIENT_ID = ''
+CILOGON_APP_SECRET = ''
+
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = True
@@ -56,18 +63,13 @@ DATABASES = {
LOG_PATH = '/var/log/user_portals/cf_mybrc/cf_mybrc_portal.log'
API_LOG_PATH = '/var/log/user_portals/cf_mybrc/cf_mybrc_api.log'
-STREAM_LOGS_TO_STDOUT = False
-
# A list of admin email addresses to CC when certain requests are approved.
REQUEST_APPROVAL_CC_LIST = ['test@test.test']
# A list of admin email addresses to notify when a project-user removal request
# is processed.
PROJECT_USER_REMOVAL_REQUEST_PROCESSED_EMAIL_ADMIN_LIST = ['test@test.test']
-# Extra apps to be included.
-EXTRA_EXTRA_APPS = []
-# Extra middleware to be included.
-EXTRA_EXTRA_MIDDLEWARE = []
+# Additional extra apps and middleware are added below
#------------------------------------------------------------------------------
# Django Cache settings
@@ -134,8 +136,7 @@ SESSION_COOKIE_SECURE = False
# Django All-Auth settings
#------------------------------------------------------------------------------
-CILOGON_APP_CLIENT_ID = ''
-CILOGON_APP_SECRET = ''
+# CILOGON_APP_CLIENT_ID and CILOGON_APP_SECRET are defined above before imports
#------------------------------------------------------------------------------
# django debug toolbar settings
@@ -170,9 +171,7 @@ FLAGS = {
# Plugin: departments
#------------------------------------------------------------------------------
-EXTRA_EXTRA_APPS += [
- 'coldfront.plugins.departments'
-]
+EXTRA_EXTRA_APPS.append('coldfront.plugins.departments')
DEPARTMENTS_DEPARTMENT_DISPLAY_NAME = 'Department'
DEPARTMENTS_DEPARTMENT_DATA_SOURCE = 'coldfront.plugins.departments.utils.data_sources.backends.dummy.DummyDataSourceBackend'
@@ -181,9 +180,7 @@ DEPARTMENTS_DEPARTMENT_DATA_SOURCE = 'coldfront.plugins.departments.utils.data_s
# Plugin: hardware_procurements
#------------------------------------------------------------------------------
-EXTRA_EXTRA_APPS += [
- 'coldfront.plugins.hardware_procurements'
-]
+EXTRA_EXTRA_APPS.append('coldfront.plugins.hardware_procurements')
HARDWARE_PROCUREMENTS_CONFIG = {
'DATA_SOURCE': 'coldfront.plugins.hardware_procurements.utils.data_sources.backends.dummy.DummyDataSourceBackend',
@@ -199,3 +196,6 @@ Q_CLUSTER = {
'sync': True,
'timeout': 24 * 60 * 60,
}
+
+# Now import base settings after defining all required variables
+from .settings import *
diff --git a/coldfront/core/portal/templates/portal/authorized_home.html b/coldfront/core/portal/templates/portal/authorized_home.html
index 74fa39d94c..72af3f4977 100644
--- a/coldfront/core/portal/templates/portal/authorized_home.html
+++ b/coldfront/core/portal/templates/portal/authorized_home.html
@@ -147,6 +147,9 @@
My {{ PROGRAM_NAME_SHORT }} Cluster Projects »
Service Units
+
+ Project Access Tracking
+
@@ -171,6 +174,16 @@ My {{ PROGRAM_NAME_SHORT }} Cluster Projects »
{% include "allocation/cluster_access_badge.html" with status=project.display_status %}
{{ project.rendered_compute_usage }}
+
+ {% if project.has_tracking %}
+
+ View Details
+
+ {% include "portal/project_access_timeline_modal.html" with project=project %}
+ {% else %}
+ -
+ {% endif %}
+
{% endfor %}
diff --git a/coldfront/core/portal/templates/portal/decommission_entry_detail.html b/coldfront/core/portal/templates/portal/decommission_entry_detail.html
new file mode 100644
index 0000000000..5477f482e9
--- /dev/null
+++ b/coldfront/core/portal/templates/portal/decommission_entry_detail.html
@@ -0,0 +1,23 @@
+{% extends "common/base.html" %}
+{% load static %}
+{% load portal_tags %}
+
+{% block title %}Condo Allocation Entry Detail{% endblock %}
+
+{% block content %}
+
+
Condo Allocation Entry Detail
+
+
+
+ {% for col in decommission_detail_columns %}
+
+
{{ col }}:
+
{{ alert.record|get_item:col }}
+
+ {% endfor %}
+
+
+
Back to Decommission Alerts
+
+{% endblock %}
diff --git a/coldfront/core/portal/templates/portal/progress_modal.html b/coldfront/core/portal/templates/portal/progress_modal.html
new file mode 100644
index 0000000000..40cfeda19b
--- /dev/null
+++ b/coldfront/core/portal/templates/portal/progress_modal.html
@@ -0,0 +1,22 @@
+
+
+
+
+
+
+
+ {% include "progress_tracker.html" with steps=project.progress_steps %}
+
+
{{ project.progress_steps|safe }}
+
+
+
+
+
diff --git a/coldfront/core/portal/templates/portal/project_access_timeline_modal.html b/coldfront/core/portal/templates/portal/project_access_timeline_modal.html
new file mode 100644
index 0000000000..d4357f0155
--- /dev/null
+++ b/coldfront/core/portal/templates/portal/project_access_timeline_modal.html
@@ -0,0 +1,231 @@
+
+
+
+
+
+
+ {% if project.tracking_status.error %}
+
+
+ {{ project.tracking_status.error }}
+
+ {% else %}
+
+
Status: {{ project.tracking_status.message }}
+ {% if project.tracking_status.details %}
+ {% if project.tracking_status.details.request_date %}
+
Request Date: {{ project.tracking_status.details.request_date|date:"M d, Y g:i A" }}
+ {% endif %}
+ {% if project.tracking_status.details.reason %}
+
Request Reason: {{ project.tracking_status.details.reason }}
+ {% endif %}
+ {% endif %}
+
+
+
+
+
+ {% for step in project.tracking_status.steps %}
+
+
+ {% if step.completed %}
+
+
+
+ {% elif step.current %}
+ {{ forloop.counter }}
+ {% else %}
+ {{ forloop.counter }}
+ {% endif %}
+ {{ step.label }}
+
+
+ {% endfor %}
+
+
+
+
+
+
+ The timeline shows your current progress in gaining cluster access for this project.
+ {% if project.tracking_status.status.value == 'Directly Added by PI' %}
+ Since you were directly added by a PI, some steps were skipped.
+ {% endif %}
+
+
+ {% endif %}
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/coldfront/core/portal/views.py b/coldfront/core/portal/views.py
index e7a6e9ce6c..a1797dba28 100644
--- a/coldfront/core/portal/views.py
+++ b/coldfront/core/portal/views.py
@@ -8,6 +8,8 @@
from django.shortcuts import render
from django.views.decorators.cache import cache_page
+import logging
+
from flags.state import flag_enabled
from coldfront.core.allocation.models import (Allocation,
@@ -20,33 +22,91 @@
generate_publication_by_year_chart_data,
generate_resources_chart_data,
generate_total_grants_by_agency_chart_data)
-from coldfront.core.project.models import Project, ProjectUserJoinRequest
+from coldfront.core.project.models import Project, ProjectUserJoinRequest, ProjectUser
from coldfront.core.project.models import ProjectUserJoinRequest
from coldfront.core.project.models import ProjectUserRemovalRequest
from coldfront.core.project.utils import render_project_compute_usage
+from coldfront.core.project.utils_.join_request_tracker import JoinRequestTracker
from django.contrib.auth.decorators import login_required
# from coldfront.core.publication.models import Publication
# from coldfront.core.research_output.models import ResearchOutput
+from coldfront.core.project.utils_.join_request_tracker import JoinRequestTracker
+logger = logging.getLogger(__name__)
-def home(request):
+def home(request):
def _compute_project_user_cluster_access_statuses(_user):
"""Return a dict mapping each Project object that the given User
is associated with to a str describing the user's access to the
cluster under the project."""
statuses = {}
- cluster_access_attributes = AllocationUserAttribute.objects.filter(
- allocation_attribute_type__name='Cluster Account Status',
- allocation_user__user=_user)
- for attribute in cluster_access_attributes:
- _project = attribute.allocation.project
- statuses[_project] = attribute.value
+ # Get all allocations for the user's projects
+ from coldfront.core.allocation.models import Allocation, AllocationUser
+
+ # Get all active project users for the user
+ project_users = ProjectUser.objects.filter(
+ user=_user,
+ status__name__in=['Active', 'Pending - Add']
+ ).select_related('project')
+ for project_user in project_users:
+ project = project_user.project
+
+ # Try to find a compute allocation for this project
+ compute_allocation = None
+ try:
+ # Look for any allocation with a compute resource
+ allocations = Allocation.objects.filter(
+ project=project,
+ status__name__in=['Active', 'New', 'Renewal Requested', 'Payment Pending', 'Payment Requested',
+ 'Paid']
+ ).prefetch_related('resources')
+
+ for allocation in allocations:
+ if allocation.resources.filter(name__icontains='Compute').exists():
+ compute_allocation = allocation
+ break
+ except:
+ pass
+
+ if compute_allocation:
+ # Check if user has allocation user record
+ try:
+ allocation_user = compute_allocation.allocationuser_set.get(user=_user)
+
+ # Try to get cluster account status
+ cluster_access_attrs = AllocationUserAttribute.objects.filter(
+ allocation_user=allocation_user,
+ allocation_attribute_type__name='Cluster Account Status'
+ )
+
+ if cluster_access_attrs.exists():
+ statuses[project] = cluster_access_attrs.first().value
+ else:
+ # User is in allocation but no cluster status yet
+ if project_user.status.name == 'Pending - Add':
+ statuses[project] = 'Pending - Add'
+ else:
+ statuses[project] = 'Pending - Add'
+ except AllocationUser.DoesNotExist:
+ # User not in allocation yet
+ if project_user.status.name == 'Pending - Add':
+ statuses[project] = 'Pending - Add'
+ else:
+ statuses[project] = 'None'
+ else:
+ # No compute allocation found
+ if project_user.status.name == 'Pending - Add':
+ statuses[project] = 'Pending - Add'
+ else:
+ statuses[project] = 'None'
+
+ # Check for pending removal requests
for project_user_removal_request in \
ProjectUserRemovalRequest.objects.filter(
project_user__user=_user, status__name='Pending'):
@@ -61,7 +121,7 @@ def _compute_project_user_cluster_access_statuses(_user):
project_list = Project.objects.filter(
(Q(status__name__in=['New', 'Active', 'Inactive']) &
Q(projectuser__user=request.user) &
- Q(projectuser__status__name__in=['Active', 'Pending - Remove']))
+ Q(projectuser__status__name__in=['Active', 'Pending - Add']))
).distinct().order_by('name')
access_states = _compute_project_user_cluster_access_statuses(
@@ -77,24 +137,38 @@ def _compute_project_user_cluster_access_statuses(_user):
rendered_compute_usage = 'Unexpected error'
project.rendered_compute_usage = rendered_compute_usage
+ # Add tracking status for each project
+ try:
+ tracker = JoinRequestTracker(request.user, project)
+ tracking_status = tracker.get_status()
+ # Convert to object-like access for template
+ project.has_tracking = tracking_status.get('can_view', False)
+ project.tracking_status = tracking_status
+ project.tracking_error = tracking_status.get('error')
+ except Exception as e:
+ logger.error(f"Failed to get tracking status for project {project.name}: {e}")
+ project.has_tracking = False
+ project.tracking_status = None
+ project.tracking_error = None
+
if has_cluster_access(request.user):
context['cluster_username'] = request.user.username
allocation_list = Allocation.objects.filter(
- Q(status__name__in=['Active', 'New', 'Renewal Requested', ]) &
- Q(project__status__name__in=['Active', 'New']) &
- Q(project__projectuser__user=request.user) &
- Q(project__projectuser__status__name__in=['Active', ]) &
- Q(allocationuser__user=request.user) &
- Q(allocationuser__status__name__in=['Active', ])
+ Q(status__name__in=['Active', 'New', 'Renewal Requested', ]) &
+ Q(project__status__name__in=['Active', 'New']) &
+ Q(project__projectuser__user=request.user) &
+ Q(project__projectuser__status__name__in=['Active', ]) &
+ Q(allocationuser__user=request.user) &
+ Q(allocationuser__status__name__in=['Active', ])
).distinct().order_by('-created')
context['project_list'] = project_list
context['allocation_list'] = allocation_list
num_join_requests = ProjectUserJoinRequest.objects.filter(
- project_user__status__name='Pending - Add',
- project_user__user=request.user
- ).order_by('project_user', '-created').distinct('project_user').count()
+ project_user__status__name='Pending - Add',
+ project_user__user=request.user
+ ).order_by('project_user', '-created').distinct('project_user').count()
context['num_join_requests'] = num_join_requests
context['pending_removal_request_projects'] = [
diff --git a/coldfront/core/project/templates/project/project_join_list.html b/coldfront/core/project/templates/project/project_join_list.html
index f945b42c5f..c5044ef59f 100644
--- a/coldfront/core/project/templates/project/project_join_list.html
+++ b/coldfront/core/project/templates/project/project_join_list.html
@@ -43,6 +43,7 @@ Join a Project
PIs
Title
Cluster
+ Request Status
@@ -53,6 +54,16 @@ Join a Project
{% for pi in project.pis %}{{ pi.username }} {% endfor %}
{{ project.title }}
{{ project.cluster_name|upper }}
+
+ {% if project.has_tracking %}
+
+ View Timeline
+
+ {% include "portal/project_access_timeline_modal.html" with project=project %}
+ {% else %}
+ Pending Approval
+ {% endif %}
+
{% endfor %}
diff --git a/coldfront/core/project/templates/project/project_join_request_list_table.html b/coldfront/core/project/templates/project/project_join_request_list_table.html
index 28293a9b3f..f063e8ff7b 100644
--- a/coldfront/core/project/templates/project/project_join_request_list_table.html
+++ b/coldfront/core/project/templates/project/project_join_request_list_table.html
@@ -23,6 +23,9 @@
Reason
+
+ Timeline
+
@@ -42,7 +45,243 @@
{{ join_request.reason }}
+
+ {% if join_request.has_tracking %}
+
+ View
+
+
+
+
+
+
+
+ {% if join_request.tracking_error %}
+
+
+ {{ join_request.tracking_error }}
+
+ {% elif join_request.tracking_status %}
+
+
Status: {{ join_request.tracking_status.message }}
+ {% if join_request.tracking_status.details %}
+ {% if join_request.tracking_status.details.request_date %}
+
Request Date: {{ join_request.tracking_status.details.request_date|date:"M d, Y g:i A" }}
+ {% endif %}
+ {% if join_request.tracking_status.details.reason %}
+
Request Reason: {{ join_request.tracking_status.details.reason }}
+ {% endif %}
+ {% endif %}
+
+
+
+
+
+ {% for step in join_request.tracking_status.steps %}
+
+
+ {% if step.completed %}
+
+
+
+ {% elif step.current %}
+ {{ forloop.counter }}
+ {% else %}
+ {{ forloop.counter }}
+ {% endif %}
+ {{ step.label }}
+
+
+ {% endfor %}
+
+
+ {% else %}
+
+
+ No tracking information available.
+
+ {% endif %}
+
+
+
+
+
+ {% else %}
+ -
+ {% endif %}
+
{% endfor %}
+
+
+
\ No newline at end of file
diff --git a/coldfront/core/project/templates/project/project_renewal_status.html b/coldfront/core/project/templates/project/project_renewal_status.html
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/coldfront/core/project/templates/project/project_renewal_tracking_table.html b/coldfront/core/project/templates/project/project_renewal_tracking_table.html
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/coldfront/core/project/utils_/join_request_tracker.py b/coldfront/core/project/utils_/join_request_tracker.py
new file mode 100644
index 0000000000..e41826246a
--- /dev/null
+++ b/coldfront/core/project/utils_/join_request_tracker.py
@@ -0,0 +1,221 @@
+"""
+Utility class for tracking project join request statuses.
+Provides a unified interface to determine the current status of a user's
+project join and cluster access request.
+"""
+from enum import Enum
+from typing import Optional, Dict, Any, List
+import logging
+
+from django.contrib.auth.models import User
+from django.db.models import Q
+
+from coldfront.core.allocation.models import ClusterAccessRequest
+from coldfront.core.project.models import (
+ Project, ProjectUser, ProjectUserJoinRequest, ProjectUserStatusChoice
+)
+from coldfront.core.utils.tracking import ModelBasedTracker, TrackingStep
+
+logger = logging.getLogger(__name__)
+
+
+class JoinRequestStatus(Enum):
+ """Enumeration of possible join request statuses."""
+ REQUEST_SENT = "Request Sent"
+ PI_APPROVED_QUEUED = "PI Approved & Queued"
+ ADMIN_PROCESSING = "Admin Processing"
+ ACCESS_GRANTED = "Access Granted"
+ DIRECTLY_ADDED = "Directly Added by PI"
+ ERROR = "Error"
+ NO_REQUEST = "No Request"
+
+
+class JoinRequestTracker(ModelBasedTracker):
+ """
+ Tracks the status of a user's project join request and cluster access.
+
+ This class consolidates the logic for determining where a user is in the
+ project join and cluster access flow.
+ """
+
+ def __init__(self, user: User, project: Project):
+ super().__init__(user, project)
+ self.project = project # Keep backward compatibility
+ self._project_user = None
+ self._join_request = None
+ self._cluster_access_request = None
+
+ def get_status(self) -> Dict[str, Any]:
+ """
+ Get the current status of the user's join request.
+
+ Maintains backward compatibility with the original interface.
+
+ Returns:
+ Dict containing status information for template usage
+ """
+ result = super().get_status()
+ return result.to_dict()
+
+ def _load_data(self):
+ """Load relevant data for status determination."""
+ # Get ProjectUser
+ try:
+ self._project_user = ProjectUser.objects.get(
+ user=self.user,
+ project=self.project
+ )
+ except ProjectUser.DoesNotExist:
+ self._project_user = None
+
+ # Get latest join request if exists
+ if self._project_user:
+ join_requests = ProjectUserJoinRequest.objects.filter(
+ project_user=self._project_user
+ ).order_by('-created')
+ self._join_request = join_requests.first() if join_requests.exists() else None
+
+ # Get cluster access request
+ allocation = self.project.allocation_set.filter(
+ resources__name__icontains='Compute',
+ status__name__in=['Active', 'New', 'Renewal Requested']
+ ).first()
+
+ if allocation:
+ try:
+ allocation_user = allocation.allocationuser_set.get(user=self.user)
+ cluster_requests = ClusterAccessRequest.objects.filter(
+ allocation_user=allocation_user
+ ).order_by('-created')
+ self._cluster_access_request = cluster_requests.first() if cluster_requests.exists() else None
+ except:
+ self._cluster_access_request = None
+
+ def _determine_status(self) -> JoinRequestStatus:
+ """Determine the current status based on loaded data."""
+ if not self._project_user:
+ return JoinRequestStatus.NO_REQUEST
+
+ # Check if user was directly added (no join request)
+ if not self._join_request and self._project_user.status.name == 'Active':
+ return JoinRequestStatus.DIRECTLY_ADDED
+
+ # Check join request status
+ if self._project_user.status.name == 'Pending - Add':
+ return JoinRequestStatus.REQUEST_SENT
+
+ if self._project_user.status.name == 'Denied':
+ return JoinRequestStatus.NO_REQUEST
+
+ # User is active, check cluster access
+ if self._project_user.status.name == 'Active':
+ if not self._cluster_access_request:
+ return JoinRequestStatus.PI_APPROVED_QUEUED
+
+ cluster_status = self._cluster_access_request.status.name
+ if cluster_status in ['Pending - Add', 'Processing']:
+ if cluster_status == 'Processing':
+ return JoinRequestStatus.ADMIN_PROCESSING
+ else:
+ return JoinRequestStatus.PI_APPROVED_QUEUED
+ elif cluster_status == 'Complete':
+ return JoinRequestStatus.ACCESS_GRANTED
+ elif cluster_status == 'Denied':
+ return JoinRequestStatus.PI_APPROVED_QUEUED # Show as queued if cluster denied
+
+ return JoinRequestStatus.ERROR
+
+ def _get_status_message(self, status: JoinRequestStatus) -> str:
+ """Get a human-readable message for the status."""
+ messages = {
+ JoinRequestStatus.REQUEST_SENT: "Your request to join the project has been sent and is awaiting approval.",
+ JoinRequestStatus.PI_APPROVED_QUEUED: "Your request has been approved by the PI and is queued for cluster access setup.",
+ JoinRequestStatus.ADMIN_PROCESSING: "Administrators are processing your cluster access request.",
+ JoinRequestStatus.ACCESS_GRANTED: "You have been granted access to the cluster under this project.",
+ JoinRequestStatus.DIRECTLY_ADDED: "You were directly added to this project by a PI or manager.",
+ JoinRequestStatus.ERROR: "Unable to determine the status of your request.",
+ JoinRequestStatus.NO_REQUEST: "No active request found for this project."
+ }
+ return messages.get(status, "Unknown status")
+
+ def _get_status_details(self, status: JoinRequestStatus) -> Optional[Dict[str, Any]]:
+ """Get additional details about the current status."""
+ details = {}
+
+ if self._join_request:
+ details['request_date'] = self._join_request.created
+ details['reason'] = self._join_request.reason
+
+ if self._project_user:
+ details['project_user_status'] = self._project_user.status.name
+ details['role'] = self._project_user.role.name
+
+ if self._cluster_access_request:
+ details['cluster_request_status'] = self._cluster_access_request.status.name
+ details['cluster_request_date'] = self._cluster_access_request.request_time
+
+ return details if details else None
+
+ def _get_progress_steps(self, current_status: JoinRequestStatus) -> List[TrackingStep]:
+ """Get the progress steps with their completion status."""
+ if current_status == JoinRequestStatus.DIRECTLY_ADDED:
+ return [
+ TrackingStep(
+ label='Directly Added by PI',
+ completed=True,
+ current=False
+ ),
+ TrackingStep(
+ label='Admin Processing',
+ completed=self._cluster_access_request and self._cluster_access_request.status.name == 'Complete',
+ current=self._cluster_access_request and self._cluster_access_request.status.name == 'Processing'
+ ),
+ TrackingStep(
+ label='Access Granted',
+ completed=self._cluster_access_request and self._cluster_access_request.status.name == 'Complete',
+ current=False
+ )
+ ]
+ else:
+ return [
+ TrackingStep(
+ label='Request Sent',
+ completed=current_status.value != 'No Request',
+ current=current_status == JoinRequestStatus.REQUEST_SENT
+ ),
+ TrackingStep(
+ label='PI Approved & Queued',
+ completed=current_status in [
+ JoinRequestStatus.PI_APPROVED_QUEUED,
+ JoinRequestStatus.ADMIN_PROCESSING,
+ JoinRequestStatus.ACCESS_GRANTED
+ ],
+ current=current_status == JoinRequestStatus.PI_APPROVED_QUEUED
+ ),
+ TrackingStep(
+ label='Admin Processing',
+ completed=current_status in [
+ JoinRequestStatus.ADMIN_PROCESSING,
+ JoinRequestStatus.ACCESS_GRANTED
+ ],
+ current=current_status == JoinRequestStatus.ADMIN_PROCESSING
+ ),
+ TrackingStep(
+ label='Access Granted',
+ completed=current_status == JoinRequestStatus.ACCESS_GRANTED,
+ current=False
+ )
+ ]
+
+ def _can_view_status(self) -> bool:
+ """Determine if the user can view the status for this project."""
+ # User can view if they have an active join request or have been added to the project
+ return self._project_user is not None
+
+ def get_error_status(self) -> JoinRequestStatus:
+ """Get the error status enum value."""
+ return JoinRequestStatus.ERROR
+
+ def get_default_error_message(self) -> str:
+ """Get the default error message for project join requests."""
+ return "Our system encountered an issue gathering the join status of your project, please contact support."
\ No newline at end of file
diff --git a/coldfront/core/project/views_/join_views/approval_views.py b/coldfront/core/project/views_/join_views/approval_views.py
index 89bd92935b..29a55b4222 100644
--- a/coldfront/core/project/views_/join_views/approval_views.py
+++ b/coldfront/core/project/views_/join_views/approval_views.py
@@ -30,7 +30,8 @@
from coldfront.core.project.utils_.new_project_user_utils import NewProjectUserSource
from coldfront.core.utils.email.email_strategy import EnqueueEmailStrategy
-
+from coldfront.core.project.utils_.join_request_tracker import JoinRequestTracker
+import logging
logger = logging.getLogger(__name__)
@@ -342,4 +343,34 @@ def get_context_data(self, **kwargs):
context['join_request_list'] = join_requests
+ # In coldfront/core/project/views_/join_views/approval_views.py
+ # Replace the tracking code in ProjectJoinRequestListView.get_context_data() with:
+
+ # Add tracking status to each join request
+ for join_request in join_requests:
+ project = join_request.project_user.project
+ user = join_request.project_user.user
+ try:
+ tracker = JoinRequestTracker(user, project)
+ tracking_status = tracker.get_status()
+
+ # Debug logging
+ logger.debug(f"Tracking status for {user.username} on {project.name}: "
+ f"can_view={tracking_status.get('can_view')}, "
+ f"status={tracking_status.get('status')}")
+
+ # For admin view, we always want to show tracking if there's a valid status
+ can_view = tracking_status.get('can_view', False)
+ has_valid_status = tracking_status.get('status') is not None
+
+ join_request.has_tracking = can_view or has_valid_status
+ join_request.tracking_status = tracking_status
+ join_request.tracking_error = tracking_status.get('error')
+ except Exception as e:
+ logger.error(f"Failed to get tracking status for user {user.username} "
+ f"on project {project.name}: {e}")
+ join_request.has_tracking = False
+ join_request.tracking_status = None
+ join_request.tracking_error = None
+
return context
diff --git a/coldfront/core/project/views_/join_views/request_views.py b/coldfront/core/project/views_/join_views/request_views.py
index 81e2ed68c6..d27d3cd36f 100644
--- a/coldfront/core/project/views_/join_views/request_views.py
+++ b/coldfront/core/project/views_/join_views/request_views.py
@@ -29,7 +29,8 @@
from coldfront.core.project.views import ProjectListView
from coldfront.core.user.utils_.host_user_utils import is_lbl_employee
from coldfront.core.user.utils_.host_user_utils import needs_host
-
+from coldfront.core.project.utils_.join_request_tracker import JoinRequestTracker
+import logging
logger = logging.getLogger(__name__)
@@ -297,6 +298,20 @@ def get_context_data(self, **kwargs):
& Q(projectuser__status__name__in=['Pending - Add']))
join_requests = annotate_queryset_with_cluster_name(join_requests)
+ # Add tracking status to each join request
+ for project in join_requests:
+ try:
+ tracker = JoinRequestTracker(self.request.user, project)
+ tracking_status = tracker.get_status()
+ project.has_tracking = tracking_status.get('can_view', False)
+ project.tracking_status = tracking_status
+ project.tracking_error = tracking_status.get('error')
+ except Exception as e:
+ logger.error(f"Failed to get tracking status for project {project.name}: {e}")
+ project.has_tracking = False
+ project.tracking_status = None
+ project.tracking_error = None
+
context['join_requests'] = join_requests
context['not_joinable'] = not_joinable
diff --git a/coldfront/core/user/templates/templatetags/portal_tags.py b/coldfront/core/user/templates/templatetags/portal_tags.py
new file mode 100644
index 0000000000..5a487c926e
--- /dev/null
+++ b/coldfront/core/user/templates/templatetags/portal_tags.py
@@ -0,0 +1,26 @@
+from django import template
+from django.conf import settings
+
+register = template.Library()
+
+
+@register.simple_tag
+def get_version():
+ return settings.VERSION
+
+
+@register.simple_tag
+def get_setting(name):
+ return getattr(settings, name, "")
+
+@register.filter
+def get_item(dictionary, key):
+ return dictionary.get(key, "")
+
+@register.filter(name='replace')
+def replace(value, arg):
+ try:
+ old, new = arg.split(',')
+ except ValueError:
+ return value
+ return value.replace(old, new)
\ No newline at end of file
diff --git a/coldfront/core/user/views_/request_hub_views.py b/coldfront/core/user/views_/request_hub_views.py
index bc462e39ac..950b4a0faa 100644
--- a/coldfront/core/user/views_/request_hub_views.py
+++ b/coldfront/core/user/views_/request_hub_views.py
@@ -21,6 +21,8 @@
from coldfront.core.project.utils_.permissions_utils import \
is_user_manager_or_pi_of_project
+from coldfront.core.project.utils_.join_request_tracker import JoinRequestTracker
+
class RequestListItem:
"""
@@ -325,12 +327,20 @@ def get_project_join_request(self):
project_user__status__name__in=['Active', 'Denied'],
*args).order_by('-modified')
+ # Add tracking status to pending join requests
+ pending_with_tracking = self._add_tracking_status_to_join_requests(
+ project_join_request_pending)
+
+ # Add tracking status to complete join requests
+ complete_with_tracking = self._add_tracking_status_to_join_requests(
+ project_join_request_complete)
+
proj_join_request_object.num = self.paginators
proj_join_request_object.pending_queryset = \
- self.create_paginator(project_join_request_pending)
+ self.create_paginator(pending_with_tracking)
proj_join_request_object.complete_queryset = \
- self.create_paginator(project_join_request_complete)
+ self.create_paginator(complete_with_tracking)
proj_join_request_object.num_pending = \
project_join_request_pending.count()
@@ -350,6 +360,33 @@ def get_project_join_request(self):
return proj_join_request_object
+ def _add_tracking_status_to_join_requests(self, join_requests_queryset):
+ """Add tracking status information to each join request in the queryset"""
+ # Convert queryset to list to allow modification
+ join_requests_list = list(join_requests_queryset)
+
+ # Add tracking status to each join request
+ for join_request in join_requests_list:
+ project = join_request.project_user.project
+ user = join_request.project_user.user
+ try:
+ tracker = JoinRequestTracker(user, project)
+ tracking_status = tracker.get_status()
+
+ # For request hub, we want to show tracking if there's a valid status
+ can_view = tracking_status.get('can_view', False)
+ has_valid_status = tracking_status.get('status') is not None
+
+ join_request.has_tracking = can_view or has_valid_status
+ join_request.tracking_status = tracking_status
+ join_request.tracking_error = tracking_status.get('error')
+ except Exception as e:
+ join_request.has_tracking = False
+ join_request.tracking_status = None
+ join_request.tracking_error = None
+
+ return join_requests_list
+
def get_project_renewal_request(self):
"""Populates a RequestListItem with data for project renewal
requests"""
diff --git a/coldfront/core/utils/request_tracking/__init__.py b/coldfront/core/utils/request_tracking/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/coldfront/core/utils/request_tracking/implementations/__init__.py b/coldfront/core/utils/request_tracking/implementations/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/coldfront/core/utils/tracking/__init__.py b/coldfront/core/utils/tracking/__init__.py
new file mode 100644
index 0000000000..bf69855aa2
--- /dev/null
+++ b/coldfront/core/utils/tracking/__init__.py
@@ -0,0 +1,21 @@
+"""
+Generic tracking framework for various request/process tracking needs.
+"""
+
+from .base import (
+ BaseTracker,
+ BaseStatus,
+ ModelBasedTracker,
+ TrackingStep,
+ TrackingResult,
+ create_tracking_steps,
+)
+
+__all__ = [
+ 'BaseTracker',
+ 'BaseStatus',
+ 'ModelBasedTracker',
+ 'TrackingStep',
+ 'TrackingResult',
+ 'create_tracking_steps',
+]
\ No newline at end of file
diff --git a/coldfront/core/utils/tracking/base.py b/coldfront/core/utils/tracking/base.py
new file mode 100644
index 0000000000..a662b92e58
--- /dev/null
+++ b/coldfront/core/utils/tracking/base.py
@@ -0,0 +1,226 @@
+"""
+Generic tracking framework for various request/process tracking needs.
+Provides base classes that can be extended for specific tracking scenarios.
+"""
+from abc import ABC, abstractmethod
+from enum import Enum
+from typing import Any, Dict, List, Optional, Union
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class TrackingStep:
+ """Represents a single step in a tracking process."""
+
+ def __init__(self, label: str, completed: bool = False, current: bool = False):
+ self.label = label
+ self.completed = completed
+ self.current = current
+
+ def to_dict(self) -> Dict[str, Any]:
+ """Convert step to dictionary for template usage."""
+ return {
+ 'label': self.label,
+ 'completed': self.completed,
+ 'current': self.current
+ }
+
+
+class TrackingResult:
+ """Holds the result of a tracking status query."""
+
+ def __init__(
+ self,
+ status: Union[Enum, str],
+ message: str,
+ details: Optional[Dict[str, Any]] = None,
+ error: Optional[str] = None,
+ steps: Optional[List[TrackingStep]] = None,
+ can_view: bool = True
+ ):
+ self.status = status
+ self.message = message
+ self.details = details or {}
+ self.error = error
+ self.steps = steps or []
+ self.can_view = can_view
+
+ def to_dict(self) -> Dict[str, Any]:
+ """Convert result to dictionary for template usage."""
+ return {
+ 'status': self.status,
+ 'message': self.message,
+ 'details': self.details,
+ 'error': self.error,
+ 'steps': [step.to_dict() for step in self.steps],
+ 'can_view': self.can_view
+ }
+
+
+class BaseStatus(Enum):
+ """Base class for tracking status enums."""
+
+ ERROR = "Error"
+ NO_REQUEST = "No Request"
+
+ @property
+ def value_str(self) -> str:
+ """Get the string value of the status."""
+ return self.value
+
+
+class BaseTracker(ABC):
+ """
+ Abstract base class for all tracking implementations.
+
+ Subclasses should implement:
+ - _load_data(): Load relevant data for status determination
+ - _determine_status(): Determine current status based on loaded data
+ - _get_status_message(): Get human-readable message for status
+ - _get_status_details(): Get additional details about status
+ - _get_progress_steps(): Get progress steps with completion status
+ - _can_view_status(): Determine if user can view status
+ """
+
+ def __init__(self, *args, **kwargs):
+ """Initialize tracker with required parameters."""
+ self._error_message = None
+
+ def get_status(self) -> TrackingResult:
+ """
+ Get the current tracking status.
+
+ Returns:
+ TrackingResult containing all status information
+ """
+ try:
+ self._load_data()
+ status = self._determine_status()
+ steps = self._get_progress_steps(status)
+
+ return TrackingResult(
+ status=status,
+ message=self._get_status_message(status),
+ details=self._get_status_details(status),
+ error=self._error_message,
+ steps=steps,
+ can_view=self._can_view_status()
+ )
+ except Exception as e:
+ logger.exception(f"Error determining tracking status: {e}")
+ return TrackingResult(
+ status=self.get_error_status(),
+ message="Unable to determine status",
+ details=None,
+ error=self.get_default_error_message(),
+ steps=[],
+ can_view=True
+ )
+
+ @abstractmethod
+ def _load_data(self):
+ """Load relevant data for status determination."""
+ pass
+
+ @abstractmethod
+ def _determine_status(self) -> Enum:
+ """Determine the current status based on loaded data."""
+ pass
+
+ @abstractmethod
+ def _get_status_message(self, status: Enum) -> str:
+ """Get a human-readable message for the status."""
+ pass
+
+ @abstractmethod
+ def _get_status_details(self, status: Enum) -> Optional[Dict[str, Any]]:
+ """Get additional details about the current status."""
+ pass
+
+ @abstractmethod
+ def _get_progress_steps(self, current_status: Enum) -> List[TrackingStep]:
+ """Get the progress steps with their completion status."""
+ pass
+
+ @abstractmethod
+ def _can_view_status(self) -> bool:
+ """Determine if the user can view the status."""
+ pass
+
+ def get_error_status(self) -> Enum:
+ """Get the error status enum value."""
+ return BaseStatus.ERROR
+
+ def get_default_error_message(self) -> str:
+ """Get the default error message."""
+ return "Our system encountered an issue gathering the status, please contact support."
+
+ def set_error_message(self, message: str):
+ """Set a custom error message."""
+ self._error_message = message
+
+
+class ModelBasedTracker(BaseTracker):
+ """
+ Extended base class for trackers that work with Django models.
+ Provides common patterns for model-based tracking.
+ """
+
+ def __init__(self, user, target_object, *args, **kwargs):
+ """
+ Initialize with user and target object.
+
+ Args:
+ user: The user whose status is being tracked
+ target_object: The object being tracked (project, allocation, etc.)
+ """
+ super().__init__(*args, **kwargs)
+ self.user = user
+ self.target_object = target_object
+
+ def _get_base_queryset_filters(self) -> Dict[str, Any]:
+ """Get base filters for querysets (user, target_object, etc.)."""
+ return {
+ 'user': self.user,
+ }
+
+ def _filter_by_status_names(self, queryset, status_names: List[str], status_field: str = 'status__name'):
+ """Helper to filter queryset by status names."""
+ filter_kwargs = {f'{status_field}__in': status_names}
+ return queryset.filter(**filter_kwargs)
+
+ def _get_latest_by_field(self, queryset, field: str = 'created'):
+ """Get the latest object from queryset by specified field."""
+ return queryset.order_by(f'-{field}').first()
+
+
+def create_tracking_steps(step_definitions: List[Dict[str, Any]],
+ current_status: Enum,
+ status_progression: Dict[Enum, int]) -> List[TrackingStep]:
+ """
+ Helper function to create tracking steps based on definitions and current status.
+
+ Args:
+ step_definitions: List of step definitions with 'label' and 'status_values'
+ current_status: Current status enum
+ status_progression: Maps status enums to their progression order
+
+ Returns:
+ List of TrackingStep objects
+ """
+ steps = []
+ current_order = status_progression.get(current_status, 0)
+
+ for i, step_def in enumerate(step_definitions):
+ step_order = i + 1
+ completed = step_order < current_order
+ current = step_order == current_order
+
+ steps.append(TrackingStep(
+ label=step_def['label'],
+ completed=completed,
+ current=current
+ ))
+
+ return steps
\ No newline at end of file
diff --git a/coldfront/tests/README.md b/coldfront/tests/README.md
new file mode 100644
index 0000000000..af647b0691
--- /dev/null
+++ b/coldfront/tests/README.md
@@ -0,0 +1,131 @@
+# ColdFront Testing Protocol
+
+This directory contains comprehensive tests for the ColdFront application using pytest and django-pytest.
+
+## Directory Structure
+
+```
+tests/
+├── conftest.py # Global fixtures and configuration
+├── README.md # This file
+└── {app_name}/ # Tests for each app (utils, project, etc.)
+ ├── conftest.py # App-specific fixtures
+ ├── unit/ # Unit tests - test individual functions/methods
+ │ ├── __init__.py
+ │ └── test_*.py
+ ├── whitebox/ # Whitebox tests - test internal behavior/integration
+ │ ├── __init__.py
+ │ └── test_*.py
+ └── blackbox/ # Blackbox tests - test external behavior/end-to-end
+ ├── __init__.py
+ └── test_*.py
+```
+
+## Test Types
+
+### Unit Tests (`unit/`)
+- Test individual functions, methods, and classes in isolation
+- Use mocks for external dependencies
+- Fast execution, no database interactions unless necessary
+- Focus on business logic and edge cases
+
+### Whitebox Tests (`whitebox/`)
+- Test internal behavior and integration between components
+- May use database and external services
+- Test data flow, error handling, and component interactions
+- Knowledge of internal implementation is used
+
+### Blackbox Tests (`blackbox/`)
+- Test external behavior from user's perspective
+- Full end-to-end workflows
+- Test through HTTP requests, UI interactions
+- No knowledge of internal implementation
+
+## Running Tests
+
+```bash
+# Run all tests
+pytest coldfront/tests/
+
+# Run specific app tests
+pytest coldfront/tests/utils/
+
+# Run specific test type
+pytest coldfront/tests/utils/unit/
+pytest coldfront/tests/utils/whitebox/
+pytest coldfront/tests/utils/blackbox/
+
+# Run with coverage
+pytest --cov=coldfront coldfront/tests/
+
+# Run specific test file
+pytest coldfront/tests/utils/unit/test_tracking_base.py
+
+# Run with specific markers
+pytest -m "unit" coldfront/tests/
+pytest -m "slow" coldfront/tests/
+```
+
+## Test Markers
+
+Use pytest markers to categorize tests:
+
+```python
+@pytest.mark.unit # Unit test
+@pytest.mark.whitebox # Whitebox test
+@pytest.mark.blackbox # Blackbox test
+@pytest.mark.slow # Slow running test
+@pytest.mark.django_db # Requires database
+@pytest.mark.parametrize # Parameterized test
+```
+
+## Best Practices
+
+1. **Fixtures**: Use fixtures in `conftest.py` for common test data and setup
+2. **Parameterization**: Use `@pytest.mark.parametrize` for testing multiple scenarios
+3. **Database**: Use `@pytest.mark.django_db` sparingly, prefer mocking when possible
+4. **Isolation**: Each test should be independent and not rely on other tests
+5. **Naming**: Use descriptive test names that explain the scenario being tested
+6. **Arrange-Act-Assert**: Structure tests clearly with setup, execution, and verification
+7. **Edge Cases**: Test both happy path and error conditions
+
+## Example Test Structure
+
+```python
+import pytest
+from unittest.mock import Mock, patch
+
+class TestYourClass:
+ """Tests for YourClass functionality."""
+
+ @pytest.mark.unit
+ @pytest.mark.parametrize("input_value,expected", [
+ ("value1", "result1"),
+ ("value2", "result2"),
+ ])
+ def test_method_with_different_inputs(self, input_value, expected):
+ # Arrange
+ instance = YourClass()
+
+ # Act
+ result = instance.method(input_value)
+
+ # Assert
+ assert result == expected
+
+ @pytest.mark.django_db
+ @pytest.mark.whitebox
+ def test_method_with_database(self, user_factory):
+ # Test with database interactions
+ pass
+```
+
+## Adding New Tests
+
+When adding tests for a new app:
+
+1. Create `tests/{app_name}/` directory
+2. Add `conftest.py` with app-specific fixtures
+3. Create `unit/`, `whitebox/`, `blackbox/` subdirectories
+4. Add `__init__.py` files to make them Python packages
+5. Write tests following the naming convention `test_*.py`
\ No newline at end of file
diff --git a/coldfront/tests/__init__.py b/coldfront/tests/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/coldfront/tests/conftest.py b/coldfront/tests/conftest.py
new file mode 100644
index 0000000000..0d81432487
--- /dev/null
+++ b/coldfront/tests/conftest.py
@@ -0,0 +1,325 @@
+"""
+Global pytest configuration and fixtures for ColdFront tests.
+"""
+import os
+import sys
+import django
+import pytest
+
+# Add the parent directory to the Python path
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
+
+# Setup Django
+os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'coldfront.config.test_settings')
+django.setup()
+
+from django.contrib.auth.models import User, Group
+from django.test import Client
+from unittest.mock import Mock, patch
+
+import factory
+from factory.django import DjangoModelFactory
+
+# Import required models for fixtures
+from coldfront.core.project.models import (
+ Project, ProjectUser, ProjectStatusChoice,
+ ProjectUserStatusChoice, ProjectUserRoleChoice,
+ ProjectUserJoinRequest
+)
+from coldfront.core.allocation.models import (
+ Allocation, AllocationStatusChoice, AllocationAttributeType,
+ ClusterAccessRequest, ClusterAccessRequestStatusChoice
+)
+from coldfront.core.resource.models import Resource, ResourceType
+from coldfront.core.field_of_science.models import FieldOfScience
+
+
+# ============================================================================
+# Database Setup Fixtures
+# ============================================================================
+
+@pytest.fixture(scope='session')
+def django_db_setup(django_db_setup, django_db_blocker):
+ """Set up test database with required data."""
+ with django_db_blocker.unblock():
+ # Create required groups
+ Group.objects.get_or_create(name='staff_group')
+
+ # Create required status choices
+ ProjectStatusChoice.objects.get_or_create(name='Active')
+ ProjectStatusChoice.objects.get_or_create(name='New')
+ ProjectStatusChoice.objects.get_or_create(name='Archived')
+
+ ProjectUserStatusChoice.objects.get_or_create(name='Active')
+ ProjectUserStatusChoice.objects.get_or_create(name='Pending - Add')
+ ProjectUserStatusChoice.objects.get_or_create(name='Denied')
+ ProjectUserStatusChoice.objects.get_or_create(name='Removed')
+
+ ProjectUserRoleChoice.objects.get_or_create(name='Principal Investigator')
+ ProjectUserRoleChoice.objects.get_or_create(name='Manager')
+ ProjectUserRoleChoice.objects.get_or_create(name='User')
+
+ # Create allocation status choices
+ AllocationStatusChoice.objects.get_or_create(name='Active')
+ AllocationStatusChoice.objects.get_or_create(name='New')
+ AllocationStatusChoice.objects.get_or_create(name='Renewal Requested')
+ AllocationStatusChoice.objects.get_or_create(name='Denied')
+ AllocationStatusChoice.objects.get_or_create(name='Expired')
+
+ # Create cluster access request status choices
+ ClusterAccessRequestStatusChoice.objects.get_or_create(name='Pending - Add')
+ ClusterAccessRequestStatusChoice.objects.get_or_create(name='Processing')
+ ClusterAccessRequestStatusChoice.objects.get_or_create(name='Complete')
+ ClusterAccessRequestStatusChoice.objects.get_or_create(name='Denied')
+
+ # Create allocation user status choices
+ from coldfront.core.allocation.models import AllocationUserStatusChoice
+ AllocationUserStatusChoice.objects.get_or_create(name='Active')
+ AllocationUserStatusChoice.objects.get_or_create(name='Pending - Add')
+ AllocationUserStatusChoice.objects.get_or_create(name='Denied')
+ AllocationUserStatusChoice.objects.get_or_create(name='Removed')
+
+ # Create a default field of science
+ FieldOfScience.objects.get_or_create(
+ description='Other',
+ defaults={'is_selectable': True}
+ )
+
+ # Create a default resource type and resource
+ resource_type, _ = ResourceType.objects.get_or_create(name='Cluster')
+ ResourceType.objects.get_or_create(name='Storage') # Add Storage resource type
+ Resource.objects.get_or_create(
+ name='Compute Cluster', # Changed to include 'Compute' in the name
+ defaults={
+ 'resource_type': resource_type,
+ 'description': 'Test compute cluster resource',
+ 'is_available': True,
+ 'is_public': True,
+ 'is_allocatable': True,
+ 'requires_payment': False
+ }
+ )
+
+
+# ============================================================================
+# Test Markers
+# ============================================================================
+
+def pytest_configure(config):
+ """Configure custom pytest markers."""
+ config.addinivalue_line("markers", "unit: Unit tests")
+ config.addinivalue_line("markers", "whitebox: Whitebox integration tests")
+ config.addinivalue_line("markers", "blackbox: Blackbox end-to-end tests")
+ config.addinivalue_line("markers", "slow: Slow running tests")
+
+
+# ============================================================================
+# Factory Classes
+# ============================================================================
+
+class UserFactory(DjangoModelFactory):
+ """Factory for creating User instances."""
+
+ class Meta:
+ model = User
+ django_get_or_create = ('username',)
+
+ username = factory.Sequence(lambda n: f'testuser{n}')
+ first_name = factory.Faker('first_name')
+ last_name = factory.Faker('last_name')
+ email = factory.LazyAttribute(lambda obj: f'{obj.username}@example.com')
+ is_active = True
+ is_staff = False
+ is_superuser = False
+
+
+class ProjectFactory(DjangoModelFactory):
+ """Factory for creating Project instances."""
+
+ class Meta:
+ model = Project
+ django_get_or_create = ('title',)
+
+ title = factory.Sequence(lambda n: f'Test Project {n}')
+ description = factory.Faker('text')
+ status = factory.LazyFunction(
+ lambda: ProjectStatusChoice.objects.get_or_create(name='Active')[0]
+ )
+ field_of_science = factory.LazyFunction(
+ lambda: FieldOfScience.objects.get_or_create(description='Other')[0]
+ )
+
+
+class AllocationFactory(DjangoModelFactory):
+ """Factory for creating Allocation instances."""
+
+ class Meta:
+ model = Allocation
+
+ status = factory.LazyFunction(
+ lambda: AllocationStatusChoice.objects.get_or_create(name='Active')[0]
+ )
+ justification = factory.Faker('text')
+ description = factory.Faker('text')
+ is_locked = False
+
+
+# ============================================================================
+# Basic Fixtures
+# ============================================================================
+
+@pytest.fixture
+def user_factory():
+ """Provide UserFactory for creating test users."""
+ return UserFactory
+
+
+@pytest.fixture
+def user(db):
+ """Create a standard test user."""
+ return UserFactory()
+
+
+@pytest.fixture
+def staff_user(db):
+ """Create a staff user."""
+ return UserFactory(is_staff=True)
+
+
+@pytest.fixture
+def superuser(db):
+ """Create a superuser."""
+ return UserFactory(is_superuser=True, is_staff=True)
+
+
+@pytest.fixture
+def client():
+ """Provide Django test client."""
+ return Client()
+
+
+@pytest.fixture
+def authenticated_client(client, user):
+ """Provide authenticated Django test client."""
+ client.force_login(user)
+ return client
+
+
+@pytest.fixture
+def staff_client(client, staff_user):
+ """Provide staff authenticated Django test client."""
+ client.force_login(staff_user)
+ return client
+
+
+# ============================================================================
+# Mock Fixtures
+# ============================================================================
+
+@pytest.fixture
+def mock_logger():
+ """Provide a mock logger for testing logging calls."""
+ with patch('logging.getLogger') as mock_get_logger:
+ mock_logger_instance = Mock()
+ mock_get_logger.return_value = mock_logger_instance
+ yield mock_logger_instance
+
+
+# ============================================================================
+# Test Data Fixtures
+# ============================================================================
+
+@pytest.fixture
+def project_factory():
+ """Provide ProjectFactory for creating test projects."""
+ return ProjectFactory
+
+
+@pytest.fixture
+def allocation_factory():
+ """Provide AllocationFactory for creating test allocations."""
+ return AllocationFactory
+
+
+@pytest.fixture
+def project(db):
+ """Create a test project."""
+ return ProjectFactory()
+
+
+@pytest.fixture
+def allocation(db, project):
+ """Create a test allocation."""
+ resource = Resource.objects.get_or_create(
+ name='Compute Cluster', # Changed to match the resource created in django_db_setup
+ defaults={
+ 'resource_type': ResourceType.objects.get_or_create(name='Cluster')[0],
+ 'description': 'Test compute cluster resource',
+ 'is_available': True,
+ 'is_public': True,
+ 'is_allocatable': True,
+ 'requires_payment': False
+ }
+ )[0]
+ allocation = AllocationFactory(project=project)
+ allocation.resources.add(resource)
+ return allocation
+
+
+@pytest.fixture
+def project_user(db, project, user):
+ """Create a ProjectUser relationship."""
+ status = ProjectUserStatusChoice.objects.get_or_create(name='Active')[0]
+ role = ProjectUserRoleChoice.objects.get_or_create(name='User')[0]
+ return ProjectUser.objects.create(
+ project=project,
+ user=user,
+ status=status,
+ role=role
+ )
+
+
+@pytest.fixture
+def pi_user(db):
+ """Create a PI user."""
+ return UserFactory(username='pi_user', first_name='Principal', last_name='Investigator')
+
+
+@pytest.fixture
+def project_with_pi(db, pi_user):
+ """Create a project with a PI."""
+ project = ProjectFactory()
+ status = ProjectUserStatusChoice.objects.get_or_create(name='Active')[0]
+ role = ProjectUserRoleChoice.objects.get_or_create(name='Principal Investigator')[0]
+ ProjectUser.objects.create(
+ project=project,
+ user=pi_user,
+ status=status,
+ role=role
+ )
+ return project
+
+
+@pytest.fixture
+def sample_tracking_data():
+ """Provide sample tracking data for tests."""
+ return {
+ 'status': 'PENDING',
+ 'message': 'Request is pending approval',
+ 'details': {
+ 'request_date': '2025-01-01',
+ 'reason': 'Test request'
+ },
+ 'can_view': True
+ }
+
+
+@pytest.fixture
+def sample_tracking_steps():
+ """Provide sample tracking steps for tests."""
+ return [
+ {'label': 'Request Sent', 'completed': True, 'current': False},
+ {'label': 'Under Review', 'completed': False, 'current': True},
+ {'label': 'Approved', 'completed': False, 'current': False},
+ {'label': 'Complete', 'completed': False, 'current': False},
+ ]
\ No newline at end of file
diff --git a/coldfront/tests/utils/__init__.py b/coldfront/tests/utils/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/coldfront/tests/utils/blackbox/__init__.py b/coldfront/tests/utils/blackbox/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/coldfront/tests/utils/conftest.py b/coldfront/tests/utils/conftest.py
new file mode 100644
index 0000000000..cb01c6dcdd
--- /dev/null
+++ b/coldfront/tests/utils/conftest.py
@@ -0,0 +1,173 @@
+"""
+Fixtures for utils tests - whitebox and unit tests.
+"""
+import pytest
+from datetime import datetime
+import factory
+from factory.django import DjangoModelFactory
+
+from django.contrib.auth.models import User
+from coldfront.core.project.models import (
+ Project, ProjectUser, ProjectStatusChoice,
+ ProjectUserStatusChoice, ProjectUserRoleChoice,
+ ProjectUserJoinRequest
+)
+from coldfront.core.allocation.models import (
+ Allocation, AllocationUser, AllocationStatusChoice,
+ ClusterAccessRequest, ClusterAccessRequestStatusChoice
+)
+from coldfront.core.resource.models import Resource, ResourceType
+from coldfront.core.field_of_science.models import FieldOfScience
+
+
+# ============================================================================
+# Factory Classes
+# ============================================================================
+
+class ProjectUserFactory(DjangoModelFactory):
+ """Factory for creating ProjectUser instances."""
+
+ class Meta:
+ model = ProjectUser
+
+ status = factory.LazyFunction(
+ lambda: ProjectUserStatusChoice.objects.get_or_create(name='Active')[0]
+ )
+ role = factory.LazyFunction(
+ lambda: ProjectUserRoleChoice.objects.get_or_create(name='User')[0]
+ )
+
+
+class AllocationUserFactory(DjangoModelFactory):
+ """Factory for creating AllocationUser instances."""
+
+ class Meta:
+ model = AllocationUser
+
+ # AllocationUser doesn't have a status field - removing it
+
+
+class ProjectUserJoinRequestFactory(DjangoModelFactory):
+ """Factory for creating ProjectUserJoinRequest instances."""
+
+ class Meta:
+ model = ProjectUserJoinRequest
+
+
+class ClusterAccessRequestFactory(DjangoModelFactory):
+ """Factory for creating ClusterAccessRequest instances."""
+
+ class Meta:
+ model = ClusterAccessRequest
+
+ status = factory.LazyFunction(
+ lambda: ClusterAccessRequestStatusChoice.objects.get_or_create(name='Pending - Add')[0]
+ )
+
+
+# ============================================================================
+# Fixtures
+# ============================================================================
+
+@pytest.fixture
+def project_user_factory():
+ """Provide ProjectUserFactory for creating test project users."""
+ return ProjectUserFactory
+
+
+@pytest.fixture
+def allocation_user_factory():
+ """Provide AllocationUserFactory for creating test allocation users."""
+ return AllocationUserFactory
+
+
+@pytest.fixture
+def join_request_factory():
+ """Provide ProjectUserJoinRequestFactory for creating test join requests."""
+ return ProjectUserJoinRequestFactory
+
+
+@pytest.fixture
+def cluster_access_request_factory():
+ """Provide ClusterAccessRequestFactory for creating test cluster access requests."""
+ return ClusterAccessRequestFactory
+
+
+@pytest.fixture
+def active_project_user(db, user, project):
+ """Create an active ProjectUser without a join request."""
+ status = ProjectUserStatusChoice.objects.get_or_create(name='Active')[0]
+ role = ProjectUserRoleChoice.objects.get_or_create(name='User')[0]
+ return ProjectUser.objects.create(
+ project=project,
+ user=user,
+ status=status,
+ role=role
+ )
+
+
+@pytest.fixture
+def pending_project_user(db, user, project):
+ """Create a pending ProjectUser."""
+ status = ProjectUserStatusChoice.objects.get_or_create(name='Pending - Add')[0]
+ role = ProjectUserRoleChoice.objects.get_or_create(name='User')[0]
+ return ProjectUser.objects.create(
+ project=project,
+ user=user,
+ status=status,
+ role=role
+ )
+
+
+@pytest.fixture
+def join_request(db, pending_project_user):
+ """Create a ProjectUserJoinRequest."""
+ return ProjectUserJoinRequest.objects.create(
+ project_user=pending_project_user,
+ reason='Test join request'
+ )
+
+
+@pytest.fixture
+def allocation_user(db, user, allocation):
+ """Create an AllocationUser."""
+ from coldfront.core.allocation.models import AllocationUserStatusChoice
+ status = AllocationUserStatusChoice.objects.get_or_create(name='Active')[0]
+ return AllocationUser.objects.create(
+ allocation=allocation,
+ user=user,
+ status=status
+ )
+
+
+@pytest.fixture
+def cluster_access_request(db, allocation_user):
+ """Create a ClusterAccessRequest."""
+ status = ClusterAccessRequestStatusChoice.objects.get_or_create(name='Pending - Add')[0]
+ return ClusterAccessRequest.objects.create(
+ allocation_user=allocation_user,
+ status=status
+ )
+
+
+@pytest.fixture
+def mock_join_request_with_dates(db, pending_project_user):
+ """Create a mock join request with specific dates."""
+ return ProjectUserJoinRequest.objects.create(
+ project_user=pending_project_user,
+ reason='Test join request with dates',
+ created=datetime(2025, 1, 1, 10, 0, 0),
+ modified=datetime(2025, 1, 2, 15, 30, 0)
+ )
+
+
+@pytest.fixture
+def mock_cluster_request_with_dates(db, allocation_user):
+ """Create a mock cluster request with specific dates."""
+ status = ClusterAccessRequestStatusChoice.objects.get_or_create(name='Processing')[0]
+ return ClusterAccessRequest.objects.create(
+ allocation_user=allocation_user,
+ status=status,
+ created=datetime(2025, 1, 3, 9, 0, 0),
+ modified=datetime(2025, 1, 4, 14, 15, 0)
+ )
\ No newline at end of file
diff --git a/coldfront/tests/utils/unit/__init__.py b/coldfront/tests/utils/unit/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/coldfront/tests/utils/unit/test_unit.py b/coldfront/tests/utils/unit/test_unit.py
new file mode 100644
index 0000000000..a38d7a913f
--- /dev/null
+++ b/coldfront/tests/utils/unit/test_unit.py
@@ -0,0 +1,456 @@
+"""
+Whitebox tests for tracking framework integration.
+Tests internal behavior, data loading, and component interactions.
+"""
+import pytest
+from unittest.mock import Mock, patch
+from datetime import datetime
+
+from coldfront.core.project.utils_.join_request_tracker import (
+ JoinRequestTracker, JoinRequestStatus
+)
+from coldfront.core.project.models import (
+ ProjectUserStatusChoice, ProjectUserRoleChoice
+)
+from coldfront.core.allocation.models import (
+ ClusterAccessRequest, ClusterAccessRequestStatusChoice
+)
+
+
+# ============================================================================
+# JoinRequestTracker Data Loading Tests
+# ============================================================================
+
+class TestJoinRequestTrackerDataLoading:
+ """Tests for JoinRequestTracker data loading behavior."""
+
+ @pytest.mark.django_db
+ @pytest.mark.whitebox
+ def test_load_data_no_project_user(self, user, project):
+ """Test data loading when no ProjectUser exists."""
+ tracker = JoinRequestTracker(user, project)
+
+ tracker._load_data()
+
+ assert tracker._project_user is None
+ assert tracker._join_request is None
+ assert tracker._cluster_access_request is None
+
+ @pytest.mark.django_db
+ @pytest.mark.whitebox
+ def test_load_data_with_project_user_no_join_request(self, active_project_user):
+ """Test data loading with ProjectUser but no join request."""
+ tracker = JoinRequestTracker(active_project_user.user, active_project_user.project)
+
+ tracker._load_data()
+
+ assert tracker._project_user == active_project_user
+ assert tracker._join_request is None
+
+ @pytest.mark.django_db
+ @pytest.mark.whitebox
+ def test_load_data_with_join_request(self, join_request):
+ """Test data loading with join request."""
+ project_user = join_request.project_user
+ tracker = JoinRequestTracker(project_user.user, project_user.project)
+
+ tracker._load_data()
+
+ assert tracker._project_user == project_user
+ assert tracker._join_request == join_request
+
+ @pytest.mark.django_db
+ @pytest.mark.whitebox
+ def test_load_data_with_allocation_and_cluster_request(
+ self, allocation_user, project_user_factory
+ ):
+ """Test data loading with allocation and cluster access request."""
+ # Create project user for the same user and project
+ project_user = project_user_factory(
+ user=allocation_user.user,
+ project=allocation_user.allocation.project,
+ status=ProjectUserStatusChoice.objects.get_or_create(name='Active')[0]
+ )
+
+ # Create cluster access request status
+ cluster_status = ClusterAccessRequestStatusChoice.objects.get_or_create(
+ name='Pending - Add'
+ )[0]
+
+ # Create cluster access request
+ cluster_request = ClusterAccessRequest.objects.create(
+ allocation_user=allocation_user,
+ status=cluster_status
+ )
+
+ tracker = JoinRequestTracker(project_user.user, project_user.project)
+
+ tracker._load_data()
+
+ assert tracker._project_user == project_user
+ assert tracker._cluster_access_request == cluster_request
+
+ @pytest.mark.django_db
+ @pytest.mark.whitebox
+ def test_load_data_multiple_join_requests_gets_latest(self, pending_project_user):
+ """Test that the latest join request is loaded when multiple exist."""
+ from coldfront.core.project.models import ProjectUserJoinRequest
+
+ # Create multiple join requests
+ older_request = ProjectUserJoinRequest.objects.create(
+ project_user=pending_project_user,
+ reason="Older request",
+ created=datetime(2024, 1, 1)
+ )
+ newer_request = ProjectUserJoinRequest.objects.create(
+ project_user=pending_project_user,
+ reason="Newer request",
+ created=datetime(2024, 2, 1)
+ )
+
+ tracker = JoinRequestTracker(pending_project_user.user, pending_project_user.project)
+
+ tracker._load_data()
+
+ assert tracker._join_request == newer_request
+
+
+# ============================================================================
+# JoinRequestTracker Status Determination Tests
+# ============================================================================
+
+class TestJoinRequestTrackerStatusDetermination:
+ """Tests for status determination logic."""
+
+ @pytest.mark.django_db
+ @pytest.mark.whitebox
+ @pytest.mark.parametrize("project_user_status,join_request_exists,expected_status", [
+ (None, False, JoinRequestStatus.NO_REQUEST),
+ ('Pending - Add', True, JoinRequestStatus.REQUEST_SENT),
+ ('Denied', True, JoinRequestStatus.NO_REQUEST),
+ ('Active', False, JoinRequestStatus.DIRECTLY_ADDED),
+ ])
+ def test_determine_status_basic_scenarios(
+ self, user, project, project_user_status, join_request_exists, expected_status
+ ):
+ """Test basic status determination scenarios."""
+ tracker = JoinRequestTracker(user, project)
+
+ # Set up the tracker's internal state
+ if project_user_status:
+ mock_project_user = Mock()
+ mock_project_user.status.name = project_user_status
+ tracker._project_user = mock_project_user
+ else:
+ tracker._project_user = None
+
+ tracker._join_request = Mock() if join_request_exists else None
+ tracker._cluster_access_request = None
+
+ status = tracker._determine_status()
+
+ assert status == expected_status
+
+ @pytest.mark.django_db
+ @pytest.mark.whitebox
+ @pytest.mark.parametrize("cluster_status,expected_status", [
+ ('Pending - Add', JoinRequestStatus.PI_APPROVED_QUEUED),
+ ('Processing', JoinRequestStatus.ADMIN_PROCESSING),
+ ('Complete', JoinRequestStatus.ACCESS_GRANTED),
+ ('Denied', JoinRequestStatus.PI_APPROVED_QUEUED),
+ ])
+ def test_determine_status_with_cluster_access(
+ self, user, project, cluster_status, expected_status
+ ):
+ """Test status determination with cluster access requests."""
+ tracker = JoinRequestTracker(user, project)
+
+ # Set up active project user with join request
+ mock_project_user = Mock()
+ mock_project_user.status.name = 'Active'
+ tracker._project_user = mock_project_user
+ tracker._join_request = Mock()
+
+ # Set up cluster access request
+ mock_cluster_request = Mock()
+ mock_cluster_request.status.name = cluster_status
+ tracker._cluster_access_request = mock_cluster_request
+
+ status = tracker._determine_status()
+
+ assert status == expected_status
+
+ @pytest.mark.django_db
+ @pytest.mark.whitebox
+ def test_determine_status_active_user_no_cluster_request(self, user, project):
+ """Test status for active user without cluster access request."""
+ tracker = JoinRequestTracker(user, project)
+
+ mock_project_user = Mock()
+ mock_project_user.status.name = 'Active'
+ tracker._project_user = mock_project_user
+ tracker._join_request = Mock()
+ tracker._cluster_access_request = None
+
+ status = tracker._determine_status()
+
+ assert status == JoinRequestStatus.PI_APPROVED_QUEUED
+
+
+# ============================================================================
+# JoinRequestTracker Message and Details Tests
+# ============================================================================
+
+class TestJoinRequestTrackerMessagesAndDetails:
+ """Tests for status messages and details."""
+
+ @pytest.mark.whitebox
+ @pytest.mark.parametrize("status,expected_message_contains", [
+ (JoinRequestStatus.REQUEST_SENT, "awaiting approval"),
+ (JoinRequestStatus.PI_APPROVED_QUEUED, "queued for cluster access"),
+ (JoinRequestStatus.ADMIN_PROCESSING, "processing your cluster access"),
+ (JoinRequestStatus.ACCESS_GRANTED, "granted access"),
+ (JoinRequestStatus.DIRECTLY_ADDED, "directly added"),
+ (JoinRequestStatus.ERROR, "Unable to determine"),
+ (JoinRequestStatus.NO_REQUEST, "No active request"),
+ ])
+ def test_get_status_message(self, user, project, status, expected_message_contains):
+ """Test status message generation."""
+ tracker = JoinRequestTracker(user, project)
+
+ message = tracker._get_status_message(status)
+
+ assert expected_message_contains.lower() in message.lower()
+
+ @pytest.mark.whitebox
+ def test_get_status_details_with_all_data(self, user, project):
+ """Test status details with all available data."""
+ tracker = JoinRequestTracker(user, project)
+
+ # Mock all data components
+ mock_join_request = Mock()
+ mock_join_request.created = datetime(2024, 1, 1)
+ mock_join_request.reason = "Test reason"
+ tracker._join_request = mock_join_request
+
+ mock_project_user = Mock()
+ mock_project_user.status.name = 'Active'
+ mock_project_user.role.name = 'User'
+ tracker._project_user = mock_project_user
+
+ mock_cluster_request = Mock()
+ mock_cluster_request.status.name = 'Complete'
+ mock_cluster_request.request_time = datetime(2024, 1, 2)
+ tracker._cluster_access_request = mock_cluster_request
+
+ details = tracker._get_status_details(JoinRequestStatus.ACCESS_GRANTED)
+
+ assert details['request_date'] == datetime(2024, 1, 1)
+ assert details['reason'] == "Test reason"
+ assert details['project_user_status'] == 'Active'
+ assert details['role'] == 'User'
+ assert details['cluster_request_status'] == 'Complete'
+ assert details['cluster_request_date'] == datetime(2024, 1, 2)
+
+ @pytest.mark.whitebox
+ def test_get_status_details_no_data(self, user, project):
+ """Test status details with no data."""
+ tracker = JoinRequestTracker(user, project)
+
+ # No data set
+ tracker._join_request = None
+ tracker._project_user = None
+ tracker._cluster_access_request = None
+
+ details = tracker._get_status_details(JoinRequestStatus.NO_REQUEST)
+
+ assert details is None
+
+
+# ============================================================================
+# JoinRequestTracker Progress Steps Tests
+# ============================================================================
+
+class TestJoinRequestTrackerProgressSteps:
+ """Tests for progress step generation."""
+
+ @pytest.mark.whitebox
+ def test_get_progress_steps_directly_added(self, user, project):
+ """Test progress steps for directly added users."""
+ tracker = JoinRequestTracker(user, project)
+
+ # Mock cluster access request as complete
+ mock_cluster_request = Mock()
+ mock_cluster_request.status.name = 'Complete'
+ tracker._cluster_access_request = mock_cluster_request
+
+ steps = tracker._get_progress_steps(JoinRequestStatus.DIRECTLY_ADDED)
+
+ assert len(steps) == 3
+ assert steps[0].label == 'Directly Added by PI'
+ assert steps[0].completed is True
+ assert steps[0].current is False
+
+ assert steps[1].label == 'Admin Processing'
+ assert steps[1].completed is True
+ assert steps[1].current is False
+
+ assert steps[2].label == 'Access Granted'
+ assert steps[2].completed is True
+ assert steps[2].current is False
+
+ @pytest.mark.whitebox
+ @pytest.mark.parametrize("current_status,expected_step_states", [
+ (JoinRequestStatus.REQUEST_SENT, [
+ ('Request Sent', True, True), # completed=True because status != 'No Request'
+ ('PI Approved & Queued', False, False),
+ ('Admin Processing', False, False),
+ ('Access Granted', False, False)
+ ]),
+ (JoinRequestStatus.PI_APPROVED_QUEUED, [
+ ('Request Sent', True, False),
+ ('PI Approved & Queued', True, True), # completed=True because in approved states
+ ('Admin Processing', False, False),
+ ('Access Granted', False, False)
+ ]),
+ (JoinRequestStatus.ADMIN_PROCESSING, [
+ ('Request Sent', True, False),
+ ('PI Approved & Queued', True, False), # completed=True because in processing states
+ ('Admin Processing', True, True), # completed=True because in processing states
+ ('Access Granted', False, False)
+ ]),
+ (JoinRequestStatus.ACCESS_GRANTED, [
+ ('Request Sent', True, False),
+ ('PI Approved & Queued', True, False),
+ ('Admin Processing', True, False),
+ ('Access Granted', True, False)
+ ])
+ ])
+ def test_get_progress_steps_standard_flow(
+ self, user, project, current_status, expected_step_states
+ ):
+ """Test progress steps for standard join request flow."""
+ tracker = JoinRequestTracker(user, project)
+
+ steps = tracker._get_progress_steps(current_status)
+
+ assert len(steps) == 4
+ for i, (label, completed, current) in enumerate(expected_step_states):
+ assert steps[i].label == label
+ assert steps[i].completed == completed
+ assert steps[i].current == current
+
+
+# ============================================================================
+# JoinRequestTracker Permission Tests
+# ============================================================================
+
+class TestJoinRequestTrackerPermissions:
+ """Tests for view permission logic."""
+
+ @pytest.mark.whitebox
+ def test_can_view_status_with_project_user(self, user, project):
+ """Test can_view_status returns True when project user exists."""
+ tracker = JoinRequestTracker(user, project)
+ tracker._project_user = Mock() # Any project user
+
+ can_view = tracker._can_view_status()
+
+ assert can_view is True
+
+ @pytest.mark.whitebox
+ def test_can_view_status_no_project_user(self, user, project):
+ """Test can_view_status returns False when no project user exists."""
+ tracker = JoinRequestTracker(user, project)
+ tracker._project_user = None
+
+ can_view = tracker._can_view_status()
+
+ assert can_view is False
+
+
+# ============================================================================
+# JoinRequestTracker Error Handling Tests
+# ============================================================================
+
+class TestJoinRequestTrackerErrorHandling:
+ """Tests for error handling behavior."""
+
+ @pytest.mark.whitebox
+ def test_get_error_status(self, user, project):
+ """Test get_error_status returns correct enum."""
+ tracker = JoinRequestTracker(user, project)
+
+ error_status = tracker.get_error_status()
+
+ assert error_status == JoinRequestStatus.ERROR
+
+ @pytest.mark.whitebox
+ def test_get_default_error_message(self, user, project):
+ """Test default error message."""
+ tracker = JoinRequestTracker(user, project)
+
+ error_message = tracker.get_default_error_message()
+
+ assert "join status" in error_message.lower()
+ assert "contact support" in error_message.lower()
+
+ @pytest.mark.django_db
+ @pytest.mark.whitebox
+ @patch('coldfront.core.utils.tracking.base.logger')
+ def test_get_status_handles_load_data_exception(self, mock_logger, user, project):
+ """Test that get_status handles exceptions in _load_data."""
+ tracker = JoinRequestTracker(user, project)
+
+ # Make _load_data raise an exception
+ def raise_error():
+ raise ValueError("Database error")
+
+ tracker._load_data = raise_error
+
+ result = tracker.get_status()
+
+ assert result['status'] == JoinRequestStatus.ERROR
+ assert result['error'] is not None
+ mock_logger.exception.assert_called_once()
+
+
+# ============================================================================
+# JoinRequestTracker Backward Compatibility Tests
+# ============================================================================
+
+class TestJoinRequestTrackerBackwardCompatibility:
+ """Tests for backward compatibility with existing API."""
+
+ @pytest.mark.django_db
+ @pytest.mark.whitebox
+ def test_get_status_returns_dict(self, user, project):
+ """Test that get_status returns a dict for backward compatibility."""
+ tracker = JoinRequestTracker(user, project)
+
+ result = tracker.get_status()
+
+ assert isinstance(result, dict)
+ assert 'status' in result
+ assert 'message' in result
+ assert 'details' in result
+ assert 'error' in result
+ assert 'steps' in result
+ assert 'can_view' in result
+
+ @pytest.mark.django_db
+ @pytest.mark.whitebox
+ def test_get_status_dict_structure(self, active_project_user):
+ """Test the structure of the returned status dict."""
+ tracker = JoinRequestTracker(active_project_user.user, active_project_user.project)
+
+ result = tracker.get_status()
+
+ # Test that steps are properly converted to dicts
+ assert isinstance(result['steps'], list)
+ if result['steps']:
+ step = result['steps'][0]
+ assert isinstance(step, dict)
+ assert 'label' in step
+ assert 'completed' in step
+ assert 'current' in step
\ No newline at end of file
diff --git a/coldfront/tests/utils/whitebox/__init__.py b/coldfront/tests/utils/whitebox/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/coldfront/tests/utils/whitebox/test_tracking.py b/coldfront/tests/utils/whitebox/test_tracking.py
new file mode 100644
index 0000000000..a38d7a913f
--- /dev/null
+++ b/coldfront/tests/utils/whitebox/test_tracking.py
@@ -0,0 +1,456 @@
+"""
+Whitebox tests for tracking framework integration.
+Tests internal behavior, data loading, and component interactions.
+"""
+import pytest
+from unittest.mock import Mock, patch
+from datetime import datetime
+
+from coldfront.core.project.utils_.join_request_tracker import (
+ JoinRequestTracker, JoinRequestStatus
+)
+from coldfront.core.project.models import (
+ ProjectUserStatusChoice, ProjectUserRoleChoice
+)
+from coldfront.core.allocation.models import (
+ ClusterAccessRequest, ClusterAccessRequestStatusChoice
+)
+
+
+# ============================================================================
+# JoinRequestTracker Data Loading Tests
+# ============================================================================
+
+class TestJoinRequestTrackerDataLoading:
+ """Tests for JoinRequestTracker data loading behavior."""
+
+ @pytest.mark.django_db
+ @pytest.mark.whitebox
+ def test_load_data_no_project_user(self, user, project):
+ """Test data loading when no ProjectUser exists."""
+ tracker = JoinRequestTracker(user, project)
+
+ tracker._load_data()
+
+ assert tracker._project_user is None
+ assert tracker._join_request is None
+ assert tracker._cluster_access_request is None
+
+ @pytest.mark.django_db
+ @pytest.mark.whitebox
+ def test_load_data_with_project_user_no_join_request(self, active_project_user):
+ """Test data loading with ProjectUser but no join request."""
+ tracker = JoinRequestTracker(active_project_user.user, active_project_user.project)
+
+ tracker._load_data()
+
+ assert tracker._project_user == active_project_user
+ assert tracker._join_request is None
+
+ @pytest.mark.django_db
+ @pytest.mark.whitebox
+ def test_load_data_with_join_request(self, join_request):
+ """Test data loading with join request."""
+ project_user = join_request.project_user
+ tracker = JoinRequestTracker(project_user.user, project_user.project)
+
+ tracker._load_data()
+
+ assert tracker._project_user == project_user
+ assert tracker._join_request == join_request
+
+ @pytest.mark.django_db
+ @pytest.mark.whitebox
+ def test_load_data_with_allocation_and_cluster_request(
+ self, allocation_user, project_user_factory
+ ):
+ """Test data loading with allocation and cluster access request."""
+ # Create project user for the same user and project
+ project_user = project_user_factory(
+ user=allocation_user.user,
+ project=allocation_user.allocation.project,
+ status=ProjectUserStatusChoice.objects.get_or_create(name='Active')[0]
+ )
+
+ # Create cluster access request status
+ cluster_status = ClusterAccessRequestStatusChoice.objects.get_or_create(
+ name='Pending - Add'
+ )[0]
+
+ # Create cluster access request
+ cluster_request = ClusterAccessRequest.objects.create(
+ allocation_user=allocation_user,
+ status=cluster_status
+ )
+
+ tracker = JoinRequestTracker(project_user.user, project_user.project)
+
+ tracker._load_data()
+
+ assert tracker._project_user == project_user
+ assert tracker._cluster_access_request == cluster_request
+
+ @pytest.mark.django_db
+ @pytest.mark.whitebox
+ def test_load_data_multiple_join_requests_gets_latest(self, pending_project_user):
+ """Test that the latest join request is loaded when multiple exist."""
+ from coldfront.core.project.models import ProjectUserJoinRequest
+
+ # Create multiple join requests
+ older_request = ProjectUserJoinRequest.objects.create(
+ project_user=pending_project_user,
+ reason="Older request",
+ created=datetime(2024, 1, 1)
+ )
+ newer_request = ProjectUserJoinRequest.objects.create(
+ project_user=pending_project_user,
+ reason="Newer request",
+ created=datetime(2024, 2, 1)
+ )
+
+ tracker = JoinRequestTracker(pending_project_user.user, pending_project_user.project)
+
+ tracker._load_data()
+
+ assert tracker._join_request == newer_request
+
+
+# ============================================================================
+# JoinRequestTracker Status Determination Tests
+# ============================================================================
+
+class TestJoinRequestTrackerStatusDetermination:
+ """Tests for status determination logic."""
+
+ @pytest.mark.django_db
+ @pytest.mark.whitebox
+ @pytest.mark.parametrize("project_user_status,join_request_exists,expected_status", [
+ (None, False, JoinRequestStatus.NO_REQUEST),
+ ('Pending - Add', True, JoinRequestStatus.REQUEST_SENT),
+ ('Denied', True, JoinRequestStatus.NO_REQUEST),
+ ('Active', False, JoinRequestStatus.DIRECTLY_ADDED),
+ ])
+ def test_determine_status_basic_scenarios(
+ self, user, project, project_user_status, join_request_exists, expected_status
+ ):
+ """Test basic status determination scenarios."""
+ tracker = JoinRequestTracker(user, project)
+
+ # Set up the tracker's internal state
+ if project_user_status:
+ mock_project_user = Mock()
+ mock_project_user.status.name = project_user_status
+ tracker._project_user = mock_project_user
+ else:
+ tracker._project_user = None
+
+ tracker._join_request = Mock() if join_request_exists else None
+ tracker._cluster_access_request = None
+
+ status = tracker._determine_status()
+
+ assert status == expected_status
+
+ @pytest.mark.django_db
+ @pytest.mark.whitebox
+ @pytest.mark.parametrize("cluster_status,expected_status", [
+ ('Pending - Add', JoinRequestStatus.PI_APPROVED_QUEUED),
+ ('Processing', JoinRequestStatus.ADMIN_PROCESSING),
+ ('Complete', JoinRequestStatus.ACCESS_GRANTED),
+ ('Denied', JoinRequestStatus.PI_APPROVED_QUEUED),
+ ])
+ def test_determine_status_with_cluster_access(
+ self, user, project, cluster_status, expected_status
+ ):
+ """Test status determination with cluster access requests."""
+ tracker = JoinRequestTracker(user, project)
+
+ # Set up active project user with join request
+ mock_project_user = Mock()
+ mock_project_user.status.name = 'Active'
+ tracker._project_user = mock_project_user
+ tracker._join_request = Mock()
+
+ # Set up cluster access request
+ mock_cluster_request = Mock()
+ mock_cluster_request.status.name = cluster_status
+ tracker._cluster_access_request = mock_cluster_request
+
+ status = tracker._determine_status()
+
+ assert status == expected_status
+
+ @pytest.mark.django_db
+ @pytest.mark.whitebox
+ def test_determine_status_active_user_no_cluster_request(self, user, project):
+ """Test status for active user without cluster access request."""
+ tracker = JoinRequestTracker(user, project)
+
+ mock_project_user = Mock()
+ mock_project_user.status.name = 'Active'
+ tracker._project_user = mock_project_user
+ tracker._join_request = Mock()
+ tracker._cluster_access_request = None
+
+ status = tracker._determine_status()
+
+ assert status == JoinRequestStatus.PI_APPROVED_QUEUED
+
+
+# ============================================================================
+# JoinRequestTracker Message and Details Tests
+# ============================================================================
+
+class TestJoinRequestTrackerMessagesAndDetails:
+ """Tests for status messages and details."""
+
+ @pytest.mark.whitebox
+ @pytest.mark.parametrize("status,expected_message_contains", [
+ (JoinRequestStatus.REQUEST_SENT, "awaiting approval"),
+ (JoinRequestStatus.PI_APPROVED_QUEUED, "queued for cluster access"),
+ (JoinRequestStatus.ADMIN_PROCESSING, "processing your cluster access"),
+ (JoinRequestStatus.ACCESS_GRANTED, "granted access"),
+ (JoinRequestStatus.DIRECTLY_ADDED, "directly added"),
+ (JoinRequestStatus.ERROR, "Unable to determine"),
+ (JoinRequestStatus.NO_REQUEST, "No active request"),
+ ])
+ def test_get_status_message(self, user, project, status, expected_message_contains):
+ """Test status message generation."""
+ tracker = JoinRequestTracker(user, project)
+
+ message = tracker._get_status_message(status)
+
+ assert expected_message_contains.lower() in message.lower()
+
+ @pytest.mark.whitebox
+ def test_get_status_details_with_all_data(self, user, project):
+ """Test status details with all available data."""
+ tracker = JoinRequestTracker(user, project)
+
+ # Mock all data components
+ mock_join_request = Mock()
+ mock_join_request.created = datetime(2024, 1, 1)
+ mock_join_request.reason = "Test reason"
+ tracker._join_request = mock_join_request
+
+ mock_project_user = Mock()
+ mock_project_user.status.name = 'Active'
+ mock_project_user.role.name = 'User'
+ tracker._project_user = mock_project_user
+
+ mock_cluster_request = Mock()
+ mock_cluster_request.status.name = 'Complete'
+ mock_cluster_request.request_time = datetime(2024, 1, 2)
+ tracker._cluster_access_request = mock_cluster_request
+
+ details = tracker._get_status_details(JoinRequestStatus.ACCESS_GRANTED)
+
+ assert details['request_date'] == datetime(2024, 1, 1)
+ assert details['reason'] == "Test reason"
+ assert details['project_user_status'] == 'Active'
+ assert details['role'] == 'User'
+ assert details['cluster_request_status'] == 'Complete'
+ assert details['cluster_request_date'] == datetime(2024, 1, 2)
+
+ @pytest.mark.whitebox
+ def test_get_status_details_no_data(self, user, project):
+ """Test status details with no data."""
+ tracker = JoinRequestTracker(user, project)
+
+ # No data set
+ tracker._join_request = None
+ tracker._project_user = None
+ tracker._cluster_access_request = None
+
+ details = tracker._get_status_details(JoinRequestStatus.NO_REQUEST)
+
+ assert details is None
+
+
+# ============================================================================
+# JoinRequestTracker Progress Steps Tests
+# ============================================================================
+
+class TestJoinRequestTrackerProgressSteps:
+ """Tests for progress step generation."""
+
+ @pytest.mark.whitebox
+ def test_get_progress_steps_directly_added(self, user, project):
+ """Test progress steps for directly added users."""
+ tracker = JoinRequestTracker(user, project)
+
+ # Mock cluster access request as complete
+ mock_cluster_request = Mock()
+ mock_cluster_request.status.name = 'Complete'
+ tracker._cluster_access_request = mock_cluster_request
+
+ steps = tracker._get_progress_steps(JoinRequestStatus.DIRECTLY_ADDED)
+
+ assert len(steps) == 3
+ assert steps[0].label == 'Directly Added by PI'
+ assert steps[0].completed is True
+ assert steps[0].current is False
+
+ assert steps[1].label == 'Admin Processing'
+ assert steps[1].completed is True
+ assert steps[1].current is False
+
+ assert steps[2].label == 'Access Granted'
+ assert steps[2].completed is True
+ assert steps[2].current is False
+
+ @pytest.mark.whitebox
+ @pytest.mark.parametrize("current_status,expected_step_states", [
+ (JoinRequestStatus.REQUEST_SENT, [
+ ('Request Sent', True, True), # completed=True because status != 'No Request'
+ ('PI Approved & Queued', False, False),
+ ('Admin Processing', False, False),
+ ('Access Granted', False, False)
+ ]),
+ (JoinRequestStatus.PI_APPROVED_QUEUED, [
+ ('Request Sent', True, False),
+ ('PI Approved & Queued', True, True), # completed=True because in approved states
+ ('Admin Processing', False, False),
+ ('Access Granted', False, False)
+ ]),
+ (JoinRequestStatus.ADMIN_PROCESSING, [
+ ('Request Sent', True, False),
+ ('PI Approved & Queued', True, False), # completed=True because in processing states
+ ('Admin Processing', True, True), # completed=True because in processing states
+ ('Access Granted', False, False)
+ ]),
+ (JoinRequestStatus.ACCESS_GRANTED, [
+ ('Request Sent', True, False),
+ ('PI Approved & Queued', True, False),
+ ('Admin Processing', True, False),
+ ('Access Granted', True, False)
+ ])
+ ])
+ def test_get_progress_steps_standard_flow(
+ self, user, project, current_status, expected_step_states
+ ):
+ """Test progress steps for standard join request flow."""
+ tracker = JoinRequestTracker(user, project)
+
+ steps = tracker._get_progress_steps(current_status)
+
+ assert len(steps) == 4
+ for i, (label, completed, current) in enumerate(expected_step_states):
+ assert steps[i].label == label
+ assert steps[i].completed == completed
+ assert steps[i].current == current
+
+
+# ============================================================================
+# JoinRequestTracker Permission Tests
+# ============================================================================
+
+class TestJoinRequestTrackerPermissions:
+ """Tests for view permission logic."""
+
+ @pytest.mark.whitebox
+ def test_can_view_status_with_project_user(self, user, project):
+ """Test can_view_status returns True when project user exists."""
+ tracker = JoinRequestTracker(user, project)
+ tracker._project_user = Mock() # Any project user
+
+ can_view = tracker._can_view_status()
+
+ assert can_view is True
+
+ @pytest.mark.whitebox
+ def test_can_view_status_no_project_user(self, user, project):
+ """Test can_view_status returns False when no project user exists."""
+ tracker = JoinRequestTracker(user, project)
+ tracker._project_user = None
+
+ can_view = tracker._can_view_status()
+
+ assert can_view is False
+
+
+# ============================================================================
+# JoinRequestTracker Error Handling Tests
+# ============================================================================
+
+class TestJoinRequestTrackerErrorHandling:
+ """Tests for error handling behavior."""
+
+ @pytest.mark.whitebox
+ def test_get_error_status(self, user, project):
+ """Test get_error_status returns correct enum."""
+ tracker = JoinRequestTracker(user, project)
+
+ error_status = tracker.get_error_status()
+
+ assert error_status == JoinRequestStatus.ERROR
+
+ @pytest.mark.whitebox
+ def test_get_default_error_message(self, user, project):
+ """Test default error message."""
+ tracker = JoinRequestTracker(user, project)
+
+ error_message = tracker.get_default_error_message()
+
+ assert "join status" in error_message.lower()
+ assert "contact support" in error_message.lower()
+
+ @pytest.mark.django_db
+ @pytest.mark.whitebox
+ @patch('coldfront.core.utils.tracking.base.logger')
+ def test_get_status_handles_load_data_exception(self, mock_logger, user, project):
+ """Test that get_status handles exceptions in _load_data."""
+ tracker = JoinRequestTracker(user, project)
+
+ # Make _load_data raise an exception
+ def raise_error():
+ raise ValueError("Database error")
+
+ tracker._load_data = raise_error
+
+ result = tracker.get_status()
+
+ assert result['status'] == JoinRequestStatus.ERROR
+ assert result['error'] is not None
+ mock_logger.exception.assert_called_once()
+
+
+# ============================================================================
+# JoinRequestTracker Backward Compatibility Tests
+# ============================================================================
+
+class TestJoinRequestTrackerBackwardCompatibility:
+ """Tests for backward compatibility with existing API."""
+
+ @pytest.mark.django_db
+ @pytest.mark.whitebox
+ def test_get_status_returns_dict(self, user, project):
+ """Test that get_status returns a dict for backward compatibility."""
+ tracker = JoinRequestTracker(user, project)
+
+ result = tracker.get_status()
+
+ assert isinstance(result, dict)
+ assert 'status' in result
+ assert 'message' in result
+ assert 'details' in result
+ assert 'error' in result
+ assert 'steps' in result
+ assert 'can_view' in result
+
+ @pytest.mark.django_db
+ @pytest.mark.whitebox
+ def test_get_status_dict_structure(self, active_project_user):
+ """Test the structure of the returned status dict."""
+ tracker = JoinRequestTracker(active_project_user.user, active_project_user.project)
+
+ result = tracker.get_status()
+
+ # Test that steps are properly converted to dicts
+ assert isinstance(result['steps'], list)
+ if result['steps']:
+ step = result['steps'][0]
+ assert isinstance(step, dict)
+ assert 'label' in step
+ assert 'completed' in step
+ assert 'current' in step
\ No newline at end of file
diff --git a/coldfront/tests/utils/whitebox/test_tracking_end_to_end.py b/coldfront/tests/utils/whitebox/test_tracking_end_to_end.py
new file mode 100644
index 0000000000..879e3e0f9a
--- /dev/null
+++ b/coldfront/tests/utils/whitebox/test_tracking_end_to_end.py
@@ -0,0 +1,376 @@
+"""
+whitebox tests for tracking framework end-to-end functionality.
+Tests external behavior from user's perspective including view integration.
+"""
+import pytest
+from unittest.mock import patch, Mock
+from django.urls import reverse
+from django.contrib.auth.models import User
+
+from coldfront.core.project.models import (
+ Project, ProjectUser, ProjectUserJoinRequest, ProjectUserStatusChoice
+)
+from coldfront.core.project.utils_.join_request_tracker import JoinRequestTracker
+
+
+# ============================================================================
+# End-to-End Workflow Tests
+# ============================================================================
+
+class TestTrackingEndToEndWorkflow:
+ """Tests for complete tracking workflows from user perspective."""
+
+ @pytest.mark.django_db
+ @pytest.mark.whitebox
+ def test_complete_join_request_workflow(self, user_factory, project_factory):
+ """Test complete join request workflow from start to finish."""
+ # Create test data
+ user = user_factory()
+ project = project_factory()
+
+ # Step 1: No request exists
+ tracker = JoinRequestTracker(user, project)
+ status = tracker.get_status()
+
+ assert status['status'].value == 'No Request'
+ assert status['can_view'] is False
+
+ # Step 2: Create pending project user (simulates join request)
+ pending_status = ProjectUserStatusChoice.objects.get_or_create(
+ name='Pending - Add'
+ )[0]
+ project_user = ProjectUser.objects.create(
+ user=user,
+ project=project,
+ status=pending_status,
+ role_id=1 # Assuming role with ID 1 exists
+ )
+
+ # Create join request
+ join_request = ProjectUserJoinRequest.objects.create(
+ project_user=project_user,
+ reason="Need access for research"
+ )
+
+ # Check status after request
+ tracker = JoinRequestTracker(user, project)
+ status = tracker.get_status()
+
+ assert status['status'].value == 'Request Sent'
+ assert status['can_view'] is True
+ assert len(status['steps']) == 4
+ assert status['steps'][0]['current'] is True
+
+ # Step 3: Approve request (simulates PI approval)
+ active_status = ProjectUserStatusChoice.objects.get_or_create(
+ name='Active'
+ )[0]
+ project_user.status = active_status
+ project_user.save()
+
+ # Check status after approval
+ tracker = JoinRequestTracker(user, project)
+ status = tracker.get_status()
+
+ assert status['status'].value == 'PI Approved & Queued'
+ assert status['steps'][1]['current'] is True
+
+ @pytest.mark.django_db
+ @pytest.mark.whitebox
+ def test_directly_added_user_workflow(self, user_factory, project_factory):
+ """Test workflow for user directly added by PI."""
+ user = user_factory()
+ project = project_factory()
+
+ # Create active project user without join request (direct add)
+ active_status = ProjectUserStatusChoice.objects.get_or_create(
+ name='Active'
+ )[0]
+ project_user = ProjectUser.objects.create(
+ user=user,
+ project=project,
+ status=active_status,
+ role_id=1
+ )
+
+ tracker = JoinRequestTracker(user, project)
+ status = tracker.get_status()
+
+ assert status['status'].value == 'Directly Added by PI'
+ assert status['can_view'] is True
+ assert len(status['steps']) == 3
+ assert status['steps'][0]['label'] == 'Directly Added by PI'
+ assert status['steps'][0]['completed'] is True
+
+
+# ============================================================================
+# View Integration Tests
+# ============================================================================
+
+class TestTrackingViewIntegration:
+ """Tests for tracking integration with views."""
+
+ @pytest.mark.django_db
+ @pytest.mark.whitebox
+ @pytest.mark.skip(reason="Home view requires additional resource setup")
+ def test_home_view_with_tracking(self, authenticated_client, user, project):
+ """Test that home view includes tracking data."""
+ # Create project user so user has a project
+ active_status = ProjectUserStatusChoice.objects.get_or_create(name='Active')[0]
+ project_user = ProjectUser.objects.create(
+ user=user,
+ project=project,
+ status=active_status,
+ role_id=1
+ )
+
+ # Sign access agreement (required for home view)
+ user.userprofile.access_agreement_signed_date = '2024-01-01'
+ user.userprofile.save()
+
+ response = authenticated_client.get(reverse('home'))
+
+ assert response.status_code == 200
+ # The project should be in the context with tracking data
+ projects = response.context.get('project_list', [])
+ if projects:
+ project = projects[0]
+ assert hasattr(project, 'has_tracking')
+ assert hasattr(project, 'tracking_status')
+
+ @pytest.mark.django_db
+ @pytest.mark.whitebox
+ def test_project_join_list_view_with_tracking(self, authenticated_client, user):
+ """Test project join list view includes tracking data."""
+ # Sign access agreement (required)
+ user.userprofile.access_agreement_signed_date = '2024-01-01'
+ user.userprofile.save()
+
+ response = authenticated_client.get(reverse('project-join-list'))
+
+ assert response.status_code == 200
+ # Check that join_requests context exists and has tracking
+ join_requests = response.context.get('join_requests', [])
+ # Even if empty, the context should exist
+ assert join_requests is not None
+
+
+# ============================================================================
+# Template Integration Tests
+# ============================================================================
+
+class TestTrackingTemplateIntegration:
+ """Tests for tracking data rendering in templates."""
+
+ @pytest.mark.django_db
+ @pytest.mark.whitebox
+ def test_tracking_modal_renders_with_data(self, authenticated_client, user, project):
+ """Test that tracking modal renders correctly with data."""
+ # Create project user with join request
+ pending_status = ProjectUserStatusChoice.objects.get_or_create(
+ name='Pending - Add'
+ )[0]
+ project_user = ProjectUser.objects.create(
+ user=user,
+ project=project,
+ status=pending_status,
+ role_id=1
+ )
+
+ join_request = ProjectUserJoinRequest.objects.create(
+ project_user=project_user,
+ reason="Test reason"
+ )
+
+ # Sign access agreement
+ user.userprofile.access_agreement_signed_date = '2024-01-01'
+ user.userprofile.save()
+
+ response = authenticated_client.get(reverse('project-join-list'))
+
+ assert response.status_code == 200
+ content = response.content.decode()
+
+ # Check for tracking modal elements
+ assert 'projectAccessTimelineModal' in content
+ assert 'View Timeline' in content or 'fas fa-clock' in content
+
+
+
+# ============================================================================
+# API Compatibility Tests
+# ============================================================================
+
+class TestTrackingAPICompatibility:
+ """Tests for API compatibility with existing code."""
+
+ @pytest.mark.django_db
+ @pytest.mark.whitebox
+ def test_tracker_api_unchanged(self, user, project):
+ """Test that tracker API is unchanged from original implementation."""
+ tracker = JoinRequestTracker(user, project)
+
+ # Should be able to call get_status without arguments
+ status = tracker.get_status()
+
+ # Should return a dictionary (not TrackingResult object)
+ assert isinstance(status, dict)
+
+ # Should have all expected keys
+ expected_keys = {'status', 'message', 'details', 'error', 'steps', 'can_view'}
+ assert expected_keys.issubset(status.keys())
+
+ @pytest.mark.django_db
+ @pytest.mark.whitebox
+ def test_status_dict_structure_unchanged(self, user, project):
+ """Test that status dictionary structure is unchanged."""
+ tracker = JoinRequestTracker(user, project)
+ status = tracker.get_status()
+
+ # Test specific structure expectations
+ assert 'status' in status
+ assert hasattr(status['status'], 'value') # Should be enum with value
+
+ assert 'message' in status
+ assert isinstance(status['message'], str)
+
+ assert 'steps' in status
+ assert isinstance(status['steps'], list)
+
+ # Steps should be dicts with expected keys
+ if status['steps']:
+ step = status['steps'][0]
+ assert isinstance(step, dict)
+ assert 'label' in step
+ assert 'completed' in step
+ assert 'current' in step
+
+ @pytest.mark.django_db
+ @pytest.mark.whitebox
+ def test_import_path_unchanged(self):
+ """Test that import path for JoinRequestTracker is unchanged."""
+ # This should not raise ImportError
+ from coldfront.core.project.utils_.join_request_tracker import JoinRequestTracker
+
+ # Should be able to instantiate (with valid args)
+ assert JoinRequestTracker is not None
+
+
+# ============================================================================
+# Error Handling Integration Tests
+# ============================================================================
+
+class TestTrackingErrorHandling:
+ """Tests for error handling in integrated environment."""
+
+ @pytest.mark.django_db
+ @pytest.mark.whitebox
+ @patch('coldfront.core.project.utils_.join_request_tracker.logger')
+ def test_view_handles_tracker_errors_gracefully(
+ self, mock_logger, authenticated_client, user
+ ):
+ """Test that views handle tracker errors gracefully."""
+ # Sign access agreement
+ user.userprofile.access_agreement_signed_date = '2024-01-01'
+ user.userprofile.save()
+
+ # Patch tracker to raise an exception
+ with patch('coldfront.core.project.views_.join_views.request_views.JoinRequestTracker') as mock_tracker_class:
+ mock_tracker = Mock()
+ mock_tracker.get_status.side_effect = Exception("Database error")
+ mock_tracker_class.return_value = mock_tracker
+
+ response = authenticated_client.get(reverse('project-join-list'))
+
+ # View should still render successfully
+ assert response.status_code == 200
+
+ @pytest.mark.django_db
+ @pytest.mark.whitebox
+ @pytest.mark.django_db
+ def test_invalid_user_project_combination(self, db):
+ """Test tracker behavior with invalid user/project combinations."""
+ from coldfront.core.project.models import ProjectStatusChoice
+ from coldfront.core.field_of_science.models import FieldOfScience
+
+ # Create user and project that don't belong together
+ user = User.objects.create_user('testuser', 'test@example.com')
+
+ # Get or create required related objects
+ status = ProjectStatusChoice.objects.get_or_create(name='Active')[0]
+ fos = FieldOfScience.objects.get_or_create(description='Other')[0]
+
+ project = Project.objects.create(
+ name='test_project',
+ title='Test',
+ status=status,
+ field_of_science=fos
+ )
+
+ tracker = JoinRequestTracker(user, project)
+ status_result = tracker.get_status()
+
+ # Should handle gracefully and return appropriate status
+ assert status_result['status'].value == 'No Request'
+ assert status_result['can_view'] is False
+ assert status_result['error'] is None # Should not be an error condition
+
+
+# ============================================================================
+# Performance and Scale Tests
+# ============================================================================
+
+class TestTrackingPerformance:
+ """Tests for tracking performance under various conditions."""
+
+ @pytest.mark.django_db
+ @pytest.mark.whitebox
+ @pytest.mark.slow
+ def test_tracker_with_many_join_requests(self, user_factory, project_factory):
+ """Test tracker performance with many join requests."""
+ user = user_factory()
+ project = project_factory()
+
+ # Create project user
+ pending_status = ProjectUserStatusChoice.objects.get_or_create(
+ name='Pending - Add'
+ )[0]
+ project_user = ProjectUser.objects.create(
+ user=user,
+ project=project,
+ status=pending_status,
+ role_id=1
+ )
+
+ # Create many join requests (simulates multiple attempts)
+ for i in range(10):
+ ProjectUserJoinRequest.objects.create(
+ project_user=project_user,
+ reason=f"Request {i}"
+ )
+
+ tracker = JoinRequestTracker(user, project)
+
+ # Should still work efficiently
+ status = tracker.get_status()
+ assert status['status'].value == 'Request Sent'
+
+ # Should get the latest request details
+ assert status['details'] is not None
+
+ @pytest.mark.django_db
+ @pytest.mark.whitebox
+ def test_tracker_database_query_efficiency(self, user, project):
+ """Test that tracker doesn't make excessive database queries."""
+ with patch('django.test.utils.override_settings'):
+ # This test would ideally use django-debug-toolbar or
+ # django.test.utils.override_settings(DEBUG=True) with
+ # connection.queries to count queries
+
+ tracker = JoinRequestTracker(user, project)
+ status = tracker.get_status()
+
+ # Should complete without errors (detailed query counting
+ # would require additional test infrastructure)
+ assert status is not None
+
diff --git a/pytest.ini b/pytest.ini
new file mode 100644
index 0000000000..ee195c744d
--- /dev/null
+++ b/pytest.ini
@@ -0,0 +1,24 @@
+[pytest]
+DJANGO_SETTINGS_MODULE = coldfront.config.test_settings
+python_files = tests.py test_*.py *_tests.py
+python_classes = Test*
+python_functions = test_*
+testpaths = coldfront/tests
+addopts =
+ --verbose
+ --tb=short
+ --strict-markers
+ --disable-warnings
+ --cov=coldfront
+ --cov-report=html
+ --cov-report=term-missing
+ --cov-config=.coveragerc
+markers =
+ unit: Unit tests
+ whitebox: Whitebox integration tests
+ blackbox: Blackbox end-to-end tests
+ slow: Slow running tests
+ django_db: Tests that require database access
+filterwarnings =
+ ignore::DeprecationWarning
+ ignore::PendingDeprecationWarning
\ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
index 3d3e8de265..d479ca3368 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -60,3 +60,8 @@ tqdm==4.67.1
urllib3==2.5.0
user-agents==2.2.0
wcwidth==0.2.13
+
+# Development dependencies
+pytest>=7.0.0
+pytest-django>=4.5.0
+pytest-cov>=4.0.0