Skip to content
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
34d0160
✨ feat: add pod status checking to distinfuish pending vs running jobs
fMurugi Apr 28, 2026
50a8377
fix: check pod is up and running before setting the job as in_progress
fMurugi Apr 28, 2026
38eb99f
fix: update the status of tune to Failed if the job was never create
fMurugi Apr 28, 2026
f9f8302
feat: add pod status checking to distinguish pending vs running fine-…
fMurugi Apr 29, 2026
daef4a5
refactor(k8s): centralize job status resolution and prevent Pending→I…
fMurugi Apr 29, 2026
2132ab1
♻️refactor(k8s): remove circular import
fMurugi Apr 29, 2026
51df810
fix: mark celery task as complete if job and pod are not in the clust…
fMurugi Apr 29, 2026
3e2b703
♻️ refactor: decouple job submission and fix event loop blocking
fMurugi Apr 29, 2026
032b1c5
fix(bug):ensure the pod is created before we check for pod status
fMurugi Apr 29, 2026
84b7394
fix:replace unkown status to pending
fMurugi Apr 29, 2026
c929fa0
fix:tune fails if job is not created
fMurugi Apr 29, 2026
0ae5786
fix:ensure when no job created the status is Failed
fMurugi Apr 29, 2026
8fb844d
fix:revert back to pending instead of automaci fail
fMurugi Apr 29, 2026
031a4f6
Reverting feature range 3e2b through 8fb84 these changes broke funtio…
fMurugi Apr 29, 2026
ea40330
fix:remove circular importation
fMurugi Apr 30, 2026
76151db
refactor: make Celery result.get non-blocking using asyncio.to_thread
fMurugi Apr 30, 2026
f295ca3
refactor:remove comments
fMurugi Apr 30, 2026
e2a2c2b
Merge branch 'main' into feat/check-pod-phase
bglar May 5, 2026
a536f2f
refactor: rename functions
fMurugi May 5, 2026
b38de23
Merge branch 'feat/check-pod-phase' of github.com:terrastackai/geospa…
fMurugi May 5, 2026
fce8ecb
reafctor:format with black and ruff
fMurugi May 5, 2026
053e025
refactor:formatting
fMurugi May 5, 2026
bd518d0
refactor:rename functions
fMurugi May 6, 2026
ef8ba3c
Merge branch 'main' into feat/check-pod-phase
fMurugi May 6, 2026
4db5d05
refactor:rename functions
fMurugi May 6, 2026
dfd9de2
fix:use pydantic automatic read of env variable and feildvalidator to…
fMurugi May 6, 2026
3dbe783
fix:remove import inside the class
fMurugi May 6, 2026
8f8aae2
refactor:use strings for env var from config
fMurugi May 6, 2026
b02a45c
fix: combine job success and fail into terminal state
fMurugi May 6, 2026
f5007f9
fix:return the terminal state if found
fMurugi May 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 32 additions & 15 deletions gfmstudio/celery_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
from gfmstudio.fine_tuning.utils.webhook_event_handlers import (
handle_dataset_factory_webhooks,
handle_fine_tuning_webhooks,
update_tune_status,
)
from gfmstudio.inference.services import (
invoke_cancel_inference_handler,
invoke_tune_upload_handler,
)
from gfmstudio.inference.v2.services import invoke_inference_v2_pipelines_handler
from gfmstudio.log import logger

INF_SERVICE_NAME = "inference_gateway"
FT_SERVICE_NAME = "geoft"
celery_app = Celery(
Expand Down Expand Up @@ -55,14 +55,14 @@
)
def deploy_tuning_job_celery_task(**kwargs):
# Inject the monitoring task into kwargs to avoid circular import
kwargs['_monitor_task'] = monitor_k8_job_completion_task
kwargs["_monitor_task"] = monitor_k8_job_completion_task
return asyncio.run(deploy_tuning_job(**kwargs))


@celery_app.task(
name="monitor_k8_job_completion_task",
queue=FT_SERVICE_NAME,
bind=True, # Bind to get access to self for retry
bind=True,
max_retries=30, # Allow many retries
default_retry_delay=30, # Start with 30 seconds
)
Expand All @@ -72,31 +72,48 @@ def monitor_k8_job_completion_task(self, ftune_id: str):
max_wait = settings.KJOB_MAX_WAIT_SECONDS or 7200

try:
k8s_job_status, _ = asyncio.run(check_k8s_job_status(ftune_id))
k8s_job_status, _ = asyncio.run(
check_k8s_job_status(ftune_id)
)
except Exception as exc:
if "not found" in str(exc):
# Job not found, consider it done (likely already completed and deleted)
logger.debug(f"{ftune_id}: Job not found, assuming completed and cleaned up")
logger.debug(
f"{ftune_id}: Job not found, assuming completed and cleaned up"
)
return "Completed"
# Unexpected error, retry with exponential backoff
logger.warning(f"{ftune_id}: Error checking job status, will retry: {exc}")
raise self.retry(exc=exc, countdown=min(2 ** self.request.retries * 30, max_wait))

# Handle None status (job not found after retries)
raise self.retry(exc=exc, countdown=min(2**self.request.retries * 30, max_wait))

if k8s_job_status is None:
# Job doesn't exist - either completed and deleted, or never created
logger.debug(f"{ftune_id}: Job status is None, assuming completed and cleaned up")
logger.debug(
f"{ftune_id}: Job status is None, assuming completed and cleaned up"
)
return "Completed"

if k8s_job_status == "Unknown":
logger.info(
f"{ftune_id}: Job status is Unknown (resources deleted), assuming completed and cleaned up"
)
return "Completed"

if k8s_job_status in ["Complete", "Failed"]:
# Job is done
logger.info(f"{ftune_id}: Job finished with status: {k8s_job_status}")
return k8s_job_status


if k8s_job_status == "Running":
try:
asyncio.run(update_tune_status(ftune_id, "In_progress"))
except Exception as e:
logger.warning(f"{ftune_id}: Failed to update status to In_progress: {e}")

# Job still running, retry with exponential backoff
# countdown: 30s, 60s, 120s, 240s, 480s, 960s (max with default 600s setting)
countdown = min(2 ** self.request.retries * 30, max_wait)
logger.info(f"{ftune_id}: Job status={k8s_job_status}, will check again in {countdown}s")
countdown = min(2**self.request.retries * 30, max_wait)
logger.info(
f"{ftune_id}: Job status={k8s_job_status}, will check again in {countdown}s"
)
raise self.retry(countdown=countdown)


Expand All @@ -105,7 +122,7 @@ def monitor_k8_job_completion_task(self, ftune_id: str):
queue=FT_SERVICE_NAME,
)
def deploy_hpo_tuning_celery_task(**kwargs):
kwargs['_monitor_task'] = monitor_k8_job_completion_task
kwargs["_monitor_task"] = monitor_k8_job_completion_task
return asyncio.run(deploy_hpo_tuning_job(**kwargs))


Expand Down
127 changes: 104 additions & 23 deletions gfmstudio/fine_tuning/core/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,15 +475,108 @@ async def monitor_k8_job_completion(ftune_id: str, monitor_task=None):
logger.info(f"{ftune_id}: Scheduled monitoring task for job completion")


async def get_pod_phase(job_name: str) -> str | None:
"""Check the status of a pod associated with a Kubernetes job.

This function checks if the pod is actually running, not just pending.
Useful for determining if a job is truly in progress or just waiting for resources.

Parameters
----------
job_name : str
The Kubernetes job name

Returns
-------
str
The pod phase: 'Running', 'Pending', 'Succeeded', 'Failed', 'Unknown', or None if no pod found
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great introduction. Thinking of easily maintaining this in the future, creating a reference schema class for these statuses would be ideal ensuring that you would only have one place to change or update the values incase of a future change/update.

Something like ...


class PodStatusPhase(str,Enum)
    RUNNING = "Running"
    PENDING = "Pending"
    SUCCEEDED = "Succeeded"
    FAILED = "Failed"
    UNKNOWN = "Unknown"
    NONE = "None" # adjust to what None is here

Then when returning the outputs from k8s,

    if result:
        try:
            return PodStatusPhase(status_str)
        except ValueError:
            # Update this to handle what states would error
            return PodStatusPhase.UNKNOWN
    return None

And whenever in your code you are using the values you can easily use the values i.e
PodStatusPhase.FAILED

"""
try:
await ensure_logged_in(f"kubectl get job --namespace={settings.NAMESPACE}")

# Get pod status using the job-name label
command = [
"kubectl",
"get",
"pods",
"-l",
f"job-name={job_name}",
"-o",
"jsonpath={.items[0].status.phase}",
]

result = await run_subprocess_cmds(command=command)
return result[0].strip() if result and result[0] else None

except Exception as e:
# Handle case where job/pod has been deleted by webhook
logger.debug(f"{job_name}: Error checking pod status (likely deleted): {e}")
return None

async def get_job_conditions(job_name: str) -> str | None:
"""
Get the conditions of a Kubernetes job.
Parameters
----------
job_name : str
The name of the job to check.
Returns
-------
str
The conditions of the job.
None
If the job has no conditions.
"""
try:
cmd =[
"kubectl",
"get",
"job",
job_name,
"-o",
"jsonpath={.status.conditions[0].type}",

]
result= await run_subprocess_cmds(cmd)
return result[0].strip() if result and result[0] else None
except Exception as e:
logger.debug(f"Error checking job conditions: {e}")
return None

async def get_k8s_status(job_name: str) -> str:
Copy link
Copy Markdown
Contributor

@fredotieno fredotieno May 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fMurugi
should we rename this to something like get_aggregate_job_and_pod_status

"""Get the status of a Kubernetes job.

Parameters
----------
job_name : str
The name of the job to check.
Returns
str
The status of the job.
"""
condition = await get_job_conditions(job_name)
if condition in ["Complete","Failed"]:
return condition
# Job exists but no terminal condition → check pod
pod_phase = await get_pod_phase(job_name)
if pod_phase:
return pod_phase
return "Unknown"

async def check_k8s_job_status(tune_id: str, retry_label_lookup=True):
Copy link
Copy Markdown
Contributor

@fredotieno fredotieno May 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fMurugi is there a missing argument here?
check_pod_phase

should we rename the function to check_tuning_task_status

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fredotieno not sure about the argument.

"""Function to check Kubernetes job status

This function checks both the job status and optionally the pod phase to determine
if a job is truly running or just waiting for resources (pending).

Parameters
----------
tune_id : str
Tune id
retry_label_lookup: bool
Wheather to retry lookup with labels.
Whether to retry lookup with labels.
check_pod_phase: bool
Whether to check the pod phase to distinguish between pending and running states.
Comment thread
bglar marked this conversation as resolved.
Outdated

Returns
-------
Expand All @@ -496,22 +589,10 @@ async def check_k8s_job_status(tune_id: str, retry_label_lookup=True):
# Log in
await ensure_logged_in(f"kubectl get job --namespace={settings.NAMESPACE}")

command = [
"kubectl",
"get",
"job",
kjob_id,
"-o",
"jsonpath={.status.conditions[*].type}",
]

result = await run_subprocess_cmds(command=command)
logger.info(f"kubectl run cmds result: {command} ---> {result}")
# Direct resolution via unified status function
status = await get_k8s_status(kjob_id)

if result and result[0] != "":
# Job has completion status (Complete or Failed)
status = result[0].strip()
logger.info(f"{kjob_id}: Status for job {status}")
if status not in ["Running"]:
return status, kjob_id

else:
Expand Down Expand Up @@ -543,7 +624,7 @@ async def check_k8s_job_status(tune_id: str, retry_label_lookup=True):
return "Running", job_name
return result if result else ("Running", job_name)

# Job exists but has no conditions - verify it exists and treat as Running
# Job exists but has no conditions - verify it exists and check pod status
verify_cmd = [
"kubectl",
"get",
Expand All @@ -555,13 +636,13 @@ async def check_k8s_job_status(tune_id: str, retry_label_lookup=True):
verify_result = await run_subprocess_cmds(command=verify_cmd)

if verify_result and verify_result[0]:
# Job exists but no status conditions yet - it's running or pending
logger.info(f"{kjob_id}: Job exists but no status conditions yet, treating as Running")
# Job exists but no status conditions yet
# Check if we should verify the pod phase
logger.info(f"{kjob_id}: Job exists but no status yet → Running")
return "Running", kjob_id
else:
# Job doesn't exist at all
logger.warning(f"{kjob_id}: Job not found in cluster")
return None, tune_id
# Job doesn't exist at all
logger.warning(f"{kjob_id}: Job not found in cluster")
return None, tune_id


async def delete_k8s_job_resources(tune_id: str):
Expand Down
47 changes: 38 additions & 9 deletions gfmstudio/fine_tuning/utils/tune_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,19 @@
import string
from typing import Any, Dict, Optional, Tuple

import asyncio
import yaml
from asyncer import asyncify
from fastapi import HTTPException
from jinja2 import BaseLoader, Environment, runtime
from sqlalchemy.orm import Session

from gfmstudio.celery_worker import deploy_tuning_job_celery_task

from gfmstudio.common.api import crud
from gfmstudio.config import settings
from gfmstudio.fine_tuning import schemas
from gfmstudio.fine_tuning.core import object_storage, tunes
from gfmstudio.fine_tuning.core.kubernetes import deploy_tuning_job
from gfmstudio.fine_tuning.core.kubernetes import(deploy_tuning_job,check_k8s_job_status)
from gfmstudio.fine_tuning.core.schema import TuneTemplateParameters
from gfmstudio.fine_tuning.core.tuning_config_utils import (
convert_to_jinja2_compatible_braces,
Expand All @@ -39,6 +40,10 @@
)
from gfmstudio.fine_tuning.models import BaseModels, GeoDataset, Tunes, TuneTemplate
from gfmstudio.fine_tuning.utils.geoserver_handlers import convert_to_geoserver_sld
from gfmstudio.celery_worker import deploy_tuning_job_celery_task

tune_crud = crud.ItemCrud(model=Tunes)
from gfmstudio.common.api import crud, utils

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -731,9 +736,8 @@ async def submit_tune_job(
detail = None

try:
if settings.CELERY_TASKS_ENABLED:
# Submit via Celery
deploy_tuning_job_celery_task.apply_async(
if settings.CELERY_TASKS_ENABLED:
result = deploy_tuning_job_celery_task.apply_async(
kwargs={
"ftune_id": tune_id,
"ftune_config_file": config_path,
Expand All @@ -742,17 +746,41 @@ async def submit_tune_job(
},
task_id=tune_id,
)
ftune_job_id = f"kjob-{tune_id}".lower()
status = "In_progress"

try:
job_result = await asyncio.to_thread(result.get,timeout=5)
ftune_job_id, job_status = job_result

if job_status == "Error":
status = "Failed"
else:
ftune_job_id = f"kjob-{tune_id}".lower()
status = "Pending"
except Exception as e:
logger.debug(f"{tune_id}: Job creation in progress: {e}")
ftune_job_id = f"kjob-{tune_id}".lower()
status = "Pending"
else:
# Submit directly
ftune_job_id, updated_status = await deploy_tuning_job(
ftune_id=tune_id,
ftune_config_file=config_path,
ftuning_runtime_image=runtime_image,
tune_type=schemas.TuneOptionEnum.K8_JOB,
)
status = updated_status or "Submitted"

if updated_status == "Error":
status = "Failed"
elif updated_status == "In_progress":
k8s_status, _ = await check_k8s_job_status(tune_id, check_pod_phase=True)

if k8s_status == "Running":
status = "In_progress"
elif k8s_status == "Pending":
status = "Pending"
else:
status = updated_status
else:
status = updated_status or "Submitted"

logger.info(f"Tune job {ftune_job_id} submitted with status: {status}")

Expand All @@ -763,3 +791,4 @@ async def submit_tune_job(
status = "Failed"

return ftune_job_id, status, detail

Loading
Loading