Skip to content

Commit

Permalink
Merge pull request #71 from moleculemaker/add-reactionminer-backend-job
Browse files Browse the repository at this point in the history
feat: add ReactionMiner backend job, skip duplicate emails, fix casing for novoStoic in emails
  • Loading branch information
bodom0015 authored Jan 29, 2025
2 parents 7bc2edf + 71b7f28 commit 26143c8
Show file tree
Hide file tree
Showing 13 changed files with 279 additions and 7 deletions.
41 changes: 41 additions & 0 deletions app/cfg/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,47 @@ kubernetes_jobs:
subPath: 'weights'
claimName: 'mmli-clean-job-weights'

# Config for running ReactionMiner job
reactionminer:
image: "moleculemaker/reactionminer:latest"
imagePullPolicy: "Always"
env:
- name: HF_TOKEN_PATH
value: '/root/.cache/token'
- name: GROBID_SERVER
value: 'localhost'
volumes:
- name: 'shared-storage'
mountPath: '/workspace/10test/'
subPath: 'uws/jobs/reactionminer/JOB_ID/in'
claimName: 'mmli-clean-job-weights'
- name: 'shared-storage'
mountPath: '/workspace/extraction/results_filtered/'
subPath: 'uws/jobs/reactionminer/JOB_ID/out'
claimName: 'mmli-clean-job-weights'
- mountPath: '/root/.cache/huggingface/hub'
name: 'shared-storage'
subPath: 'uws/jobs/reactionminer/.cache'
claimName: 'mmli-clean-job-weights'

# Create a secret named HuggingFace API token
# Format:
# apiVersion: v1
# kind: Secret
# data:
# token: YOUR_API_TOKEN
secrets:
- mountPath: '/root/.cache/token'
name: 'token'
subPath: 'token'
readOnly: 'true'
secretName: huggingface-token
# "itemList" takes the place of "items"
# This should avoid overlap with the built-in items() function
itemList:
- key: token
path: token

# Config for running SOMN job
somn:
image: "ianrinehart/somn:1.1"
Expand Down
1 change: 1 addition & 0 deletions app/models/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class JobType(str, Enum):
NOVOSTOIC_PATHWAYS = 'novostoic-pathways'
NOVOSTOIC_ENZRANK = 'novostoic-enzrank'
NOVOSTOIC_DGPREDICTOR = 'novostoic-dgpredictor'
REACTIONMINER = 'reactionminer'
SOMN = 'somn'
DEFAULT = 'defaults'

Expand Down
4 changes: 4 additions & 0 deletions app/routers/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from services.minio_service import MinIOService
from services.chemscraper_service import ChemScraperService
from services.aceretro_service import ACERetroService
from services.reactionminer_service import ReactionMinerService


from typing import Optional
Expand Down Expand Up @@ -83,6 +84,9 @@ async def get_results(bucket_name: str, job_id: str, service: MinIOService = Dep
print("Getting novostoic-dgpredictor job result")
return await NovostoicService.dgPredictorResultPostProcess(bucket_name, job_id, service, db)

elif bucket_name == JobType.REACTIONMINER:
return await ReactionMinerService.resultPostProcess(bucket_name, job_id, service, db)

elif bucket_name == JobType.SOMN:
return await SomnService.resultPostProcess(bucket_name, job_id, service, db)

Expand Down
10 changes: 9 additions & 1 deletion app/routers/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import uuid
import csv
import io
import traceback

from typing import List

Expand Down Expand Up @@ -71,6 +72,10 @@ async def create_job(
command = ''
environment = []

# Mount in secrets/volumes at runtime
volumes = []
secrets = []

if job_type == JobType.DEFAULT:
command = app_config['kubernetes_jobs'][job_type]['command']
#command = f'ls -al /uws/jobs/{job_type}/{job_id}'
Expand Down Expand Up @@ -115,7 +120,9 @@ async def create_job(

command = f"python entrypoint.py --job_id {job_id}"
# Job is created at end of function

elif job_type == JobType.REACTIONMINER:
log.debug(f'Running ReactionMiner job: {job_id}')
environment = app_config['kubernetes_jobs']['reactionminer']['env']
elif job_type == JobType.SOMN:
# Build up example_request.csv from user input, upload to MinIO?
job_config = json.loads(job_info.replace('\"', '"'))
Expand Down Expand Up @@ -231,6 +238,7 @@ async def create_job(
kubejob_service.create_job(job_type=job_type, job_id=job_id, run_id=run_id, image_name=image_name, command=command, environment=environment)
except Exception as ex:
log.error("Failed to create Job: " + str(ex))
log.error(traceback.format_exc())
raise HTTPException(status_code=400, detail="Failed to create Job: " + str(ex))

else:
Expand Down
51 changes: 45 additions & 6 deletions app/services/kubejob_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import sqlalchemy as db

from services.email_service import EmailService
from services.minio_service import MinIOService

log = get_logger(__name__)

Expand Down Expand Up @@ -118,7 +119,9 @@ def __init__(self):
#self.connection.run_sync(SQLModel.metadata.create_all)
self.metadata = db.MetaData()
self.jobs = []

self.email_service = EmailService()
self.minio_service = MinIOService()

self.stream = None
self.logger.info('Starting KubeWatcher')
Expand All @@ -132,7 +135,7 @@ def send_notification_email(self, updated_job, new_phase):
novostoic_frontend_url = app_config['novostoic_frontend_url']
if job_type == JobType.NOVOSTOIC_PATHWAYS:
results_url = f'{novostoic_frontend_url}/pathway-search/result/{updated_job.job_id}'
job_type_name = 'NovoStoic'
job_type_name = 'novoStoic'
elif job_type == JobType.NOVOSTOIC_OPTSTOIC:
results_url = f'{novostoic_frontend_url}/overall-stoichiometry/result/{updated_job.job_id}'
job_type_name = 'OptStoic'
Expand All @@ -142,6 +145,8 @@ def send_notification_email(self, updated_job, new_phase):
elif job_type == JobType.NOVOSTOIC_DGPREDICTOR:
results_url = f'{novostoic_frontend_url}/thermodynamical-feasibility/result/{updated_job.job_id}'
job_type_name = 'dGPredictor'
else:
raise ValueError(f"Unrecognized novoStoic subjob type {job_type} not in existing Job Types {JobType}")
elif job_type == JobType.SOMN:
somn_frontend_url = app_config['somn_frontend_url']
results_url = f'{somn_frontend_url}/results/{updated_job.job_id}'
Expand All @@ -157,26 +162,54 @@ def send_notification_email(self, updated_job, new_phase):
elif job_type == JobType.ACERETRO:
aceretro_frontend_url = app_config['aceretro_frontend_url']
results_url = f'{aceretro_frontend_url}/results/{updated_job.job_id}'
job_type_name = 'ACERETRO'
job_type_name = 'ACERetro'
elif job_type == JobType.REACTIONMINER:
reactionminer_frontend_url = app_config['reactionminer_frontend_url']
results_url = f'{reactionminer_frontend_url}/results/{updated_job.job_id}'
job_type_name = 'ReactionMiner'
else:
raise ValueError(f"Unrecognized job type {job_type} not in existing Job Types {JobType}")

job_id = updated_job.job_id

# Send email notification about success/failure
if new_phase == JobStatus.COMPLETED and updated_job.email:
if new_phase == JobStatus.COMPLETED and updated_job.email and self.should_send_email(job_type, job_id):
try:
self.email_service.send_email(updated_job.email,
f'''Result for your {job_type_name} Job ({updated_job.job_id}) is ready''',
f'''Result for your {job_type_name} Job ({job_id}) is ready''',
f'''The result for your {job_type_name} Job is available at {results_url}''')
self.mark_email_as_sent(job_type, job_id, success=True)
except Exception as e:
log.error(f'Failed to send email notification on success: {str(e)}')
elif new_phase == JobStatus.ERROR and updated_job.email:
elif new_phase == JobStatus.ERROR and updated_job.email and self.should_send_email(job_type, job_id):
try:
self.email_service.send_email(updated_job.email,
f'''{job_type_name} Job ({updated_job.job_id}) failed''',
f'''{job_type_name} Job ({job_id}) failed''',
f'''An error occurred in computing the result for your {job_type_name} job.''')
self.mark_email_as_sent(job_type, job_id, success=False)
except Exception as e:
log.error(f'Failed to send email notification on failure: {str(e)}')

def should_send_email(self, job_type, job_id):
# Check if email has already been sent
# if so, file should exist in MinIO
minio_bucket_name = job_type
minio_check_path = f'{job_id}/email-sent'
if self.minio_service.check_file_exists(minio_bucket_name, minio_check_path):
log.debug(f'Skipped sending email for {job_id}: email has already been sent for this job')
return False
return True

def mark_email_as_sent(self, job_type, job_id, success):
minio_bucket_name = job_type
minio_check_path = f'{job_id}/email-sent'
if success:
# Job completed successfully, email was sent indicating success
self.minio_service.upload_file(minio_bucket_name, minio_check_path, 'success')
else:
# Job failed with an error, email was sent indicating error
self.minio_service.upload_file(minio_bucket_name, minio_check_path, 'error')

def run(self):
# Ignore kube-system namespace
# TODO: Parameterize this?
Expand Down Expand Up @@ -528,6 +561,11 @@ def create_job(job_type, job_id, run_id=None, image_name=None, command=None, own
for volume in app_config['kubernetes_jobs'][job_type]['volumes']:
all_volumes.append(volume)

# Include secrets, if necessary (e.g. ReactionMiner for HuggingFace API token)
secrets = []
if 'secrets' in app_config['kubernetes_jobs'][job_type]:
secrets = app_config['kubernetes_jobs'][job_type]['secrets']

jobCompleteApiUrl = f'''{app_config['server']['protocol']}://{os.path.join(
app_config['server']['hostName'],
app_config['server']['basePath'],
Expand Down Expand Up @@ -572,6 +610,7 @@ def create_job(job_type, job_id, run_id=None, image_name=None, command=None, own
securityContext=app_config['kubernetes_jobs'][job_type]['securityContext'] if 'securityContext' in app_config['kubernetes_jobs'][job_type] else None,
workingVolume=app_config['kubernetes_jobs']['defaults']['workingVolume'],
volumes=all_volumes,
secrets=secrets,
resources=app_config['kubernetes_jobs'][job_type]['resources'] if 'resources' in app_config['kubernetes_jobs'][job_type] else app_config['kubernetes_jobs']['defaults']['resources'],
# apiToken=config['jwt']['hs256Secret'],
apiToken='dummy',
Expand Down
7 changes: 7 additions & 0 deletions app/services/minio_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ def get_file(self, bucket_name, object_name):
log.error("Error: ", err)

def upload_file(self, bucket_name, object_name, file_content):
# If file_content is a string, first convert to a byte stream
if isinstance(file_content, str):
file_content = file_content.encode('utf-8')

try:
# Upload file to the specified bucket
result = self.client.put_object(
Expand All @@ -44,6 +48,9 @@ def upload_file(self, bucket_name, object_name, file_content):
except S3Error as err:
log.error("Error: ", err)
return False

def list_files(self, bucket_name, path, recursive=False):
return self.client.list_objects(bucket_name, prefix=path, recursive=recursive)

def get_file_urls(self, bucket_name, path):
try:
Expand Down
45 changes: 45 additions & 0 deletions app/services/reactionminer_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import json
import os
import time

from fastapi import HTTPException
from sqlmodel.ext.asyncio.session import AsyncSession

from config import log
from models.enums import JobStatus
from services.minio_service import MinIOService


class ReactionMinerService:
def __init__(self, db) -> None:
self.db = db

async def update_job_phase(self, jobObject, phase: JobStatus):
jobObject.phase = phase
if phase == JobStatus.PROCESSING:
jobObject.time_start = int(time.time())
else:
jobObject.time_end = int(time.time())
self.db.add(jobObject)
await self.db.commit()

@staticmethod
async def resultPostProcess(bucket_name: str, job_id: str, service: MinIOService, db: AsyncSession):
"""
Inputs stored in Minio: /{job_id}/in/[name].pdf Bucket name: reactionminer
Outputs stored in Minio: /{job_id}/out/[name].json Bucket name: reactionminer
"""
folder_path = f"/{job_id}/out/"
objects = service.list_files(bucket_name, folder_path)

# Iterate over folder and add all contents to a dictionary
content = {}
for obj in objects:
file_name = os.path.basename(obj.object_name).split('/')[-1]
content[file_name] = json.loads(service.get_file(bucket_name=bucket_name, object_name=obj.object_name))

# Return the dictionary if it has contents
if not content:
raise HTTPException(status_code=404, detail=f"No output files were found")

return content
18 changes: 18 additions & 0 deletions app/services/templates/job.tpl.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,18 @@ spec:
- name: "shared-storage"
persistentVolumeClaim:
claimName: "{{ workingVolume.claimName }}"
{%- for sec in secrets %}
- name: "{{ sec.name }}"
secret:
secretName: "{{ sec.secretName }}"
{%- if sec.itemList %}
items:
{%- for secretItem in sec.itemList %}
- key: "{{ secretItem.key }}"
path: "{{ secretItem.path }}"
{%- endfor %}
{%- endif %}
{%- endfor %}
initContainers:
- name: init
image: moleculemaker/mmli-backend:kubejob
Expand Down Expand Up @@ -157,6 +169,12 @@ spec:
subPath: "{{ volume.subPath | replace('JOB_ID',jobId) }}"
readOnly: {{ volume.readOnly }}
{%- endfor %}
{%- for sec in secrets %}
- name: "{{ sec.name }}"
mountPath: "{{ sec.mountPath }}"
readOnly: true
subPath: "{{ sec.subPath }}"
{%- endfor %}
containers:
- name: post-job
image: "moleculemaker/mmli-backend:kubejob"
Expand Down
6 changes: 6 additions & 0 deletions chart/pvc-explorer.pod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ spec:
volumeMounts:
- mountPath: "/app/data"
name: data
- mountPath: "/nfs"
name: nfs
volumes:
- name: data
persistentVolumeClaim:
Expand All @@ -23,3 +25,7 @@ spec:

# staging
claimName: mmli-clean-job-weights
- name: nfs
nfs:
path: /taiga/ncsa/radiant/bbfp/mmli1/
server: taiga-nfs.ncsa.illinois.edu
4 changes: 4 additions & 0 deletions chart/values.local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ controller:
novostoic_frontend_url: "http://localhost:4200"
somn_frontend_url: "http://localhost:4200"
chemscraper_frontend_url: "http://localhost:4200"
clean_frontend_url: "http://localhost:4200"
molli_frontend_url: "http://localhost:4200"
aceretro_frontend_url: "http://localhost:4200"
reactionminer_frontend_url: "http://localhost:4200"

postgresql:
enabled: true
Expand Down
Loading

0 comments on commit 26143c8

Please sign in to comment.