-
Notifications
You must be signed in to change notification settings - Fork 908
Enhanced worker crash handling with integrated crash telemetry #5412
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
40302fe
7a5bb52
c05c057
f2f4138
95aedec
f0a3af4
e18c663
425e29b
af10a40
1d3c339
1944d29
eedd2e4
ea914c3
50fe7d7
8e617cf
f2699ce
e67f638
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -628,6 +628,10 @@ await processChannel.SendAsync( | |
| detailInfo = string.Join(Environment.NewLine, workerOutput); | ||
| Trace.Info($"Return code {returnCode} indicate worker encounter an unhandled exception or app crash, attach worker stdout/stderr to JobRequest result."); | ||
| await LogWorkerProcessUnhandledException(message, detailInfo, agentCertManager.SkipServerCertificateValidation); | ||
|
|
||
| // Publish worker crash telemetry for Kusto analysis | ||
| var telemetryPublisher = HostContext.GetService<IWorkerCrashTelemetryPublisher>(); | ||
| await telemetryPublisher.PublishWorkerCrashTelemetryAsync(HostContext, message.JobId, returnCode); | ||
| } | ||
|
|
||
| TaskResult result = TaskResultUtil.TranslateFromReturnCode(returnCode); | ||
|
|
@@ -641,8 +645,20 @@ await processChannel.SendAsync( | |
| await renewJobRequest; | ||
|
|
||
| Trace.Info($"Job request completion initiated - Completing job request for job: {message.JobId}"); | ||
| // complete job request | ||
| await CompleteJobRequestAsync(_poolId, message, lockToken, result, detailInfo); | ||
|
|
||
| if (ShouldUseEnhancedCrashHandling(message, returnCode)) | ||
| { | ||
| // Direct plan event reporting for Plan v8+ worker crashes | ||
| await ReportJobCompletionEventAsync(message, result, agentCertManager.SkipServerCertificateValidation); | ||
| Trace.Info("Plan event reporting executed successfully for worker crash"); | ||
| } | ||
| else | ||
| { | ||
| // Standard completion for Plan v7 or normal Plan v8+ scenarios, or when enhanced handling is disabled | ||
| await CompleteJobRequestAsync(_poolId, message, lockToken, result, detailInfo); | ||
| Trace.Info("Standard completion executed successfully"); | ||
| } | ||
|
|
||
| Trace.Info("Job request completion completed"); | ||
|
|
||
| // print out unhandled exception happened in worker after we complete job request. | ||
|
|
@@ -971,55 +987,121 @@ private async Task CompleteJobRequestAsync(int poolId, Pipelines.AgentJobRequest | |
| throw new AggregateException(exceptions); | ||
| } | ||
|
|
||
| // log an error issue to job level timeline record | ||
| [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Maintainability", "CA2000:Dispose objects before losing scope", MessageId = "jobServer")] | ||
| private async Task LogWorkerProcessUnhandledException(Pipelines.AgentJobRequestMessage message, string errorMessage, bool skipServerCertificateValidation = false) | ||
| // Determines if enhanced crash handling should be used for Plan v8+ worker crashes | ||
| private bool ShouldUseEnhancedCrashHandling(Pipelines.AgentJobRequestMessage message, int returnCode) | ||
| { | ||
| try | ||
| if (!AgentKnobs.EnhancedWorkerCrashHandling.GetValue(UtilKnobValueContext.Instance()).AsBoolean()) | ||
| return false; | ||
|
|
||
| bool isPlanV8Plus = PlanUtil.GetFeatures(message.Plan).HasFlag(PlanFeatures.JobCompletedPlanEvent); | ||
| bool isWorkerCrash = !TaskResultUtil.IsValidReturnCode(returnCode); | ||
|
|
||
| return isPlanV8Plus && isWorkerCrash; | ||
| } | ||
|
|
||
| // Creates a job server connection with proper URL normalization for OnPremises servers | ||
| private async Task<VssConnection> CreateJobServerConnectionAsync(Pipelines.AgentJobRequestMessage message, bool skipServerCertificateValidation = false) | ||
| { | ||
| Trace.Info("Creating job server connection"); | ||
|
|
||
| var systemConnection = message.Resources.Endpoints.SingleOrDefault(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection)); | ||
| ArgUtil.NotNull(systemConnection, nameof(systemConnection)); | ||
|
|
||
| var jobServer = HostContext.GetService<IJobServer>(); | ||
| VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection); | ||
| Uri jobServerUrl = systemConnection.Url; | ||
|
|
||
| Trace.Verbose($"Initial connection details [JobId:{message.JobId}, OriginalUrl:{jobServerUrl}]"); | ||
|
|
||
| // Make sure SystemConnection Url match Config Url base for OnPremises server | ||
| if (!message.Variables.ContainsKey(Constants.Variables.System.ServerType) || | ||
| string.Equals(message.Variables[Constants.Variables.System.ServerType]?.Value, "OnPremises", StringComparison.OrdinalIgnoreCase)) | ||
| { | ||
| var systemConnection = message.Resources.Endpoints.SingleOrDefault(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection)); | ||
| ArgUtil.NotNull(systemConnection, nameof(systemConnection)); | ||
| try | ||
| { | ||
| Uri urlResult = null; | ||
| Uri configUri = new Uri(_agentSetting.ServerUrl); | ||
| if (Uri.TryCreate(new Uri(configUri.GetComponents(UriComponents.SchemeAndServer, UriFormat.Unescaped)), jobServerUrl.PathAndQuery, out urlResult)) | ||
| { | ||
| //replace the schema and host portion of messageUri with the host from the | ||
| //server URI (which was set at config time) | ||
| Trace.Info($"URL replacement for OnPremises server - Original: {jobServerUrl}, New: {urlResult}"); | ||
| jobServerUrl = urlResult; | ||
| } | ||
| } | ||
| catch (InvalidOperationException ex) | ||
| { | ||
| Trace.Error(ex); | ||
| } | ||
| catch (UriFormatException ex) | ||
| { | ||
| Trace.Error(ex); | ||
| } | ||
| } | ||
|
|
||
| var jobConnection = VssUtil.CreateConnection(jobServerUrl, jobServerCredential, trace: Trace, skipServerCertificateValidation); | ||
| await jobServer.ConnectAsync(jobConnection); | ||
| Trace.Info($"Job server connection established successfully"); | ||
|
|
||
| return jobConnection; | ||
| } | ||
|
|
||
| var jobServer = HostContext.GetService<IJobServer>(); | ||
| VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection); | ||
| Uri jobServerUrl = systemConnection.Url; | ||
| // Reports job completion to server via plan event (similar to how worker reports) | ||
| // Used for Plan v8+ scenarios where listener needs to notify server of job completion | ||
| private async Task ReportJobCompletionEventAsync(Pipelines.AgentJobRequestMessage message, TaskResult result, bool skipServerCertificateValidation = false) | ||
| { | ||
| Trace.Info($"Plan event reporting initiated - Sending job completion event to server"); | ||
|
|
||
| // Make sure SystemConnection Url match Config Url base for OnPremises server | ||
| if (!message.Variables.ContainsKey(Constants.Variables.System.ServerType) || | ||
| string.Equals(message.Variables[Constants.Variables.System.ServerType]?.Value, "OnPremises", StringComparison.OrdinalIgnoreCase)) | ||
| try | ||
| { | ||
| using (var jobConnection = await CreateJobServerConnectionAsync(message, skipServerCertificateValidation)) | ||
| { | ||
| var jobServer = HostContext.GetService<IJobServer>(); | ||
| // Create job completed event (similar to worker) | ||
| var jobCompletedEvent = new JobCompletedEvent(message.RequestId, message.JobId, result, false); | ||
| try | ||
| { | ||
| Uri result = null; | ||
| Uri configUri = new Uri(_agentSetting.ServerUrl); | ||
| if (Uri.TryCreate(new Uri(configUri.GetComponents(UriComponents.SchemeAndServer, UriFormat.Unescaped)), jobServerUrl.PathAndQuery, out result)) | ||
| { | ||
| //replace the schema and host portion of messageUri with the host from the | ||
| //server URI (which was set at config time) | ||
| jobServerUrl = result; | ||
| } | ||
| await jobServer.RaisePlanEventAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, jobCompletedEvent, CancellationToken.None); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we add a retry here? I do see we have a retry in Jobrunner for RaisePlanEventAsync |
||
| Trace.Info("Plan event reporting completed successfully"); | ||
| } | ||
| catch (TaskOrchestrationPlanNotFoundException ex) | ||
| { | ||
| Trace.Error(ex); | ||
| } | ||
| catch (InvalidOperationException ex) | ||
| catch (TaskOrchestrationPlanSecurityException ex) | ||
| { | ||
| //cannot parse the Uri - not a fatal error | ||
| Trace.Error(ex); | ||
| } | ||
| catch (UriFormatException ex) | ||
| catch (Exception ex) | ||
| { | ||
| //cannot parse the Uri - not a fatal error | ||
| Trace.Error(ex); | ||
| } | ||
| } | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| Trace.Error("Critical error during plan event reporting setup"); | ||
| Trace.Error(ex); | ||
| } | ||
| } | ||
|
|
||
| var jobConnection = VssUtil.CreateConnection(jobServerUrl, jobServerCredential, trace: Trace, skipServerCertificateValidation); | ||
| await jobServer.ConnectAsync(jobConnection); | ||
| var timeline = await jobServer.GetTimelineAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, CancellationToken.None); | ||
| ArgUtil.NotNull(timeline, nameof(timeline)); | ||
| TimelineRecord jobRecord = timeline.Records.FirstOrDefault(x => x.Id == message.JobId && x.RecordType == "Job"); | ||
| ArgUtil.NotNull(jobRecord, nameof(jobRecord)); | ||
| jobRecord.ErrorCount++; | ||
| jobRecord.Issues.Add(new Issue() { Type = IssueType.Error, Message = errorMessage }); | ||
| await jobServer.UpdateTimelineRecordsAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, new TimelineRecord[] { jobRecord }, CancellationToken.None); | ||
| // log an error issue to job level timeline record | ||
| [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Maintainability", "CA2000:Dispose objects before losing scope", MessageId = "jobServer")] | ||
| private async Task LogWorkerProcessUnhandledException(Pipelines.AgentJobRequestMessage message, string errorMessage, bool skipServerCertificateValidation = false) | ||
| { | ||
| try | ||
| { | ||
| using (var jobConnection = await CreateJobServerConnectionAsync(message, skipServerCertificateValidation)) | ||
| { | ||
| var jobServer = HostContext.GetService<IJobServer>(); | ||
| var timeline = await jobServer.GetTimelineAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, CancellationToken.None); | ||
| ArgUtil.NotNull(timeline, nameof(timeline)); | ||
| TimelineRecord jobRecord = timeline.Records.FirstOrDefault(x => x.Id == message.JobId && x.RecordType == "Job"); | ||
| ArgUtil.NotNull(jobRecord, nameof(jobRecord)); | ||
| jobRecord.ErrorCount++; | ||
| jobRecord.Issues.Add(new Issue() { Type = IssueType.Error, Message = errorMessage }); | ||
| await jobServer.UpdateTimelineRecordsAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, new TimelineRecord[] { jobRecord }, CancellationToken.None); | ||
| } | ||
| } | ||
| catch (SocketException ex) | ||
| { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| // Copyright (c) Microsoft Corporation. | ||
| // Licensed under the MIT License. | ||
|
|
||
| using System; | ||
| using System.Collections.Generic; | ||
| using System.Threading.Tasks; | ||
| using Microsoft.VisualStudio.Services.Agent.Util; | ||
| using Microsoft.TeamFoundation.DistributedTask.WebApi; | ||
| using Newtonsoft.Json; | ||
|
|
||
| namespace Microsoft.VisualStudio.Services.Agent.Listener.Telemetry | ||
| { | ||
| [ServiceLocator(Default = typeof(WorkerCrashTelemetryPublisher))] | ||
| public interface IWorkerCrashTelemetryPublisher : IAgentService | ||
| { | ||
| Task PublishWorkerCrashTelemetryAsync(IHostContext hostContext, Guid jobId, int exitCode); | ||
| } | ||
|
|
||
| public sealed class WorkerCrashTelemetryPublisher : AgentService, IWorkerCrashTelemetryPublisher | ||
| { | ||
| public async Task PublishWorkerCrashTelemetryAsync(IHostContext hostContext, Guid jobId, int exitCode) | ||
| { | ||
| try | ||
| { | ||
| var telemetryPublisher = hostContext.GetService<IAgenetListenerTelemetryPublisher>(); | ||
|
|
||
| var telemetryData = new Dictionary<string, object> | ||
| { | ||
| ["JobId"] = jobId.ToString(), | ||
| ["ExitCode"] = exitCode.ToString() | ||
| }; | ||
|
|
||
| var command = new Command("telemetry", "publish") | ||
| { | ||
| Data = JsonConvert.SerializeObject(telemetryData) | ||
| }; | ||
| command.Properties.Add("area", "AzurePipelinesAgent"); | ||
| command.Properties.Add("feature", "WorkerCrash"); | ||
|
|
||
| await telemetryPublisher.PublishEvent(hostContext, command); | ||
| Trace.Info($"Published worker crash telemetry for job {jobId} with exit code {exitCode}"); | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| Trace.Warning($"Failed to publish worker crash telemetry: {ex}"); | ||
| } | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -715,6 +715,12 @@ public class AgentKnobs | |
| new EnvironmentKnobSource("FAIL_JOB_WHEN_AGENT_DIES"), | ||
| new BuiltInDefaultKnobSource("false")); | ||
|
|
||
| public static readonly Knob EnhancedWorkerCrashHandling = new Knob( | ||
| nameof(EnhancedWorkerCrashHandling), | ||
| "If true, enables enhanced worker crash handling with forced completion for Plan v8+ scenarios where worker crashes cannot send completion events", | ||
| new EnvironmentKnobSource("AZP_ENHANCED_WORKER_CRASH_HANDLING"), | ||
| new BuiltInDefaultKnobSource("false")); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this intended, not to have RuntimeKnobSource?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On the Listener side, enabling RuntimeKnobSource is not possible once the listener has started. Instead, I will replace this mechanism with a server API call, removing the dependency on the Agent knob.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated with runtime control in Agent.cs file |
||
|
|
||
| public static readonly Knob AllowWorkDirectoryRepositories = new Knob( | ||
| nameof(AllowWorkDirectoryRepositories), | ||
| "Allows repositories to be checked out below work directory level on self hosted agents.", | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.