diff --git a/src/Agent.Listener/JobDispatcher.cs b/src/Agent.Listener/JobDispatcher.cs index b22acc3e6d..25eb155b7b 100644 --- a/src/Agent.Listener/JobDispatcher.cs +++ b/src/Agent.Listener/JobDispatcher.cs @@ -383,7 +383,6 @@ private async Task RunAsync(Pipelines.AgentJobRequestMessage message, WorkerDisp var jobRequestCancellationToken = newJobDispatch.WorkerCancellationTokenSource.Token; var workerCancelTimeoutKillToken = newJobDispatch.WorkerCancelTimeoutKillTokenSource.Token; - var workerFlushLogsTimeoutToken = newJobDispatch.WorkerFlushLogsTimeoutTokenSource.Token; var term = HostContext.GetService(); term.WriteLine(StringUtil.Loc("RunningJob", DateTime.UtcNow, message.JobDisplayName)); @@ -451,7 +450,6 @@ private async Task RunAsync(Pipelines.AgentJobRequestMessage message, WorkerDisp var featureFlagProvider = HostContext.GetService(); var newMaskerAndRegexesFeatureFlagStatus = await featureFlagProvider.GetFeatureFlagAsync(HostContext, "DistributedTask.Agent.EnableNewMaskerAndRegexes", Trace); var enhancedLoggingFlag = await featureFlagProvider.GetFeatureFlagAsync(HostContext, "DistributedTask.Agent.UseEnhancedLogging", Trace); - var environment = new Dictionary(); if (newMaskerAndRegexesFeatureFlagStatus?.EffectiveState == "On") { @@ -736,19 +734,24 @@ await processChannel.SendAsync( } Trace.Info($"Waiting for worker to exit gracefully for job: {message.JobId}"); + // wait worker to exit + // if worker doesn't exit within timeout, then kill worker. + var exitTask = await Task.WhenAny(workerProcessTask, Task.Delay(-1, workerCancelTimeoutKillToken)); - // Wait for worker to complete within the original timeout - var gracefulExitTask = await Task.WhenAny(workerProcessTask, Task.Delay(-1, workerFlushLogsTimeoutToken)); - - if (gracefulExitTask != workerProcessTask) + // worker haven't exit within cancellation timeout. + if (exitTask != workerProcessTask) { - // Original timeout expired, handle with timeout log flushing if enabled - await HandleWorkerTimeoutAsync( - message.JobId, - processChannel, - workerProcessTask, - workerProcessCancelTokenSource, - workerCancelTimeoutKillToken); + Trace.Info($"worker process for job {message.JobId} haven't exit within cancellation timout, kill running worker."); + workerProcessCancelTokenSource.Cancel(); + try + { + await workerProcessTask; + Trace.Info("Worker process forceful termination completed"); + } + catch (OperationCanceledException) + { + Trace.Info("worker process has been killed."); + } } else { @@ -1067,7 +1070,6 @@ private class WorkerDispatcher : IDisposable public TaskCompletionSource MetadataSource { get; set; } public CancellationTokenSource WorkerCancellationTokenSource { get; private set; } public CancellationTokenSource WorkerCancelTimeoutKillTokenSource { get; private set; } - public CancellationTokenSource WorkerFlushLogsTimeoutTokenSource { get; private set; } private readonly object _lock = new object(); const int maxValueInMinutes = 35790; // 35790 * 60 * 1000 = 2147400000 @@ -1078,19 +1080,18 @@ public WorkerDispatcher(Guid jobId, long requestId) { JobId = jobId; RequestId = requestId; - WorkerCancellationTokenSource = new CancellationTokenSource(); WorkerCancelTimeoutKillTokenSource = new CancellationTokenSource(); - WorkerFlushLogsTimeoutTokenSource = new CancellationTokenSource(); + WorkerCancellationTokenSource = new CancellationTokenSource(); MetadataSource = new TaskCompletionSource(); } public bool Cancel(TimeSpan timeout) { - if (WorkerCancellationTokenSource != null && WorkerCancelTimeoutKillTokenSource != null && WorkerFlushLogsTimeoutTokenSource != null) + if (WorkerCancellationTokenSource != null && WorkerCancelTimeoutKillTokenSource != null) { lock (_lock) { - if (WorkerCancellationTokenSource != null && WorkerCancelTimeoutKillTokenSource != null && WorkerFlushLogsTimeoutTokenSource != null) + if (WorkerCancellationTokenSource != null && WorkerCancelTimeoutKillTokenSource != null) { WorkerCancellationTokenSource.Cancel(); @@ -1106,12 +1107,7 @@ public bool Cancel(TimeSpan timeout) timeout = TimeSpan.FromMinutes(maxValueInMinutes); } - // Use the original timeout for worker execution (no flush signal beforehand) - WorkerFlushLogsTimeoutTokenSource.CancelAfter(timeout.Subtract(TimeSpan.FromSeconds(15))); - - // Set kill timeout to original timeout + 1 minute for log flushing - TimeSpan killTimeout = timeout.Add(TimeSpan.FromMinutes(1)); - WorkerCancelTimeoutKillTokenSource.CancelAfter(killTimeout); + WorkerCancelTimeoutKillTokenSource.CancelAfter(timeout.Subtract(TimeSpan.FromSeconds(15))); return true; } } @@ -1143,7 +1139,7 @@ private void Dispose(bool disposing) { if (disposing) { - if (WorkerCancellationTokenSource != null || WorkerCancelTimeoutKillTokenSource != null || WorkerFlushLogsTimeoutTokenSource != null) + if (WorkerCancellationTokenSource != null || WorkerCancelTimeoutKillTokenSource != null) { lock (_lock) { @@ -1158,66 +1154,10 @@ private void Dispose(bool disposing) WorkerCancelTimeoutKillTokenSource.Dispose(); WorkerCancelTimeoutKillTokenSource = null; } - - if (WorkerFlushLogsTimeoutTokenSource != null) - { - WorkerFlushLogsTimeoutTokenSource.Dispose(); - WorkerFlushLogsTimeoutTokenSource = null; - } } } } } } - - private async Task HandleWorkerTimeoutAsync( - Guid jobId, - IProcessChannel processChannel, - Task workerProcessTask, - CancellationTokenSource workerProcessCancelTokenSource, - CancellationToken workerCancelTimeoutKillToken) - { - Trace.Info($"Worker process for job {jobId} hasn't completed within original timeout, sending flush logs request and waiting 1 minute before forceful kill."); - try - { - // Send special flush logs request to worker - using (var csSendFlush = new CancellationTokenSource(_channelTimeout)) - { - await processChannel.SendAsync( - messageType: MessageType.FlushLogsRequest, - body: string.Empty, - cancellationToken: csSendFlush.Token); - } - Trace.Info("Flush logs request sent to worker, waiting 1 minute for log flushing before forceful kill."); - } - catch (Exception ex) - { - Trace.Warning($"Failed to send flush logs request to worker: {ex.Message}"); - Trace.Warning(ex.ToString()); - } - - // Now wait for the additional 1 minute log flushing period - try - { - await Task.WhenAny(workerProcessTask, Task.Delay(-1, workerCancelTimeoutKillToken)); - - if (!workerProcessTask.IsCompleted) - { - // Worker still hasn't exited after 1 minute log flushing period, force kill - Trace.Info($"Worker process for job {jobId} hasn't exited after 1 minute log flushing period, proceeding to forceful kill."); - workerProcessCancelTokenSource.Cancel(); - await workerProcessTask; - Trace.Info("Worker process forceful termination completed"); - } - else - { - Trace.Info("Worker process exited gracefully after flush logs signal"); - } - } - catch (OperationCanceledException) - { - Trace.Info("worker process has been killed."); - } - } } } diff --git a/src/Agent.Sdk/Knob/AgentKnobs.cs b/src/Agent.Sdk/Knob/AgentKnobs.cs index 2ee55de97c..77f17c7f85 100644 --- a/src/Agent.Sdk/Knob/AgentKnobs.cs +++ b/src/Agent.Sdk/Knob/AgentKnobs.cs @@ -709,14 +709,6 @@ public class AgentKnobs new EnvironmentKnobSource("AZP_ENABLE_NEW_MASKER_AND_REGEXES"), new BuiltInDefaultKnobSource("false")); - public static readonly Knob EnableTimeoutLogFlushing = new Knob( - nameof(EnableTimeoutLogFlushing), - "If true, enables timeout log flushing where worker gets 1 minute to flush logs after job timeout before force kill.", - new PipelineFeatureSource("EnableTimeoutLogFlushing"), - new RuntimeKnobSource("AZP_ENABLE_TIMEOUT_LOG_FLUSHING"), - new EnvironmentKnobSource("AZP_ENABLE_TIMEOUT_LOG_FLUSHING"), - new BuiltInDefaultKnobSource("false")); - public static readonly Knob SendSecretMaskerTelemetry = new Knob( nameof(SendSecretMaskerTelemetry), "If true, the agent will send telemetry about secret masking", diff --git a/src/Agent.Worker/JobRunner.cs b/src/Agent.Worker/JobRunner.cs index d5152f6025..7d9968807f 100644 --- a/src/Agent.Worker/JobRunner.cs +++ b/src/Agent.Worker/JobRunner.cs @@ -103,7 +103,6 @@ public async Task RunAsync(Pipelines.AgentJobRequestMessage message, IExecutionContext jobContext = null; CancellationTokenRegistration? agentShutdownRegistration = null; - CancellationTokenRegistration? workerTimeoutRegistration = null; VssConnection taskConnection = null; VssConnection legacyTaskConnection = null; IResourceMetricsManager resourceDiagnosticManager = null; @@ -160,13 +159,6 @@ public async Task RunAsync(Pipelines.AgentJobRequestMessage message, jobContext.AddIssue(new Issue() { Type = IssueType.Error, Message = errorMessage }); }); - // Register for worker timeout cancellation - similar to agent shutdown - workerTimeoutRegistration = HostContext.WorkerShutdownForTimeout.Register(() => - { - Trace.Warning($"Worker shutdown for timeout triggered [JobId:{message.JobId}]"); - jobContext.AddIssue(new Issue() { Type = IssueType.Error, Message = "Job cancelled due to worker timeout." }); - }); - // Validate directory permissions. string workDirectory = HostContext.GetDirectory(WellKnownDirectory.Work); Trace.Info($"Validating directory permissions for: '{workDirectory}'"); @@ -462,12 +454,6 @@ public async Task RunAsync(Pipelines.AgentJobRequestMessage message, agentShutdownRegistration = null; } - if (workerTimeoutRegistration != null) - { - workerTimeoutRegistration.Value.Dispose(); - workerTimeoutRegistration = null; - } - legacyTaskConnection?.Dispose(); taskConnection?.Dispose(); jobConnection?.Dispose(); @@ -753,4 +739,4 @@ private void PublishTelemetry(IExecutionContext context, string area, String fea } } } -} \ No newline at end of file +} diff --git a/src/Agent.Worker/StepsRunner.cs b/src/Agent.Worker/StepsRunner.cs index e91a6f9877..b8a5fcdb74 100644 --- a/src/Agent.Worker/StepsRunner.cs +++ b/src/Agent.Worker/StepsRunner.cs @@ -146,14 +146,6 @@ public async Task RunAsync(IExecutionContext jobContext, IList steps) conditionReTestResult = false; Trace.Info($"Condition re-evaluation skipped [Step:'{step.DisplayName}', Reason:AgentShutdown]"); } - else if (AgentKnobs.EnableTimeoutLogFlushing.GetValue(step.ExecutionContext).AsBoolean() && - HostContext.WorkerShutdownForTimeout.IsCancellationRequested) - { - jobContext.Result = TaskResult.Canceled; - jobContext.Variables.Agent_JobStatus = jobContext.Result; - conditionReTestResult = false; - Trace.Info($"Condition re-evaluation skipped [Step:'{step.DisplayName}', Reason:WorkerTimeout]"); - } else { try @@ -211,14 +203,6 @@ public async Task RunAsync(IExecutionContext jobContext, IList steps) conditionResult = false; Trace.Info($"Condition evaluation skipped due to agent shutdown: '{step.DisplayName}'"); } - else if (AgentKnobs.EnableTimeoutLogFlushing.GetValue(step.ExecutionContext).AsBoolean() && - HostContext.WorkerShutdownForTimeout.IsCancellationRequested) - { - jobContext.Result = TaskResult.Canceled; - jobContext.Variables.Agent_JobStatus = jobContext.Result; - conditionResult = false; - Trace.Info($"Condition evaluation skipped due to worker timeout: '{step.DisplayName}'"); - } else { try @@ -256,8 +240,8 @@ public async Task RunAsync(IExecutionContext jobContext, IList steps) else { Trace.Info($"RunStepAsync execution initiated for step: '{step.DisplayName}'"); - // Run the step with worker timeout integration. - await RunStepWithTimeoutAsync(step, jobContext.CancellationToken); + // Run the step. + await RunStepAsync(step, jobContext.CancellationToken); Trace.Info($"RunStepAsync execution completed for step: '{step.DisplayName}' - Result: {step.ExecutionContext.Result}"); } } @@ -296,41 +280,6 @@ public async Task RunAsync(IExecutionContext jobContext, IList steps) } } - private async Task RunStepWithTimeoutAsync(IStep step, CancellationToken jobCancellationToken) - { - Trace.Info($"Individual step execution initiated: '{step.DisplayName}'"); - - // Check if timeout log flushing feature is enabled - bool timeoutLogFlushingEnabled = AgentKnobs.EnableTimeoutLogFlushing.GetValue(step.ExecutionContext).AsBoolean(); - - // Register for worker timeout to cancel the step only if timeout log flushing is enabled - CancellationTokenRegistration? workerTimeoutRegistration = null; - if (timeoutLogFlushingEnabled && !HostContext.WorkerShutdownForTimeout.IsCancellationRequested) - { - workerTimeoutRegistration = HostContext.WorkerShutdownForTimeout.Register(() => - { - Trace.Warning($"Worker timeout detected during step execution: '{step.DisplayName}' - cancelling step"); - step.ExecutionContext.Error("Step cancelled due to worker timeout"); - step.ExecutionContext.CancelToken(); - }); - Trace.Info($"Worker timeout registration active for step: '{step.DisplayName}'"); - } - - try - { - await RunStepAsync(step, jobCancellationToken); - } - finally - { - // Dispose worker timeout registration - if (workerTimeoutRegistration != null) - { - workerTimeoutRegistration.Value.Dispose(); - Trace.Info($"Worker timeout registration disposed for step: '{step.DisplayName}'"); - } - } - } - private async Task RunStepAsync(IStep step, CancellationToken jobCancellationToken) { Trace.Info($"Individual step execution initiated: '{step.DisplayName}'"); @@ -453,7 +402,7 @@ private async Task RunStepAsync(IStep step, CancellationToken jobCancellationTok Trace.Info($"Step result merged with command result - Step: {step.DisplayName}, CommandResult:{step.ExecutionContext.CommandResult} FinalResult: {step.ExecutionContext.Result}"); } - // Fixup the step result if ContinueOnError. + // Fixup the step result if ContinueOnError. if (step.ExecutionContext.Result == TaskResult.Failed && step.ContinueOnError) { step.ExecutionContext.Result = TaskResult.SucceededWithIssues; diff --git a/src/Agent.Worker/Worker.cs b/src/Agent.Worker/Worker.cs index 48168779ec..4c5c615c9a 100644 --- a/src/Agent.Worker/Worker.cs +++ b/src/Agent.Worker/Worker.cs @@ -114,6 +114,7 @@ public async Task RunAsync(string pipeIn, string pipeOut) { case MessageType.CancelRequest: Trace.Info("Job cancellation request received - initiating graceful job termination"); + cancel = true; jobRequestCancellationToken.Cancel(); // Expire the host cancellation token. break; case MessageType.AgentShutdown: @@ -132,10 +133,6 @@ public async Task RunAsync(string pipeIn, string pipeOut) jobRunner.UpdateMetadata(metadataMessage); Trace.Info("Job metadata update processed successfully"); break; - case MessageType.FlushLogsRequest: - Trace.Info("FlushLogsRequest received in main message loop"); - HostContext.ShutdownWorkerForTimeout(); - break; default: throw new ArgumentOutOfRangeException(nameof(channelMessage.MessageType), channelMessage.MessageType, nameof(channelMessage.MessageType)); } diff --git a/src/Microsoft.VisualStudio.Services.Agent/HostContext.cs b/src/Microsoft.VisualStudio.Services.Agent/HostContext.cs index 5c561c9dab..2e666d0183 100644 --- a/src/Microsoft.VisualStudio.Services.Agent/HostContext.cs +++ b/src/Microsoft.VisualStudio.Services.Agent/HostContext.cs @@ -46,9 +46,6 @@ public interface IHostContext : IDisposable, IKnobValueContext void WritePerfCounter(string counter); void EnableHttpTrace(); ContainerInfo CreateContainerInfo(Pipelines.ContainerResource container, Boolean isJobContainer = true); - // Added for flush logs support - CancellationToken WorkerShutdownForTimeout { get; } - void ShutdownWorkerForTimeout(); } public enum StartupType @@ -77,7 +74,6 @@ public class HostContext : EventListener, IObserver, IObserv private ILoggedSecretMasker _secretMasker; private readonly ProductInfoHeaderValue _userAgent = new ProductInfoHeaderValue($"VstsAgentCore-{BuildConstants.AgentPackage.PackageName}", BuildConstants.AgentPackage.Version); private CancellationTokenSource _agentShutdownTokenSource = new CancellationTokenSource(); - private CancellationTokenSource _workerShutdownForTimeout = new CancellationTokenSource(); private object _perfLock = new object(); private Tracing _trace; private Tracing _vssTrace; @@ -91,8 +87,6 @@ public class HostContext : EventListener, IObserver, IObserv private HostType _hostType; public event EventHandler Unloading; public CancellationToken AgentShutdownToken => _agentShutdownTokenSource.Token; - public CancellationToken WorkerShutdownForTimeout => _workerShutdownForTimeout.Token; - public ShutdownReason AgentShutdownReason { get; private set; } public ILoggedSecretMasker SecretMasker => _secretMasker; public ProductInfoHeaderValue UserAgent => _userAgent; @@ -571,12 +565,6 @@ public void ShutdownAgent(ShutdownReason reason) AgentShutdownReason = reason; _agentShutdownTokenSource.Cancel(); } - - public void ShutdownWorkerForTimeout() - { - _trace.Info($"Worker will be shutdown"); - _workerShutdownForTimeout.Cancel(); - } public ContainerInfo CreateContainerInfo(Pipelines.ContainerResource container, Boolean isJobContainer = true) { @@ -704,9 +692,6 @@ protected virtual void Dispose(bool disposing) _agentShutdownTokenSource?.Dispose(); _agentShutdownTokenSource = null; - _workerShutdownForTimeout?.Dispose(); - _workerShutdownForTimeout = null; - base.Dispose(); } } diff --git a/src/Microsoft.VisualStudio.Services.Agent/HostTraceListener.cs b/src/Microsoft.VisualStudio.Services.Agent/HostTraceListener.cs index bb32475aac..3e2a9b58b3 100644 --- a/src/Microsoft.VisualStudio.Services.Agent/HostTraceListener.cs +++ b/src/Microsoft.VisualStudio.Services.Agent/HostTraceListener.cs @@ -23,7 +23,6 @@ public sealed class HostTraceListener : TextWriterTraceListener private int _retentionDays; private bool _diagErrorDetected = false; private string _logFilePath; - private bool _disposed = false; public HostTraceListener(string logFileDirectory, string logFilePrefix, int pageSizeLimit, int retentionDays) : base() @@ -57,8 +56,8 @@ public HostTraceListener(string logFile) ArgUtil.NotNullOrEmpty(logFile, nameof(logFile)); _logFilePath = logFile; Directory.CreateDirectory(Path.GetDirectoryName(_logFilePath)); - // Use StreamWriter constructor that handles FileStream internally - Writer = new StreamWriter(_logFilePath, append: false, Encoding.UTF8, bufferSize: 4096); + Stream logStream = new FileStream(_logFilePath, FileMode.Create, FileAccess.ReadWrite, FileShare.Read, bufferSize: 4096); + Writer = new StreamWriter(logStream); } // Copied and modified slightly from .Net Core source code. Modification was required to make it compile. @@ -91,17 +90,14 @@ public override void WriteLine(string message) if (_currentPageSize > _pageSizeLimit) { Flush(); - if (Writer != null && !_disposed) + if (Writer != null) { Writer.Dispose(); Writer = null; } - if (!_disposed) - { - Writer = CreatePageLogWriter(); - _currentPageSize = 0; - } + Writer = CreatePageLogWriter(); + _currentPageSize = 0; } } @@ -202,31 +198,17 @@ private StreamWriter CreatePageLogWriter() string fileName = StringUtil.Format(_logFileNamingPattern, _logFilePrefix, DateTime.UtcNow); _logFilePath = Path.Combine(_logFileDirectory, fileName); - - // Use StreamWriter constructor that handles FileStream internally - // This eliminates the dual resource management issue + Stream logStream; if (File.Exists(_logFilePath)) { - return new StreamWriter(_logFilePath, append: true, Encoding.UTF8, bufferSize: 4096); + logStream = new FileStream(_logFilePath, FileMode.Append, FileAccess.Write, FileShare.Read, bufferSize: 4096); } else { - return new StreamWriter(_logFilePath, append: false, Encoding.UTF8, bufferSize: 4096); + logStream = new FileStream(_logFilePath, FileMode.Create, FileAccess.ReadWrite, FileShare.Read, bufferSize: 4096); } - } - protected override void Dispose(bool disposing) - { - if (!_disposed && disposing) - { - _disposed = true; - - // Safely dispose the current writer if it exists - // No exception handling needed - we control the state - Writer?.Dispose(); - Writer = null; - } - base.Dispose(disposing); + return new StreamWriter(logStream); } } } \ No newline at end of file diff --git a/src/Microsoft.VisualStudio.Services.Agent/Logging.cs b/src/Microsoft.VisualStudio.Services.Agent/Logging.cs index 045c630fbb..5af4f6e8b9 100644 --- a/src/Microsoft.VisualStudio.Services.Agent/Logging.cs +++ b/src/Microsoft.VisualStudio.Services.Agent/Logging.cs @@ -29,6 +29,7 @@ public class PagingLogger : AgentService, IPagingLogger, IDisposable private Guid _timelineId; private Guid _timelineRecordId; private string _pageId; + private FileStream _pageData; private StreamWriter _pageWriter; private int _byteCount; private int _pageCount; @@ -108,12 +109,7 @@ public void Write(string message) public void End() { - // Prevent multiple disposal attempts - only call EndPage if writer still exists - // This is important because both End() and Dispose() can be called during cleanup - if (_pageWriter != null) - { - EndPage(); - } + EndPage(); } private void Create() @@ -126,35 +122,21 @@ private void NewPage() EndPage(); _byteCount = 0; _dataFileName = Path.Combine(_pagesFolder, $"{_pageId}_{++_pageCount}.log"); - // Create StreamWriter directly with file path - it will handle the FileStream internally - _pageWriter = new StreamWriter(_dataFileName, append: false, System.Text.Encoding.UTF8); + _pageData = new FileStream(_dataFileName, FileMode.CreateNew); + _pageWriter = new StreamWriter(_pageData, System.Text.Encoding.UTF8); } private void EndPage() { if (_pageWriter != null) { - // StreamWriter manages the underlying file handle across all platforms - // This avoids platform-specific disposal timing issues (like "Bad file descriptor" on macOS) - try - { - _pageWriter.Flush(); - } - catch (ObjectDisposedException) - { - // StreamWriter was already disposed - this is safe to ignore - // Can happen during shutdown or cleanup scenarios - } - catch (IOException) - { - // File handle may be invalid (e.g., "Bad file descriptor" on POSIX systems) - // This can happen if the underlying file was closed externally - // Safe to ignore as we're disposing anyway - } - + _pageWriter.Flush(); + _pageData.Flush(); + //The StreamWriter object calls Dispose() on the provided Stream object when StreamWriter.Dispose is called. _pageWriter.Dispose(); _pageWriter = null; - + _pageData.Dispose(); + _pageData = null; _jobServerQueue.QueueFileUpload(_timelineId, _timelineRecordId, "DistributedTask.Core.Log", "CustomToolLog", _dataFileName, true); } } @@ -166,10 +148,8 @@ public void Dispose() protected virtual void Dispose(bool disposing) { - if (disposing && _pageWriter != null) + if (disposing) { - // Only call EndPage if we haven't already disposed the writer - // This prevents double-disposal which causes "Bad file descriptor" on macOS/Linux EndPage(); } } diff --git a/src/Microsoft.VisualStudio.Services.Agent/ProcessChannel.cs b/src/Microsoft.VisualStudio.Services.Agent/ProcessChannel.cs index f6251938eb..7b308e5aea 100644 --- a/src/Microsoft.VisualStudio.Services.Agent/ProcessChannel.cs +++ b/src/Microsoft.VisualStudio.Services.Agent/ProcessChannel.cs @@ -22,7 +22,6 @@ public enum MessageType AgentShutdown = 3, OperatingSystemShutdown = 4, JobMetadataUpdate = 5, - FlushLogsRequest = 9999, } public struct WorkerMessage diff --git a/src/Microsoft.VisualStudio.Services.Agent/TraceManager.cs b/src/Microsoft.VisualStudio.Services.Agent/TraceManager.cs index f80061a34c..8f0709a19f 100644 --- a/src/Microsoft.VisualStudio.Services.Agent/TraceManager.cs +++ b/src/Microsoft.VisualStudio.Services.Agent/TraceManager.cs @@ -103,9 +103,6 @@ private void Dispose(bool disposing) } _sources.Clear(); - - // Dispose the HostTraceListener to prevent "Bad file descriptor" errors on POSIX systems - _hostTraceListener?.Dispose(); } private ITracingProxy CreateTracingProxy(string name) diff --git a/src/Microsoft.VisualStudio.Services.Agent/TracingProxy.cs b/src/Microsoft.VisualStudio.Services.Agent/TracingProxy.cs index 7d69bce13a..28d7c085a7 100644 --- a/src/Microsoft.VisualStudio.Services.Agent/TracingProxy.cs +++ b/src/Microsoft.VisualStudio.Services.Agent/TracingProxy.cs @@ -60,6 +60,7 @@ public override void Info(string message, [CallerMemberName] string operation = var inner = _inner; if (inner is null) { + base.Info(message, operation); return; } inner.Info(message, operation); @@ -70,6 +71,7 @@ public override void Info(object item, [CallerMemberName] string operation = "") var inner = _inner; if (inner is null) { + base.Info(item, operation); return; } inner.Info(item, operation); @@ -80,6 +82,7 @@ public override void Error(Exception exception, [CallerMemberName] string operat var inner = _inner; if (inner is null) { + base.Error(exception, operation); return; } inner.Error(exception, operation); @@ -90,6 +93,7 @@ public override void Error(string message, [CallerMemberName] string operation = var inner = _inner; if (inner is null) { + base.Error(message, operation); return; } inner.Error(message, operation); @@ -100,6 +104,7 @@ public override void Warning(string message, [CallerMemberName] string operation var inner = _inner; if (inner is null) { + base.Warning(message, operation); return; } inner.Warning(message, operation); @@ -110,6 +115,7 @@ public override void Verbose(string message, [CallerMemberName] string operation var inner = _inner; if (inner is null) { + base.Verbose(message, operation); return; } inner.Verbose(message, operation); @@ -120,6 +126,7 @@ public override void Verbose(object item, [CallerMemberName] string operation = var inner = _inner; if (inner is null) { + base.Verbose(item, operation); return; } inner.Verbose(item, operation); @@ -130,6 +137,7 @@ public override void Entering([CallerMemberName] string name = "") var inner = _inner; if (inner is null) { + base.Entering(name); return; } inner.Entering(name); @@ -140,6 +148,7 @@ public override void Leaving([CallerMemberName] string name = "") var inner = _inner; if (inner is null) { + base.Leaving(name); return; } inner.Leaving(name); @@ -150,7 +159,7 @@ public override IDisposable EnteringWithDuration([CallerMemberName] string name var inner = _inner; if (inner is null) { - return new NoOpDisposable(); + return base.EnteringWithDuration(name); } return inner.EnteringWithDuration(name); } @@ -161,11 +170,4 @@ protected override void Dispose(bool disposing) base.Dispose(disposing); } } - - internal sealed class NoOpDisposable : IDisposable - { - public void Dispose() - { - } - } } diff --git a/src/Test/L0/Container/DockerCommandManagerL0.cs b/src/Test/L0/Container/DockerCommandManagerL0.cs index 499152ca55..059a50c88d 100644 --- a/src/Test/L0/Container/DockerCommandManagerL0.cs +++ b/src/Test/L0/Container/DockerCommandManagerL0.cs @@ -8,7 +8,6 @@ using Moq; using System; using System.Collections.Generic; -using System.IO; using System.Threading; using System.Threading.Tasks; using Xunit; @@ -22,7 +21,6 @@ public sealed class DockerCommandManagerL0 private readonly Mock _ec; private readonly Mock _configurationStore; private readonly Mock _jobServerQueue; - private readonly Mock _hostContext; public DockerCommandManagerL0() { @@ -30,31 +28,12 @@ public DockerCommandManagerL0() _ec = new Mock(); _configurationStore = new Mock(); _jobServerQueue = new Mock(); - _hostContext = new Mock(); - - // Setup basic host context functionality - _hostContext.Setup(x => x.GetTrace(It.IsAny())).Returns((Tracing)null); // Setup basic configuration store mocks _configurationStore.Setup(x => x.IsConfigured()).Returns(true); _configurationStore.Setup(x => x.GetSettings()).Returns(new AgentSettings()); } - private bool IsDockerAvailable() - { - // Check if Docker is available - try - { - WhichUtil.Which("docker", true); - return true; - } - catch (FileNotFoundException) - { - // Docker not available - return false; - } - } - private DockerCommandManager CreateDockerCommandManager() { var dockerManager = new DockerCommandManager(); @@ -131,7 +110,6 @@ private void SetupEnvironmentVariables(string dockerActionRetries, string checkB [Trait("SkipOn", "darwin")] public async Task DockerStart_WithCheckBeforeRetryFalse_UsesStandardRetryLogic() { - if (!IsDockerAvailable()) return; // Arrange var containerId = "test-container-id"; var exitCode = 0; @@ -187,8 +165,6 @@ public async Task DockerStart_WithCheckBeforeRetryFalse_UsesStandardRetryLogic() [Trait("SkipOn", "darwin")] public async Task DockerStart_WithCheckBeforeRetryTrue_ContainerAlreadyRunning_ReturnsSuccess() { - if (!IsDockerAvailable()) return; - // Arrange var containerId = "test-container-id"; @@ -245,8 +221,6 @@ public async Task DockerStart_WithCheckBeforeRetryTrue_ContainerAlreadyRunning_R [Trait("SkipOn", "darwin")] public async Task DockerStart_WithCheckBeforeRetryTrue_StartSucceedsFirstAttempt_ReturnsSuccess() { - if (!IsDockerAvailable()) return; - // Arrange var containerId = "test-container-id"; @@ -294,8 +268,6 @@ public async Task DockerStart_WithCheckBeforeRetryTrue_StartSucceedsFirstAttempt [Trait("SkipOn", "darwin")] public async Task DockerStart_WithCheckBeforeRetryTrue_AllRetriesFail_ReturnsFailure() { - if (!IsDockerAvailable()) return; - // Arrange var containerId = "test-container-id"; @@ -355,7 +327,6 @@ public async Task DockerStart_WithCheckBeforeRetryTrue_AllRetriesFail_ReturnsFai [Trait("SkipOn", "darwin")] public async Task DockerStart_WithCheckBeforeRetryTrue_NoRetriesEnabled_FailsImmediately() { - if (!IsDockerAvailable()) return; // Arrange var containerId = "test-container-id"; @@ -415,8 +386,6 @@ public async Task DockerStart_WithCheckBeforeRetryTrue_NoRetriesEnabled_FailsImm [Trait("SkipOn", "darwin")] public async Task DockerStart_WithCheckBeforeRetryTrue_RetriesWithBackoff() { - if (!IsDockerAvailable()) return; - // Arrange var containerId = "test-container-id"; diff --git a/src/Test/L0/EnhancedTracingL0.cs b/src/Test/L0/EnhancedTracingL0.cs index 723949ea11..8201b3aa52 100644 --- a/src/Test/L0/EnhancedTracingL0.cs +++ b/src/Test/L0/EnhancedTracingL0.cs @@ -43,5 +43,27 @@ private static string ReadAll(string path) Task.Delay(25).Wait(); return File.Exists(path) ? File.ReadAllText(path) : string.Empty; } + + [Fact] + [Trait("Level", "L0")] + [Trait("Category", "Tracing")] + public void Correlation_Is_Formatted_In_Enhanced_Log() + { + var (mgr, path, trace, masker, listener) = Create("CorrFmt"); + try + { + trace.Info("hello world", operation: "Op1"); + } + finally + { + mgr.Dispose(); + listener.Dispose(); + } + + var content = ReadAll(path); + // Depending on implementation, correlation may or may not be present here. + Assert.Contains("[Op1]", content); + Assert.Contains("hello world", content); + } } } diff --git a/src/Test/L0/Listener/JobDispatcherL0.cs b/src/Test/L0/Listener/JobDispatcherL0.cs index bd716381fd..0393448c02 100644 --- a/src/Test/L0/Listener/JobDispatcherL0.cs +++ b/src/Test/L0/Listener/JobDispatcherL0.cs @@ -507,84 +507,5 @@ public async void DispatchesOneTimeJobRequest() Assert.True(jobDispatcher.RunOnceJobCompleted.Task.Result, "JobDispatcher should set task complete token to 'TRUE' for one time agent."); } } - - [Fact] - [Trait("Level", "L0")] - [Trait("Category", "Agent")] - public async Task HandleWorkerTimeoutAsync_AlwaysSendsFlushLogsRequest() - { - using (TestHostContext hc = new TestHostContext(this)) - { - // Arrange - var jobDispatcher = new JobDispatcher(); - _configurationStore.Setup(x => x.GetSettings()).Returns(new AgentSettings() { PoolId = 1 }); - hc.SetSingleton(_configurationStore.Object); - hc.SetSingleton(_agentServer.Object); - hc.SetSingleton(_processInvoker.Object); - hc.SetSingleton(_processChannel.Object); - hc.SetSingleton(_featureFlagProvider.Object); - - jobDispatcher.Initialize(hc); - - var message = CreateJobRequestMessage(); - var workerProcessTask = Task.FromResult(0); - using var workerProcessCancelTokenSource = new CancellationTokenSource(); - using var workerCancelTimeoutTokenSource = new CancellationTokenSource(); - var workerCancelTimeoutKillToken = workerCancelTimeoutTokenSource.Token; - - _processChannel.Setup(x => x.SendAsync( - MessageType.FlushLogsRequest, - string.Empty, - It.IsAny())) - .Returns(Task.CompletedTask); - - // Use reflection to access the private HandleWorkerTimeoutAsync method - var method = typeof(JobDispatcher).GetMethod("HandleWorkerTimeoutAsync", - BindingFlags.NonPublic | BindingFlags.Instance); - Assert.NotNull(method); - - // Act - var task = (Task)method.Invoke(jobDispatcher, new object[] { - message.JobId, - _processChannel.Object, - workerProcessTask, - workerProcessCancelTokenSource, - workerCancelTimeoutKillToken - }); - - await task; - - // Assert - HandleWorkerTimeoutAsync always sends FlushLogsRequest - _processChannel.Verify(x => x.SendAsync( - MessageType.FlushLogsRequest, - string.Empty, - It.IsAny()), Times.Once); - } - } - - // Note: HandleWorkerTimeoutAsync always sends FlushLogsRequest when called. - // The timeout log flushing feature control happens at a higher level - // determining whether HandleWorkerTimeoutAsync is called at all. - - [Fact] - [Trait("Level", "L0")] - [Trait("Category", "Agent")] - public void JobDispatcher_HasHandleWorkerTimeoutAsyncMethod() - { - // Arrange & Act - var method = typeof(JobDispatcher).GetMethod("HandleWorkerTimeoutAsync", - BindingFlags.NonPublic | BindingFlags.Instance); - - // Assert - Verify that the timeout log flushing method exists - Assert.NotNull(method); - - var parameters = method.GetParameters(); - Assert.Equal(5, parameters.Length); - Assert.Equal("jobId", parameters[0].Name); - Assert.Equal("processChannel", parameters[1].Name); - Assert.Equal("workerProcessTask", parameters[2].Name); - Assert.Equal("workerProcessCancelTokenSource", parameters[3].Name); - Assert.Equal("workerCancelTimeoutKillToken", parameters[4].Name); - } } } diff --git a/src/Test/L0/TestHostContext.cs b/src/Test/L0/TestHostContext.cs index 2be5acaaed..d438c5bb5c 100644 --- a/src/Test/L0/TestHostContext.cs +++ b/src/Test/L0/TestHostContext.cs @@ -28,7 +28,6 @@ public sealed class TestHostContext : IHostContext, IDisposable private readonly Terminal _term; private readonly ILoggedSecretMasker _secretMasker; private CancellationTokenSource _agentShutdownTokenSource = new CancellationTokenSource(); - private CancellationTokenSource _workerShutdownForTimeoutTokenSource = new CancellationTokenSource(); private string _suiteName; private string _testName; private Tracing _trace; @@ -37,7 +36,6 @@ public sealed class TestHostContext : IHostContext, IDisposable private StartupType _startupType; public event EventHandler Unloading; public CancellationToken AgentShutdownToken => _agentShutdownTokenSource.Token; - public CancellationToken WorkerShutdownForTimeout => _workerShutdownForTimeoutTokenSource.Token; public ShutdownReason AgentShutdownReason { get; private set; } public ILoggedSecretMasker SecretMasker => _secretMasker; @@ -479,11 +477,6 @@ public void ShutdownAgent(ShutdownReason reason) _agentShutdownTokenSource.Cancel(); } - public void ShutdownWorkerForTimeout() - { - _workerShutdownForTimeoutTokenSource.Cancel(); - } - public void WritePerfCounter(string counter) { } @@ -496,8 +489,7 @@ public void EnableHttpTrace() string IKnobValueContext.GetVariableValueOrDefault(string variableName) { - // Return null for unknown variables to allow knob fallback to other sources - return null; + throw new NotSupportedException("Method not supported for Microsoft.VisualStudio.Services.Agent.Tests.TestHostContext"); } IScopedEnvironment IKnobValueContext.GetScopedEnvironment() @@ -525,7 +517,6 @@ private void Dispose(bool disposing) _trace?.Dispose(); _secretMasker?.Dispose(); _agentShutdownTokenSource?.Dispose(); - _workerShutdownForTimeoutTokenSource?.Dispose(); try { Directory.Delete(_tempDirectoryRoot); diff --git a/src/Test/L0/Worker/StepsRunnerL0.cs b/src/Test/L0/Worker/StepsRunnerL0.cs index 722576c479..91f9f1726a 100644 --- a/src/Test/L0/Worker/StepsRunnerL0.cs +++ b/src/Test/L0/Worker/StepsRunnerL0.cs @@ -14,7 +14,6 @@ using Microsoft.TeamFoundation.DistributedTask.Expressions; using Pipelines = Microsoft.TeamFoundation.DistributedTask.Pipelines; using Agent.Sdk; -using Agent.Sdk.Knob; namespace Microsoft.VisualStudio.Services.Agent.Tests.Worker { @@ -483,139 +482,5 @@ private string FormatSteps(IEnumerable> steps) x.Object.ContinueOnError, x.Object.Enabled))); } - - [Fact] - [Trait("Level", "L0")] - [Trait("Category", "Worker")] - public async Task RunAsync_WhenTimeoutLogFlushingEnabled_RegistersWorkerShutdownForTimeout() - { - using (TestHostContext hc = CreateTestContext()) - { - // Arrange - Set environment variable before creating context - Environment.SetEnvironmentVariable("AZP_ENABLE_TIMEOUT_LOG_FLUSHING", "true"); - - try - { - var step1 = CreateStep(TaskResult.Succeeded, ExpressionManager.Succeeded); - List steps = new List() { step1.Object }; - - // Mock timeout scenario by making the step run longer than timeout - step1.Setup(x => x.RunAsync()).Returns(async () => - { - // Simulate a long-running step that will timeout - await Task.Delay(100); // Reduced delay for test performance - return TaskResult.Succeeded; - }); - - _ec.Setup(x => x.CancellationToken).Returns(new System.Threading.CancellationToken()); - - // Act - await _stepsRunner.RunAsync(_ec.Object, steps); - - // Verify that timeout log flushing condition includes WorkerShutdownForTimeout - // This is tested indirectly by ensuring the environment variable was set correctly - // We just verify the test completed successfully since the StepsRunner would fail if knob access failed - Assert.True(true); // Test passes if no exception was thrown during execution - } - finally - { - // Cleanup - Environment.SetEnvironmentVariable("AZP_ENABLE_TIMEOUT_LOG_FLUSHING", null); - } - } - } - - [Fact] - [Trait("Level", "L0")] - [Trait("Category", "Worker")] - public async Task RunAsync_WhenTimeoutLogFlushingDisabled_DoesNotRegisterWorkerShutdownForTimeout() - { - using (TestHostContext hc = CreateTestContext()) - { - // Arrange - Set environment variable before creating context - Environment.SetEnvironmentVariable("AZP_ENABLE_TIMEOUT_LOG_FLUSHING", "false"); - - try - { - var step1 = CreateStep(TaskResult.Succeeded, ExpressionManager.Succeeded); - List steps = new List() { step1.Object }; - - _ec.Setup(x => x.CancellationToken).Returns(new System.Threading.CancellationToken()); - - // Act - await _stepsRunner.RunAsync(_ec.Object, steps); - - // Verify that timeout log flushing is disabled - // This is tested indirectly by ensuring the test completes successfully - Assert.True(true); // Test passes if no exception was thrown during execution - } - finally - { - // Cleanup - Environment.SetEnvironmentVariable("AZP_ENABLE_TIMEOUT_LOG_FLUSHING", null); - } - } - } - - [Fact] - [Trait("Level", "L0")] - [Trait("Category", "Worker")] - public async Task RunAsync_WhenTimeoutLogFlushingNotSet_DefaultsToDisabled() - { - using (TestHostContext hc = CreateTestContext()) - { - // Arrange - Environment.SetEnvironmentVariable("AZP_ENABLE_TIMEOUT_LOG_FLUSHING", null); - - try - { - var step1 = CreateStep(TaskResult.Succeeded, ExpressionManager.Succeeded); - List steps = new List() { step1.Object }; - - _ec.Setup(x => x.CancellationToken).Returns(new System.Threading.CancellationToken()); - - // Act - await _stepsRunner.RunAsync(_ec.Object, steps); - - // Verify that timeout log flushing defaults to disabled - // This is tested indirectly by ensuring the test completes successfully - Assert.True(true); // Test passes if no exception was thrown during execution - } - finally - { - // Ensure cleanup - Environment.SetEnvironmentVariable("AZP_ENABLE_TIMEOUT_LOG_FLUSHING", null); - } - } - } - - [Fact] - [Trait("Level", "L0")] - [Trait("Category", "Worker")] - public async Task WorkerShutdownForTimeout_WhenTriggered_SetsCorrectState() - { - using (TestHostContext hc = CreateTestContext()) - { - // Arrange - Environment.SetEnvironmentVariable("AZP_ENABLE_TIMEOUT_LOG_FLUSHING", "true"); - - var step1 = CreateStep(TaskResult.Succeeded, ExpressionManager.Succeeded); - List steps = new List() { step1.Object }; - - // Simulate WorkerShutdownForTimeout being triggered - hc.ShutdownWorkerForTimeout(); - - _ec.Setup(x => x.CancellationToken).Returns(new System.Threading.CancellationToken()); - - // Act - await _stepsRunner.RunAsync(_ec.Object, steps); - - // Verify that WorkerShutdownForTimeout was triggered - Assert.True(hc.WorkerShutdownForTimeout.IsCancellationRequested); - - // Cleanup - Environment.SetEnvironmentVariable("AZP_ENABLE_TIMEOUT_LOG_FLUSHING", null); - } - } } } diff --git a/src/Test/L0/Worker/WorkerL0.cs b/src/Test/L0/Worker/WorkerL0.cs index a343eab422..8cca55561d 100644 --- a/src/Test/L0/Worker/WorkerL0.cs +++ b/src/Test/L0/Worker/WorkerL0.cs @@ -153,16 +153,7 @@ public async void DispatchCancellation() var workerMessages = new Queue(arWorkerMessages); _processChannel.Setup(x => x.ReceiveAsync(It.IsAny())) - .Returns(() => - { - if (workerMessages.Count > 0) - { - return Task.FromResult(workerMessages.Dequeue()); - } - // Return a task that will never complete to avoid queue empty exception - var tcs = new TaskCompletionSource(); - return tcs.Task; - }); + .Returns(() => Task.FromResult(workerMessages.Dequeue())); _jobRunner.Setup(x => x.RunAsync(It.IsAny(), It.IsAny())) .Returns( async (Pipelines.AgentJobRequestMessage jm, CancellationToken ct) => @@ -378,272 +369,5 @@ private bool IsMessageIdentical(Pipelines.AgentJobRequestMessage source, Pipelin return true; } - - [Fact] - [Trait("Level", "L0")] - [Trait("Category", "Worker")] - public async void FlushLogsRequest_WhenFeatureEnabled_TriggersWorkerTimeout() - { - // Arrange - using (var hc = new TestHostContext(this)) - { - // Set the timeout log flushing feature flag environment variable - Environment.SetEnvironmentVariable("AZP_ENABLE_TIMEOUT_LOG_FLUSHING", "true"); - - var worker = new Agent.Worker.Worker(); - worker.Initialize(hc); - - hc.SetSingleton(_proxy.Object); - hc.SetSingleton(_cert.Object); - hc.EnqueueInstance(_processChannel.Object); - hc.EnqueueInstance(_jobRunner.Object); - - var jobMessage = CreateJobRequestMessage("job1"); - var callCount = 0; - var jobStarted = new TaskCompletionSource(); - - _processChannel.Setup(x => x.ReceiveAsync(It.IsAny())) - .Returns(async () => - { - callCount++; - if (callCount == 1) - { - // First call - return the job request - return new WorkerMessage - { - Body = JsonUtility.ToString(jobMessage), - MessageType = MessageType.NewJobRequest - }; - } - else if (callCount == 2) - { - // Second call - wait for job to start, then return FlushLogsRequest - await jobStarted.Task.ConfigureAwait(false); - await Task.Delay(50); // Give job a moment to start - return new WorkerMessage - { - Body = "", - MessageType = MessageType.FlushLogsRequest - }; - } - else - { - // Subsequent calls - return CancelRequest to avoid blocking - await Task.Delay(10); - return new WorkerMessage { MessageType = MessageType.CancelRequest, Body = "" }; - } - }); - - _jobRunner.Setup(x => x.RunAsync(It.IsAny(), It.IsAny())) - .Returns(async (Pipelines.AgentJobRequestMessage msg, CancellationToken ct) => - { - // Signal that the job has started - jobStarted.SetResult(true); - - // Run long enough to allow FlushLogsRequest to be processed - // Use a loop with cancellation token support to be more realistic - for (int i = 0; i < 100; i++) - { - if (ct.IsCancellationRequested || hc.WorkerShutdownForTimeout.IsCancellationRequested) - { - break; - } - await Task.Delay(50, CancellationToken.None); // Don't use ct to avoid cancellation race - } - - return TaskResult.Succeeded; - }); - - // Act - var result = await worker.RunAsync("pipeIn", "pipeOut"); - - // Assert - // When feature is enabled, worker should process FlushLogsRequest and complete normally - Assert.Equal(100, result); // TaskResult.Succeeded translates to return code 100 - - // Verify that ShutdownWorkerForTimeout was called by checking if the token is cancelled - Assert.True(hc.WorkerShutdownForTimeout.IsCancellationRequested); - - // Cleanup - Environment.SetEnvironmentVariable("AZP_ENABLE_TIMEOUT_LOG_FLUSHING", null); - } - } - - [Fact] - [Trait("Level", "L0")] - [Trait("Category", "Worker")] - public async void FlushLogsRequest_WhenFeatureDisabled_IgnoresRequest() - { - // Arrange - using (var hc = new TestHostContext(this)) - { - // Ensure the timeout log flushing feature flag environment variable is not set - Environment.SetEnvironmentVariable("AZP_ENABLE_TIMEOUT_LOG_FLUSHING", "false"); - - var worker = new Agent.Worker.Worker(); - worker.Initialize(hc); - - hc.SetSingleton(_proxy.Object); - hc.SetSingleton(_cert.Object); - hc.EnqueueInstance(_processChannel.Object); - hc.EnqueueInstance(_jobRunner.Object); - - var jobMessage = CreateJobRequestMessage("job1"); - var callCount = 0; - var jobStarted = new TaskCompletionSource(); - - _processChannel.Setup(x => x.ReceiveAsync(It.IsAny())) - .Returns(async () => - { - callCount++; - if (callCount == 1) - { - // First call - return the job request - return new WorkerMessage - { - Body = JsonUtility.ToString(jobMessage), - MessageType = MessageType.NewJobRequest - }; - } - else if (callCount == 2) - { - // Second call - wait for job to start, then return FlushLogsRequest - await jobStarted.Task.ConfigureAwait(false); - await Task.Delay(50); // Give job a moment to start - return new WorkerMessage - { - Body = "", - MessageType = MessageType.FlushLogsRequest - }; - } - else - { - // Subsequent calls - return CancelRequest to avoid blocking - await Task.Delay(10); - return new WorkerMessage { MessageType = MessageType.CancelRequest, Body = "" }; - } - }); - - _jobRunner.Setup(x => x.RunAsync(It.IsAny(), It.IsAny())) - .Returns(async (Pipelines.AgentJobRequestMessage jm, CancellationToken ct) => - { - // Signal that the job has started - jobStarted.SetResult(true); - - // Run long enough to allow FlushLogsRequest to be processed - // Use a loop with cancellation token support to be more realistic - for (int i = 0; i < 100; i++) - { - if (ct.IsCancellationRequested || hc.WorkerShutdownForTimeout.IsCancellationRequested) - { - break; - } - await Task.Delay(50, CancellationToken.None); // Don't use ct to avoid cancellation race - } - - return TaskResult.Succeeded; - }); - - // Act - var result = await worker.RunAsync("pipeIn", "pipeOut"); - - // Assert - // When feature is disabled, FlushLogsRequest still triggers worker shutdown (simplified implementation) - Assert.Equal(100, result); // TaskResult.Succeeded translates to return code 100 - - // Verify that ShutdownWorkerForTimeout was called (always called now regardless of feature flag) - Assert.True(hc.WorkerShutdownForTimeout.IsCancellationRequested); - - // Cleanup - Environment.SetEnvironmentVariable("AZP_ENABLE_TIMEOUT_LOG_FLUSHING", null); - } - } - - [Fact] - [Trait("Level", "L0")] - [Trait("Category", "Worker")] - public async void FlushLogsRequest_WhenFeatureNotSet_DefaultsToDisabled() - { - // Arrange - using (var hc = new TestHostContext(this)) - { - // Ensure the timeout log flushing feature flag environment variable is not set - Environment.SetEnvironmentVariable("AZP_ENABLE_TIMEOUT_LOG_FLUSHING", null); - - var worker = new Agent.Worker.Worker(); - worker.Initialize(hc); - - hc.SetSingleton(_proxy.Object); - hc.SetSingleton(_cert.Object); - hc.EnqueueInstance(_processChannel.Object); - hc.EnqueueInstance(_jobRunner.Object); - - var jobMessage = CreateJobRequestMessage("job1"); - var callCount = 0; - var jobStarted = new TaskCompletionSource(); - - _processChannel.Setup(x => x.ReceiveAsync(It.IsAny())) - .Returns(async () => - { - callCount++; - if (callCount == 1) - { - // First call - return the job request - return new WorkerMessage - { - Body = JsonUtility.ToString(jobMessage), - MessageType = MessageType.NewJobRequest - }; - } - else if (callCount == 2) - { - // Second call - wait for job to start, then return FlushLogsRequest - await jobStarted.Task.ConfigureAwait(false); - await Task.Delay(50); // Give job a moment to start - return new WorkerMessage - { - Body = "", - MessageType = MessageType.FlushLogsRequest - }; - } - else - { - // Subsequent calls - return CancelRequest to avoid blocking - await Task.Delay(10); - return new WorkerMessage { MessageType = MessageType.CancelRequest, Body = "" }; - } - }); - - _jobRunner.Setup(x => x.RunAsync(It.IsAny(), It.IsAny())) - .Returns(async (Pipelines.AgentJobRequestMessage jm, CancellationToken ct) => - { - // Signal that the job has started - jobStarted.SetResult(true); - - // Run long enough to allow FlushLogsRequest to be processed - // Use a loop with cancellation token support to be more realistic - for (int i = 0; i < 100; i++) - { - if (ct.IsCancellationRequested || hc.WorkerShutdownForTimeout.IsCancellationRequested) - { - break; - } - await Task.Delay(50, CancellationToken.None); // Don't use ct to avoid cancellation race - } - - return TaskResult.Succeeded; - }); - - // Act - var result = await worker.RunAsync("pipeIn", "pipeOut"); - - // Assert - // When feature is not set (defaults to disabled), FlushLogsRequest still triggers worker shutdown (simplified implementation) - Assert.Equal(100, result); // TaskResult.Succeeded translates to return code 100 - - // Verify that ShutdownWorkerForTimeout was called (always called now regardless of feature flag) - Assert.True(hc.WorkerShutdownForTimeout.IsCancellationRequested); - } - } } } diff --git a/src/Test/L1/Worker/TimeoutLogFlushingL1Tests.cs b/src/Test/L1/Worker/TimeoutLogFlushingL1Tests.cs deleted file mode 100644 index 98c951d762..0000000000 --- a/src/Test/L1/Worker/TimeoutLogFlushingL1Tests.cs +++ /dev/null @@ -1,240 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -using Microsoft.TeamFoundation.DistributedTask.WebApi; -using System; -using System.Runtime.InteropServices; -using System.Threading.Tasks; -using Xunit; - -namespace Microsoft.VisualStudio.Services.Agent.Tests.L1.Worker -{ - [Collection("Worker L1 Tests")] - public class TimeoutLogFlushingL1Tests : L1TestBase - { - [Fact] - [Trait("Level", "L1")] - [Trait("Category", "Worker")] - public async Task TestTimeoutLogFlushingEnabled_JobCompletesSuccessfully() - { - try - { - // Arrange - SetupL1(); - Environment.SetEnvironmentVariable("AZP_ENABLE_TIMEOUT_LOG_FLUSHING", "true"); - - var message = LoadTemplateMessage(); - message.Steps.Clear(); - - message.Steps.Add(CreateScriptTask("echo Testing timeout log flushing functionality")); - - // Act - var results = await RunWorker(message); - - // Assert - Assert.Equal(TaskResult.Succeeded, results.Result); - Assert.Equal(100, results.ReturnCode); - } - finally - { - Environment.SetEnvironmentVariable("AZP_ENABLE_TIMEOUT_LOG_FLUSHING", null); - } - } - - [Fact] - [Trait("Level", "L1")] - [Trait("Category", "Worker")] - public async Task TestTimeoutLogFlushingNotSet_DefaultsToDisabled() - { - try - { - // Arrange - SetupL1(); - Environment.SetEnvironmentVariable("AZP_ENABLE_TIMEOUT_LOG_FLUSHING", null); - - var message = LoadTemplateMessage(); - message.Steps.Clear(); - - message.Steps.Add(CreateScriptTask("echo Testing default timeout log flushing behavior")); - - // Act - var results = await RunWorker(message); - - // Assert - When timeout log flushing is not set, job should succeed normally - // This test verifies the default behavior when the environment variable is unset - Assert.Equal(TaskResult.Succeeded, results.Result); - Assert.False(results.TimedOut); - } - finally - { - Environment.SetEnvironmentVariable("AZP_ENABLE_TIMEOUT_LOG_FLUSHING", null); - } - } - - [Fact] - [Trait("Level", "L1")] - [Trait("Category", "Worker")] - public async Task TestTimeoutLogFlushingWithSingleStep_CompletesSuccessfully() - { - try - { - // Arrange - SetupL1(); - Environment.SetEnvironmentVariable("AZP_ENABLE_TIMEOUT_LOG_FLUSHING", "true"); - - var message = LoadTemplateMessage(); - message.Steps.Clear(); - - // Use cross-platform script task (works on Windows, macOS, and Linux) - message.Steps.Add(CreateScriptTask("echo Testing timeout log flushing with single step")); - - // Act - var results = await RunWorker(message); - - // Assert - Assert.Equal(TaskResult.Succeeded, results.Result); - Assert.Equal(100, results.ReturnCode); - } - finally - { - Environment.SetEnvironmentVariable("AZP_ENABLE_TIMEOUT_LOG_FLUSHING", null); - } - } - - [Fact] - [Trait("Level", "L1")] - [Trait("Category", "Worker")] - public async Task TestTimeoutLogFlushingEnvironmentVariableValues_HandlesVariousInputs() - { - var testCases = new[] { "true", "TRUE", "True", "1", "false", "FALSE", "False", "0", "" }; - - // Setup once before all test cases - SetupL1(); - - foreach (var testValue in testCases) - { - try - { - // Arrange - Environment.SetEnvironmentVariable("AZP_ENABLE_TIMEOUT_LOG_FLUSHING", testValue); - - var message = LoadTemplateMessage(); - message.Steps.Clear(); - - message.Steps.Add(CreateScriptTask($"echo \"Testing with env value: {testValue}\"")); - - // Act - var results = await RunWorker(message); - - // Assert - Assert.Equal(TaskResult.Succeeded, results.Result); - Assert.Equal(100, results.ReturnCode); - } - finally - { - Environment.SetEnvironmentVariable("AZP_ENABLE_TIMEOUT_LOG_FLUSHING", null); - } - } - } - - [Fact] - [Trait("Level", "L1")] - [Trait("Category", "Worker")] - [Trait("SkipOn", "darwin")] - public async Task TestTimeoutLogFlushingEnabled_JobTimesOutWithExpectedResult() - { - try - { - // Arrange - SetupL1(); - Environment.SetEnvironmentVariable("AZP_ENABLE_TIMEOUT_LOG_FLUSHING", "true"); - - // Set a very short job timeout (5 seconds) to force timeout - JobTimeout = TimeSpan.FromSeconds(5); - - var message = LoadTemplateMessage(); - message.Steps.Clear(); - - // Add a script task that runs longer than the timeout - // Use reliable commands that will definitely take more than 5 seconds - if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) - { - message.Steps.Add(CreateScriptTask("powershell -Command \"Start-Sleep -Seconds 10\"")); - } - else - { - if (RuntimeInformation.IsOSPlatform(OSPlatform.OSX)) - { - message.Steps.Add(CreateScriptTask("/bin/bash -c 'sleep 10'")); - } - else - { - message.Steps.Add(CreateScriptTask("/bin/sleep 10")); - } - } - - // Act - var results = await RunWorker(message); - - // Assert - Job should timeout and have TimedOut = true - Assert.True(results.TimedOut, "Job should have timed out"); - } - finally - { - Environment.SetEnvironmentVariable("AZP_ENABLE_TIMEOUT_LOG_FLUSHING", null); - // Reset JobTimeout to default - JobTimeout = TimeSpan.FromSeconds(100); - } - } - - [Fact] - [Trait("Level", "L1")] - [Trait("Category", "Worker")] - [Trait("SkipOn", "darwin")] - public async Task TestTimeoutLogFlushingDisabled_JobTimesOutWithExpectedResult() - { - try - { - // Arrange - SetupL1(); - Environment.SetEnvironmentVariable("AZP_ENABLE_TIMEOUT_LOG_FLUSHING", "false"); - - // Set a very short job timeout (5 seconds) to force timeout - JobTimeout = TimeSpan.FromSeconds(5); - - var message = LoadTemplateMessage(); - message.Steps.Clear(); - - // Add a script task that runs longer than the timeout (sleep for 10 seconds, timeout is 5 seconds) - if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) - { - message.Steps.Add(CreateScriptTask("powershell -Command \"Start-Sleep -Seconds 10\"")); - } - else - { - if (RuntimeInformation.IsOSPlatform(OSPlatform.OSX)) - { - message.Steps.Add(CreateScriptTask("/bin/bash -c 'sleep 10'")); - } - else - { - message.Steps.Add(CreateScriptTask("/bin/sleep 10")); - } - } - - // Act - var results = await RunWorker(message); - - // Assert - Job should timeout and have TimedOut = true - Assert.True(results.TimedOut, "Job should have timed out"); - - } - finally - { - Environment.SetEnvironmentVariable("AZP_ENABLE_TIMEOUT_LOG_FLUSHING", null); - // Reset JobTimeout to default - JobTimeout = TimeSpan.FromSeconds(100); - } - } - } -} \ No newline at end of file