Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 97 additions & 11 deletions brevia/async_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
import logging
import time
from datetime import datetime
import sqlalchemy
from sqlalchemy import BinaryExpression, Column, desc, func, String, text
from pydantic import BaseModel as PydanticModel
from sqlalchemy.dialects.postgresql import JSON, TIMESTAMP, SMALLINT
from sqlalchemy.orm import Session
from sqlalchemy.orm import Session, Query
from langchain_community.vectorstores.pgembedding import BaseModel
from brevia.connection import db_connection
from brevia.services import BaseService
from brevia.utilities.dates import date_filter
from brevia.utilities.json_api import query_data_pagination
from brevia.utilities.types import load_type

MAX_DURATION = 120 # default max duration is 120 min / 2hr
Expand All @@ -19,22 +22,22 @@ class AsyncJobsStore(BaseModel):
""" Async Jobs table """
__tablename__ = "async_jobs"

service = sqlalchemy.Column(sqlalchemy.String(), nullable=False)
payload = sqlalchemy.Column(JSON())
expires = sqlalchemy.Column(TIMESTAMP(timezone=False))
created = sqlalchemy.Column(
service = Column(String(), nullable=False)
payload = Column(JSON())
expires = Column(TIMESTAMP(timezone=False))
created = Column(
TIMESTAMP(timezone=False),
nullable=False,
server_default=sqlalchemy.func.current_timestamp(),
server_default=func.current_timestamp(),
)
completed = sqlalchemy.Column(TIMESTAMP(timezone=False))
locked_until = sqlalchemy.Column(TIMESTAMP(timezone=False))
max_attempts = sqlalchemy.Column(
completed = Column(TIMESTAMP(timezone=False))
locked_until = Column(TIMESTAMP(timezone=False))
max_attempts = Column(
SMALLINT(),
nullable=False,
server_default='1',
)
result = sqlalchemy.Column(JSON(), nullable=True)
result = Column(JSON(), nullable=True)


def single_job(uuid: str) -> (AsyncJobsStore | None):
Expand All @@ -43,6 +46,89 @@ def single_job(uuid: str) -> (AsyncJobsStore | None):
return session.get(AsyncJobsStore, uuid)


class JobsFilter(PydanticModel):
""" Jobs filter """
min_date: str | None = None
max_date: str | None = None
service: str | None = None
completed: bool | None = None
page: int = 1
page_size: int = 50


def get_jobs(filter: JobsFilter) -> dict: # pylint: disable=redefined-builtin
"""
Read async jobs with optional filters using pagination data in response.
"""

# Handle date filters - only apply if explicitly provided
filter_min_date = text('1 = 1') # always true by default
filter_max_date = text('1 = 1') # always true by default

if filter.min_date:
min_date = date_filter(filter.min_date, 'min')
filter_min_date = AsyncJobsStore.created >= min_date

if filter.max_date:
max_date = date_filter(filter.max_date, 'max')
filter_max_date = AsyncJobsStore.created <= max_date

filter_service = text('1 = 1') # (default) always true expression
if filter.service:
filter_service = AsyncJobsStore.service == filter.service
filter_completed = text('1 = 1') # (default) always true expression
if filter.completed is not None:
filter_completed = (
AsyncJobsStore.completed.is_not(None)
if filter.completed
else AsyncJobsStore.completed.is_(None)
)

with Session(db_connection()) as session:
query = get_jobs_query(
session=session,
filter_min_date=filter_min_date,
filter_max_date=filter_max_date,
filter_service=filter_service,
filter_completed=filter_completed,
)
result = query_data_pagination(
query=query,
page=filter.page,
page_size=filter.page_size
)
return result


def get_jobs_query(
session: Session,
filter_min_date: BinaryExpression,
filter_max_date: BinaryExpression,
filter_service: BinaryExpression,
filter_completed: BinaryExpression,
) -> Query:
"""
Constructs a SQLAlchemy query to retrieve async jobs based on specified filters.
"""

query = (
session.query(
AsyncJobsStore.uuid,
AsyncJobsStore.service,
AsyncJobsStore.payload,
AsyncJobsStore.expires,
AsyncJobsStore.created,
AsyncJobsStore.completed,
AsyncJobsStore.locked_until,
AsyncJobsStore.max_attempts,
AsyncJobsStore.result,
)
.filter(filter_min_date, filter_max_date, filter_service, filter_completed)
.order_by(desc(AsyncJobsStore.created))
)
return query


def create_job(
service: str,
payload: dict,
Expand Down
35 changes: 5 additions & 30 deletions brevia/chat_history.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""Chat history table & utilities"""
from typing import List
import logging
from datetime import datetime, time
from langchain_community.vectorstores.pgembedding import BaseModel, CollectionStore
from pydantic import BaseModel as PydanticModel
import sqlalchemy
Expand All @@ -12,6 +11,7 @@
from brevia.connection import db_connection
from brevia.models import load_embeddings
from brevia.settings import get_settings
from brevia.utilities.dates import date_filter
from brevia.utilities.json_api import query_data_pagination
from brevia.utilities.uuid import is_valid_uuid

Expand Down Expand Up @@ -145,31 +145,6 @@ class ChatHistoryFilter(PydanticModel):
page_size: int = 50


def get_date_filter(date_str, type_str):
"""
Parses a date string into a datetime object with combined time information.

Args:
date_str (str): A string representing a date in the format 'YYYY-MM-DD'.
None if no specific date is provided.

type_str (str): Indicates whether to create a maximum or minimum date filter.
Valid values are 'max' or 'min'.
"""
max_date = datetime.now()
min_date = datetime.fromtimestamp(0)

if date_str is not None:
parsed_date = datetime.strptime(date_str, '%Y-%m-%d')
if type_str == 'max':
max_date = parsed_date
return datetime.combine(max_date, time.max)
min_date = parsed_date
return datetime.combine(min_date, time.min)

return max_date if type_str == 'max' else min_date


def get_collection_filter(collection_name):
"""
Constructs a filter expression based on the collection name.
Expand All @@ -196,8 +171,8 @@ def get_history(filter: ChatHistoryFilter) -> dict: # pylint: disable=redefined
Read chat history with optional filters using pagination data in response.
"""

min_date = get_date_filter(filter.min_date, 'min')
max_date = get_date_filter(filter.max_date, 'max')
min_date = date_filter(filter.min_date, 'min')
max_date = date_filter(filter.max_date, 'max')
filter_collection = get_collection_filter(filter.collection)
filter_session_id = get_session_filter(filter.session_id)

Expand All @@ -223,8 +198,8 @@ def get_history_sessions(filter: ChatHistoryFilter) -> dict:
Read chat history with optional filters using pagination data in response.
"""

min_date = get_date_filter(filter.min_date, 'min')
max_date = get_date_filter(filter.max_date, 'max')
min_date = date_filter(filter.min_date, 'min')
max_date = date_filter(filter.max_date, 'max')
filter_collection = get_collection_filter(filter.collection)

with Session(db_connection()) as session:
Expand Down
161 changes: 126 additions & 35 deletions brevia/postman/Brevia API.postman_collection.json
Original file line number Diff line number Diff line change
Expand Up @@ -2163,6 +2163,132 @@
}
]
},
{
"name": "Jobs",
"item": [
{
"name": "jobs - Read single job",
"request": {
"auth": {
"type": "bearer",
"bearer": [
{
"key": "token",
"value": "{{access_token}}",
"type": "string"
}
]
},
"method": "GET",
"header": [
{
"key": "Content-Type",
"value": "application/json",
"type": "text",
"disabled": true
}
],
"url": {
"raw": "{{baseUrl}}/jobs/{{job_id}}",
"host": [
"{{baseUrl}}"
],
"path": [
"jobs",
"{{job_id}}"
]
}
},
"response": []
},
{
"name": "Jobs - Read all jobs",
"request": {
"auth": {
"type": "bearer",
"bearer": [
{
"key": "token",
"value": "{{access_token}}",
"type": "string"
}
]
},
"method": "GET",
"header": [
{
"key": "Content-Type",
"value": "application/json",
"type": "text",
"disabled": true
}
],
"url": {
"raw": "{{baseUrl}}/jobs",
"host": [
"{{baseUrl}}"
],
"path": [
"jobs"
]
}
},
"response": []
},
{
"name": "Jobs - Read jobs filters",
"request": {
"auth": {
"type": "bearer",
"bearer": [
{
"key": "token",
"value": "{{access_token}}",
"type": "string"
}
]
},
"method": "GET",
"header": [
{
"key": "Content-Type",
"value": "application/json",
"type": "text",
"disabled": true
}
],
"url": {
"raw": "{{baseUrl}}/jobs?min_date=2025-04-20&completed=true&page=1&page_size=10",
"host": [
"{{baseUrl}}"
],
"path": [
"jobs"
],
"query": [
{
"key": "min_date",
"value": "2025-04-20"
},
{
"key": "completed",
"value": "true"
},
{
"key": "page",
"value": "1"
},
{
"key": "page_size",
"value": "10"
}
]
}
},
"response": []
}
]
},
{
"name": "upload_analyze - Upload file and analyze using extension",
"event": [
Expand Down Expand Up @@ -2277,41 +2403,6 @@
}
},
"response": []
},
{
"name": "jobs - Read single job",
"request": {
"auth": {
"type": "bearer",
"bearer": [
{
"key": "token",
"value": "{{access_token}}",
"type": "string"
}
]
},
"method": "GET",
"header": [
{
"key": "Content-Type",
"value": "application/json",
"type": "text",
"disabled": true
}
],
"url": {
"raw": "{{baseUrl}}/jobs/{{job_id}}",
"host": [
"{{baseUrl}}"
],
"path": [
"jobs",
"{{job_id}}"
]
}
},
"response": []
}
]
}
Loading
Loading