diff --git a/brevia/async_jobs.py b/brevia/async_jobs.py index bf8e7fb..79c6bf5 100644 --- a/brevia/async_jobs.py +++ b/brevia/async_jobs.py @@ -4,6 +4,7 @@ from datetime import datetime, timezone from sqlalchemy import BinaryExpression, Column, desc, func, String, text from pydantic import BaseModel as PydanticModel +from sqlalchemy import delete, select from sqlalchemy.dialects.postgresql import JSON, TIMESTAMP, SMALLINT from sqlalchemy.orm import Session, Query from langchain_community.vectorstores.pgembedding import BaseModel @@ -12,6 +13,7 @@ from brevia.utilities.dates import date_filter from brevia.utilities.json_api import query_data_pagination from brevia.utilities.types import load_type +from brevia.utilities.output import LinkedFileOutput MAX_DURATION = 120 # default max duration is 120 min / 2hr MAX_ATTEMPTS = 1 # default max number of attempts @@ -264,3 +266,68 @@ def is_job_available( return False return True + + +def cleanup_async_jobs(before_date: datetime, dry_run: bool): + """ + Remove async jobs created before the specified date. + This function removes async_jobs records from the Brevia database + where the 'created' timestamp is older than the specified date. + + Args: + before_date (datetime): The cutoff date for job deletion. + dry_run (bool): If True, only show what would be deleted without actually + deleting. + """ + # Ensure before_date is timezone-aware + if before_date.tzinfo is None: + before_date = before_date.replace(tzinfo=timezone.utc) + + log = logging.getLogger(__name__) + + with Session(db_connection()) as session: + # First, count how many records would be affected + count_query = select(AsyncJobsStore).where(AsyncJobsStore.created < before_date) + result = session.execute(count_query) + jobs_to_delete = result.scalars().all() + + if not jobs_to_delete: + log.info(f"No async jobs found created before {before_date}") + return 0 + + log.info(f"Found {len(jobs_to_delete)} async jobs created before {before_date}") + + if dry_run: + log.info("DRY RUN - The following jobs would be deleted:") + for job in jobs_to_delete: + status = "completed" if job.completed else "pending" + log.info( + f" - Job UUID: {job.uuid}, Created: {job.created}, " + f"Status: {status}, Service: {job.service}" + ) + return len(jobs_to_delete) + + # Store job UUIDs before deletion for file cleanup + job_uuids = [str(job.uuid) for job in jobs_to_delete] + + # Perform the actual deletion from database + delete_query = delete(AsyncJobsStore).where( + AsyncJobsStore.created < before_date + ) + result = session.execute(delete_query) + session.commit() + + # Clean up files for each deleted job + files_cleanup_errors = 0 + for job_uuid in job_uuids: + try: + file_output = LinkedFileOutput(job_id=job_uuid) + file_output.cleanup_job_files() + except Exception as exc: # pylint: disable=broad-exception-caught + files_cleanup_errors += 1 + log.warning(f"Failed to cleanup files for job {job_uuid}: {exc}") + + if files_cleanup_errors > 0: + log.warning(f"{files_cleanup_errors} jobs had file cleanup errors") + + return result.rowcount diff --git a/brevia/commands.py b/brevia/commands.py index b01dca7..13d7cf4 100644 --- a/brevia/commands.py +++ b/brevia/commands.py @@ -1,11 +1,13 @@ """Utility commands for applications""" import json import sys +from datetime import datetime from os import getcwd, path from logging import config import click from brevia.alembic import current, upgrade, downgrade from brevia.alembic import revision as create_revision +from brevia.async_jobs import cleanup_async_jobs from brevia.index import update_links_documents from brevia.utilities import files_import, run_service, collections_io from brevia.tokens import create_token @@ -167,3 +169,36 @@ def update_collection_links(collection: str): init_logging() num = update_links_documents(collection_name=collection) print(f'Updated {num} links documents. Done!') + + +@click.command() +@click.option( + '--before-date', + type=click.DateTime(formats=['%Y-%m-%d', '%Y-%m-%d %H:%M:%S']), + required=True, + help='Remove async jobs created before this date ' + '(format: YYYY-MM-DD or YYYY-MM-DD HH:MM:SS)' +) +@click.option( + '--dry-run', + is_flag=True, + help='Show what would be deleted without actually deleting' +) +def cleanup_jobs(before_date: datetime, dry_run: bool): + """ + Remove async jobs and related files created before the specified date. + """ + # Only ask for confirmation if not doing a dry run + if not dry_run: + if not click.confirm('Are you sure you want to delete async jobs?'): + click.echo('Operation cancelled.') + return + init_logging() + num = cleanup_async_jobs(before_date=before_date, dry_run=dry_run) + + if dry_run: + click.echo(f"Dry run completed. {num} {'job' if num == 1 else 'jobs'} would be deleted.") + elif num == 0: + click.echo("No async jobs to delete.") + else: + click.echo(f"Successfully deleted {num} async jobs.") diff --git a/brevia/utilities/output.py b/brevia/utilities/output.py index 0849999..626c685 100644 --- a/brevia/utilities/output.py +++ b/brevia/utilities/output.py @@ -1,6 +1,7 @@ """File output utilities""" import tempfile import os +import shutil from brevia.settings import get_settings @@ -86,3 +87,66 @@ def write(self, content: str, filename: str): os.unlink(output_path) return self.file_url(filename) + + def _s3_delete_objects(self, bucket_name: str, prefix: str): + """ + Delete all objects in S3 with the specified prefix. + + :param bucket_name: The name of the S3 bucket. + :param prefix: The prefix to match objects for deletion. + """ + try: + import boto3 # pylint: disable=import-outside-toplevel + s3 = boto3.client('s3') + + # List all objects with the prefix + response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix) + + if 'Contents' not in response: + return # No objects found + + # Prepare objects for deletion + objects_to_delete = [{'Key': obj['Key']} for obj in response['Contents']] + + # Delete objects in batches (S3 allows max 1000 objects per delete request) + for i in range(0, len(objects_to_delete), 1000): + batch = objects_to_delete[i:i + 1000] + s3.delete_objects( + Bucket=bucket_name, + Delete={'Objects': batch} + ) + + except ModuleNotFoundError: + raise ImportError('Boto3 is not installed!') + + def cleanup_job_files(self): + """ + Remove all files in the job folder, including the folder itself. + Handles both local filesystem and S3 storage. + + :raises ValueError: If no job_id is set. + """ + if not self.job_id: + raise ValueError("No job_id set. Cannot cleanup files without a job_id.") + + base_path = get_settings().file_output_base_path + + if base_path.startswith('s3://'): + # S3 cleanup + bucket_name = base_path.split('/')[2] + base_prefix = '/'.join(base_path.split('/')[3:]).lstrip('/') + + # Build the prefix for this job's files + if base_prefix: + job_prefix = f"{base_prefix}/{self.job_id}/" + else: + job_prefix = f"{self.job_id}/" + + self._s3_delete_objects(bucket_name, job_prefix) + + else: + # Local filesystem cleanup + job_dir = f"{base_path}/{self.job_id}" + + if os.path.exists(job_dir) and os.path.isdir(job_dir): + shutil.rmtree(job_dir) diff --git a/pyproject.toml b/pyproject.toml index 76413ff..7efc884 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,6 +41,7 @@ repository = "https://github.com/brevia-ai/brevia" extras = [ "standard" ] [tool.poetry.scripts] + cleanup_jobs = "brevia.commands:cleanup_jobs" create_token = "brevia.commands:create_access_token" create_openapi = "brevia.commands:create_openapi" db_current = "brevia.commands:db_current_cmd" diff --git a/tests/test_async_jobs.py b/tests/test_async_jobs.py index be6d046..6cdac89 100644 --- a/tests/test_async_jobs.py +++ b/tests/test_async_jobs.py @@ -2,12 +2,14 @@ from datetime import datetime, timedelta import time import pytest +from unittest.mock import patch from sqlalchemy.orm import Session from brevia.connection import db_connection from brevia.async_jobs import ( single_job, create_job, complete_job, save_job_result, create_service, lock_job_service, is_job_available, run_job_service, get_jobs, JobsFilter, + cleanup_async_jobs, AsyncJobsStore, ) from brevia.services import BaseService @@ -369,3 +371,235 @@ def test_get_jobs_empty_results(): pagination = result['meta']['pagination'] assert pagination['count'] == 0 + + +def test_cleanup_async_jobs_no_jobs(): + """Test cleanup_async_jobs when no jobs exist to delete""" + # Use a future date to ensure no jobs are found + future_date = datetime.now() + timedelta(days=1) + + # Test dry run with no jobs + result = cleanup_async_jobs(before_date=future_date, dry_run=True) + assert result == 0 + + # Test actual cleanup with no jobs + result = cleanup_async_jobs(before_date=future_date, dry_run=False) + assert result == 0 + + +def test_cleanup_async_jobs_dry_run(): + """Test cleanup_async_jobs dry run functionality""" + # Create some test jobs + service = 'test_cleanup_service' + payload = {'max_duration': 10, 'max_attempts': 1} + + job1 = create_job(service, payload) + job2 = create_job(service, payload) + + # Set created dates to the past for testing + past_date = datetime.now() - timedelta(days=2) + cutoff_date = datetime.now() - timedelta(days=1) + + with Session(db_connection()) as session: + # Update job1 to have an old created date + job1_store = session.get(AsyncJobsStore, job1.uuid) + job1_store.created = past_date + session.add(job1_store) + session.commit() + + # Dry run should return count without deleting + result = cleanup_async_jobs(before_date=cutoff_date, dry_run=True) + assert result == 1 + + # Verify job1 still exists + retrieved_job = single_job(job1.uuid) + assert retrieved_job is not None + + # Verify job2 still exists (should not be affected) + retrieved_job2 = single_job(job2.uuid) + assert retrieved_job2 is not None + + +def test_cleanup_async_jobs_actual_deletion(): + """Test cleanup_async_jobs actual deletion functionality""" + # Create some test jobs + service = 'test_cleanup_service_delete' + payload = {'max_duration': 10, 'max_attempts': 1} + + job1 = create_job(service, payload) + job2 = create_job(service, payload) + job3 = create_job(service, payload) + + # Set different created dates + old_date = datetime.now() - timedelta(days=3) + cutoff_date = datetime.now() - timedelta(days=1) + + with Session(db_connection()) as session: + # Update job1 and job2 to have old created dates + job1_store = session.get(AsyncJobsStore, job1.uuid) + job1_store.created = old_date + session.add(job1_store) + + job2_store = session.get(AsyncJobsStore, job2.uuid) + job2_store.created = old_date + session.add(job2_store) + + session.commit() + + # Perform actual cleanup + result = cleanup_async_jobs(before_date=cutoff_date, dry_run=False) + assert result == 2 + + # Verify job1 and job2 are deleted + retrieved_job1 = single_job(job1.uuid) + assert retrieved_job1 is None + + retrieved_job2 = single_job(job2.uuid) + assert retrieved_job2 is None + + # Verify job3 still exists (created recently, not affected) + retrieved_job3 = single_job(job3.uuid) + assert retrieved_job3 is not None + + +def test_cleanup_async_jobs_with_timezone(): + """Test cleanup_async_jobs with timezone-aware datetime""" + from datetime import timezone + + # Create a test job + service = 'test_cleanup_timezone' + payload = {'max_duration': 10, 'max_attempts': 1} + + job = create_job(service, payload) + + # Set created date to the past with timezone + past_date = datetime.now(tz=timezone.utc) - timedelta(days=2) + cutoff_date = datetime.now(tz=timezone.utc) - timedelta(days=1) + + with Session(db_connection()) as session: + job_store = session.get(AsyncJobsStore, job.uuid) + job_store.created = past_date + session.add(job_store) + session.commit() + + # Test with timezone-aware datetime + result = cleanup_async_jobs(before_date=cutoff_date, dry_run=False) + assert result == 1 + + # Verify job is deleted + retrieved_job = single_job(job.uuid) + assert retrieved_job is None + + +@patch('brevia.async_jobs.LinkedFileOutput') +def test_cleanup_async_jobs_with_file_cleanup(mock_linked_file_output): + """Test cleanup_async_jobs calls file cleanup for each deleted job""" + from datetime import timezone + + # Create test jobs + service = 'test_cleanup_with_files' + payload = {'max_duration': 10, 'max_attempts': 1} + + job1 = create_job(service, payload) + job2 = create_job(service, payload) + + # Set created dates to the past + past_date = datetime.now(tz=timezone.utc) - timedelta(days=2) + cutoff_date = datetime.now(tz=timezone.utc) - timedelta(days=1) + + with Session(db_connection()) as session: + # Update both jobs to have old created dates + job1_store = session.get(AsyncJobsStore, job1.uuid) + job1_store.created = past_date + session.add(job1_store) + + job2_store = session.get(AsyncJobsStore, job2.uuid) + job2_store.created = past_date + session.add(job2_store) + + session.commit() + + # Mock the LinkedFileOutput instance and cleanup method + mock_instance = mock_linked_file_output.return_value + mock_instance.cleanup_job_files.return_value = None + + # Perform actual cleanup + result = cleanup_async_jobs(before_date=cutoff_date, dry_run=False) + assert result == 2 + + # Verify LinkedFileOutput was called for each job + assert mock_linked_file_output.call_count == 2 + mock_linked_file_output.assert_any_call(job_id=str(job1.uuid)) + mock_linked_file_output.assert_any_call(job_id=str(job2.uuid)) + + # Verify cleanup_job_files was called for each job + assert mock_instance.cleanup_job_files.call_count == 2 + + # Verify jobs are deleted from database + assert single_job(job1.uuid) is None + assert single_job(job2.uuid) is None + + +@patch('brevia.async_jobs.LinkedFileOutput') +def test_cleanup_async_jobs_file_cleanup_error(mock_linked_file_output): + """Test cleanup_async_jobs handles file cleanup errors gracefully""" + from datetime import timezone + + # Create a test job + service = 'test_cleanup_error' + payload = {'max_duration': 10, 'max_attempts': 1} + job = create_job(service, payload) + + # Set created date to the past + past_date = datetime.now(tz=timezone.utc) - timedelta(days=2) + cutoff_date = datetime.now(tz=timezone.utc) - timedelta(days=1) + + with Session(db_connection()) as session: + job_store = session.get(AsyncJobsStore, job.uuid) + job_store.created = past_date + session.add(job_store) + session.commit() + + # Mock the LinkedFileOutput to raise an exception during cleanup + mock_instance = mock_linked_file_output.return_value + mock_instance.cleanup_job_files.side_effect = Exception("File cleanup failed") + + # Perform cleanup - should not fail despite file cleanup error + result = cleanup_async_jobs(before_date=cutoff_date, dry_run=False) + assert result == 1 + + # Verify job is still deleted from database despite file cleanup error + assert single_job(job.uuid) is None + + # Verify cleanup was attempted + mock_linked_file_output.assert_called_once_with(job_id=str(job.uuid)) + mock_instance.cleanup_job_files.assert_called_once() + + +def test_cleanup_async_jobs_dry_run_no_file_cleanup(): + """Test cleanup_async_jobs dry run does not call file cleanup""" + # Create a test job + service = 'test_dry_run_no_cleanup' + payload = {'max_duration': 10, 'max_attempts': 1} + job = create_job(service, payload) + + # Set created date to the past + past_date = datetime.now() - timedelta(days=2) + cutoff_date = datetime.now() - timedelta(days=1) + + with Session(db_connection()) as session: + job_store = session.get(AsyncJobsStore, job.uuid) + job_store.created = past_date + session.add(job_store) + session.commit() + + with patch('brevia.async_jobs.LinkedFileOutput') as mock_linked_file_output: + # Perform dry run + result = cleanup_async_jobs(before_date=cutoff_date, dry_run=True) + assert result == 1 + + # Verify LinkedFileOutput was not called in dry run + mock_linked_file_output.assert_not_called() + + # Verify job still exists after dry run + assert single_job(job.uuid) is not None diff --git a/tests/test_commands.py b/tests/test_commands.py index 8071dcf..ceb27bc 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -17,6 +17,7 @@ create_access_token, create_openapi, update_collection_links, + cleanup_jobs, ) from brevia.collections import create_collection, collection_name_exists from brevia.settings import get_settings @@ -184,3 +185,163 @@ def test_db_revision_cmd_with_autogenerate(): for file_path in migration_files: if exists(file_path): unlink(file_path) + + +def test_cleanup_jobs_dry_run(): + """Test cleanup_jobs command with dry run""" + from datetime import datetime, timedelta + from brevia.async_jobs import create_job, AsyncJobsStore + from brevia.connection import db_connection + from sqlalchemy.orm import Session + + # Create a test job with old date + service = 'test_cleanup_command' + payload = {'max_duration': 10, 'max_attempts': 1} + job = create_job(service, payload) + + # Set created date to the past + past_date = datetime.now() - timedelta(days=2) + + with Session(db_connection()) as session: + job_store = session.get(AsyncJobsStore, job.uuid) + job_store.created = past_date + session.add(job_store) + session.commit() + + runner = CliRunner() + cutoff_date = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d') + + # Test dry run + result = runner.invoke(cleanup_jobs, [ + '--before-date', cutoff_date, + '--dry-run' + ]) # No confirmation needed for dry run + + assert result.exit_code == 0 + assert 'Dry run completed' in result.output + + +def test_cleanup_jobs_actual_deletion(): + """Test cleanup_jobs command with actual deletion""" + from datetime import datetime, timedelta + from brevia.async_jobs import create_job, single_job, AsyncJobsStore + from brevia.connection import db_connection + from sqlalchemy.orm import Session + + # Create test jobs + service = 'test_cleanup_command_delete' + payload = {'max_duration': 10, 'max_attempts': 1} + job1 = create_job(service, payload) + job2 = create_job(service, payload) + + # Set one job to have old date + past_date = datetime.now() - timedelta(days=2) + + with Session(db_connection()) as session: + job1_store = session.get(AsyncJobsStore, job1.uuid) + job1_store.created = past_date + session.add(job1_store) + session.commit() + + runner = CliRunner() + cutoff_date = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d') + + # Test actual deletion + result = runner.invoke(cleanup_jobs, [ + '--before-date', cutoff_date, + ], input='y\n') # Provide 'y' input for confirmation + + assert result.exit_code == 0 + assert 'Successfully deleted' in result.output + + # Verify job1 is deleted and job2 still exists + assert single_job(job1.uuid) is None + assert single_job(job2.uuid) is not None + + +def test_cleanup_jobs_no_jobs_to_delete(): + """Test cleanup_jobs command when no jobs need to be deleted""" + from datetime import datetime, timedelta + + runner = CliRunner() + # Use a very old date to ensure no jobs are found + old_date = (datetime.now() - timedelta(days=365)).strftime('%Y-%m-%d') + + result = runner.invoke(cleanup_jobs, [ + '--before-date', old_date, + ], input='y\n') # Provide 'y' input for confirmation + + assert result.exit_code == 0 + assert 'No async jobs to delete' in result.output + + +def test_cleanup_jobs_with_datetime(): + """Test cleanup_jobs command with datetime format""" + from datetime import datetime, timedelta + from brevia.async_jobs import create_job, single_job, AsyncJobsStore + from brevia.connection import db_connection + from sqlalchemy.orm import Session + + # Create a test job + service = 'test_cleanup_datetime' + payload = {'max_duration': 10, 'max_attempts': 1} + job = create_job(service, payload) + + # Set created date to the past + past_date = datetime.now() - timedelta(days=2) + + with Session(db_connection()) as session: + job_store = session.get(AsyncJobsStore, job.uuid) + job_store.created = past_date + session.add(job_store) + session.commit() + + runner = CliRunner() + # Use full datetime format + cutoff_datetime = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d %H:%M:%S') + + result = runner.invoke(cleanup_jobs, [ + '--before-date', cutoff_datetime, + ], input='y\n') # Provide 'y' input for confirmation + + assert result.exit_code == 0 + assert 'Successfully deleted' in result.output + + # Verify job is deleted + assert single_job(job.uuid) is None + + +def test_cleanup_jobs_cancelled_operation(): + """Test cleanup_jobs command when user cancels the operation""" + from datetime import datetime, timedelta + from brevia.async_jobs import create_job, single_job, AsyncJobsStore + from brevia.connection import db_connection + from sqlalchemy.orm import Session + + # Create a test job + service = 'test_cleanup_cancelled' + payload = {'max_duration': 10, 'max_attempts': 1} + job = create_job(service, payload) + + # Set created date to the past + past_date = datetime.now() - timedelta(days=2) + + with Session(db_connection()) as session: + job_store = session.get(AsyncJobsStore, job.uuid) + job_store.created = past_date + session.add(job_store) + session.commit() + + runner = CliRunner() + cutoff_date = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d') + + # Test cancelled operation (user responds 'n') + result = runner.invoke(cleanup_jobs, [ + '--before-date', cutoff_date, + ], input='n\n') # User cancels the operation + + assert result.exit_code == 0 + assert 'Operation cancelled' in result.output + + # Verify job still exists + assert single_job(job.uuid) is not None diff --git a/tests/utilities/test_output.py b/tests/utilities/test_output.py index fab5ee4..58368ff 100644 --- a/tests/utilities/test_output.py +++ b/tests/utilities/test_output.py @@ -83,11 +83,180 @@ def test_s3_upload_method(): output = LinkedFileOutput(job_id='1234') mock_s3 = MagicMock() - import sys - sys.modules['boto3'] = mock_s3 mock_client = MagicMock() mock_s3.client.return_value = mock_client mock_client.upload_file.return_value = None - result = output._s3_upload('test.txt', 'my-bucket', '1234/test.txt') - assert result is None + with patch.dict('sys.modules', {'boto3': mock_s3}): + result = output._s3_upload('test.txt', 'my-bucket', '1234/test.txt') + assert result is None + + +def test_cleanup_job_files_no_job_id(): + """Test cleanup_job_files raises ValueError when no job_id is set.""" + output = LinkedFileOutput() + + with pytest.raises(ValueError) as exc: + output.cleanup_job_files() + assert "No job_id set" in str(exc.value) + + +@patch("brevia.utilities.output.get_settings") +@patch("os.path.exists") +@patch("os.path.isdir") +@patch("shutil.rmtree") +def test_cleanup_job_files_local(mock_rmtree, mock_isdir, mock_exists, mock_settings): + """Test cleanup_job_files method for local filesystem.""" + mock_settings.return_value.file_output_base_path = '/tmp' + mock_exists.return_value = True + mock_isdir.return_value = True + + output = LinkedFileOutput(job_id='1234') + output.cleanup_job_files() + + mock_exists.assert_called_once_with('/tmp/1234') + mock_isdir.assert_called_once_with('/tmp/1234') + mock_rmtree.assert_called_once_with('/tmp/1234') + + +@patch("brevia.utilities.output.get_settings") +@patch("os.path.exists") +def test_cleanup_job_files_local_no_directory(mock_exists, mock_settings): + """Test cleanup_job_files method when local directory doesn't exist.""" + mock_settings.return_value.file_output_base_path = '/tmp' + mock_exists.return_value = False + + output = LinkedFileOutput(job_id='1234') + # Should not raise an error if directory doesn't exist + output.cleanup_job_files() + + mock_exists.assert_called_once_with('/tmp/1234') + + +@patch('brevia.utilities.output.LinkedFileOutput._s3_delete_objects') +@patch("brevia.utilities.output.get_settings") +def test_cleanup_job_files_s3(mock_settings, mock_s3_delete): + """Test cleanup_job_files method for S3 storage.""" + mock_settings.return_value.file_output_base_path = 's3://my-bucket' + mock_s3_delete.return_value = None + + output = LinkedFileOutput(job_id='1234') + output.cleanup_job_files() + + mock_s3_delete.assert_called_once_with('my-bucket', '1234/') + + +@patch('brevia.utilities.output.LinkedFileOutput._s3_delete_objects') +@patch("brevia.utilities.output.get_settings") +def test_cleanup_job_files_s3_with_prefix(mock_settings, mock_s3_delete): + """Test cleanup_job_files method for S3 storage with base prefix.""" + mock_settings.return_value.file_output_base_path = 's3://my-bucket/output/files' + mock_s3_delete.return_value = None + + output = LinkedFileOutput(job_id='1234') + output.cleanup_job_files() + + mock_s3_delete.assert_called_once_with('my-bucket', 'output/files/1234/') + + +def test_s3_delete_objects_import_error(): + """Test the _s3_delete_objects method for ImportError.""" + output = LinkedFileOutput(job_id='1234') + + # Mock the import to raise ModuleNotFoundError + with patch('builtins.__import__') as mock_import: + def side_effect(name, *args): + if name == 'boto3': + raise ModuleNotFoundError("No module named 'boto3'") + return __import__(name, *args) + + mock_import.side_effect = side_effect + + with pytest.raises(ImportError) as exc: + output._s3_delete_objects('my-bucket', '1234/') + assert str(exc.value) == 'Boto3 is not installed!' + + +def test_s3_delete_objects_method(): + """Test the _s3_delete_objects method.""" + output = LinkedFileOutput(job_id='1234') + + mock_s3 = MagicMock() + mock_client = MagicMock() + mock_s3.client.return_value = mock_client + + # Mock the list_objects_v2 response + mock_client.list_objects_v2.return_value = { + 'Contents': [ + {'Key': '1234/file1.txt'}, + {'Key': '1234/file2.txt'}, + {'Key': '1234/subdir/file3.txt'} + ] + } + mock_client.delete_objects.return_value = None + + with patch.dict('sys.modules', {'boto3': mock_s3}): + output._s3_delete_objects('my-bucket', '1234/') + + mock_client.list_objects_v2.assert_called_once_with( + Bucket='my-bucket', Prefix='1234/' + ) + mock_client.delete_objects.assert_called_once_with( + Bucket='my-bucket', + Delete={ + 'Objects': [ + {'Key': '1234/file1.txt'}, + {'Key': '1234/file2.txt'}, + {'Key': '1234/subdir/file3.txt'} + ] + } + ) + + +def test_s3_delete_objects_no_contents(): + """Test the _s3_delete_objects method when no objects are found.""" + output = LinkedFileOutput(job_id='1234') + + mock_s3 = MagicMock() + mock_client = MagicMock() + mock_s3.client.return_value = mock_client + + # Mock the list_objects_v2 response with no contents + mock_client.list_objects_v2.return_value = {} + + with patch.dict('sys.modules', {'boto3': mock_s3}): + # Should not raise an error when no objects are found + output._s3_delete_objects('my-bucket', '1234/') + + mock_client.list_objects_v2.assert_called_once_with( + Bucket='my-bucket', Prefix='1234/' + ) + mock_client.delete_objects.assert_not_called() + + +def test_s3_delete_objects_large_batch(): + """Test the _s3_delete_objects method with more than 1000 objects.""" + output = LinkedFileOutput(job_id='1234') + + mock_s3 = MagicMock() + mock_client = MagicMock() + mock_s3.client.return_value = mock_client + + # Create a list with more than 1000 objects + objects = [{'Key': f'1234/file{i}.txt'} for i in range(1500)] + mock_client.list_objects_v2.return_value = {'Contents': objects} + mock_client.delete_objects.return_value = None + + with patch.dict('sys.modules', {'boto3': mock_s3}): + output._s3_delete_objects('my-bucket', '1234/') + + # Should be called twice due to 1000 object limit per batch + assert mock_client.delete_objects.call_count == 2 + + # First call should have 1000 objects + first_call_args = mock_client.delete_objects.call_args_list[0] + assert len(first_call_args[1]['Delete']['Objects']) == 1000 + + # Second call should have remaining 500 objects + second_call_args = mock_client.delete_objects.call_args_list[1] + assert len(second_call_args[1]['Delete']['Objects']) == 500