Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions brevia/async_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from datetime import datetime, timezone
from sqlalchemy import BinaryExpression, Column, desc, func, String, text
from pydantic import BaseModel as PydanticModel
from sqlalchemy import delete, select
from sqlalchemy.dialects.postgresql import JSON, TIMESTAMP, SMALLINT
from sqlalchemy.orm import Session, Query
from langchain_community.vectorstores.pgembedding import BaseModel
Expand All @@ -12,6 +13,7 @@
from brevia.utilities.dates import date_filter
from brevia.utilities.json_api import query_data_pagination
from brevia.utilities.types import load_type
from brevia.utilities.output import LinkedFileOutput

MAX_DURATION = 120 # default max duration is 120 min / 2hr
MAX_ATTEMPTS = 1 # default max number of attempts
Expand Down Expand Up @@ -264,3 +266,68 @@ def is_job_available(
return False

return True


def cleanup_async_jobs(before_date: datetime, dry_run: bool):
"""
Remove async jobs created before the specified date.
This function removes async_jobs records from the Brevia database
where the 'created' timestamp is older than the specified date.

Args:
before_date (datetime): The cutoff date for job deletion.
dry_run (bool): If True, only show what would be deleted without actually
deleting.
"""
# Ensure before_date is timezone-aware
if before_date.tzinfo is None:
before_date = before_date.replace(tzinfo=timezone.utc)

log = logging.getLogger(__name__)

with Session(db_connection()) as session:
# First, count how many records would be affected
count_query = select(AsyncJobsStore).where(AsyncJobsStore.created < before_date)
result = session.execute(count_query)
jobs_to_delete = result.scalars().all()

if not jobs_to_delete:
log.info(f"No async jobs found created before {before_date}")
return 0

log.info(f"Found {len(jobs_to_delete)} async jobs created before {before_date}")

if dry_run:
log.info("DRY RUN - The following jobs would be deleted:")
for job in jobs_to_delete:
status = "completed" if job.completed else "pending"
log.info(
f" - Job UUID: {job.uuid}, Created: {job.created}, "
f"Status: {status}, Service: {job.service}"
)
return len(jobs_to_delete)

# Store job UUIDs before deletion for file cleanup
job_uuids = [str(job.uuid) for job in jobs_to_delete]

# Perform the actual deletion from database
delete_query = delete(AsyncJobsStore).where(
AsyncJobsStore.created < before_date
)
result = session.execute(delete_query)
session.commit()

# Clean up files for each deleted job
files_cleanup_errors = 0
for job_uuid in job_uuids:
try:
file_output = LinkedFileOutput(job_id=job_uuid)
file_output.cleanup_job_files()
except Exception as exc: # pylint: disable=broad-exception-caught
files_cleanup_errors += 1
log.warning(f"Failed to cleanup files for job {job_uuid}: {exc}")

if files_cleanup_errors > 0:
log.warning(f"{files_cleanup_errors} jobs had file cleanup errors")

return result.rowcount
35 changes: 35 additions & 0 deletions brevia/commands.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
"""Utility commands for applications"""
import json
import sys
from datetime import datetime
from os import getcwd, path
from logging import config
import click
from brevia.alembic import current, upgrade, downgrade
from brevia.alembic import revision as create_revision
from brevia.async_jobs import cleanup_async_jobs
from brevia.index import update_links_documents
from brevia.utilities import files_import, run_service, collections_io
from brevia.tokens import create_token
Expand Down Expand Up @@ -167,3 +169,36 @@ def update_collection_links(collection: str):
init_logging()
num = update_links_documents(collection_name=collection)
print(f'Updated {num} links documents. Done!')


@click.command()
@click.option(
'--before-date',
type=click.DateTime(formats=['%Y-%m-%d', '%Y-%m-%d %H:%M:%S']),
required=True,
help='Remove async jobs created before this date '
'(format: YYYY-MM-DD or YYYY-MM-DD HH:MM:SS)'
)
@click.option(
'--dry-run',
is_flag=True,
help='Show what would be deleted without actually deleting'
)
def cleanup_jobs(before_date: datetime, dry_run: bool):
"""
Remove async jobs and related files created before the specified date.
"""
# Only ask for confirmation if not doing a dry run
if not dry_run:
if not click.confirm('Are you sure you want to delete async jobs?'):
click.echo('Operation cancelled.')
return
init_logging()
num = cleanup_async_jobs(before_date=before_date, dry_run=dry_run)

if dry_run:
click.echo(f"Dry run completed. {num} {'job' if num == 1 else 'jobs'} would be deleted.")
elif num == 0:
click.echo("No async jobs to delete.")
else:
click.echo(f"Successfully deleted {num} async jobs.")
64 changes: 64 additions & 0 deletions brevia/utilities/output.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""File output utilities"""
import tempfile
import os
import shutil
from brevia.settings import get_settings


Expand Down Expand Up @@ -86,3 +87,66 @@ def write(self, content: str, filename: str):
os.unlink(output_path)

return self.file_url(filename)

def _s3_delete_objects(self, bucket_name: str, prefix: str):
"""
Delete all objects in S3 with the specified prefix.

:param bucket_name: The name of the S3 bucket.
:param prefix: The prefix to match objects for deletion.
"""
try:
import boto3 # pylint: disable=import-outside-toplevel
s3 = boto3.client('s3')

# List all objects with the prefix
response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)

if 'Contents' not in response:
return # No objects found

# Prepare objects for deletion
objects_to_delete = [{'Key': obj['Key']} for obj in response['Contents']]

# Delete objects in batches (S3 allows max 1000 objects per delete request)
for i in range(0, len(objects_to_delete), 1000):
batch = objects_to_delete[i:i + 1000]
s3.delete_objects(
Bucket=bucket_name,
Delete={'Objects': batch}
)

except ModuleNotFoundError:
raise ImportError('Boto3 is not installed!')

def cleanup_job_files(self):
"""
Remove all files in the job folder, including the folder itself.
Handles both local filesystem and S3 storage.

:raises ValueError: If no job_id is set.
"""
if not self.job_id:
raise ValueError("No job_id set. Cannot cleanup files without a job_id.")

base_path = get_settings().file_output_base_path

if base_path.startswith('s3://'):
# S3 cleanup
bucket_name = base_path.split('/')[2]
base_prefix = '/'.join(base_path.split('/')[3:]).lstrip('/')

# Build the prefix for this job's files
if base_prefix:
job_prefix = f"{base_prefix}/{self.job_id}/"
else:
job_prefix = f"{self.job_id}/"

self._s3_delete_objects(bucket_name, job_prefix)

else:
# Local filesystem cleanup
job_dir = f"{base_path}/{self.job_id}"

if os.path.exists(job_dir) and os.path.isdir(job_dir):
shutil.rmtree(job_dir)
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ repository = "https://github.com/brevia-ai/brevia"
extras = [ "standard" ]

[tool.poetry.scripts]
cleanup_jobs = "brevia.commands:cleanup_jobs"
create_token = "brevia.commands:create_access_token"
create_openapi = "brevia.commands:create_openapi"
db_current = "brevia.commands:db_current_cmd"
Expand Down
Loading
Loading