Skip to content
Open
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
34d0160
✨ feat: add pod status checking to distinfuish pending vs running jobs
fMurugi Apr 28, 2026
50a8377
fix: check pod is up and running before setting the job as in_progress
fMurugi Apr 28, 2026
38eb99f
fix: update the status of tune to Failed if the job was never create
fMurugi Apr 28, 2026
f9f8302
feat: add pod status checking to distinguish pending vs running fine-…
fMurugi Apr 29, 2026
daef4a5
refactor(k8s): centralize job status resolution and prevent Pending→I…
fMurugi Apr 29, 2026
2132ab1
♻️refactor(k8s): remove circular import
fMurugi Apr 29, 2026
51df810
fix: mark celery task as complete if job and pod are not in the clust…
fMurugi Apr 29, 2026
3e2b703
♻️ refactor: decouple job submission and fix event loop blocking
fMurugi Apr 29, 2026
032b1c5
fix(bug):ensure the pod is created before we check for pod status
fMurugi Apr 29, 2026
84b7394
fix:replace unkown status to pending
fMurugi Apr 29, 2026
c929fa0
fix:tune fails if job is not created
fMurugi Apr 29, 2026
0ae5786
fix:ensure when no job created the status is Failed
fMurugi Apr 29, 2026
8fb844d
fix:revert back to pending instead of automaci fail
fMurugi Apr 29, 2026
031a4f6
Reverting feature range 3e2b through 8fb84 these changes broke funtio…
fMurugi Apr 29, 2026
ea40330
fix:remove circular importation
fMurugi Apr 30, 2026
76151db
refactor: make Celery result.get non-blocking using asyncio.to_thread
fMurugi Apr 30, 2026
f295ca3
refactor:remove comments
fMurugi Apr 30, 2026
e2a2c2b
Merge branch 'main' into feat/check-pod-phase
bglar May 5, 2026
a536f2f
refactor: rename functions
fMurugi May 5, 2026
b38de23
Merge branch 'feat/check-pod-phase' of github.com:terrastackai/geospa…
fMurugi May 5, 2026
fce8ecb
reafctor:format with black and ruff
fMurugi May 5, 2026
053e025
refactor:formatting
fMurugi May 5, 2026
bd518d0
refactor:rename functions
fMurugi May 6, 2026
ef8ba3c
Merge branch 'main' into feat/check-pod-phase
fMurugi May 6, 2026
4db5d05
refactor:rename functions
fMurugi May 6, 2026
dfd9de2
fix:use pydantic automatic read of env variable and feildvalidator to…
fMurugi May 6, 2026
3dbe783
fix:remove import inside the class
fMurugi May 6, 2026
8f8aae2
refactor:use strings for env var from config
fMurugi May 6, 2026
b02a45c
fix: combine job success and fail into terminal state
fMurugi May 6, 2026
f5007f9
fix:return the terminal state if found
fMurugi May 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 32 additions & 16 deletions gfmstudio/celery_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@
from gfmstudio.amo.utils import invoke_model_offboarding_handler
from gfmstudio.config import settings
from gfmstudio.fine_tuning.core.kubernetes import (
check_k8s_job_status,
check_tuning_task_status,
deploy_hpo_tuning_job,
deploy_tuning_job,
)
from gfmstudio.fine_tuning.utils.webhook_event_handlers import (
handle_dataset_factory_webhooks,
handle_fine_tuning_webhooks,
update_tune_status,
)
from gfmstudio.inference.services import (
invoke_cancel_inference_handler,
Expand Down Expand Up @@ -55,14 +56,14 @@
)
def deploy_tuning_job_celery_task(**kwargs):
# Inject the monitoring task into kwargs to avoid circular import
kwargs['_monitor_task'] = monitor_k8_job_completion_task
kwargs["_monitor_task"] = monitor_k8_job_completion_task
return asyncio.run(deploy_tuning_job(**kwargs))


@celery_app.task(
name="monitor_k8_job_completion_task",
queue=FT_SERVICE_NAME,
bind=True, # Bind to get access to self for retry
bind=True,
max_retries=30, # Allow many retries
default_retry_delay=30, # Start with 30 seconds
)
Expand All @@ -72,31 +73,46 @@ def monitor_k8_job_completion_task(self, ftune_id: str):
max_wait = settings.KJOB_MAX_WAIT_SECONDS or 7200

try:
k8s_job_status, _ = asyncio.run(check_k8s_job_status(ftune_id))
k8s_job_status, _ = asyncio.run(check_tuning_task_status(ftune_id))
except Exception as exc:
if "not found" in str(exc):
# Job not found, consider it done (likely already completed and deleted)
logger.debug(f"{ftune_id}: Job not found, assuming completed and cleaned up")
logger.debug(
f"{ftune_id}: Job not found, assuming completed and cleaned up"
)
return "Completed"
# Unexpected error, retry with exponential backoff
logger.warning(f"{ftune_id}: Error checking job status, will retry: {exc}")
raise self.retry(exc=exc, countdown=min(2 ** self.request.retries * 30, max_wait))

# Handle None status (job not found after retries)
raise self.retry(exc=exc, countdown=min(2**self.request.retries * 30, max_wait))

if k8s_job_status is None:
# Job doesn't exist - either completed and deleted, or never created
logger.debug(f"{ftune_id}: Job status is None, assuming completed and cleaned up")
logger.debug(
f"{ftune_id}: Job status is None, assuming completed and cleaned up"
)
return "Completed"

if k8s_job_status == "Unknown":
logger.info(
f"{ftune_id}: Job status is Unknown (resources deleted), assuming completed and cleaned up"
)
return "Completed"

if k8s_job_status in ["Complete", "Failed"]:
# Job is done
logger.info(f"{ftune_id}: Job finished with status: {k8s_job_status}")
return k8s_job_status


if k8s_job_status == "Running":
try:
asyncio.run(update_tune_status(ftune_id, "In_progress"))
except Exception as e:
logger.warning(f"{ftune_id}: Failed to update status to In_progress: {e}")

# Job still running, retry with exponential backoff
# countdown: 30s, 60s, 120s, 240s, 480s, 960s (max with default 600s setting)
countdown = min(2 ** self.request.retries * 30, max_wait)
logger.info(f"{ftune_id}: Job status={k8s_job_status}, will check again in {countdown}s")
countdown = min(2**self.request.retries * 30, max_wait)
logger.info(
f"{ftune_id}: Job status={k8s_job_status}, will check again in {countdown}s"
)
raise self.retry(countdown=countdown)


Expand All @@ -105,7 +121,7 @@ def monitor_k8_job_completion_task(self, ftune_id: str):
queue=FT_SERVICE_NAME,
)
def deploy_hpo_tuning_celery_task(**kwargs):
kwargs['_monitor_task'] = monitor_k8_job_completion_task
kwargs["_monitor_task"] = monitor_k8_job_completion_task
return asyncio.run(deploy_hpo_tuning_job(**kwargs))


Expand Down
Loading
Loading