diff --git a/README.md b/README.md index 4c3e3de..5a02761 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,13 @@ # Kew Task Queue Manager -A flexible and robust asynchronous task queue manager for Python applications. +A flexible and robust asynchronous task queue manager for Python applications with support for multiple priority queues. ## Features +- Multiple named queues with independent configurations +- Priority-based task processing - Asynchronous task execution -- Configurable worker pool +- Configurable worker pools per queue - Task status tracking and monitoring - Automatic cleanup of completed tasks - Thread-safe operations @@ -21,7 +23,7 @@ pip install kew ```python import asyncio -from kew import TaskQueueManager +from kew import TaskQueueManager, QueueConfig, QueuePriority async def example_task(x: int): await asyncio.sleep(1) @@ -29,20 +31,46 @@ async def example_task(x: int): async def main(): # Initialize the task queue manager - manager = TaskQueueManager(max_workers=2) + manager = TaskQueueManager() - # Submit a task - task_info = await manager.submit_task( + # Create queues with different priorities + manager.create_queue(QueueConfig( + name="high_priority", + max_workers=4, + priority=QueuePriority.HIGH + )) + + manager.create_queue(QueueConfig( + name="background", + max_workers=1, + priority=QueuePriority.LOW + )) + + # Submit tasks to different queues + critical_task = await manager.submit_task( task_id="task1", + queue_name="high_priority", task_type="multiplication", task_func=example_task, + priority=QueuePriority.HIGH, x=5 ) - # Wait for result + background_task = await manager.submit_task( + task_id="task2", + queue_name="background", + task_type="multiplication", + task_func=example_task, + priority=QueuePriority.LOW, + x=10 + ) + + # Wait for results await asyncio.sleep(2) - status = manager.get_task_status("task1") - print(f"Result: {status.result}") + high_status = manager.get_task_status("task1") + low_status = manager.get_task_status("task2") + print(f"High Priority Result: {high_status.result}") + print(f"Background Result: {low_status.result}") # Cleanup await manager.shutdown() @@ -51,49 +79,114 @@ if __name__ == "__main__": asyncio.run(main()) ``` -## Advanced Usage +## Queue Management -### Concurrent Task Execution +### Creating Queues ```python -async def main(): - manager = TaskQueueManager(max_workers=2) - - # Submit multiple tasks - tasks = [] - for i in range(5): - task_info = await manager.submit_task( - task_id=f"task{i}", - task_type="example", - task_func=example_task, - x=i - ) - tasks.append(task_info) - - # Wait for all tasks to complete - await manager.wait_for_all_tasks() +from kew import QueueConfig, QueuePriority + +# Create a high-priority queue with 4 workers +manager.create_queue(QueueConfig( + name="critical", + max_workers=4, + priority=QueuePriority.HIGH, + max_size=1000, + task_timeout=3600 +)) + +# Create a background queue with 1 worker +manager.create_queue(QueueConfig( + name="background", + max_workers=1, + priority=QueuePriority.LOW +)) +``` + +### Queue Priorities + +- `QueuePriority.HIGH` (1): Critical tasks +- `QueuePriority.MEDIUM` (2): Standard tasks +- `QueuePriority.LOW` (3): Background tasks + +### Queue Monitoring + +```python +# Get queue status +status = manager.get_queue_status("critical") +print(f"Active Tasks: {status['active_tasks']}") +print(f"Queued Tasks: {status['queued_tasks']}") +print(f"Completed Tasks: {status['completed_tasks']}") +``` + +### Queue Operations + +```python +# Wait for specific queue to complete +await manager.wait_for_queue("critical") + +# Clean up old tasks in a queue +manager.cleanup_old_tasks(max_age_hours=24, queue_name="background") +``` + +## Task Management + +### Submitting Tasks + +```python +task_info = await manager.submit_task( + task_id="unique_id", + queue_name="critical", + task_type="example", + task_func=my_async_function, + priority=QueuePriority.HIGH, + *args, + **kwargs +) ``` ### Task Status Monitoring ```python -status = manager.get_task_status("task1") -print(f"Status: {status.status}") +status = manager.get_task_status("unique_id") +print(f"Status: {status.status}") # TaskStatus.QUEUED, PROCESSING, COMPLETED, FAILED +print(f"Queue: {status.queue_name}") +print(f"Priority: {status.priority}") print(f"Result: {status.result}") print(f"Error: {status.error}") ``` +### Waiting for Tasks + +```python +# Wait for specific task +await manager.wait_for_task("task1", timeout=30) + +# Wait for all tasks in a queue +await manager.wait_for_queue("critical", timeout=60) +``` + ## API Reference ### TaskQueueManager -- `__init__(max_workers=2, queue_size=1000, task_timeout=3600)` -- `async submit_task(task_id, task_type, task_func, *args, **kwargs)` +- `__init__()` +- `create_queue(config: QueueConfig)` +- `async submit_task(task_id, queue_name, task_type, task_func, priority, *args, **kwargs)` - `get_task_status(task_id)` +- `get_queue_status(queue_name)` - `async wait_for_task(task_id, timeout=None)` -- `async wait_for_all_tasks(timeout=None)` -- `cleanup_old_tasks(max_age_hours=24)` -- `async shutdown()` +- `async wait_for_queue(queue_name, timeout=None)` +- `cleanup_old_tasks(max_age_hours=24, queue_name=None)` +- `async shutdown(wait=True)` + +### QueueConfig + +- `name: str` +- `max_workers: int` +- `priority: QueuePriority = QueuePriority.MEDIUM` +- `max_size: int = 1000` +- `task_timeout: int = 3600` ### TaskStatus @@ -103,6 +196,13 @@ Enum with states: - `COMPLETED` - `FAILED` +### QueuePriority + +Enum with levels: +- `HIGH` (1) +- `MEDIUM` (2) +- `LOW` (3) + ## Contributing Contributions are welcome! Please feel free to submit a Pull Request. diff --git a/kew/.coverage b/kew/.coverage new file mode 100644 index 0000000..bc05a2d Binary files /dev/null and b/kew/.coverage differ diff --git a/kew/README.md b/kew/README.md index 4c3e3de..5a02761 100644 --- a/kew/README.md +++ b/kew/README.md @@ -1,11 +1,13 @@ # Kew Task Queue Manager -A flexible and robust asynchronous task queue manager for Python applications. +A flexible and robust asynchronous task queue manager for Python applications with support for multiple priority queues. ## Features +- Multiple named queues with independent configurations +- Priority-based task processing - Asynchronous task execution -- Configurable worker pool +- Configurable worker pools per queue - Task status tracking and monitoring - Automatic cleanup of completed tasks - Thread-safe operations @@ -21,7 +23,7 @@ pip install kew ```python import asyncio -from kew import TaskQueueManager +from kew import TaskQueueManager, QueueConfig, QueuePriority async def example_task(x: int): await asyncio.sleep(1) @@ -29,20 +31,46 @@ async def example_task(x: int): async def main(): # Initialize the task queue manager - manager = TaskQueueManager(max_workers=2) + manager = TaskQueueManager() - # Submit a task - task_info = await manager.submit_task( + # Create queues with different priorities + manager.create_queue(QueueConfig( + name="high_priority", + max_workers=4, + priority=QueuePriority.HIGH + )) + + manager.create_queue(QueueConfig( + name="background", + max_workers=1, + priority=QueuePriority.LOW + )) + + # Submit tasks to different queues + critical_task = await manager.submit_task( task_id="task1", + queue_name="high_priority", task_type="multiplication", task_func=example_task, + priority=QueuePriority.HIGH, x=5 ) - # Wait for result + background_task = await manager.submit_task( + task_id="task2", + queue_name="background", + task_type="multiplication", + task_func=example_task, + priority=QueuePriority.LOW, + x=10 + ) + + # Wait for results await asyncio.sleep(2) - status = manager.get_task_status("task1") - print(f"Result: {status.result}") + high_status = manager.get_task_status("task1") + low_status = manager.get_task_status("task2") + print(f"High Priority Result: {high_status.result}") + print(f"Background Result: {low_status.result}") # Cleanup await manager.shutdown() @@ -51,49 +79,114 @@ if __name__ == "__main__": asyncio.run(main()) ``` -## Advanced Usage +## Queue Management -### Concurrent Task Execution +### Creating Queues ```python -async def main(): - manager = TaskQueueManager(max_workers=2) - - # Submit multiple tasks - tasks = [] - for i in range(5): - task_info = await manager.submit_task( - task_id=f"task{i}", - task_type="example", - task_func=example_task, - x=i - ) - tasks.append(task_info) - - # Wait for all tasks to complete - await manager.wait_for_all_tasks() +from kew import QueueConfig, QueuePriority + +# Create a high-priority queue with 4 workers +manager.create_queue(QueueConfig( + name="critical", + max_workers=4, + priority=QueuePriority.HIGH, + max_size=1000, + task_timeout=3600 +)) + +# Create a background queue with 1 worker +manager.create_queue(QueueConfig( + name="background", + max_workers=1, + priority=QueuePriority.LOW +)) +``` + +### Queue Priorities + +- `QueuePriority.HIGH` (1): Critical tasks +- `QueuePriority.MEDIUM` (2): Standard tasks +- `QueuePriority.LOW` (3): Background tasks + +### Queue Monitoring + +```python +# Get queue status +status = manager.get_queue_status("critical") +print(f"Active Tasks: {status['active_tasks']}") +print(f"Queued Tasks: {status['queued_tasks']}") +print(f"Completed Tasks: {status['completed_tasks']}") +``` + +### Queue Operations + +```python +# Wait for specific queue to complete +await manager.wait_for_queue("critical") + +# Clean up old tasks in a queue +manager.cleanup_old_tasks(max_age_hours=24, queue_name="background") +``` + +## Task Management + +### Submitting Tasks + +```python +task_info = await manager.submit_task( + task_id="unique_id", + queue_name="critical", + task_type="example", + task_func=my_async_function, + priority=QueuePriority.HIGH, + *args, + **kwargs +) ``` ### Task Status Monitoring ```python -status = manager.get_task_status("task1") -print(f"Status: {status.status}") +status = manager.get_task_status("unique_id") +print(f"Status: {status.status}") # TaskStatus.QUEUED, PROCESSING, COMPLETED, FAILED +print(f"Queue: {status.queue_name}") +print(f"Priority: {status.priority}") print(f"Result: {status.result}") print(f"Error: {status.error}") ``` +### Waiting for Tasks + +```python +# Wait for specific task +await manager.wait_for_task("task1", timeout=30) + +# Wait for all tasks in a queue +await manager.wait_for_queue("critical", timeout=60) +``` + ## API Reference ### TaskQueueManager -- `__init__(max_workers=2, queue_size=1000, task_timeout=3600)` -- `async submit_task(task_id, task_type, task_func, *args, **kwargs)` +- `__init__()` +- `create_queue(config: QueueConfig)` +- `async submit_task(task_id, queue_name, task_type, task_func, priority, *args, **kwargs)` - `get_task_status(task_id)` +- `get_queue_status(queue_name)` - `async wait_for_task(task_id, timeout=None)` -- `async wait_for_all_tasks(timeout=None)` -- `cleanup_old_tasks(max_age_hours=24)` -- `async shutdown()` +- `async wait_for_queue(queue_name, timeout=None)` +- `cleanup_old_tasks(max_age_hours=24, queue_name=None)` +- `async shutdown(wait=True)` + +### QueueConfig + +- `name: str` +- `max_workers: int` +- `priority: QueuePriority = QueuePriority.MEDIUM` +- `max_size: int = 1000` +- `task_timeout: int = 3600` ### TaskStatus @@ -103,6 +196,13 @@ Enum with states: - `COMPLETED` - `FAILED` +### QueuePriority + +Enum with levels: +- `HIGH` (1) +- `MEDIUM` (2) +- `LOW` (3) + ## Contributing Contributions are welcome! Please feel free to submit a Pull Request. diff --git a/kew/build_and_publish.py b/kew/build_and_publish.py new file mode 100755 index 0000000..1f13ef0 --- /dev/null +++ b/kew/build_and_publish.py @@ -0,0 +1,27 @@ +#!/usr/bin/env python3 + +import subprocess +import sys + +def run_cmd(cmd): + print(f"Running: {cmd}") + result = subprocess.run(cmd, shell=True) + if result.returncode != 0: + print(f"Error running: {cmd}") + sys.exit(1) + +def main(): + # Run tests + run_cmd("pytest --cov=kew") + + # Build + run_cmd("python -m build") + + # Upload to PyPI/TestPyPI + if "--production" in sys.argv: + run_cmd("twine upload dist/*") + else: + run_cmd("twine upload --repository kew dist/*") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/kew/example.py b/kew/example.py index 4ec13a6..44ca7d9 100644 --- a/kew/example.py +++ b/kew/example.py @@ -1,36 +1,69 @@ -# example.py import asyncio -from kew import TaskQueueManager +from kew import TaskQueueManager, QueueConfig, QueuePriority async def example_task(x: int): await asyncio.sleep(1) return x * 2 async def main(): - # Initialize the task queue manager - manager = TaskQueueManager(max_workers=2) + # Create manager + manager = TaskQueueManager() - # Submit a task - task_info = await manager.submit_task( + # Create queues + manager.create_queue(QueueConfig( + name="critical", + max_workers=4, + priority=QueuePriority.HIGH + )) + + manager.create_queue(QueueConfig( + name="background", + max_workers=1, + priority=QueuePriority.LOW + )) + + # Submit tasks to different queues + task1 = await manager.submit_task( task_id="task1", + queue_name="critical", task_type="multiplication", task_func=example_task, + priority=QueuePriority.HIGH, x=5 ) - # Get task status - status = manager.get_task_status("task1") - print(f"Task status: {status.status}") + task2 = await manager.submit_task( + task_id="task2", + queue_name="background", + task_type="multiplication", + task_func=example_task, + priority=QueuePriority.LOW, + x=10 + ) - # Wait a bit to see the result + # Get initial queue statuses + print("\nInitial Queue Statuses:") + print("Critical Queue:", manager.get_queue_status("critical")) + print("Background Queue:", manager.get_queue_status("background")) + + # Wait for tasks to complete await asyncio.sleep(2) - status = manager.get_task_status("task1") - print(f"Final status: {status.status}") - print(f"Result: {status.result}") - # Cleanup and shutdown - now properly awaited - manager.cleanup_old_tasks() + # Get final queue statuses + print("\nFinal Queue Statuses:") + print("Critical Queue:", manager.get_queue_status("critical")) + print("Background Queue:", manager.get_queue_status("background")) + + # Get task results + task1_status = manager.get_task_status("task1") + task2_status = manager.get_task_status("task2") + + print("\nTask Results:") + print(f"Task 1 (Critical): {task1_status.result}") + print(f"Task 2 (Background): {task2_status.result}") + + # Shutdown await manager.shutdown() if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file + asyncio.run(main()) diff --git a/kew/kew/__init__.py b/kew/kew/__init__.py index ae4114c..db92961 100644 --- a/kew/kew/__init__.py +++ b/kew/kew/__init__.py @@ -1,5 +1,21 @@ from .manager import TaskQueueManager -from .models import TaskStatus, TaskInfo +from .models import TaskStatus, TaskInfo, QueueConfig, QueuePriority +from .exceptions import ( + TaskQueueError, + TaskAlreadyExistsError, + TaskNotFoundError, + QueueNotFoundError +) __version__ = "0.1.0" -__all__ = ["TaskQueueManager", "TaskStatus", "TaskInfo"] +__all__ = [ + "TaskQueueManager", + "TaskStatus", + "TaskInfo", + "QueueConfig", + "QueuePriority", + "TaskQueueError", + "TaskAlreadyExistsError", + "TaskNotFoundError", + "QueueNotFoundError" +] \ No newline at end of file diff --git a/kew/kew/exceptions.py b/kew/kew/exceptions.py index e858772..d9f8d21 100644 --- a/kew/kew/exceptions.py +++ b/kew/kew/exceptions.py @@ -9,3 +9,7 @@ class TaskAlreadyExistsError(TaskQueueError): class TaskNotFoundError(TaskQueueError): """Raised when attempting to access a non-existent task""" pass + +class QueueNotFoundError(TaskQueueError): + """Raised when attempting to access a non-existent queue""" + pass \ No newline at end of file diff --git a/kew/kew/manager.py b/kew/kew/manager.py index 0f47663..984f607 100644 --- a/kew/kew/manager.py +++ b/kew/kew/manager.py @@ -1,28 +1,42 @@ +# kew/kew/manager.py from concurrent.futures import ThreadPoolExecutor -from queue import Queue +from queue import PriorityQueue, Queue import threading -from typing import Optional, Dict, Any, Callable +from typing import Optional, Dict, Any, Callable, List, Tuple from datetime import datetime import logging import asyncio -from .models import TaskStatus, TaskInfo -from .exceptions import TaskAlreadyExistsError, TaskNotFoundError +from .models import TaskStatus, TaskInfo, QueueConfig, QueuePriority +from .exceptions import TaskAlreadyExistsError, TaskNotFoundError, QueueNotFoundError logger = logging.getLogger(__name__) +class PrioritizedItem: + def __init__(self, priority: int, item: Any): + self.priority = priority + self.item = item + self.timestamp = datetime.now() + + def __lt__(self, other): + if self.priority != other.priority: + return self.priority < other.priority + return self.timestamp < other.timestamp + +class QueueWorkerPool: + """Manages workers for a specific queue""" + def __init__(self, config: QueueConfig): + self.config = config + self.executor = ThreadPoolExecutor(max_workers=config.max_workers) + self.queue: PriorityQueue[PrioritizedItem] = PriorityQueue(maxsize=config.max_size) + self._shutdown = False + self._tasks: Dict[str, asyncio.Task] = {} + class TaskQueueManager: - def __init__( - self, - max_workers: int = 2, - queue_size: int = 1000, - task_timeout: int = 3600 - ): - self.executor = ThreadPoolExecutor(max_workers=max_workers) - self.queue = Queue(maxsize=queue_size) - self.task_timeout = task_timeout + def __init__(self): + """Initialize TaskQueueManager with multiple queue support""" + self.queues: Dict[str, QueueWorkerPool] = {} self.tasks: Dict[str, TaskInfo] = {} self._lock = threading.Lock() - self._shutdown = False self._setup_logging() def _setup_logging(self): @@ -34,106 +48,181 @@ def _setup_logging(self): logger.addHandler(handler) logger.setLevel(logging.INFO) + def create_queue(self, config: QueueConfig): + """Create a new queue with specified configuration""" + with self._lock: + if config.name in self.queues: + raise ValueError(f"Queue {config.name} already exists") + worker_pool = QueueWorkerPool(config) + self.queues[config.name] = worker_pool + logger.info(f"Created queue {config.name} with {config.max_workers} workers") + + # Start queue processor + asyncio.create_task(self._process_queue(config.name)) + + async def _process_queue(self, queue_name: str): + """Process tasks in the queue""" + worker_pool = self.queues[queue_name] + + while not worker_pool._shutdown: + try: + if not worker_pool.queue.empty(): + prioritized_item = worker_pool.queue.get_nowait() + task_id = prioritized_item.item + + with self._lock: + task_info = self.tasks[task_id] + if task_info.status == TaskStatus.QUEUED: + # Execute task + task_info.status = TaskStatus.PROCESSING + task_info.started_time = datetime.now() + logger.info(f"Processing task {task_id} from queue {queue_name}") + + # Create task + task = asyncio.create_task(task_info._func(*task_info._args, **task_info._kwargs)) + worker_pool._tasks[task_id] = task + + # Add completion callback + task.add_done_callback( + lambda f, tid=task_id: self._handle_task_completion(tid, f) + ) + + await asyncio.sleep(0.1) # Small delay to prevent busy waiting + + except Exception as e: + logger.error(f"Error processing queue {queue_name}: {str(e)}") + await asyncio.sleep(1) # Longer delay on error + async def submit_task( self, task_id: str, + queue_name: str, task_type: str, task_func: Callable, + priority: QueuePriority = QueuePriority.MEDIUM, *args, **kwargs ) -> TaskInfo: + """Submit a task to a specific queue""" with self._lock: if task_id in self.tasks: raise TaskAlreadyExistsError( f"Task with ID {task_id} already exists" ) - task_info = TaskInfo(task_id, task_type) + if queue_name not in self.queues: + raise QueueNotFoundError( + f"Queue {queue_name} not found" + ) + + worker_pool = self.queues[queue_name] + task_info = TaskInfo(task_id, task_type, queue_name, priority.value) + + # Store function and arguments for later execution + task_info._func = task_func + task_info._args = args + task_info._kwargs = kwargs + self.tasks[task_id] = task_info - logger.info(f"Task {task_id} ({task_type}) submitted") - - # Create task coroutine - task = asyncio.create_task(self._execute_task( - task_id, - task_func, - *args, - **kwargs - )) - - # Store the task for later cleanup - task_info._task = task - - return task_info + + # Add to priority queue + worker_pool.queue.put(PrioritizedItem( + priority=priority.value, + item=task_id + )) + + logger.info(f"Task {task_id} submitted to queue {queue_name}") + + return task_info - async def _execute_task( - self, - task_id: str, - task_func: Callable, - *args, - **kwargs - ): + def _handle_task_completion(self, task_id: str, future: asyncio.Future): + """Handle task completion and cleanup""" try: + result = future.result() with self._lock: task_info = self.tasks[task_id] - task_info.status = TaskStatus.PROCESSING - task_info.started_time = datetime.now() - logger.info(f"Starting execution of task {task_id}") - - # Execute the task function - result = await task_func(*args, **kwargs) - - with self._lock: task_info.result = result task_info.status = TaskStatus.COMPLETED task_info.completed_time = datetime.now() logger.info(f"Task {task_id} completed successfully with result: {result}") - - return result - + + # Clean up task + worker_pool = self.queues[task_info.queue_name] + if task_id in worker_pool._tasks: + del worker_pool._tasks[task_id] + except Exception as e: - logger.error(f"Error executing task {task_id}: {str(e)}") + logger.error(f"Task {task_id} failed: {str(e)}") with self._lock: task_info = self.tasks[task_id] task_info.status = TaskStatus.FAILED task_info.error = str(e) task_info.completed_time = datetime.now() - raise + + def get_task_status(self, task_id: str) -> TaskInfo: + """Get status of a specific task""" + with self._lock: + task_info = self.tasks.get(task_id) + if not task_info: + raise TaskNotFoundError(f"Task {task_id} not found") + return task_info + + def get_queue_status(self, queue_name: str) -> Dict[str, Any]: + """Get status of a specific queue""" + with self._lock: + if queue_name not in self.queues: + raise QueueNotFoundError(f"Queue {queue_name} not found") + + worker_pool = self.queues[queue_name] + queue_tasks = [ + task for task in self.tasks.values() + if task.queue_name == queue_name + ] + + return { + "name": queue_name, + "max_workers": worker_pool.config.max_workers, + "priority": worker_pool.config.priority.value, + "active_tasks": len([t for t in queue_tasks if t.status == TaskStatus.PROCESSING]), + "queued_tasks": len([t for t in queue_tasks if t.status == TaskStatus.QUEUED]), + "completed_tasks": len([t for t in queue_tasks if t.status == TaskStatus.COMPLETED]), + "failed_tasks": len([t for t in queue_tasks if t.status == TaskStatus.FAILED]) + } async def wait_for_task(self, task_id: str, timeout: Optional[float] = None) -> TaskInfo: """Wait for a specific task to complete""" task_info = self.get_task_status(task_id) - if hasattr(task_info, '_task'): + worker_pool = self.queues[task_info.queue_name] + + if task_id in worker_pool._tasks: try: - await asyncio.wait_for(task_info._task, timeout=timeout) + await asyncio.wait_for(worker_pool._tasks[task_id], timeout=timeout) except asyncio.TimeoutError: logger.warning(f"Task {task_id} timed out after {timeout} seconds") raise + return task_info - async def wait_for_all_tasks(self, timeout: Optional[float] = None): - """Wait for all tasks to complete""" - tasks = [ - task_info._task for task_info in self.tasks.values() - if hasattr(task_info, '_task') and not task_info._task.done() - ] + async def wait_for_queue(self, queue_name: str, timeout: Optional[float] = None): + """Wait for all tasks in a specific queue to complete""" + if queue_name not in self.queues: + raise QueueNotFoundError(f"Queue {queue_name} not found") + + worker_pool = self.queues[queue_name] + tasks = list(worker_pool._tasks.values()) + if tasks: await asyncio.wait(tasks, timeout=timeout) - def get_task_status(self, task_id: str) -> TaskInfo: - with self._lock: - task_info = self.tasks.get(task_id) - if not task_info: - raise TaskNotFoundError(f"Task {task_id} not found") - return task_info - - def get_queue_length(self) -> int: - return self.queue.qsize() - - def cleanup_old_tasks(self, max_age_hours: int = 24): + def cleanup_old_tasks(self, max_age_hours: int = 24, queue_name: Optional[str] = None): + """Clean up completed tasks, optionally for a specific queue""" current_time = datetime.now() cleaned_count = 0 with self._lock: for task_id, task_info in list(self.tasks.items()): + if queue_name and task_info.queue_name != queue_name: + continue + if task_info.completed_time: age = current_time - task_info.completed_time if age.total_seconds() > max_age_hours * 3600: @@ -142,9 +231,22 @@ def cleanup_old_tasks(self, max_age_hours: int = 24): logger.info(f"Cleaned up {cleaned_count} old tasks") - async def shutdown(self): + async def shutdown(self, wait: bool = True): + """Shutdown all queues""" logger.info("Shutting down TaskQueueManager") - self._shutdown = True - # Wait for pending tasks - await self.wait_for_all_tasks(timeout=5.0) - self.executor.shutdown(wait=True) \ No newline at end of file + + # Wait for pending tasks if requested + if wait: + for queue_name, worker_pool in self.queues.items(): + tasks = list(worker_pool._tasks.values()) + if tasks: + try: + await asyncio.wait(tasks, timeout=5.0) + except Exception as e: + logger.error(f"Error waiting for tasks in queue {queue_name}: {str(e)}") + + # Shutdown all worker pools + for queue_name, worker_pool in self.queues.items(): + worker_pool._shutdown = True + worker_pool.executor.shutdown(wait=wait) + logger.info(f"Shut down queue {queue_name}") \ No newline at end of file diff --git a/kew/kew/models.py b/kew/kew/models.py index 6c8c2bd..5688188 100644 --- a/kew/kew/models.py +++ b/kew/kew/models.py @@ -1,6 +1,7 @@ from datetime import datetime from enum import Enum from typing import Optional, TypeVar, Generic +from dataclasses import dataclass T = TypeVar('T') # Generic type for task result @@ -10,10 +11,26 @@ class TaskStatus(Enum): COMPLETED = "completed" FAILED = "failed" +class QueuePriority(Enum): + HIGH = 1 + MEDIUM = 2 + LOW = 3 + +@dataclass +class QueueConfig: + """Configuration for a single queue""" + name: str + max_workers: int + priority: QueuePriority = QueuePriority.MEDIUM + max_size: int = 1000 + task_timeout: int = 3600 + class TaskInfo(Generic[T]): - def __init__(self, task_id: str, task_type: str): + def __init__(self, task_id: str, task_type: str, queue_name: str, priority: int): self.task_id = task_id self.task_type = task_type + self.queue_name = queue_name + self.priority = priority self.status = TaskStatus.QUEUED self.queued_time = datetime.now() self.started_time: Optional[datetime] = None @@ -22,10 +39,11 @@ def __init__(self, task_id: str, task_type: str): self.error: Optional[str] = None def to_dict(self): - """Convert TaskInfo to dictionary representation""" return { "task_id": self.task_id, "task_type": self.task_type, + "queue_name": self.queue_name, + "priority": self.priority, "status": self.status.value, "queued_time": self.queued_time.isoformat(), "started_time": self.started_time.isoformat() if self.started_time else None, diff --git a/kew/kew/tests/conftest.py b/kew/kew/tests/conftest.py new file mode 100644 index 0000000..454080e --- /dev/null +++ b/kew/kew/tests/conftest.py @@ -0,0 +1,11 @@ +# kew/tests/conftest.py +import pytest +import asyncio +from kew import TaskQueueManager + +@pytest.fixture +async def manager(): + """Fixture that provides a TaskQueueManager instance""" + manager = TaskQueueManager() + yield manager + await manager.shutdown() diff --git a/kew/kew/tests/test_queue_manager.py b/kew/kew/tests/test_queue_manager.py new file mode 100644 index 0000000..cf3fd20 --- /dev/null +++ b/kew/kew/tests/test_queue_manager.py @@ -0,0 +1,222 @@ +import pytest +import asyncio +import random +from kew import TaskQueueManager, QueueConfig, QueuePriority, TaskStatus + +async def long_task(task_num: int, sleep_time: float) -> dict: + """Simulate a long-running task""" + await asyncio.sleep(sleep_time) + result = sleep_time * 2 + return {"task_num": task_num, "result": result} + +@pytest.mark.asyncio +async def test_single_queue(): + """Test single queue operation""" + manager = TaskQueueManager() + + # Create queue + manager.create_queue(QueueConfig( + name="test_queue", + max_workers=2, + priority=QueuePriority.HIGH + )) + + # Submit task + task_info = await manager.submit_task( + task_id="task1", + queue_name="test_queue", + task_type="test", + task_func=long_task, + priority=QueuePriority.HIGH, + task_num=1, + sleep_time=0.1 + ) + + # Check initial status + status = manager.get_task_status(task_info.task_id) + assert status.queue_name == "test_queue" + + # Wait for completion + await asyncio.sleep(0.2) + + # Check final status + status = manager.get_task_status(task_info.task_id) + assert status.status == TaskStatus.COMPLETED + assert status.result["task_num"] == 1 + assert status.result["result"] == 0.2 + + await manager.shutdown() + +@pytest.mark.asyncio +async def test_multiple_queues(): + """Test multiple queues with different priorities""" + manager = TaskQueueManager() + + # Create queues + manager.create_queue(QueueConfig( + name="fast_track", + max_workers=2, + priority=QueuePriority.HIGH + )) + + manager.create_queue(QueueConfig( + name="standard", + max_workers=1, + priority=QueuePriority.LOW + )) + + tasks = [] + + # Submit high-priority tasks + for i in range(2): + sleep_time = 0.1 + task_info = await manager.submit_task( + task_id=f"high_task_{i+1}", + queue_name="fast_track", + task_type="test", + task_func=long_task, + priority=QueuePriority.HIGH, + task_num=i+1, + sleep_time=sleep_time + ) + tasks.append(task_info) + + # Submit low-priority task + task_info = await manager.submit_task( + task_id="low_task_1", + queue_name="standard", + task_type="test", + task_func=long_task, + priority=QueuePriority.LOW, + task_num=3, + sleep_time=0.1 + ) + tasks.append(task_info) + + # Wait for completion + await asyncio.sleep(0.3) + + # Check all tasks completed + for task in tasks: + status = manager.get_task_status(task.task_id) + assert status.status == TaskStatus.COMPLETED + assert status.result is not None + + # Check queue statuses + fast_track_status = manager.get_queue_status("fast_track") + standard_status = manager.get_queue_status("standard") + + assert fast_track_status["completed_tasks"] == 2 + assert standard_status["completed_tasks"] == 1 + + await manager.shutdown() + +@pytest.mark.asyncio +async def test_queue_priorities(): + """Test that high priority tasks complete before low priority ones""" + manager = TaskQueueManager() + + manager.create_queue(QueueConfig( + name="mixed_queue", + max_workers=1, # Single worker to ensure sequential execution + priority=QueuePriority.MEDIUM + )) + + completion_order = [] + + async def tracking_task(priority_level: str): + await asyncio.sleep(0.1) + completion_order.append(priority_level) + return priority_level + + # Submit low priority task first + await manager.submit_task( + task_id="low_priority", + queue_name="mixed_queue", + task_type="test", + task_func=tracking_task, + priority=QueuePriority.LOW, + priority_level="low" + ) + + # Submit high priority task second + await manager.submit_task( + task_id="high_priority", + queue_name="mixed_queue", + task_type="test", + task_func=tracking_task, + priority=QueuePriority.HIGH, + priority_level="high" + ) + + # Wait for completion + await asyncio.sleep(0.3) + + # High priority task should complete first + assert completion_order[0] == "high" + assert completion_order[1] == "low" + + await manager.shutdown() + +@pytest.mark.asyncio +async def test_error_handling(): + """Test error handling in tasks""" + manager = TaskQueueManager() + + manager.create_queue(QueueConfig( + name="test_queue", + max_workers=1 + )) + + async def failing_task(): + await asyncio.sleep(0.1) + raise ValueError("Test error") + + task_info = await manager.submit_task( + task_id="failing_task", + queue_name="test_queue", + task_type="test", + task_func=failing_task, + priority=QueuePriority.MEDIUM + ) + + # Wait for task to fail + await asyncio.sleep(0.2) + + status = manager.get_task_status("failing_task") + assert status.status == TaskStatus.FAILED + assert "Test error" in status.error + + await manager.shutdown() + +@pytest.mark.asyncio +async def test_queue_cleanup(): + """Test queue cleanup functionality""" + manager = TaskQueueManager() + + manager.create_queue(QueueConfig( + name="test_queue", + max_workers=1 + )) + + task_info = await manager.submit_task( + task_id="task1", + queue_name="test_queue", + task_type="test", + task_func=long_task, + priority=QueuePriority.MEDIUM, + task_num=1, + sleep_time=0.1 + ) + + # Wait for completion + await asyncio.sleep(0.2) + + # Clean up old tasks + manager.cleanup_old_tasks(max_age_hours=0) + + # Check task was cleaned up + with pytest.raises(Exception): + manager.get_task_status("task1") + + await manager.shutdown() diff --git a/kew/pyproject.toml b/kew/pyproject.toml index a4bfeb9..98e50d2 100644 --- a/kew/pyproject.toml +++ b/kew/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "hatchling.build" [project] name = "kew" -version = "0.1.0" +version = "0.1.1" authors = [ { name="Rach Pradhan", email="rach@rachit.ai" }, ] @@ -33,3 +33,24 @@ dependencies = [ [tool.hatch.build.targets.wheel] packages = ["kew"] + +[tool.pytest.ini_options] +asyncio_mode = "strict" +testpaths = ["tests"] +python_files = ["test_*.py"] +addopts = "-v --cov=kew" + +[tool.coverage.run] +source = ["kew"] +omit = ["tests/*", "setup.py"] + +[tool.coverage.report] +exclude_lines = [ + "pragma: no cover", + "def __repr__", + "if __name__ == .__main__.:", + "raise NotImplementedError", + "raise ImportError", + "except ImportError:", +] +fail_under = 80 \ No newline at end of file diff --git a/kew/test_multiple.py b/kew/test_multiple.py index d8a29c9..8869f1e 100644 --- a/kew/test_multiple.py +++ b/kew/test_multiple.py @@ -1,7 +1,7 @@ # test_multiple.py import asyncio -from kew import TaskQueueManager, TaskStatus import random +from kew import TaskQueueManager, QueueConfig, QueuePriority, TaskStatus async def long_task(task_num: int, sleep_time: int) -> dict: """Simulate a long-running task""" @@ -12,45 +12,74 @@ async def long_task(task_num: int, sleep_time: int) -> dict: return {"task_num": task_num, "result": result} async def main(): - # Initialize manager with 2 workers (so only 2 tasks can run simultaneously) - manager = TaskQueueManager(max_workers=2) + # Initialize manager + manager = TaskQueueManager() - # Submit 5 tasks with different durations + # Create queues + manager.create_queue(QueueConfig( + name="fast_track", + max_workers=2, + priority=QueuePriority.HIGH + )) + + manager.create_queue(QueueConfig( + name="standard", + max_workers=1, + priority=QueuePriority.LOW + )) + + # Submit tasks to different queues tasks = [] - for i in range(5): - sleep_time = random.randint(3, 7) # Random duration between 3-7 seconds + + # Submit 3 high-priority tasks + for i in range(3): + sleep_time = random.randint(2, 4) + task_info = await manager.submit_task( + task_id=f"high_task_{i+1}", + queue_name="fast_track", + task_type="long_calculation", + task_func=long_task, + priority=QueuePriority.HIGH, + task_num=i+1, + sleep_time=sleep_time + ) + tasks.append(task_info) + + # Submit 2 low-priority tasks + for i in range(2): + sleep_time = random.randint(1, 3) task_info = await manager.submit_task( - task_id=f"task{i+1}", + task_id=f"low_task_{i+1}", + queue_name="standard", task_type="long_calculation", task_func=long_task, + priority=QueuePriority.LOW, task_num=i+1, sleep_time=sleep_time ) tasks.append(task_info) - print(f"Submitted task {i+1}") - # Monitor task progress + # Monitor progress while True: all_completed = True print("\nCurrent status:") for task in tasks: status = manager.get_task_status(task.task_id) - print(f"{task.task_id}: {status.status.value} - Result: {status.result}") + print(f"{task.task_id} ({task.queue_name}): {status.status.value} - Result: {status.result}") if status.status not in (TaskStatus.COMPLETED, TaskStatus.FAILED): all_completed = False if all_completed: break - await asyncio.sleep(1) # Wait a second before checking again + await asyncio.sleep(1) - # Final results - print("\nFinal results:") - for task in tasks: - status = manager.get_task_status(task.task_id) - print(f"{task.task_id}: {status.result}") + # Final queue statuses + print("\nFinal Queue Statuses:") + print("Fast Track Queue:", manager.get_queue_status("fast_track")) + print("Standard Queue:", manager.get_queue_status("standard")) - # Properly await shutdown + # Shutdown await manager.shutdown() if __name__ == "__main__":