Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/Agent.Listener/Agent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -371,13 +371,21 @@ private async Task InitializeRuntimeFeatures()
var enhancedLoggingFlag = await featureFlagProvider.GetFeatureFlagAsync(HostContext, "DistributedTask.Agent.UseEnhancedLogging", Trace);
bool enhancedLoggingEnabled = string.Equals(enhancedLoggingFlag?.EffectiveState, "On", StringComparison.OrdinalIgnoreCase);

// Check enhanced worker crash handling feature flag
var enhancedWorkerCrashHandlingFlag = await featureFlagProvider.GetFeatureFlagAsync(HostContext, "DistributedTask.Agent.EnhancedWorkerCrashHandling", Trace);
bool enhancedWorkerCrashHandlingEnabled = string.Equals(enhancedWorkerCrashHandlingFlag?.EffectiveState, "On", StringComparison.OrdinalIgnoreCase);

Trace.Info($"Enhanced logging feature flag is {(enhancedLoggingEnabled ? "enabled" : "disabled")}");
// Set the result on TraceManager - this automatically switches all trace sources
traceManager.SetEnhancedLoggingEnabled(enhancedLoggingEnabled);

// Ensure child processes (worker/plugin) pick up enhanced logging via knob
Environment.SetEnvironmentVariable("AZP_USE_ENHANCED_LOGGING", enhancedLoggingEnabled ? "true" : null);

Trace.Info($"Enhanced worker crash handling feature flag is {(enhancedWorkerCrashHandlingEnabled ? "enabled" : "disabled")}");
// Ensure child processes (worker/plugin) pick up enhanced crash handling via knob
Environment.SetEnvironmentVariable("AZP_ENHANCED_WORKER_CRASH_HANDLING", enhancedWorkerCrashHandlingEnabled ? "true" : null);

Trace.Info("Runtime features initialization completed successfully");
}
catch (Exception ex)
Expand Down
152 changes: 117 additions & 35 deletions src/Agent.Listener/JobDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,10 @@ await processChannel.SendAsync(
detailInfo = string.Join(Environment.NewLine, workerOutput);
Trace.Info($"Return code {returnCode} indicate worker encounter an unhandled exception or app crash, attach worker stdout/stderr to JobRequest result.");
await LogWorkerProcessUnhandledException(message, detailInfo, agentCertManager.SkipServerCertificateValidation);

// Publish worker crash telemetry for Kusto analysis
var telemetryPublisher = HostContext.GetService<IWorkerCrashTelemetryPublisher>();
await telemetryPublisher.PublishWorkerCrashTelemetryAsync(HostContext, message.JobId, returnCode);
}

TaskResult result = TaskResultUtil.TranslateFromReturnCode(returnCode);
Expand All @@ -641,8 +645,20 @@ await processChannel.SendAsync(
await renewJobRequest;

Trace.Info($"Job request completion initiated - Completing job request for job: {message.JobId}");
// complete job request
await CompleteJobRequestAsync(_poolId, message, lockToken, result, detailInfo);

if (ShouldUseEnhancedCrashHandling(message, returnCode))
{
// Direct plan event reporting for Plan v8+ worker crashes
await ReportJobCompletionEventAsync(message, result, agentCertManager.SkipServerCertificateValidation);
Trace.Info("Plan event reporting executed successfully for worker crash");
}
else
{
// Standard completion for Plan v7 or normal Plan v8+ scenarios, or when enhanced handling is disabled
await CompleteJobRequestAsync(_poolId, message, lockToken, result, detailInfo);
Trace.Info("Standard completion executed successfully");
}

Trace.Info("Job request completion completed");

// print out unhandled exception happened in worker after we complete job request.
Expand Down Expand Up @@ -971,55 +987,121 @@ private async Task CompleteJobRequestAsync(int poolId, Pipelines.AgentJobRequest
throw new AggregateException(exceptions);
}

// log an error issue to job level timeline record
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Maintainability", "CA2000:Dispose objects before losing scope", MessageId = "jobServer")]
private async Task LogWorkerProcessUnhandledException(Pipelines.AgentJobRequestMessage message, string errorMessage, bool skipServerCertificateValidation = false)
// Determines if enhanced crash handling should be used for Plan v8+ worker crashes
private bool ShouldUseEnhancedCrashHandling(Pipelines.AgentJobRequestMessage message, int returnCode)
{
try
if (!AgentKnobs.EnhancedWorkerCrashHandling.GetValue(UtilKnobValueContext.Instance()).AsBoolean())
return false;

bool isPlanV8Plus = PlanUtil.GetFeatures(message.Plan).HasFlag(PlanFeatures.JobCompletedPlanEvent);
bool isWorkerCrash = !TaskResultUtil.IsValidReturnCode(returnCode);

return isPlanV8Plus && isWorkerCrash;
}

// Creates a job server connection with proper URL normalization for OnPremises servers
private async Task<VssConnection> CreateJobServerConnectionAsync(Pipelines.AgentJobRequestMessage message, bool skipServerCertificateValidation = false)
{
Trace.Info("Creating job server connection");

var systemConnection = message.Resources.Endpoints.SingleOrDefault(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection));
ArgUtil.NotNull(systemConnection, nameof(systemConnection));

var jobServer = HostContext.GetService<IJobServer>();
VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection);
Uri jobServerUrl = systemConnection.Url;

Trace.Verbose($"Initial connection details [JobId:{message.JobId}, OriginalUrl:{jobServerUrl}]");

// Make sure SystemConnection Url match Config Url base for OnPremises server
if (!message.Variables.ContainsKey(Constants.Variables.System.ServerType) ||
string.Equals(message.Variables[Constants.Variables.System.ServerType]?.Value, "OnPremises", StringComparison.OrdinalIgnoreCase))
{
var systemConnection = message.Resources.Endpoints.SingleOrDefault(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection));
ArgUtil.NotNull(systemConnection, nameof(systemConnection));
try
{
Uri urlResult = null;
Uri configUri = new Uri(_agentSetting.ServerUrl);
if (Uri.TryCreate(new Uri(configUri.GetComponents(UriComponents.SchemeAndServer, UriFormat.Unescaped)), jobServerUrl.PathAndQuery, out urlResult))
{
//replace the schema and host portion of messageUri with the host from the
//server URI (which was set at config time)
Trace.Info($"URL replacement for OnPremises server - Original: {jobServerUrl}, New: {urlResult}");
jobServerUrl = urlResult;
}
}
catch (InvalidOperationException ex)
{
Trace.Error(ex);
}
catch (UriFormatException ex)
{
Trace.Error(ex);
}
}

var jobConnection = VssUtil.CreateConnection(jobServerUrl, jobServerCredential, trace: Trace, skipServerCertificateValidation);
await jobServer.ConnectAsync(jobConnection);
Trace.Info($"Job server connection established successfully");

return jobConnection;
}

var jobServer = HostContext.GetService<IJobServer>();
VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection);
Uri jobServerUrl = systemConnection.Url;
// Reports job completion to server via plan event (similar to how worker reports)
// Used for Plan v8+ scenarios where listener needs to notify server of job completion
private async Task ReportJobCompletionEventAsync(Pipelines.AgentJobRequestMessage message, TaskResult result, bool skipServerCertificateValidation = false)
{
Trace.Info($"Plan event reporting initiated - Sending job completion event to server");

// Make sure SystemConnection Url match Config Url base for OnPremises server
if (!message.Variables.ContainsKey(Constants.Variables.System.ServerType) ||
string.Equals(message.Variables[Constants.Variables.System.ServerType]?.Value, "OnPremises", StringComparison.OrdinalIgnoreCase))
try
{
using (var jobConnection = await CreateJobServerConnectionAsync(message, skipServerCertificateValidation))
{
var jobServer = HostContext.GetService<IJobServer>();
// Create job completed event (similar to worker)
var jobCompletedEvent = new JobCompletedEvent(message.RequestId, message.JobId, result, false);
try
{
Uri result = null;
Uri configUri = new Uri(_agentSetting.ServerUrl);
if (Uri.TryCreate(new Uri(configUri.GetComponents(UriComponents.SchemeAndServer, UriFormat.Unescaped)), jobServerUrl.PathAndQuery, out result))
{
//replace the schema and host portion of messageUri with the host from the
//server URI (which was set at config time)
jobServerUrl = result;
}
await jobServer.RaisePlanEventAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, jobCompletedEvent, CancellationToken.None);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add a retry here? I do see we have a retry in Jobrunner for RaisePlanEventAsync

Trace.Info("Plan event reporting completed successfully");
}
catch (TaskOrchestrationPlanNotFoundException ex)
{
Trace.Error(ex);
}
catch (InvalidOperationException ex)
catch (TaskOrchestrationPlanSecurityException ex)
{
//cannot parse the Uri - not a fatal error
Trace.Error(ex);
}
catch (UriFormatException ex)
catch (Exception ex)
{
//cannot parse the Uri - not a fatal error
Trace.Error(ex);
}
}
}
catch (Exception ex)
{
Trace.Error("Critical error during plan event reporting setup");
Trace.Error(ex);
}
}

var jobConnection = VssUtil.CreateConnection(jobServerUrl, jobServerCredential, trace: Trace, skipServerCertificateValidation);
await jobServer.ConnectAsync(jobConnection);
var timeline = await jobServer.GetTimelineAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, CancellationToken.None);
ArgUtil.NotNull(timeline, nameof(timeline));
TimelineRecord jobRecord = timeline.Records.FirstOrDefault(x => x.Id == message.JobId && x.RecordType == "Job");
ArgUtil.NotNull(jobRecord, nameof(jobRecord));
jobRecord.ErrorCount++;
jobRecord.Issues.Add(new Issue() { Type = IssueType.Error, Message = errorMessage });
await jobServer.UpdateTimelineRecordsAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, new TimelineRecord[] { jobRecord }, CancellationToken.None);
// log an error issue to job level timeline record
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Maintainability", "CA2000:Dispose objects before losing scope", MessageId = "jobServer")]
private async Task LogWorkerProcessUnhandledException(Pipelines.AgentJobRequestMessage message, string errorMessage, bool skipServerCertificateValidation = false)
{
try
{
using (var jobConnection = await CreateJobServerConnectionAsync(message, skipServerCertificateValidation))
{
var jobServer = HostContext.GetService<IJobServer>();
var timeline = await jobServer.GetTimelineAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, CancellationToken.None);
ArgUtil.NotNull(timeline, nameof(timeline));
TimelineRecord jobRecord = timeline.Records.FirstOrDefault(x => x.Id == message.JobId && x.RecordType == "Job");
ArgUtil.NotNull(jobRecord, nameof(jobRecord));
jobRecord.ErrorCount++;
jobRecord.Issues.Add(new Issue() { Type = IssueType.Error, Message = errorMessage });
await jobServer.UpdateTimelineRecordsAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, new TimelineRecord[] { jobRecord }, CancellationToken.None);
}
}
catch (SocketException ex)
{
Expand Down
49 changes: 49 additions & 0 deletions src/Agent.Listener/Telemetry/WorkerCrashTelemetryPublisher.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.VisualStudio.Services.Agent.Util;
using Microsoft.TeamFoundation.DistributedTask.WebApi;
using Newtonsoft.Json;

namespace Microsoft.VisualStudio.Services.Agent.Listener.Telemetry
{
[ServiceLocator(Default = typeof(WorkerCrashTelemetryPublisher))]
public interface IWorkerCrashTelemetryPublisher : IAgentService
{
Task PublishWorkerCrashTelemetryAsync(IHostContext hostContext, Guid jobId, int exitCode);
}

public sealed class WorkerCrashTelemetryPublisher : AgentService, IWorkerCrashTelemetryPublisher
{
public async Task PublishWorkerCrashTelemetryAsync(IHostContext hostContext, Guid jobId, int exitCode)
{
try
{
var telemetryPublisher = hostContext.GetService<IAgenetListenerTelemetryPublisher>();

var telemetryData = new Dictionary<string, object>
{
["JobId"] = jobId.ToString(),
["ExitCode"] = exitCode.ToString()
};

var command = new Command("telemetry", "publish")
{
Data = JsonConvert.SerializeObject(telemetryData)
};
command.Properties.Add("area", "AzurePipelinesAgent");
command.Properties.Add("feature", "WorkerCrash");

await telemetryPublisher.PublishEvent(hostContext, command);
Trace.Info($"Published worker crash telemetry for job {jobId} with exit code {exitCode}");
}
catch (Exception ex)
{
Trace.Warning($"Failed to publish worker crash telemetry: {ex}");
}
}
}
}
6 changes: 6 additions & 0 deletions src/Agent.Sdk/Knob/AgentKnobs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,12 @@ public class AgentKnobs
new EnvironmentKnobSource("FAIL_JOB_WHEN_AGENT_DIES"),
new BuiltInDefaultKnobSource("false"));

public static readonly Knob EnhancedWorkerCrashHandling = new Knob(
nameof(EnhancedWorkerCrashHandling),
"If true, enables enhanced worker crash handling with forced completion for Plan v8+ scenarios where worker crashes cannot send completion events",
new EnvironmentKnobSource("AZP_ENHANCED_WORKER_CRASH_HANDLING"),
new BuiltInDefaultKnobSource("false"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this intended, not to have RuntimeKnobSource?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the Listener side, enabling RuntimeKnobSource is not possible once the listener has started. Instead, I will replace this mechanism with a server API call, removing the dependency on the Agent knob.

Copy link
Contributor Author

@raujaiswal raujaiswal Dec 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated with runtime control in Agent.cs file


public static readonly Knob AllowWorkDirectoryRepositories = new Knob(
nameof(AllowWorkDirectoryRepositories),
"Allows repositories to be checked out below work directory level on self hosted agents.",
Expand Down
Loading