Skip to content

Commit

Permalink
updated to version 0.1.1, added priority supports for more types of q…
Browse files Browse the repository at this point in the history
…ueues as well!
  • Loading branch information
justrach committed Oct 31, 2024
1 parent e9f40cb commit 26bb996
Show file tree
Hide file tree
Showing 13 changed files with 862 additions and 179 deletions.
168 changes: 134 additions & 34 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
# Kew Task Queue Manager

A flexible and robust asynchronous task queue manager for Python applications.
A flexible and robust asynchronous task queue manager for Python applications with support for multiple priority queues.

## Features

- Multiple named queues with independent configurations
- Priority-based task processing
- Asynchronous task execution
- Configurable worker pool
- Configurable worker pools per queue
- Task status tracking and monitoring
- Automatic cleanup of completed tasks
- Thread-safe operations
Expand All @@ -21,28 +23,54 @@ pip install kew

```python
import asyncio
from kew import TaskQueueManager
from kew import TaskQueueManager, QueueConfig, QueuePriority

async def example_task(x: int):
await asyncio.sleep(1)
return x * 2

async def main():
# Initialize the task queue manager
manager = TaskQueueManager(max_workers=2)
manager = TaskQueueManager()

# Submit a task
task_info = await manager.submit_task(
# Create queues with different priorities
manager.create_queue(QueueConfig(
name="high_priority",
max_workers=4,
priority=QueuePriority.HIGH
))

manager.create_queue(QueueConfig(
name="background",
max_workers=1,
priority=QueuePriority.LOW
))

# Submit tasks to different queues
critical_task = await manager.submit_task(
task_id="task1",
queue_name="high_priority",
task_type="multiplication",
task_func=example_task,
priority=QueuePriority.HIGH,
x=5
)

# Wait for result
background_task = await manager.submit_task(
task_id="task2",
queue_name="background",
task_type="multiplication",
task_func=example_task,
priority=QueuePriority.LOW,
x=10
)

# Wait for results
await asyncio.sleep(2)
status = manager.get_task_status("task1")
print(f"Result: {status.result}")
high_status = manager.get_task_status("task1")
low_status = manager.get_task_status("task2")
print(f"High Priority Result: {high_status.result}")
print(f"Background Result: {low_status.result}")

# Cleanup
await manager.shutdown()
Expand All @@ -51,49 +79,114 @@ if __name__ == "__main__":
asyncio.run(main())
```

## Advanced Usage
## Queue Management

### Concurrent Task Execution
### Creating Queues

```python
async def main():
manager = TaskQueueManager(max_workers=2)

# Submit multiple tasks
tasks = []
for i in range(5):
task_info = await manager.submit_task(
task_id=f"task{i}",
task_type="example",
task_func=example_task,
x=i
)
tasks.append(task_info)

# Wait for all tasks to complete
await manager.wait_for_all_tasks()
from kew import QueueConfig, QueuePriority

# Create a high-priority queue with 4 workers
manager.create_queue(QueueConfig(
name="critical",
max_workers=4,
priority=QueuePriority.HIGH,
max_size=1000,
task_timeout=3600
))

# Create a background queue with 1 worker
manager.create_queue(QueueConfig(
name="background",
max_workers=1,
priority=QueuePriority.LOW
))
```

### Queue Priorities

- `QueuePriority.HIGH` (1): Critical tasks
- `QueuePriority.MEDIUM` (2): Standard tasks
- `QueuePriority.LOW` (3): Background tasks

### Queue Monitoring

```python
# Get queue status
status = manager.get_queue_status("critical")
print(f"Active Tasks: {status['active_tasks']}")
print(f"Queued Tasks: {status['queued_tasks']}")
print(f"Completed Tasks: {status['completed_tasks']}")
```

### Queue Operations

```python
# Wait for specific queue to complete
await manager.wait_for_queue("critical")

# Clean up old tasks in a queue
manager.cleanup_old_tasks(max_age_hours=24, queue_name="background")
```

## Task Management

### Submitting Tasks

```python
task_info = await manager.submit_task(
task_id="unique_id",
queue_name="critical",
task_type="example",
task_func=my_async_function,
priority=QueuePriority.HIGH,
*args,
**kwargs
)
```

### Task Status Monitoring

```python
status = manager.get_task_status("task1")
print(f"Status: {status.status}")
status = manager.get_task_status("unique_id")
print(f"Status: {status.status}") # TaskStatus.QUEUED, PROCESSING, COMPLETED, FAILED
print(f"Queue: {status.queue_name}")
print(f"Priority: {status.priority}")
print(f"Result: {status.result}")
print(f"Error: {status.error}")
```

### Waiting for Tasks

```python
# Wait for specific task
await manager.wait_for_task("task1", timeout=30)

# Wait for all tasks in a queue
await manager.wait_for_queue("critical", timeout=60)
```

## API Reference

### TaskQueueManager

- `__init__(max_workers=2, queue_size=1000, task_timeout=3600)`
- `async submit_task(task_id, task_type, task_func, *args, **kwargs)`
- `__init__()`
- `create_queue(config: QueueConfig)`
- `async submit_task(task_id, queue_name, task_type, task_func, priority, *args, **kwargs)`
- `get_task_status(task_id)`
- `get_queue_status(queue_name)`
- `async wait_for_task(task_id, timeout=None)`
- `async wait_for_all_tasks(timeout=None)`
- `cleanup_old_tasks(max_age_hours=24)`
- `async shutdown()`
- `async wait_for_queue(queue_name, timeout=None)`
- `cleanup_old_tasks(max_age_hours=24, queue_name=None)`
- `async shutdown(wait=True)`

### QueueConfig

- `name: str`
- `max_workers: int`
- `priority: QueuePriority = QueuePriority.MEDIUM`
- `max_size: int = 1000`
- `task_timeout: int = 3600`

### TaskStatus

Expand All @@ -103,6 +196,13 @@ Enum with states:
- `COMPLETED`
- `FAILED`

### QueuePriority

Enum with levels:
- `HIGH` (1)
- `MEDIUM` (2)
- `LOW` (3)

## Contributing

Contributions are welcome! Please feel free to submit a Pull Request.
Expand Down
Binary file added kew/.coverage
Binary file not shown.
Loading

0 comments on commit 26bb996

Please sign in to comment.