Skip to content

gecko-71/GThreadPoolManager

Repository files navigation

GThreadPoolManager

GThreadPoolManager is a thread pool manager for Delphi applications that enables background task execution with support for priorities, categorization, cancellation, and monitoring. The code is designed with thread safety and task lifecycle control in mind.

Features

  • Priority-based task queuing — supports five levels: Low, Normal, High, Critical, and Custom.
  • Task categories — allows organizing tasks into predefined and custom groups (e.g., HTTP, FileUpload, FileDownload, etc.).
  • Fixed thread pool size — no automatic scaling; the number of worker threads is static or manually configured.
  • Task statistics and monitoring — includes tracking of queued, running, and completed tasks.
  • Task cancellation and timeouts — supports manual cancellation and execution time limits for tasks.
  • Memory management with UserData ownership — handles user-provided data with optional ownership and cleanup.
  • Integrated logging — logs events, warnings, and errors using a configurable logging module.
  • Thread-safe design — uses synchronization primitives (like TCriticalSection) to ensure safe concurrent access.
  • Graceful shutdown — allows active tasks to finish before the manager shuts down.

image

Quick Start

Basic Usage

uses GThreadPoolManager, Logger, System.SysUtils;

var
  Pool: TThreadPoolManager;
  Logger: THttpLogger;
begin
  // Create logger and pool
  Logger := THttpLogger.Create('app.log');
  Pool := TThreadPoolManager.Create(Logger);

  try
    // Submit a simple task
    Pool.SubmitTask(
      TTaskProc(procedure
      begin
        // Your task code here
        Sleep(1000);
        WriteLn('Task completed!');
      end),
      tpNormal,  // Priority
      tcCustom,  // Category
      'My first task'  // Description
    );

    // Wait for all tasks to complete
    Pool.WaitForAll(5000); // 5 second timeout

  finally
    Pool.Stop(True); // Graceful shutdown
    Pool.Free;
    Logger.Free;
  end;
end.

Priority-based Tasks

// High priority task (executed first)
Pool.SubmitTask(
  TTaskProc(procedure
  begin
    WriteLn('Critical operation executing...');
    ProcessCriticalData;
  end),
  tpHigh,
  tcMaintenance,
  'Critical system maintenance'
);

// Normal priority task
Pool.SubmitTask(
  procedure
  begin
    WriteLn('Regular processing...');
    ProcessRegularData;
  end,
  tpNormal,
  tcCustom,
  'Regular data processing'
);

// Low priority task (executed last)
Pool.SubmitTask(
  TTaskProc(procedure
  begin
    WriteLn('Background cleanup...');
    CleanupTempFiles;
  end),
  tpLow,
  tcCleanup,
  'Background file cleanup'
);

Task with Callback

var
  Task: TPooledTask;
  Pool:TThreadPoolManager;
begin
  Task := Pool.SubmitTaskWithCallback(
    // Main task procedure
    procedure
    begin
      // Simulate file upload
      Sleep(2000);
      // Upload file logic here
    end,
    
    // Completion callback
    procedure(ATask: TPooledTask; ASuccess: Boolean; const AError: string)
    begin
      if ASuccess then
        WriteLn('Upload completed successfully!')
      else
        WriteLn('Upload failed: ' + AError);
    end,
    
    tpNormal,
    tcFileUpload,
    'Upload user-avatar.jpg'
  );
end.

HTTP Request Simulation

procedure ProcessHTTPRequests;
var
  i: Integer;
  CompletedCount: Integer;
  Lock: TCriticalSection;
  Pool:TThreadPoolManager;
begin
  CompletedCount := 0;
  Lock := TCriticalSection.Create;
  
  try
    // Configure pool for HTTP workload
    Pool.SetMinMaxThreads(4, 12);
    Pool.EnableAutoScaling(True);
    
    // Submit 100 HTTP request tasks
    for i := 1 to 100 do
    begin
      Pool.SubmitTask(
        TTaskProc(procedure
        begin
          // Simulate HTTP request processing
          var RequestId := i; // Capture for closure
          var ProcessingTime := Random(200) + 50; // 50-250ms
          
          Sleep(ProcessingTime);
          
          // Thread-safe counter increment
          Lock.Enter;
          try
            Inc(CompletedCount);
            if CompletedCount mod 10 = 0 then
              WriteLn(Format('Processed %d/100 requests', [CompletedCount]));
          finally
            Lock.Leave;
          end;
        end),
        tpNormal,
        tcHTTPRequest,
        Format('HTTP Request #%d', [i])
      );
    end;
    
    // Wait for all requests to complete
    Pool.WaitForAll(30000);
    WriteLn(Format('All %d HTTP requests completed!', [CompletedCount]));
    
  finally
    Lock.Free;
  end;
end;

Task Cancellation

var
  Task: TPooledTask;
  TaskId: Integer;
  Pool:TThreadPoolManager;
begin
  // Submit long-running task
  Task := Pool.SubmitTask(
    procedure
    begin
      var i: Integer;
      for i := 1 to 100 do
      begin
        Sleep(100); // Simulate work
        // Task will be cancelled during execution
      end;
    end,
    tpNormal,
    tcCustom,
    'Long running task'
  );
  
  TaskId := Task.ID;
  
  // Cancel task after 2 seconds
  Sleep(2000);
  if Pool.CancelTask(TaskId) then
    WriteLn('Task cancelled successfully')
  else
    WriteLn('Task was already completed');
end;

Configuration

var
  Config: TPoolConfig;
begin
  // Custom configuration
  Config.MinThreads := 2;
  Config.MaxThreads := 16;
  Config.MaxQueueSize := 1000;
  Config.ThreadIdleTimeout := 30000;      // 30 seconds
  Config.DefaultTaskTimeout := 60000;     // 60 seconds
  Config.EnableStatistics := True;
  Config.EnableAutoScaling := True;
  Config.AutoScaleThreshold := 0.75;      // Scale at 75% utilization
  Config.LoggingEnabled := True;
  Config.CleanupInterval := 60000;        // 60 seconds
  
  Pool.Configure(Config);
end;

Statistics Monitoring

procedure ShowStatistics;
var
  Stats: TPoolStatistics;
begin
  Stats := Pool.GetStatistics;
  
  WriteLn('=== Thread Pool Statistics ===');
  WriteLn(Format('Active Tasks: %d', [Stats.ActiveTasks]));
  WriteLn(Format('Queued Tasks: %d', [Stats.QueuedTasks]));
  WriteLn(Format('Completed: %d', [Stats.CompletedTasks]));
  WriteLn(Format('Errors: %d', [Stats.ErrorTasks]));
  WriteLn(Format('Cancelled: %d', [Stats.CancelledTasks]));
  WriteLn(Format('Peak Active: %d', [Stats.PeakActiveCount]));
  WriteLn(Format('Avg Execution Time: %.2f ms', [Stats.AverageExecutionTime * 1000]));
  WriteLn(Format('Throughput: %.2f tasks/sec', [Stats.TasksPerSecond]));
  WriteLn(Format('Memory Usage: %.2f MB', [Stats.MemoryUsage / (1024 * 1024)]));
  WriteLn(Format('Uptime: %.2f hours', [Stats.UptimeSeconds / 3600]));
  
  // Priority distribution
  WriteLn('--- Priority Distribution ---');
  WriteLn(Format('High Priority: %d', [Stats.TasksByPriority[tpHigh]]));
  WriteLn(Format('Normal Priority: %d', [Stats.TasksByPriority[tpNormal]]));
  WriteLn(Format('Low Priority: %d', [Stats.TasksByPriority[tpLow]]));
  
  // Category distribution
  WriteLn('--- Category Distribution ---');
  WriteLn(Format('HTTP Requests: %d', [Stats.TasksByCategory[tcHTTPRequest]]));
  WriteLn(Format('File Uploads: %d', [Stats.TasksByCategory[tcFileUpload]]));
  WriteLn(Format('Custom Tasks: %d', [Stats.TasksByCategory[tcCustom]]));
end;

Error Handling

var
  Task: TPooledTask;
  Pool:TThreadPoolManager;
begin
  Task := Pool.SubmitTask(
    procedure
    begin
      try
        // Risky operation
        RiskyDatabaseOperation;
      except
        on E: Exception do
        begin
          // Error will be captured automatically
          raise Exception.Create('Database operation failed: ' + E.Message);
        end;
      end;
    end,
    tpHigh,
    tcMaintenance,
    'Database maintenance'
  );
  
  // Wait for task and check result
  if Pool.WaitForTask(Task.ID, 10000) then
  begin
    if Task.Status = tsError then
      WriteLn('Task failed: ' + Task.LastError)
    else
      WriteLn('Task completed successfully');
  end
  else
    WriteLn('Task timed out');
end;

API Reference

Core Classes

  • TThreadPoolManager - Main thread pool manager
  • TPooledTask - Individual task with metadata
  • TTaskQueue - Priority-based task queue
  • TWorkerThread - Worker thread implementation
  • TMaintenanceThread - Background maintenance operations

Task Priorities

TTaskPriority = (tpLow, tpNormal, tpHigh, tpCritical, tpCustom);

Task Categories

TTaskCategory = (tcHTTPRequest, tcFileUpload, tcFileDownload, 
                 tcCleanup, tcMaintenance, tcCustom);

Task Status

TTaskStatus = (tsQueued, tsRunning, tsCompleted, tsError, tsCancelled);

Requirements

  • Delphi 12.3+
  • Windows (uses Windows API for memory monitoring)
  • VCL or Console application

Installation

  1. Clone the repository
  2. Add GThreadPoolManager.pas and Logger.pas to your project
  3. Add units to your uses clause
  4. Create and configure your thread pool

GUI Test Application

The repository includes a complete GUI test application (GThreadPoolGUITest) that demonstrates:

  • Real-time statistics visualization
  • Interactive testing of all features
  • Performance benchmarking
  • Memory usage monitoring
  • Live logging and progress tracking

Performance

Typical performance characteristics:

  • Throughput: 1000+ tasks/second (depends on task complexity)
  • Latency: Sub-millisecond task queuing
  • Memory: Minimal overhead per task (~200 bytes)
  • Scaling: Linear performance up to CPU core count

Thread Safety

All operations are thread-safe:

  • Task submission and cancellation
  • Statistics collection
  • Queue operations
  • Configuration changes
  • Logging operations

License

This project is released under the MIT License. Feel free to use in commercial and non-commercial projects.

Contributing

Contributions are welcome! Please feel free to submit pull requests, report bugs, or suggest new features.


Created for high-performance Delphi applications requiring efficient task processing and thread management.

About

No description or website provided.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages