Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/reference-docs/websockets.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ The main component is the `WebSocketManager` (WSM) which has the following respo
3. Provide an interface to pass messages from a backend process (workflow/task)

In a setup with multiple isolated Orchestrator instances the WSM is initialized multiple times as well, therefore clients can be connected to any arbitrary WSM instance.
Letting a backend process broadcast messages to all clients thus requires a message broker, for which we use [Redis Pub/Sub](https://redis.io/docs/manual/pubsub).
Letting a backend process broadcast messages to all clients thus requires a message broker, for which we use [Redis Pub/Sub](https://redis.io/docs/latest/develop/pubsub/).

There are 2 WSM implementations: a `MemoryWebsocketManager` for development/testing, and a `BroadcastWebsocketManager` that connects to Redis. We'll continue to discuss the latter.

Expand Down
4 changes: 4 additions & 0 deletions orchestrator/api/api_v1/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
product_blocks,
products,
resource_types,
schedules,
settings,
subscription_customer_descriptions,
subscriptions,
Expand Down Expand Up @@ -88,6 +89,9 @@
api_router.include_router(
ws.router, prefix="/ws", tags=["Core", "Events"]
) # Auth on the websocket is handled in the Websocket Manager
api_router.include_router(
schedules.router, prefix="/schedules", tags=["Core", "Schedules"], dependencies=[Depends(authorize)]
)

if llm_settings.SEARCH_ENABLED:
from orchestrator.api.api_v1.endpoints import search
Expand Down
45 changes: 45 additions & 0 deletions orchestrator/api/api_v1/endpoints/schedules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Copyright 2019-2025 SURF.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from http import HTTPStatus

from fastapi.routing import APIRouter

from orchestrator.schedules.service import (
add_create_scheduled_task_to_queue,
add_delete_scheduled_task_to_queue,
add_update_scheduled_task_to_queue,
)
from orchestrator.schemas.schedules import APSchedulerJobCreate, APSchedulerJobDelete, APSchedulerJobUpdate

router = APIRouter()


@router.post("/", status_code=HTTPStatus.CREATED, response_model=dict[str, str])
def create_scheduled_task(payload: APSchedulerJobCreate) -> dict[str, str]:
"""Create a scheduled task."""
add_create_scheduled_task_to_queue(payload)
return {"message": "Added to Create Queue", "status": "CREATED"}


@router.put("/", status_code=HTTPStatus.OK, response_model=dict[str, str])
async def update_scheduled_task(payload: APSchedulerJobUpdate) -> dict[str, str]:
"""Update a scheduled task."""
add_update_scheduled_task_to_queue(payload)
return {"message": "Added to Update Queue", "status": "UPDATED"}


@router.delete("/", status_code=HTTPStatus.OK, response_model=dict[str, str])
async def delete_scheduled_task(payload: APSchedulerJobDelete) -> dict[str, str]:
"""Delete a scheduled task."""
add_delete_scheduled_task_to_queue(payload)
return {"message": "Added to Delete Queue", "status": "DELETED"}
82 changes: 78 additions & 4 deletions orchestrator/cli/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,55 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import time

import typer
from redis import Redis

from orchestrator.schedules.scheduler import (
get_all_scheduler_tasks,
get_scheduler,
get_scheduler_task,
)
from orchestrator.schedules.service import (
SCHEDULER_QUEUE,
add_create_scheduled_task_to_queue,
workflow_scheduler_queue,
)
from orchestrator.schemas.schedules import APSchedulerJobCreate
from orchestrator.services.workflows import get_workflow_by_name
from orchestrator.settings import app_settings
from orchestrator.utils.redis_client import create_redis_client

app: typer.Typer = typer.Typer()


@app.command()
def run() -> None:
"""Start scheduler and loop eternally to keep thread alive."""
with get_scheduler():
while True:

def _get_scheduled_task_item_from_queue(redis_conn: Redis) -> tuple[str, bytes] | None:
"""Get an item from the Redis Queue for scheduler tasks."""
try:
return redis_conn.brpop(SCHEDULER_QUEUE, timeout=1)
except ConnectionError:
time.sleep(3)
except Exception:
time.sleep(1)

return None

with get_scheduler() as scheduler_connection:
redis_connection = create_redis_client(app_settings.CACHE_URI)
typer.echo(f"redis_connection Waiting for scheduled tasks...{redis_connection}")
while True:
item = _get_scheduled_task_item_from_queue(redis_connection)
typer.echo(f"Waiting for scheduled tasks...{item}")
if not item:
continue

workflow_scheduler_queue(item, scheduler_connection)


@app.command()
def show_schedule() -> None:
Expand Down Expand Up @@ -59,3 +86,50 @@ def force(task_id: str) -> None:
except Exception as e:
typer.echo(f"Task execution failed: {e}")
raise typer.Exit(code=1)


@app.command()
def load_initial_schedule() -> None:
"""Load the initial schedule into the scheduler."""
initial_schedules = [
{
"name": "Task Resume Workflows",
"workflow_name": "task_resume_workflows",
"workflow_id": "",
"trigger": "interval",
"trigger_kwargs": {"hours": 1},
},
{
"name": "Task Clean Up Tasks",
"workflow_name": "task_clean_up_tasks",
"workflow_id": "",
"trigger": "interval",
"trigger_kwargs": {"hours": 6},
},
{
"name": "Validate Products Pre-conditions",
"workflow_name": "pre_conditions_check_task_validate_products",
"workflow_id": "",
"trigger": "cron",
"trigger_kwargs": {"hour": 2, "minute": 30},
},
{
"name": "Validate Subscriptions Workflow",
"workflow_name": "validate_subscriptions_workflow",
"workflow_id": "",
"trigger": "cron",
"trigger_kwargs": {"hour": 0, "minute": 10},
},
]

for schedule in initial_schedules:
# enrich with workflow id
workflow = get_workflow_by_name(schedule.get("workflow_name"))
if not workflow:
typer.echo(f"Workflow '{schedule['workflow_name']}' not found. Skipping schedule.")
continue

schedule["workflow_id"] = workflow.workflow_id

typer.echo(f"Initial Schedule: {schedule}")
add_create_scheduled_task_to_queue(APSchedulerJobCreate(**schedule)) # type: ignore
26 changes: 26 additions & 0 deletions orchestrator/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@
CheckConstraint,
Column,
Enum,
Float,
ForeignKey,
Index,
Integer,
LargeBinary,
PrimaryKeyConstraint,
Select,
String,
Expand Down Expand Up @@ -796,3 +798,27 @@ class AiSearchIndex(BaseModel):
content_hash = mapped_column(String(64), nullable=False, index=True)

__table_args__ = (PrimaryKeyConstraint("entity_id", "path", name="pk_ai_search_index"),)


class APSchedulerJobStoreModel(BaseModel):
__tablename__ = "apscheduler_jobs"

id = mapped_column(String(191), primary_key=True)
next_run_time = mapped_column(Float, nullable=True)
job_state = mapped_column(LargeBinary, nullable=False)


class WorkflowApschedulerJob(BaseModel):
__tablename__ = "workflows_apscheduler_jobs"

workflow_id = mapped_column(
UUIDType, ForeignKey("workflows.workflow_id", ondelete="CASCADE"), primary_key=True, nullable=False
)

# Notice the VARCHAR(512) for schedule_id to accommodate longer IDs so
# that if APScheduler changes its ID format in the future, we are covered.
schedule_id = mapped_column(
String(512), ForeignKey("apscheduler_jobs.id", ondelete="CASCADE"), primary_key=True, nullable=False
)

__table_args__ = (UniqueConstraint("workflow_id", "schedule_id", name="uq_workflow_schedule"),)
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
"""Create linker table workflow_apscheduler.

Revision ID: 961eddbd4c13
Revises: 850dccac3b02
Create Date: 2025-11-18 10:38:57.211087

"""

from uuid import uuid4

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "961eddbd4c13"
down_revision = "850dccac3b02"
branch_labels = None
depends_on = None


workflows = [
{
"name": "pre_conditions_check_task_validate_products",
"description": "Run Pre-Conditions before validate products",
"workflow_id": uuid4(),
"target": "SYSTEM",
},
{
"name": "validate_subscriptions_workflow",
"description": "Validate subscriptions workflow",
"workflow_id": uuid4(),
"target": "SYSTEM",
},
]


def _create_workflows() -> None:
conn = op.get_bind()
for workflow in workflows:
conn.execute(
sa.text(
"INSERT INTO workflows VALUES (:workflow_id, :name, :target, :description, now()) ON CONFLICT DO NOTHING"
),
workflow,
)


def _downgrade_create_workflows() -> None:
conn = op.get_bind()
for workflow in workflows:
conn.execute(sa.text("DELETE FROM workflows WHERE name = :name"), {"name": workflow["name"]})


def _create_apscheduler_jobs_table_if_not_exists() -> None:
# Check if the apscheduler_jobs table exists and create it if it does not exist.
conn = op.get_bind()
inspector = sa.inspect(conn)
if "apscheduler_jobs" not in inspector.get_table_names():
op.execute(
sa.text(
"""
CREATE TABLE apscheduler_jobs
(
id VARCHAR(191) NOT NULL PRIMARY KEY,
next_run_time DOUBLE PRECISION,
job_state bytea NOT NULL
);
"""
)
)


def _create_workflows_table_if_not_exists() -> None:
# Notice the VARCHAR(512) for schedule_id to accommodate longer IDs
# This so that if APScheduler changes its ID format in the future, we are covered.
op.execute(
sa.text(
"""
CREATE TABLE workflows_apscheduler_jobs (
workflow_id UUID NOT NULL,
schedule_id VARCHAR(512) NOT NULL,
PRIMARY KEY (workflow_id, schedule_id),
CONSTRAINT fk_workflow
FOREIGN KEY (workflow_id) REFERENCES public.workflows (workflow_id)
ON DELETE CASCADE,
CONSTRAINT fk_schedule
FOREIGN KEY (schedule_id) REFERENCES public.apscheduler_jobs (id)
ON DELETE CASCADE,
CONSTRAINT uq_workflow_schedule UNIQUE (workflow_id, schedule_id)
);
"""
)
)

op.create_index("ix_workflows_apscheduler_jobs_schedule_id", "workflows_apscheduler_jobs", ["schedule_id"])


def upgrade() -> None:
_create_apscheduler_jobs_table_if_not_exists()
_create_workflows_table_if_not_exists()
_create_workflows()


def downgrade() -> None:
op.execute(
sa.text(
"""
DROP TABLE IF EXISTS workflows_apscheduler_jobs;
"""
)
)
_downgrade_create_workflows()
4 changes: 0 additions & 4 deletions orchestrator/schedules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,10 @@
# limitations under the License.


from orchestrator.schedules.resume_workflows import run_resume_workflows
from orchestrator.schedules.task_vacuum import vacuum_tasks
from orchestrator.schedules.validate_products import validate_products
from orchestrator.schedules.validate_subscriptions import validate_subscriptions

ALL_SCHEDULERS: list = [
run_resume_workflows,
vacuum_tasks,
validate_subscriptions,
validate_products,
]
21 changes: 0 additions & 21 deletions orchestrator/schedules/resume_workflows.py

This file was deleted.

Loading
Loading