From 26bb996a5963c7ad9722b457d269cff6163a67f7 Mon Sep 17 00:00:00 2001 From: Rach <54503978+justrach@users.noreply.github.com> Date: Wed, 30 Oct 2024 21:28:14 -0700 Subject: [PATCH] updated to version 0.1.1, added priority supports for more types of queues as well! --- README.md | 168 +++++++++++++++---- kew/.coverage | Bin 0 -> 53248 bytes kew/README.md | 168 +++++++++++++++---- kew/build_and_publish.py | 27 +++ kew/example.py | 65 ++++++-- kew/kew/__init__.py | 20 ++- kew/kew/exceptions.py | 4 + kew/kew/manager.py | 248 ++++++++++++++++++++-------- kew/kew/models.py | 22 ++- kew/kew/tests/conftest.py | 11 ++ kew/kew/tests/test_queue_manager.py | 222 +++++++++++++++++++++++++ kew/pyproject.toml | 23 ++- kew/test_multiple.py | 63 +++++-- 13 files changed, 862 insertions(+), 179 deletions(-) create mode 100644 kew/.coverage create mode 100755 kew/build_and_publish.py create mode 100644 kew/kew/tests/conftest.py create mode 100644 kew/kew/tests/test_queue_manager.py diff --git a/README.md b/README.md index 4c3e3de..5a02761 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,13 @@ # Kew Task Queue Manager -A flexible and robust asynchronous task queue manager for Python applications. +A flexible and robust asynchronous task queue manager for Python applications with support for multiple priority queues. ## Features +- Multiple named queues with independent configurations +- Priority-based task processing - Asynchronous task execution -- Configurable worker pool +- Configurable worker pools per queue - Task status tracking and monitoring - Automatic cleanup of completed tasks - Thread-safe operations @@ -21,7 +23,7 @@ pip install kew ```python import asyncio -from kew import TaskQueueManager +from kew import TaskQueueManager, QueueConfig, QueuePriority async def example_task(x: int): await asyncio.sleep(1) @@ -29,20 +31,46 @@ async def example_task(x: int): async def main(): # Initialize the task queue manager - manager = TaskQueueManager(max_workers=2) + manager = TaskQueueManager() - # Submit a task - task_info = await manager.submit_task( + # Create queues with different priorities + manager.create_queue(QueueConfig( + name="high_priority", + max_workers=4, + priority=QueuePriority.HIGH + )) + + manager.create_queue(QueueConfig( + name="background", + max_workers=1, + priority=QueuePriority.LOW + )) + + # Submit tasks to different queues + critical_task = await manager.submit_task( task_id="task1", + queue_name="high_priority", task_type="multiplication", task_func=example_task, + priority=QueuePriority.HIGH, x=5 ) - # Wait for result + background_task = await manager.submit_task( + task_id="task2", + queue_name="background", + task_type="multiplication", + task_func=example_task, + priority=QueuePriority.LOW, + x=10 + ) + + # Wait for results await asyncio.sleep(2) - status = manager.get_task_status("task1") - print(f"Result: {status.result}") + high_status = manager.get_task_status("task1") + low_status = manager.get_task_status("task2") + print(f"High Priority Result: {high_status.result}") + print(f"Background Result: {low_status.result}") # Cleanup await manager.shutdown() @@ -51,49 +79,114 @@ if __name__ == "__main__": asyncio.run(main()) ``` -## Advanced Usage +## Queue Management -### Concurrent Task Execution +### Creating Queues ```python -async def main(): - manager = TaskQueueManager(max_workers=2) - - # Submit multiple tasks - tasks = [] - for i in range(5): - task_info = await manager.submit_task( - task_id=f"task{i}", - task_type="example", - task_func=example_task, - x=i - ) - tasks.append(task_info) - - # Wait for all tasks to complete - await manager.wait_for_all_tasks() +from kew import QueueConfig, QueuePriority + +# Create a high-priority queue with 4 workers +manager.create_queue(QueueConfig( + name="critical", + max_workers=4, + priority=QueuePriority.HIGH, + max_size=1000, + task_timeout=3600 +)) + +# Create a background queue with 1 worker +manager.create_queue(QueueConfig( + name="background", + max_workers=1, + priority=QueuePriority.LOW +)) +``` + +### Queue Priorities + +- `QueuePriority.HIGH` (1): Critical tasks +- `QueuePriority.MEDIUM` (2): Standard tasks +- `QueuePriority.LOW` (3): Background tasks + +### Queue Monitoring + +```python +# Get queue status +status = manager.get_queue_status("critical") +print(f"Active Tasks: {status['active_tasks']}") +print(f"Queued Tasks: {status['queued_tasks']}") +print(f"Completed Tasks: {status['completed_tasks']}") +``` + +### Queue Operations + +```python +# Wait for specific queue to complete +await manager.wait_for_queue("critical") + +# Clean up old tasks in a queue +manager.cleanup_old_tasks(max_age_hours=24, queue_name="background") +``` + +## Task Management + +### Submitting Tasks + +```python +task_info = await manager.submit_task( + task_id="unique_id", + queue_name="critical", + task_type="example", + task_func=my_async_function, + priority=QueuePriority.HIGH, + *args, + **kwargs +) ``` ### Task Status Monitoring ```python -status = manager.get_task_status("task1") -print(f"Status: {status.status}") +status = manager.get_task_status("unique_id") +print(f"Status: {status.status}") # TaskStatus.QUEUED, PROCESSING, COMPLETED, FAILED +print(f"Queue: {status.queue_name}") +print(f"Priority: {status.priority}") print(f"Result: {status.result}") print(f"Error: {status.error}") ``` +### Waiting for Tasks + +```python +# Wait for specific task +await manager.wait_for_task("task1", timeout=30) + +# Wait for all tasks in a queue +await manager.wait_for_queue("critical", timeout=60) +``` + ## API Reference ### TaskQueueManager -- `__init__(max_workers=2, queue_size=1000, task_timeout=3600)` -- `async submit_task(task_id, task_type, task_func, *args, **kwargs)` +- `__init__()` +- `create_queue(config: QueueConfig)` +- `async submit_task(task_id, queue_name, task_type, task_func, priority, *args, **kwargs)` - `get_task_status(task_id)` +- `get_queue_status(queue_name)` - `async wait_for_task(task_id, timeout=None)` -- `async wait_for_all_tasks(timeout=None)` -- `cleanup_old_tasks(max_age_hours=24)` -- `async shutdown()` +- `async wait_for_queue(queue_name, timeout=None)` +- `cleanup_old_tasks(max_age_hours=24, queue_name=None)` +- `async shutdown(wait=True)` + +### QueueConfig + +- `name: str` +- `max_workers: int` +- `priority: QueuePriority = QueuePriority.MEDIUM` +- `max_size: int = 1000` +- `task_timeout: int = 3600` ### TaskStatus @@ -103,6 +196,13 @@ Enum with states: - `COMPLETED` - `FAILED` +### QueuePriority + +Enum with levels: +- `HIGH` (1) +- `MEDIUM` (2) +- `LOW` (3) + ## Contributing Contributions are welcome! Please feel free to submit a Pull Request. diff --git a/kew/.coverage b/kew/.coverage new file mode 100644 index 0000000000000000000000000000000000000000..bc05a2d94aa3a456e411457131f31723ef71f21f GIT binary patch literal 53248 zcmeI)NsJU#7zgmGu3qR_cn~BlPH>(vI82y@2`+JoFbD@C5m7WT3PpAIo0*dCs-~(M zW{kuPn2?wlO^i46%E^l!jEP>n8A*_fdN3i9C?pCl5s84{_tnxp4GfJC1B3rccXciA zy?XCgZ|Ul#p546B4wM{oy{Z++RoVhg*R?fLYMPd(XE!~QDMt(GWQAV!8OyUR=Cz?$ zYTe>_ttfv_H1B@mnf0Sv*2^uUYd5Z!(KflfL(()jC`YB|?v^#>$uYa4r0taLk`>s_ zxC|y#61wiIG7ogLjfOf7I26X`CDSfbtU!%Z5;f1RTHci0p{54H0P)cRH5o)3C=q3k zI~+nTkE8cXPmL*0IVI&sxma$O`#bvgz1EQ{ELx=RS7DhfuSAc|meSCMW>GebQ9>ox zD|7CPp5>G#ls_P?MhQzU<#RHKO6$4R;lPy@+X;*5_;z5sj#QJXR1Z}7iUk~Cv}7W@ znWdq$wsV2DM=|4L&CqE^zCz46cpQ0*GUvDnYcnDrnj(jJcIwq4k-uE8*7)R5jOf6D zJ6p`z;s$P9*lRvFS2&l=^0u7Lgq+W46szvoJL{@(@&{xy=0I2s#~qMd2&w_gI4tkC-{)2atkjxk7KRzJF&}QJBXp+JWC#$hTCQ#^zb7 z4)346-f$~vtt8xvDi!IDg-wQI%jo1#ZJya?h7%0SHa)oD}#bEL7@ zF})<4EA;f}d-Gv6iqBU3l543#$(g)DH=69KD$B2XNk?+EPBMwn>?M;n67Qkn4Cb%GR|2rM|hsy_hKr{^8O{Eew8|Xei|)V<}J-EKdGh$cUcwcO=_0yQIn-UI2r*Y z2P|6@*ebg9AgsT<8dgspdz$%;5@@a`_7ti=3{K7aY1j8{8diDO@*hdI%DxH9=U&L~ zlSS99D9ee)28yx4r(@c_4D)dLsIz{}fB*y_ z009U<00Izz00bZ~HwC&<#!`{|#ABtYQ@oZX~)`y?->D~dubpN1 TaskInfo: + """Submit a task to a specific queue""" with self._lock: if task_id in self.tasks: raise TaskAlreadyExistsError( f"Task with ID {task_id} already exists" ) - task_info = TaskInfo(task_id, task_type) + if queue_name not in self.queues: + raise QueueNotFoundError( + f"Queue {queue_name} not found" + ) + + worker_pool = self.queues[queue_name] + task_info = TaskInfo(task_id, task_type, queue_name, priority.value) + + # Store function and arguments for later execution + task_info._func = task_func + task_info._args = args + task_info._kwargs = kwargs + self.tasks[task_id] = task_info - logger.info(f"Task {task_id} ({task_type}) submitted") - - # Create task coroutine - task = asyncio.create_task(self._execute_task( - task_id, - task_func, - *args, - **kwargs - )) - - # Store the task for later cleanup - task_info._task = task - - return task_info + + # Add to priority queue + worker_pool.queue.put(PrioritizedItem( + priority=priority.value, + item=task_id + )) + + logger.info(f"Task {task_id} submitted to queue {queue_name}") + + return task_info - async def _execute_task( - self, - task_id: str, - task_func: Callable, - *args, - **kwargs - ): + def _handle_task_completion(self, task_id: str, future: asyncio.Future): + """Handle task completion and cleanup""" try: + result = future.result() with self._lock: task_info = self.tasks[task_id] - task_info.status = TaskStatus.PROCESSING - task_info.started_time = datetime.now() - logger.info(f"Starting execution of task {task_id}") - - # Execute the task function - result = await task_func(*args, **kwargs) - - with self._lock: task_info.result = result task_info.status = TaskStatus.COMPLETED task_info.completed_time = datetime.now() logger.info(f"Task {task_id} completed successfully with result: {result}") - - return result - + + # Clean up task + worker_pool = self.queues[task_info.queue_name] + if task_id in worker_pool._tasks: + del worker_pool._tasks[task_id] + except Exception as e: - logger.error(f"Error executing task {task_id}: {str(e)}") + logger.error(f"Task {task_id} failed: {str(e)}") with self._lock: task_info = self.tasks[task_id] task_info.status = TaskStatus.FAILED task_info.error = str(e) task_info.completed_time = datetime.now() - raise + + def get_task_status(self, task_id: str) -> TaskInfo: + """Get status of a specific task""" + with self._lock: + task_info = self.tasks.get(task_id) + if not task_info: + raise TaskNotFoundError(f"Task {task_id} not found") + return task_info + + def get_queue_status(self, queue_name: str) -> Dict[str, Any]: + """Get status of a specific queue""" + with self._lock: + if queue_name not in self.queues: + raise QueueNotFoundError(f"Queue {queue_name} not found") + + worker_pool = self.queues[queue_name] + queue_tasks = [ + task for task in self.tasks.values() + if task.queue_name == queue_name + ] + + return { + "name": queue_name, + "max_workers": worker_pool.config.max_workers, + "priority": worker_pool.config.priority.value, + "active_tasks": len([t for t in queue_tasks if t.status == TaskStatus.PROCESSING]), + "queued_tasks": len([t for t in queue_tasks if t.status == TaskStatus.QUEUED]), + "completed_tasks": len([t for t in queue_tasks if t.status == TaskStatus.COMPLETED]), + "failed_tasks": len([t for t in queue_tasks if t.status == TaskStatus.FAILED]) + } async def wait_for_task(self, task_id: str, timeout: Optional[float] = None) -> TaskInfo: """Wait for a specific task to complete""" task_info = self.get_task_status(task_id) - if hasattr(task_info, '_task'): + worker_pool = self.queues[task_info.queue_name] + + if task_id in worker_pool._tasks: try: - await asyncio.wait_for(task_info._task, timeout=timeout) + await asyncio.wait_for(worker_pool._tasks[task_id], timeout=timeout) except asyncio.TimeoutError: logger.warning(f"Task {task_id} timed out after {timeout} seconds") raise + return task_info - async def wait_for_all_tasks(self, timeout: Optional[float] = None): - """Wait for all tasks to complete""" - tasks = [ - task_info._task for task_info in self.tasks.values() - if hasattr(task_info, '_task') and not task_info._task.done() - ] + async def wait_for_queue(self, queue_name: str, timeout: Optional[float] = None): + """Wait for all tasks in a specific queue to complete""" + if queue_name not in self.queues: + raise QueueNotFoundError(f"Queue {queue_name} not found") + + worker_pool = self.queues[queue_name] + tasks = list(worker_pool._tasks.values()) + if tasks: await asyncio.wait(tasks, timeout=timeout) - def get_task_status(self, task_id: str) -> TaskInfo: - with self._lock: - task_info = self.tasks.get(task_id) - if not task_info: - raise TaskNotFoundError(f"Task {task_id} not found") - return task_info - - def get_queue_length(self) -> int: - return self.queue.qsize() - - def cleanup_old_tasks(self, max_age_hours: int = 24): + def cleanup_old_tasks(self, max_age_hours: int = 24, queue_name: Optional[str] = None): + """Clean up completed tasks, optionally for a specific queue""" current_time = datetime.now() cleaned_count = 0 with self._lock: for task_id, task_info in list(self.tasks.items()): + if queue_name and task_info.queue_name != queue_name: + continue + if task_info.completed_time: age = current_time - task_info.completed_time if age.total_seconds() > max_age_hours * 3600: @@ -142,9 +231,22 @@ def cleanup_old_tasks(self, max_age_hours: int = 24): logger.info(f"Cleaned up {cleaned_count} old tasks") - async def shutdown(self): + async def shutdown(self, wait: bool = True): + """Shutdown all queues""" logger.info("Shutting down TaskQueueManager") - self._shutdown = True - # Wait for pending tasks - await self.wait_for_all_tasks(timeout=5.0) - self.executor.shutdown(wait=True) \ No newline at end of file + + # Wait for pending tasks if requested + if wait: + for queue_name, worker_pool in self.queues.items(): + tasks = list(worker_pool._tasks.values()) + if tasks: + try: + await asyncio.wait(tasks, timeout=5.0) + except Exception as e: + logger.error(f"Error waiting for tasks in queue {queue_name}: {str(e)}") + + # Shutdown all worker pools + for queue_name, worker_pool in self.queues.items(): + worker_pool._shutdown = True + worker_pool.executor.shutdown(wait=wait) + logger.info(f"Shut down queue {queue_name}") \ No newline at end of file diff --git a/kew/kew/models.py b/kew/kew/models.py index 6c8c2bd..5688188 100644 --- a/kew/kew/models.py +++ b/kew/kew/models.py @@ -1,6 +1,7 @@ from datetime import datetime from enum import Enum from typing import Optional, TypeVar, Generic +from dataclasses import dataclass T = TypeVar('T') # Generic type for task result @@ -10,10 +11,26 @@ class TaskStatus(Enum): COMPLETED = "completed" FAILED = "failed" +class QueuePriority(Enum): + HIGH = 1 + MEDIUM = 2 + LOW = 3 + +@dataclass +class QueueConfig: + """Configuration for a single queue""" + name: str + max_workers: int + priority: QueuePriority = QueuePriority.MEDIUM + max_size: int = 1000 + task_timeout: int = 3600 + class TaskInfo(Generic[T]): - def __init__(self, task_id: str, task_type: str): + def __init__(self, task_id: str, task_type: str, queue_name: str, priority: int): self.task_id = task_id self.task_type = task_type + self.queue_name = queue_name + self.priority = priority self.status = TaskStatus.QUEUED self.queued_time = datetime.now() self.started_time: Optional[datetime] = None @@ -22,10 +39,11 @@ def __init__(self, task_id: str, task_type: str): self.error: Optional[str] = None def to_dict(self): - """Convert TaskInfo to dictionary representation""" return { "task_id": self.task_id, "task_type": self.task_type, + "queue_name": self.queue_name, + "priority": self.priority, "status": self.status.value, "queued_time": self.queued_time.isoformat(), "started_time": self.started_time.isoformat() if self.started_time else None, diff --git a/kew/kew/tests/conftest.py b/kew/kew/tests/conftest.py new file mode 100644 index 0000000..454080e --- /dev/null +++ b/kew/kew/tests/conftest.py @@ -0,0 +1,11 @@ +# kew/tests/conftest.py +import pytest +import asyncio +from kew import TaskQueueManager + +@pytest.fixture +async def manager(): + """Fixture that provides a TaskQueueManager instance""" + manager = TaskQueueManager() + yield manager + await manager.shutdown() diff --git a/kew/kew/tests/test_queue_manager.py b/kew/kew/tests/test_queue_manager.py new file mode 100644 index 0000000..cf3fd20 --- /dev/null +++ b/kew/kew/tests/test_queue_manager.py @@ -0,0 +1,222 @@ +import pytest +import asyncio +import random +from kew import TaskQueueManager, QueueConfig, QueuePriority, TaskStatus + +async def long_task(task_num: int, sleep_time: float) -> dict: + """Simulate a long-running task""" + await asyncio.sleep(sleep_time) + result = sleep_time * 2 + return {"task_num": task_num, "result": result} + +@pytest.mark.asyncio +async def test_single_queue(): + """Test single queue operation""" + manager = TaskQueueManager() + + # Create queue + manager.create_queue(QueueConfig( + name="test_queue", + max_workers=2, + priority=QueuePriority.HIGH + )) + + # Submit task + task_info = await manager.submit_task( + task_id="task1", + queue_name="test_queue", + task_type="test", + task_func=long_task, + priority=QueuePriority.HIGH, + task_num=1, + sleep_time=0.1 + ) + + # Check initial status + status = manager.get_task_status(task_info.task_id) + assert status.queue_name == "test_queue" + + # Wait for completion + await asyncio.sleep(0.2) + + # Check final status + status = manager.get_task_status(task_info.task_id) + assert status.status == TaskStatus.COMPLETED + assert status.result["task_num"] == 1 + assert status.result["result"] == 0.2 + + await manager.shutdown() + +@pytest.mark.asyncio +async def test_multiple_queues(): + """Test multiple queues with different priorities""" + manager = TaskQueueManager() + + # Create queues + manager.create_queue(QueueConfig( + name="fast_track", + max_workers=2, + priority=QueuePriority.HIGH + )) + + manager.create_queue(QueueConfig( + name="standard", + max_workers=1, + priority=QueuePriority.LOW + )) + + tasks = [] + + # Submit high-priority tasks + for i in range(2): + sleep_time = 0.1 + task_info = await manager.submit_task( + task_id=f"high_task_{i+1}", + queue_name="fast_track", + task_type="test", + task_func=long_task, + priority=QueuePriority.HIGH, + task_num=i+1, + sleep_time=sleep_time + ) + tasks.append(task_info) + + # Submit low-priority task + task_info = await manager.submit_task( + task_id="low_task_1", + queue_name="standard", + task_type="test", + task_func=long_task, + priority=QueuePriority.LOW, + task_num=3, + sleep_time=0.1 + ) + tasks.append(task_info) + + # Wait for completion + await asyncio.sleep(0.3) + + # Check all tasks completed + for task in tasks: + status = manager.get_task_status(task.task_id) + assert status.status == TaskStatus.COMPLETED + assert status.result is not None + + # Check queue statuses + fast_track_status = manager.get_queue_status("fast_track") + standard_status = manager.get_queue_status("standard") + + assert fast_track_status["completed_tasks"] == 2 + assert standard_status["completed_tasks"] == 1 + + await manager.shutdown() + +@pytest.mark.asyncio +async def test_queue_priorities(): + """Test that high priority tasks complete before low priority ones""" + manager = TaskQueueManager() + + manager.create_queue(QueueConfig( + name="mixed_queue", + max_workers=1, # Single worker to ensure sequential execution + priority=QueuePriority.MEDIUM + )) + + completion_order = [] + + async def tracking_task(priority_level: str): + await asyncio.sleep(0.1) + completion_order.append(priority_level) + return priority_level + + # Submit low priority task first + await manager.submit_task( + task_id="low_priority", + queue_name="mixed_queue", + task_type="test", + task_func=tracking_task, + priority=QueuePriority.LOW, + priority_level="low" + ) + + # Submit high priority task second + await manager.submit_task( + task_id="high_priority", + queue_name="mixed_queue", + task_type="test", + task_func=tracking_task, + priority=QueuePriority.HIGH, + priority_level="high" + ) + + # Wait for completion + await asyncio.sleep(0.3) + + # High priority task should complete first + assert completion_order[0] == "high" + assert completion_order[1] == "low" + + await manager.shutdown() + +@pytest.mark.asyncio +async def test_error_handling(): + """Test error handling in tasks""" + manager = TaskQueueManager() + + manager.create_queue(QueueConfig( + name="test_queue", + max_workers=1 + )) + + async def failing_task(): + await asyncio.sleep(0.1) + raise ValueError("Test error") + + task_info = await manager.submit_task( + task_id="failing_task", + queue_name="test_queue", + task_type="test", + task_func=failing_task, + priority=QueuePriority.MEDIUM + ) + + # Wait for task to fail + await asyncio.sleep(0.2) + + status = manager.get_task_status("failing_task") + assert status.status == TaskStatus.FAILED + assert "Test error" in status.error + + await manager.shutdown() + +@pytest.mark.asyncio +async def test_queue_cleanup(): + """Test queue cleanup functionality""" + manager = TaskQueueManager() + + manager.create_queue(QueueConfig( + name="test_queue", + max_workers=1 + )) + + task_info = await manager.submit_task( + task_id="task1", + queue_name="test_queue", + task_type="test", + task_func=long_task, + priority=QueuePriority.MEDIUM, + task_num=1, + sleep_time=0.1 + ) + + # Wait for completion + await asyncio.sleep(0.2) + + # Clean up old tasks + manager.cleanup_old_tasks(max_age_hours=0) + + # Check task was cleaned up + with pytest.raises(Exception): + manager.get_task_status("task1") + + await manager.shutdown() diff --git a/kew/pyproject.toml b/kew/pyproject.toml index a4bfeb9..98e50d2 100644 --- a/kew/pyproject.toml +++ b/kew/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "hatchling.build" [project] name = "kew" -version = "0.1.0" +version = "0.1.1" authors = [ { name="Rach Pradhan", email="rach@rachit.ai" }, ] @@ -33,3 +33,24 @@ dependencies = [ [tool.hatch.build.targets.wheel] packages = ["kew"] + +[tool.pytest.ini_options] +asyncio_mode = "strict" +testpaths = ["tests"] +python_files = ["test_*.py"] +addopts = "-v --cov=kew" + +[tool.coverage.run] +source = ["kew"] +omit = ["tests/*", "setup.py"] + +[tool.coverage.report] +exclude_lines = [ + "pragma: no cover", + "def __repr__", + "if __name__ == .__main__.:", + "raise NotImplementedError", + "raise ImportError", + "except ImportError:", +] +fail_under = 80 \ No newline at end of file diff --git a/kew/test_multiple.py b/kew/test_multiple.py index d8a29c9..8869f1e 100644 --- a/kew/test_multiple.py +++ b/kew/test_multiple.py @@ -1,7 +1,7 @@ # test_multiple.py import asyncio -from kew import TaskQueueManager, TaskStatus import random +from kew import TaskQueueManager, QueueConfig, QueuePriority, TaskStatus async def long_task(task_num: int, sleep_time: int) -> dict: """Simulate a long-running task""" @@ -12,45 +12,74 @@ async def long_task(task_num: int, sleep_time: int) -> dict: return {"task_num": task_num, "result": result} async def main(): - # Initialize manager with 2 workers (so only 2 tasks can run simultaneously) - manager = TaskQueueManager(max_workers=2) + # Initialize manager + manager = TaskQueueManager() - # Submit 5 tasks with different durations + # Create queues + manager.create_queue(QueueConfig( + name="fast_track", + max_workers=2, + priority=QueuePriority.HIGH + )) + + manager.create_queue(QueueConfig( + name="standard", + max_workers=1, + priority=QueuePriority.LOW + )) + + # Submit tasks to different queues tasks = [] - for i in range(5): - sleep_time = random.randint(3, 7) # Random duration between 3-7 seconds + + # Submit 3 high-priority tasks + for i in range(3): + sleep_time = random.randint(2, 4) + task_info = await manager.submit_task( + task_id=f"high_task_{i+1}", + queue_name="fast_track", + task_type="long_calculation", + task_func=long_task, + priority=QueuePriority.HIGH, + task_num=i+1, + sleep_time=sleep_time + ) + tasks.append(task_info) + + # Submit 2 low-priority tasks + for i in range(2): + sleep_time = random.randint(1, 3) task_info = await manager.submit_task( - task_id=f"task{i+1}", + task_id=f"low_task_{i+1}", + queue_name="standard", task_type="long_calculation", task_func=long_task, + priority=QueuePriority.LOW, task_num=i+1, sleep_time=sleep_time ) tasks.append(task_info) - print(f"Submitted task {i+1}") - # Monitor task progress + # Monitor progress while True: all_completed = True print("\nCurrent status:") for task in tasks: status = manager.get_task_status(task.task_id) - print(f"{task.task_id}: {status.status.value} - Result: {status.result}") + print(f"{task.task_id} ({task.queue_name}): {status.status.value} - Result: {status.result}") if status.status not in (TaskStatus.COMPLETED, TaskStatus.FAILED): all_completed = False if all_completed: break - await asyncio.sleep(1) # Wait a second before checking again + await asyncio.sleep(1) - # Final results - print("\nFinal results:") - for task in tasks: - status = manager.get_task_status(task.task_id) - print(f"{task.task_id}: {status.result}") + # Final queue statuses + print("\nFinal Queue Statuses:") + print("Fast Track Queue:", manager.get_queue_status("fast_track")) + print("Standard Queue:", manager.get_queue_status("standard")) - # Properly await shutdown + # Shutdown await manager.shutdown() if __name__ == "__main__":