diff --git a/src/Agent.Listener/Agent.cs b/src/Agent.Listener/Agent.cs index d5c5afb375..8dc2f15ff0 100644 --- a/src/Agent.Listener/Agent.cs +++ b/src/Agent.Listener/Agent.cs @@ -371,6 +371,10 @@ private async Task InitializeRuntimeFeatures() var enhancedLoggingFlag = await featureFlagProvider.GetFeatureFlagAsync(HostContext, "DistributedTask.Agent.UseEnhancedLogging", Trace); bool enhancedLoggingEnabled = string.Equals(enhancedLoggingFlag?.EffectiveState, "On", StringComparison.OrdinalIgnoreCase); + // Check enhanced worker crash handling feature flag + var enhancedWorkerCrashHandlingFlag = await featureFlagProvider.GetFeatureFlagAsync(HostContext, "DistributedTask.Agent.EnhancedWorkerCrashHandling", Trace); + bool enhancedWorkerCrashHandlingEnabled = string.Equals(enhancedWorkerCrashHandlingFlag?.EffectiveState, "On", StringComparison.OrdinalIgnoreCase); + Trace.Info($"Enhanced logging feature flag is {(enhancedLoggingEnabled ? "enabled" : "disabled")}"); // Set the result on TraceManager - this automatically switches all trace sources traceManager.SetEnhancedLoggingEnabled(enhancedLoggingEnabled); @@ -378,6 +382,10 @@ private async Task InitializeRuntimeFeatures() // Ensure child processes (worker/plugin) pick up enhanced logging via knob Environment.SetEnvironmentVariable("AZP_USE_ENHANCED_LOGGING", enhancedLoggingEnabled ? "true" : null); + Trace.Info($"Enhanced worker crash handling feature flag is {(enhancedWorkerCrashHandlingEnabled ? "enabled" : "disabled")}"); + // Ensure child processes (worker/plugin) pick up enhanced crash handling via knob + Environment.SetEnvironmentVariable("AZP_ENHANCED_WORKER_CRASH_HANDLING", enhancedWorkerCrashHandlingEnabled ? "true" : null); + Trace.Info("Runtime features initialization completed successfully"); } catch (Exception ex) diff --git a/src/Agent.Listener/JobDispatcher.cs b/src/Agent.Listener/JobDispatcher.cs index b22acc3e6d..29f2c3d3ad 100644 --- a/src/Agent.Listener/JobDispatcher.cs +++ b/src/Agent.Listener/JobDispatcher.cs @@ -628,6 +628,10 @@ await processChannel.SendAsync( detailInfo = string.Join(Environment.NewLine, workerOutput); Trace.Info($"Return code {returnCode} indicate worker encounter an unhandled exception or app crash, attach worker stdout/stderr to JobRequest result."); await LogWorkerProcessUnhandledException(message, detailInfo, agentCertManager.SkipServerCertificateValidation); + + // Publish worker crash telemetry for Kusto analysis + var telemetryPublisher = HostContext.GetService(); + await telemetryPublisher.PublishWorkerCrashTelemetryAsync(HostContext, message.JobId, returnCode); } TaskResult result = TaskResultUtil.TranslateFromReturnCode(returnCode); @@ -641,8 +645,20 @@ await processChannel.SendAsync( await renewJobRequest; Trace.Info($"Job request completion initiated - Completing job request for job: {message.JobId}"); - // complete job request - await CompleteJobRequestAsync(_poolId, message, lockToken, result, detailInfo); + + if (ShouldUseEnhancedCrashHandling(message, returnCode)) + { + // Direct plan event reporting for Plan v8+ worker crashes + await ReportJobCompletionEventAsync(message, result, agentCertManager.SkipServerCertificateValidation); + Trace.Info("Plan event reporting executed successfully for worker crash"); + } + else + { + // Standard completion for Plan v7 or normal Plan v8+ scenarios, or when enhanced handling is disabled + await CompleteJobRequestAsync(_poolId, message, lockToken, result, detailInfo); + Trace.Info("Standard completion executed successfully"); + } + Trace.Info("Job request completion completed"); // print out unhandled exception happened in worker after we complete job request. @@ -971,55 +987,146 @@ private async Task CompleteJobRequestAsync(int poolId, Pipelines.AgentJobRequest throw new AggregateException(exceptions); } - // log an error issue to job level timeline record - [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Maintainability", "CA2000:Dispose objects before losing scope", MessageId = "jobServer")] - private async Task LogWorkerProcessUnhandledException(Pipelines.AgentJobRequestMessage message, string errorMessage, bool skipServerCertificateValidation = false) + // Determines if enhanced crash handling should be used for Plan v8+ worker crashes + private bool ShouldUseEnhancedCrashHandling(Pipelines.AgentJobRequestMessage message, int returnCode) { - try + if (!AgentKnobs.EnhancedWorkerCrashHandling.GetValue(UtilKnobValueContext.Instance()).AsBoolean()) + return false; + + bool isPlanV8Plus = PlanUtil.GetFeatures(message.Plan).HasFlag(PlanFeatures.JobCompletedPlanEvent); + bool isWorkerCrash = !TaskResultUtil.IsValidReturnCode(returnCode); + + return isPlanV8Plus && isWorkerCrash; + } + + // Creates a job server connection with proper URL normalization for OnPremises servers + private async Task CreateJobServerConnectionAsync(Pipelines.AgentJobRequestMessage message, bool skipServerCertificateValidation = false) + { + Trace.Info("Creating job server connection"); + + var systemConnection = message.Resources.Endpoints.SingleOrDefault(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection)); + ArgUtil.NotNull(systemConnection, nameof(systemConnection)); + + var jobServer = HostContext.GetService(); + VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection); + Uri jobServerUrl = systemConnection.Url; + + Trace.Verbose($"Initial connection details [JobId:{message.JobId}, OriginalUrl:{jobServerUrl}]"); + + // Make sure SystemConnection Url match Config Url base for OnPremises server + if (!message.Variables.ContainsKey(Constants.Variables.System.ServerType) || + string.Equals(message.Variables[Constants.Variables.System.ServerType]?.Value, "OnPremises", StringComparison.OrdinalIgnoreCase)) { - var systemConnection = message.Resources.Endpoints.SingleOrDefault(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection)); - ArgUtil.NotNull(systemConnection, nameof(systemConnection)); + try + { + Uri urlResult = null; + Uri configUri = new Uri(_agentSetting.ServerUrl); + if (Uri.TryCreate(new Uri(configUri.GetComponents(UriComponents.SchemeAndServer, UriFormat.Unescaped)), jobServerUrl.PathAndQuery, out urlResult)) + { + //replace the schema and host portion of messageUri with the host from the + //server URI (which was set at config time) + Trace.Info($"URL replacement for OnPremises server - Original: {jobServerUrl}, New: {urlResult}"); + jobServerUrl = urlResult; + } + } + catch (InvalidOperationException ex) + { + Trace.Error(ex); + } + catch (UriFormatException ex) + { + Trace.Error(ex); + } + } + + var jobConnection = VssUtil.CreateConnection(jobServerUrl, jobServerCredential, trace: Trace, skipServerCertificateValidation); + await jobServer.ConnectAsync(jobConnection); + Trace.Info($"Job server connection established successfully"); + + return jobConnection; + } - var jobServer = HostContext.GetService(); - VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection); - Uri jobServerUrl = systemConnection.Url; + // Reports job completion to server via plan event (similar to how worker reports) + // Used for Plan v8+ scenarios where listener needs to notify server of job completion + private async Task ReportJobCompletionEventAsync(Pipelines.AgentJobRequestMessage message, TaskResult result, bool skipServerCertificateValidation = false) + { + Trace.Info($"Plan event reporting initiated - Sending job completion event to server"); - // Make sure SystemConnection Url match Config Url base for OnPremises server - if (!message.Variables.ContainsKey(Constants.Variables.System.ServerType) || - string.Equals(message.Variables[Constants.Variables.System.ServerType]?.Value, "OnPremises", StringComparison.OrdinalIgnoreCase)) + try + { + using (var jobConnection = await CreateJobServerConnectionAsync(message, skipServerCertificateValidation)) { - try + var jobServer = HostContext.GetService(); + // Create job completed event (similar to worker) + var jobCompletedEvent = new JobCompletedEvent(message.RequestId, message.JobId, result); + + // Send plan event with retry logic (similar to worker pattern) + int retryLimit = 5; + var exceptions = new List(); + + while (retryLimit-- > 0) { - Uri result = null; - Uri configUri = new Uri(_agentSetting.ServerUrl); - if (Uri.TryCreate(new Uri(configUri.GetComponents(UriComponents.SchemeAndServer, UriFormat.Unescaped)), jobServerUrl.PathAndQuery, out result)) + try { - //replace the schema and host portion of messageUri with the host from the - //server URI (which was set at config time) - jobServerUrl = result; + await jobServer.RaisePlanEventAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, jobCompletedEvent, CancellationToken.None); + Trace.Info($"Plan event reporting completed successfully [JobId:{message.JobId}, Result:{result}]"); + return; } + catch (TaskOrchestrationPlanNotFoundException ex) + { + Trace.Error($"TaskOrchestrationPlanNotFoundException during plan event reporting for job {message.JobId}"); + Trace.Error(ex); + return; // No point retrying + } + catch (TaskOrchestrationPlanSecurityException ex) + { + Trace.Error($"TaskOrchestrationPlanSecurityException during plan event reporting for job {message.JobId}"); + Trace.Error(ex); + return; // No point retrying + } + catch (Exception ex) + { + Trace.Error(ex); + exceptions.Add(ex); + } + + // delay 5 seconds before next retry + Trace.Info($"Plan event reporting retry delay - Waiting 5 seconds before retry {5 - retryLimit}/5"); + await Task.Delay(TimeSpan.FromSeconds(5)); } - catch (InvalidOperationException ex) - { - //cannot parse the Uri - not a fatal error - Trace.Error(ex); - } - catch (UriFormatException ex) + + // If we get here, all retries failed + Trace.Warning($"Plan event reporting failed after all retries [JobId:{message.JobId}, TotalExceptions:{exceptions.Count}]"); + foreach (var ex in exceptions) { - //cannot parse the Uri - not a fatal error Trace.Error(ex); } } + } + catch (Exception ex) + { + Trace.Error("Critical error during plan event reporting setup"); + Trace.Error(ex); + } + } - var jobConnection = VssUtil.CreateConnection(jobServerUrl, jobServerCredential, trace: Trace, skipServerCertificateValidation); - await jobServer.ConnectAsync(jobConnection); - var timeline = await jobServer.GetTimelineAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, CancellationToken.None); - ArgUtil.NotNull(timeline, nameof(timeline)); - TimelineRecord jobRecord = timeline.Records.FirstOrDefault(x => x.Id == message.JobId && x.RecordType == "Job"); - ArgUtil.NotNull(jobRecord, nameof(jobRecord)); - jobRecord.ErrorCount++; - jobRecord.Issues.Add(new Issue() { Type = IssueType.Error, Message = errorMessage }); - await jobServer.UpdateTimelineRecordsAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, new TimelineRecord[] { jobRecord }, CancellationToken.None); + // log an error issue to job level timeline record + [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Maintainability", "CA2000:Dispose objects before losing scope", MessageId = "jobServer")] + private async Task LogWorkerProcessUnhandledException(Pipelines.AgentJobRequestMessage message, string errorMessage, bool skipServerCertificateValidation = false) + { + try + { + using (var jobConnection = await CreateJobServerConnectionAsync(message, skipServerCertificateValidation)) + { + var jobServer = HostContext.GetService(); + var timeline = await jobServer.GetTimelineAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, CancellationToken.None); + ArgUtil.NotNull(timeline, nameof(timeline)); + TimelineRecord jobRecord = timeline.Records.FirstOrDefault(x => x.Id == message.JobId && x.RecordType == "Job"); + ArgUtil.NotNull(jobRecord, nameof(jobRecord)); + jobRecord.ErrorCount++; + jobRecord.Issues.Add(new Issue() { Type = IssueType.Error, Message = errorMessage }); + await jobServer.UpdateTimelineRecordsAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, new TimelineRecord[] { jobRecord }, CancellationToken.None); + } } catch (SocketException ex) { diff --git a/src/Agent.Listener/Telemetry/WorkerCrashTelemetryPublisher.cs b/src/Agent.Listener/Telemetry/WorkerCrashTelemetryPublisher.cs new file mode 100644 index 0000000000..21fc772e3c --- /dev/null +++ b/src/Agent.Listener/Telemetry/WorkerCrashTelemetryPublisher.cs @@ -0,0 +1,49 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Microsoft.VisualStudio.Services.Agent.Util; +using Microsoft.TeamFoundation.DistributedTask.WebApi; +using Newtonsoft.Json; + +namespace Microsoft.VisualStudio.Services.Agent.Listener.Telemetry +{ + [ServiceLocator(Default = typeof(WorkerCrashTelemetryPublisher))] + public interface IWorkerCrashTelemetryPublisher : IAgentService + { + Task PublishWorkerCrashTelemetryAsync(IHostContext hostContext, Guid jobId, int exitCode); + } + + public sealed class WorkerCrashTelemetryPublisher : AgentService, IWorkerCrashTelemetryPublisher + { + public async Task PublishWorkerCrashTelemetryAsync(IHostContext hostContext, Guid jobId, int exitCode) + { + try + { + var telemetryPublisher = hostContext.GetService(); + + var telemetryData = new Dictionary + { + ["JobId"] = jobId.ToString(), + ["ExitCode"] = exitCode.ToString() + }; + + var command = new Command("telemetry", "publish") + { + Data = JsonConvert.SerializeObject(telemetryData) + }; + command.Properties.Add("area", "AzurePipelinesAgent"); + command.Properties.Add("feature", "WorkerCrash"); + + await telemetryPublisher.PublishEvent(hostContext, command); + Trace.Info($"Published worker crash telemetry for job {jobId} with exit code {exitCode}"); + } + catch (Exception ex) + { + Trace.Warning($"Failed to publish worker crash telemetry: {ex}"); + } + } + } +} \ No newline at end of file diff --git a/src/Agent.Sdk/Knob/AgentKnobs.cs b/src/Agent.Sdk/Knob/AgentKnobs.cs index 70341346bb..a2d91b845b 100644 --- a/src/Agent.Sdk/Knob/AgentKnobs.cs +++ b/src/Agent.Sdk/Knob/AgentKnobs.cs @@ -715,6 +715,12 @@ public class AgentKnobs new EnvironmentKnobSource("FAIL_JOB_WHEN_AGENT_DIES"), new BuiltInDefaultKnobSource("false")); + public static readonly Knob EnhancedWorkerCrashHandling = new Knob( + nameof(EnhancedWorkerCrashHandling), + "If true, enables enhanced worker crash handling with forced completion for Plan v8+ scenarios where worker crashes cannot send completion events", + new EnvironmentKnobSource("AZP_ENHANCED_WORKER_CRASH_HANDLING"), + new BuiltInDefaultKnobSource("false")); + public static readonly Knob AllowWorkDirectoryRepositories = new Knob( nameof(AllowWorkDirectoryRepositories), "Allows repositories to be checked out below work directory level on self hosted agents.",