Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 113 additions & 2 deletions src/Agent.Listener/JobDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,10 @@ await processChannel.SendAsync(
detailInfo = string.Join(Environment.NewLine, workerOutput);
Trace.Info($"Return code {returnCode} indicate worker encounter an unhandled exception or app crash, attach worker stdout/stderr to JobRequest result.");
await LogWorkerProcessUnhandledException(message, detailInfo, agentCertManager.SkipServerCertificateValidation);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we have sendEvent to server method here as well, as here only we are logging if worker is terminated due to unhandled exception

// Publish worker crash telemetry for Kusto analysis
var telemetryPublisher = HostContext.GetService<IWorkerCrashTelemetryPublisher>();
await telemetryPublisher.PublishWorkerCrashTelemetryAsync(HostContext, message.JobId, message.JobId, returnCode);
}

TaskResult result = TaskResultUtil.TranslateFromReturnCode(returnCode);
Expand All @@ -641,8 +645,40 @@ await processChannel.SendAsync(
await renewJobRequest;

Trace.Info($"Job request completion initiated - Completing job request for job: {message.JobId}");
// complete job request
await CompleteJobRequestAsync(_poolId, message, lockToken, result, detailInfo);

// Check if enhanced crash handling is enabled via agent knob
bool enhancedworkercrashhandlingenabled = AgentKnobs.EnhancedWorkerCrashHandling.GetValue(UtilKnobValueContext.Instance()).AsBoolean();
Trace.Info($"Enhanced worker crash handling enabled: {enhancedworkercrashhandlingenabled}");

if (enhancedworkercrashhandlingenabled)
{
bool isPlanV8Plus = PlanUtil.GetFeatures(message.Plan).HasFlag(PlanFeatures.JobCompletedPlanEvent);
bool isWorkerCrash = !TaskResultUtil.IsValidReturnCode(returnCode);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we rename this to worker failed to send status to server?
as in that case this can be extended to any events in future


Trace.Info($"Enhanced crash handling enabled - Normal completion crash analysis [JobId:{message.JobId}, PlanVersion:{message.Plan.Version}, IsPlanV8Plus:{isPlanV8Plus}, IsWorkerCrash:{isWorkerCrash}, ExitCode:{returnCode}, NeedsForcedCompletion:{isPlanV8Plus && isWorkerCrash}]");

if (isPlanV8Plus && isWorkerCrash)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if isPlanV8Plus = true and isWorkerCrash = false? did we test this behaviour?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we tested this behaviour, in that case worker itself has already reported to server and on the listener side it will call CompleteJobRequestAsync func where it checks, if plan v8 is enabled then it will skip reporting.

{
// Direct plan event reporting for Plan v8+ worker crashes
Trace.Warning($"Plan event reporting for Plan v8+ worker crash [JobId:{message.JobId}, PlanVersion:{message.Plan.Version}, ExitCode:{returnCode}, Result:{result}]");
await ReportJobCompletionEventAsync(message, result);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we not calling CompleteJobRequestAsync inside?
Are we completely changing the agent’s behavior to not call CompleteJobRequestAsync for V8?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot answer this question

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not call CompleteJobRequestAsync?
Because in Plan v8+ the server-side lifecycle is event-driven. The correct, supported path is RaisePlanEventAsync(JobCompletedPlanEvent). Using CompleteJobRequestAsync (legacy PATCH) in v8 bypasses the orchestration engine

Are we completely changing behavior for v8?
No. Normal v8 flow remains unchanged (Worker sends the event). We only add a crash-aware fallback in the Listener. The preferred fallback is plan-event (RaisePlanEventAsync).

Trace.Info("Plan event reporting executed successfully for worker crash");
}
else
{
// Standard completion for Plan v7 or normal Plan v8+ scenarios
Trace.Info($"Standard completion for normal scenario [JobId:{message.JobId}, PlanVersion:{message.Plan.Version}, ExitCode:{returnCode}, Result:{result}]");
await CompleteJobRequestAsync(_poolId, message, lockToken, result, detailInfo);
Trace.Info("Standard completion executed successfully");
}
}
else
{
// Original simple completion logic
Trace.Info($"Using previous completion logic [JobId:{message.JobId}, EnhancedHandling:Disabled]");
await CompleteJobRequestAsync(_poolId, message, lockToken, result, detailInfo);
}

Trace.Info("Job request completion completed");

// print out unhandled exception happened in worker after we complete job request.
Expand Down Expand Up @@ -971,6 +1007,81 @@ private async Task CompleteJobRequestAsync(int poolId, Pipelines.AgentJobRequest
throw new AggregateException(exceptions);
}

// Reports job completion to server via plan event (similar to how worker reports)
// Used for Plan v8+ scenarios where listener needs to notify server of job completion
private async Task ReportJobCompletionEventAsync(Pipelines.AgentJobRequestMessage message, TaskResult result)
{
Trace.Info($"Plan event reporting initiated - Sending job completion event to server [JobId:{message.JobId}, Result:{result}]");

try
{
var systemConnection = message.Resources.Endpoints.SingleOrDefault(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection));
ArgUtil.NotNull(systemConnection, nameof(systemConnection));

var jobServer = HostContext.GetService<IJobServer>();
VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection);
Uri jobServerUrl = systemConnection.Url;

// Make sure SystemConnection Url match Config Url base for OnPremises server
if (!message.Variables.ContainsKey(Constants.Variables.System.ServerType) ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this logic is similar to the logic in method LogWorkerProcessUnhandledException, could we please check if we can refactor this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for suggestion, refactored the part.

string.Equals(message.Variables[Constants.Variables.System.ServerType]?.Value, "OnPremises", StringComparison.OrdinalIgnoreCase))
{
try
{
Uri urlResult = null;
Uri configUri = new Uri(_agentSetting.ServerUrl);
if (Uri.TryCreate(new Uri(configUri.GetComponents(UriComponents.SchemeAndServer, UriFormat.Unescaped)), jobServerUrl.PathAndQuery, out urlResult))
{
//replace the schema and host portion of messageUri with the host from the
//server URI (which was set at config time)
Trace.Info($"URL replacement for OnPremises server - Original: {jobServerUrl}, New: {urlResult}");
jobServerUrl = urlResult;
}
}
catch (InvalidOperationException ex)
{
Trace.Error(ex);
}
catch (UriFormatException ex)
{
Trace.Error(ex);
}
}

using (var jobConnection = VssUtil.CreateConnection(jobServerUrl, jobServerCredential, trace: Trace, skipServerCertificateValidation: false))
{
await jobServer.ConnectAsync(jobConnection);
// Create job completed event (similar to worker)
var jobCompletedEvent = new JobCompletedEvent(message.RequestId, message.JobId, result, false);
try
{
await jobServer.RaisePlanEventAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, jobCompletedEvent, CancellationToken.None);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add a retry here? I do see we have a retry in Jobrunner for RaisePlanEventAsync

Trace.Info($"Plan event reporting completed successfully [JobId:{message.JobId}, Result:{result}]");
}
catch (TaskOrchestrationPlanNotFoundException ex)
{
Trace.Error($"TaskOrchestrationPlanNotFoundException during plan event reporting for job {message.JobId}");
Trace.Error(ex);
}
catch (TaskOrchestrationPlanSecurityException ex)
{
Trace.Error($"TaskOrchestrationPlanSecurityException during plan event reporting for job {message.JobId}");
Trace.Error(ex);
}
catch (Exception ex)
{
Trace.Error($"Exception during plan event reporting for job {message.JobId}: {ex.Message}");
Trace.Error(ex);
}
}
}
catch (Exception ex)
{
Trace.Error($"Critical error during plan event reporting setup for job {message.JobId}: {ex.Message}");
Trace.Error(ex);
}
}

// log an error issue to job level timeline record
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Maintainability", "CA2000:Dispose objects before losing scope", MessageId = "jobServer")]
private async Task LogWorkerProcessUnhandledException(Pipelines.AgentJobRequestMessage message, string errorMessage, bool skipServerCertificateValidation = false)
Expand Down
50 changes: 50 additions & 0 deletions src/Agent.Listener/Telemetry/WorkerCrashTelemetryPublisher.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.VisualStudio.Services.Agent.Util;
using Microsoft.TeamFoundation.DistributedTask.WebApi;
using Newtonsoft.Json;

namespace Microsoft.VisualStudio.Services.Agent.Listener.Telemetry
{
[ServiceLocator(Default = typeof(WorkerCrashTelemetryPublisher))]
public interface IWorkerCrashTelemetryPublisher : IAgentService
{
Task PublishWorkerCrashTelemetryAsync(IHostContext hostContext, Guid jobId, Guid? taskInstanceId, int exitCode);
}

public sealed class WorkerCrashTelemetryPublisher : AgentService, IWorkerCrashTelemetryPublisher
{
public async Task PublishWorkerCrashTelemetryAsync(IHostContext hostContext, Guid jobId, Guid? taskInstanceId, int exitCode)
{
try
{
var telemetryPublisher = hostContext.GetService<IAgenetListenerTelemetryPublisher>();

var telemetryData = new Dictionary<string, object>
{
["JobId"] = jobId.ToString(),
["TaskInstanceId"] = taskInstanceId?.ToString() ?? "N/A",
["ExitCode"] = exitCode.ToString()
};

var command = new Command("telemetry", "publish")
{
Data = JsonConvert.SerializeObject(telemetryData)
};
command.Properties.Add("area", "AzurePipelinesAgent");
command.Properties.Add("feature", "WorkerCrash");

await telemetryPublisher.PublishEvent(hostContext, command);
Trace.Info($"Published worker crash telemetry for job {jobId} with exit code {exitCode}");
}
catch (Exception ex)
{
Trace.Warning($"Failed to publish worker crash telemetry: {ex.Message}");
}
}
}
}
6 changes: 6 additions & 0 deletions src/Agent.Sdk/Knob/AgentKnobs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,12 @@ public class AgentKnobs
new EnvironmentKnobSource("FAIL_JOB_WHEN_AGENT_DIES"),
new BuiltInDefaultKnobSource("false"));

public static readonly Knob EnhancedWorkerCrashHandling = new Knob(
nameof(EnhancedWorkerCrashHandling),
"If true, enables enhanced worker crash handling with forced completion for Plan v8+ scenarios where worker crashes cannot send completion events",
new EnvironmentKnobSource("ENHANCED_WORKER_CRASH_HANDLING"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please convert this to runtime knob

new BuiltInDefaultKnobSource("false"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this intended, not to have RuntimeKnobSource?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the Listener side, enabling RuntimeKnobSource is not possible once the listener has started. Instead, I will replace this mechanism with a server API call, removing the dependency on the Agent knob.

Copy link
Contributor Author

@raujaiswal raujaiswal Dec 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated with runtime control in Agent.cs file


public static readonly Knob AllowWorkDirectoryRepositories = new Knob(
nameof(AllowWorkDirectoryRepositories),
"Allows repositories to be checked out below work directory level on self hosted agents.",
Expand Down
Loading