Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions monitoring/metrics/definitions.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,17 @@ There are metrics that measure various spects of the [transactional session oper
- Dispatch druation - how long it took to dispatch the control message
- Attempts - how many times the mechanism attempted to process the control message for a given transaction
- Transit time - how long it took between dispatching the control message and starting to process it again

### Envelope handling metrics

Various metrics track performance of the envelope handling when interoperating with external systems. The generic metrics include:

- Envelope unwrapping errors - how many times a single envelope handler failed to unwrap the incoming message

#### CloudEvents specific metrics

Metrics related to [CloudEvents](/transports/cloudevents.md) track the following:

- Unwrapping attempt - number of attempts to unwrap a message
- Invalid messages received - number of messages that don't meet the specification requirements (e.g., they lack mandatory fields)
- Unexpected version received - number of messages that use unexpected specification version
38 changes: 38 additions & 0 deletions nservicebus/operations/opentelemetry_metrics_core_[10,).partial.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
## Meters

NServiceBus endpoints can be configured to expose metrics related to message processing. To capture meter information, add the appropriate meter source (e.g., `NServiceBus.Core.Pipeline.Incoming`) to the OpenTelemetry configuration:

> [!NOTE]
> The metric definitions published by NServiceBus are not yet finalized and could change in a minor release.

snippet: opentelemetry-enablemeters

### Emitted meters

Meter source `NServiceBus.Core.Pipeline.Incoming`:

- [`nservicebus.messaging.successes`](/monitoring/metrics/definitions.md#metrics-captured-number-of-messages-successfully-processed) - Total number of messages processed successfully by the endpoint
- [`nservicebus.messaging.fetches`](/monitoring/metrics/definitions.md#metrics-captured-number-of-messages-pulled-from-queue) - Total number of messages fetched from the queue by the endpoint
- [`nservicebus.messaging.failures`](/monitoring/metrics/definitions.md#metrics-captured-number-of-message-processing-failures) - Total number of messages processed unsuccessfully by the endpoint
- [`nservicebus.messaging.handler_time`](/monitoring/metrics/definitions.md#metrics-captured-handler-time) - The time the user handling code takes to handle a message
- [`nservicebus.messaging.processing_time`](/monitoring/metrics/definitions.md#metrics-captured-processing-time) - The time the endpoint takes to process a message
- [`nservicebus.messaging.critical_time`](/monitoring/metrics/definitions.md#metrics-captured-critical-time) - The time between when a message is sent and when it is fully processed
- [`nservicebus.recoverability.immediate`](/monitoring/metrics/definitions.md#metrics-captured-immediate-retries) - Total number of immediate retries requested
- [`nservicebus.recoverability.delayed`](/monitoring/metrics/definitions.md#metrics-captured-delayed-retries) - Total number of delayed retries requested
- [`nservicebus.recoverability.error`](/monitoring/metrics/definitions.md#metrics-captured-moved-to-error-queue) - Total number of messages sent to the error queue
- [`nservicebus.envelope.unwrapping_error`](/monitoring/metrics/definitions.md#envelope-handling-metrics) - Total number of times when an envelope handler failed to unwrap an incoming message

Meter source `NServiceBus.TransactionalSession`:

- [`nservicebus.transactional_session.commit.duration`](/monitoring/metrics/definitions.md#metrics-captured-transactional-session-metrics) - The time the endpoint takes to commit the session in the Transactional Session
- [`nservicebus.transactional_session.dispatch.duration`](/monitoring/metrics/definitions.md#metrics-captured-transactional-session-metrics) - The time the endpoint takes to dispatch the control message in the Transactional Session
- [`nservicebus.transactional_session.control_message.attempts`](/monitoring/metrics/definitions.md#metrics-captured-transactional-session-metrics) - Total number of attempts to process the control message in the Transactional Session
- [`nservicebus.transactional_session.control_message.transit_time`](/monitoring/metrics/definitions.md#metrics-captured-transactional-session-metrics) - The time between dispatching the control message and starting to process it in the Transactional Session

Meter source `NServiceBus.Envelope.CloudEvents`:

- [`nservicebus.envelope.cloud_events.received.unwrapping_attempt`](/monitoring/metrics/definitions.md#cloudevents-specific-metrics) - Total number of unwrapping attempts
- [`nservicebus.envelope.cloud_events.received.invalid_message`](/monitoring/metrics/definitions.md#cloudevents-specific-metrics) - Total number of received messages not conforming to the specification
- [`nservicebus.envelope.cloud_events.received.unexpected_version`](/monitoring/metrics/definitions.md#cloudevents-specific-metrics) - Total number of received messages with unexpected version field value

See the [OpenTelemetry samples](/samples/open-telemetry/) for instructions on how to send metric information to different tools.
23 changes: 23 additions & 0 deletions samples/aws/cloud-events/Sqs_9/CloudEvents.sln
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 18
VisualStudioVersion = 18.3.11222.16 d18.3
MinimumVisualStudioVersion = 15.0.26730.12
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Endpoint", "Endpoint\Endpoint.csproj", "{7081E12A-888F-4506-A435-970E50BA4C0B}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{7081E12A-888F-4506-A435-970E50BA4C0B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{7081E12A-888F-4506-A435-970E50BA4C0B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{22B1E205-B713-45C3-A041-62CCDDA9646B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{22B1E205-B713-45C3-A041-62CCDDA9646B}.Debug|Any CPU.Build.0 = Debug|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {6E692BB9-1D3D-4DA0-B777-DB9C754E7342}
EndGlobalSection
EndGlobal
10 changes: 10 additions & 0 deletions samples/aws/cloud-events/Sqs_9/Endpoint/AwsBlobNotification.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#region sqs-cloudevents-message-definition
public class AwsBlobNotification :
IMessage
{
public string Key { get; set; }
public int Size { get; set; }
public string ETag { get; set; }
public string Sequencer { get; set; }
}
#endregion
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using Microsoft.Extensions.Logging;

public class AwsBlobNotificationHandler(ILogger<AwsBlobNotification> logger) :
IHandleMessages<AwsBlobNotification>
{
public Task Handle(AwsBlobNotification message, IMessageHandlerContext context)
{
logger.LogInformation("Blob {Key} created!", message.Key);
return Task.CompletedTask;
}
}
16 changes: 16 additions & 0 deletions samples/aws/cloud-events/Sqs_9/Endpoint/Endpoint.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net10.0</TargetFramework>
<OutputType>Exe</OutputType>
<LangVersion>14.0</LangVersion>
<ImplicitUsings>enable</ImplicitUsings>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="NServiceBus.Envelope.CloudEvents" Version="1.0.0-alpha.1" />
<PackageReference Include="NServiceBus.AmazonSQS" Version="9.0.0-alpha.3" />
<PackageReference Include="NServiceBus.Extensions.Hosting" Version="4.0.0-alpha.3" />
</ItemGroup>

</Project>
40 changes: 40 additions & 0 deletions samples/aws/cloud-events/Sqs_9/Endpoint/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
using System.Text.Json;
using Microsoft.Extensions.Hosting;

Console.Title = "CloudEvents";

var builder = Host.CreateApplicationBuilder(args);

var endpointConfiguration = new EndpointConfiguration("Samples.Sqs.CloudEvents");
endpointConfiguration.EnableInstallers();

#region sqs-cloudevents-serialization
endpointConfiguration.UseSerialization<SystemJsonSerializer>().Options(new JsonSerializerOptions
{
PropertyNameCaseInsensitive = true,
IncludeFields = true
});
#endregion

#region sqs-cloudevents-configuration
var cloudEventsConfiguration = endpointConfiguration.EnableCloudEvents();
#endregion

#region sqs-cloudevents-typemapping
cloudEventsConfiguration.TypeMappings["ObjectCreated:Put"] = [typeof(AwsBlobNotification)];

Check failure on line 24 in samples/aws/cloud-events/Sqs_9/Endpoint/Program.cs

View workflow job for this annotation

GitHub Actions / Build samples & snippets

'CloudEventsConfiguration' does not contain a definition for 'TypeMappings' and no accessible extension method 'TypeMappings' accepting a first argument of type 'CloudEventsConfiguration' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 24 in samples/aws/cloud-events/Sqs_9/Endpoint/Program.cs

View workflow job for this annotation

GitHub Actions / Build samples & snippets

'CloudEventsConfiguration' does not contain a definition for 'TypeMappings' and no accessible extension method 'TypeMappings' accepting a first argument of type 'CloudEventsConfiguration' could be found (are you missing a using directive or an assembly reference?)
#endregion

#region sqs-cloudevents-json-permissive
cloudEvents.EnvelopeUnwrappers.Find<CloudEventJsonStructuredEnvelopeUnwrapper>().EnvelopeHandlingMode = JsonStructureEnvelopeHandlingMode.Permissive;

Check failure on line 28 in samples/aws/cloud-events/Sqs_9/Endpoint/Program.cs

View workflow job for this annotation

GitHub Actions / Build samples & snippets

The name 'JsonStructureEnvelopeHandlingMode' does not exist in the current context

Check failure on line 28 in samples/aws/cloud-events/Sqs_9/Endpoint/Program.cs

View workflow job for this annotation

GitHub Actions / Build samples & snippets

The type or namespace name 'CloudEventJsonStructuredEnvelopeUnwrapper' could not be found (are you missing a using directive or an assembly reference?)

Check failure on line 28 in samples/aws/cloud-events/Sqs_9/Endpoint/Program.cs

View workflow job for this annotation

GitHub Actions / Build samples & snippets

The name 'cloudEvents' does not exist in the current context

Check failure on line 28 in samples/aws/cloud-events/Sqs_9/Endpoint/Program.cs

View workflow job for this annotation

GitHub Actions / Build samples & snippets

The name 'JsonStructureEnvelopeHandlingMode' does not exist in the current context

Check failure on line 28 in samples/aws/cloud-events/Sqs_9/Endpoint/Program.cs

View workflow job for this annotation

GitHub Actions / Build samples & snippets

The type or namespace name 'CloudEventJsonStructuredEnvelopeUnwrapper' could not be found (are you missing a using directive or an assembly reference?)

Check failure on line 28 in samples/aws/cloud-events/Sqs_9/Endpoint/Program.cs

View workflow job for this annotation

GitHub Actions / Build samples & snippets

The name 'cloudEvents' does not exist in the current context
#endregion

var transport = new SqsTransport();
endpointConfiguration.UseTransport(transport);


Console.WriteLine("Press any key, the application is starting");
Console.ReadKey();
Console.WriteLine("Starting...");

builder.UseNServiceBus(endpointConfiguration);
await builder.Build().RunAsync();
Empty file.
73 changes: 73 additions & 0 deletions samples/aws/cloud-events/sample.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
---
title: AmazonSQS CloudEvents Sample
summary: Demonstrates how to consume CloudEvents via the Amazon SQS
reviewed: 2025-12-10
component: Sqs
related:
- transports/sqs
---

## Prerequisites

### Security and access configuration

Add the [AWS Access Key ID and AWS Secret Access Key](https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys) to the following environment variables:

* Access Key ID in `AWS_ACCESS_KEY_ID`
* Secret Access Key in `AWS_SECRET_ACCESS_KEY`
* Default Region in `AWS_REGION`

See also [AWS Account Identifiers](https://docs.aws.amazon.com/general/latest/gr/acct-identifiers.html), [Managing Access Keys for an AWS Account](https://docs.aws.amazon.com/general/latest/gr/managing-aws-access-keys.html), and [IAM Security Credentials](https://console.aws.amazon.com/iam/home#/security_credential).

See also [AWS Regions](https://docs.aws.amazon.com/general/latest/gr/rande.html) for a list of available regions.

### S3 configuration

The S3 bucket should be configured to generate notifications that are ultimately delivered to the SQS queue matching the endpoint. There are many ways to configure the setup. One example configuration includes:

- Configuring S3 bucket to [send notifications to SQS](https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-how-to-event-types-and-destinations.html#supported-notification-event-types)
- Creating [Amazon EventBridge Pipe](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-event-target.html) that picks up the messages from the SQS queue
- Configuring the Pipe to [transform](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-input-transformation.html) the messages to the CloudEvents schema as specified in the [CloudEvents specification](https://github.com/cloudevents/spec/blob/main/cloudevents/adapters/aws-s3.md).
- Configuring the Pipe to call AWS Lambda that would enrich the message with the proper `content-type` property and send the message to the SQS queue matching the endpoint.

## Code walk-through

This sample shows an endpoint receiving a CloudEvents message from the Amazon Simple Queue Service (Amazon SQS)

* The `Endpoint` defines the schema for the CloudEvents message.
* The `Endpoint` enables CloudEvents support and configures the type mapping.
* The `Endpoint` configures the serializer to support fields and properties with different casing.
* The `Endpoint` receives the message and calls the proper handler.

### CloudEvents message schema

The message schema is defined as follows:

snippet: sqs-cloudevents-message-definition

This schema must match the schema of the [notification generated by S3](https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-how-to-event-types-and-destinations.html#supported-notification-event-types).

### CloudEvents support configuration

CloudEvents support must be explicitly enabled:

snippet: sqs-cloudevents-configuration

The configuration includes the type mapping to match the messages with the classes:

snippet: sqs-cloudevents-typemapping

To handle the JSON structured messages that do not have the `Content-Type` header set properly, enable the Permissive mode:

snippet: sqs-cloudevents-json-permissive

To support the differences between uppercase letters and lowercase letters in the schema definition and content, the serializer is configured to use case insensitive mapping:

snippet: sqs-cloudevents-serialization


### Running the sample

1. Run the sample.
2. Generate the `ObjectCreated:Put` event by creating a new file in the S3 bucket.
3. Observe that the sample prints out the URL of the newly created file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 18
VisualStudioVersion = 18.3.11222.16 d18.3
MinimumVisualStudioVersion = 15.0.26730.12
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Endpoint", "Endpoint\Endpoint.csproj", "{1874F4C3-9250-8342-8F1E-4E2692AD6BE3}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{1874F4C3-9250-8342-8F1E-4E2692AD6BE3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{1874F4C3-9250-8342-8F1E-4E2692AD6BE3}.Debug|Any CPU.Build.0 = Debug|Any CPU
{1874F4C3-9250-8342-8F1E-4E2692AD6BE3}.Release|Any CPU.ActiveCfg = Release|Any CPU
{1874F4C3-9250-8342-8F1E-4E2692AD6BE3}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
EndGlobal
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using NServiceBus;

#region asb-cloudevents-message-definition
public class BlobCreated :
IMessage
{
public string Api { get; set; }
public string ClientRequestId { get; set; }
public string RequestId { get; set; }
public string ETag { get; set; }
public string ContentType { get; set; }
public int ContentLength { get; set; }
public string BlobType { get; set; }
public string Url { get; set; }
public string Sequencer { get; set; }
public StorageDiagnostics StorageDiagnostics { get; set; }
}

public class StorageDiagnostics
{
public string BatchId { get; set; }
}
#endregion
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using NServiceBus;

public class BlobCreatedHandler(ILogger<BlobCreated> logger) :
IHandleMessages<BlobCreated>
{
public Task Handle(BlobCreated message, IMessageHandlerContext context)
{
logger.LogInformation("Blob {Url} created!", message.Url);
return Task.CompletedTask;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net10.0</TargetFramework>
<OutputType>Exe</OutputType>
<LangVersion>14.0</LangVersion>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="NServiceBus.Extensions.Hosting" Version="4.0.0-alpha.3" />
<PackageReference Include="NServiceBus.Transport.AzureServiceBus" Version="6.0.0-alpha.4" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="NServiceBus.Envelope.CloudEvents" Version="1.0.0-alpha.1" />
</ItemGroup>
<ItemGroup>
<None Update="appsettings.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
using System;
using System.Collections.Generic;
using System.Text.Json;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using NServiceBus;

Console.Title = "Endpoint";

var builder = Host.CreateApplicationBuilder(args);

var endpointConfiguration = new EndpointConfiguration("Samples.ASBS.CloudEvents.Endpoint");
endpointConfiguration.EnableInstallers();

var connectionString = builder.Configuration.GetConnectionString("AzureServiceBusConnectionString");
if (string.IsNullOrWhiteSpace(connectionString))
{
throw new Exception("Could not read the 'AzureServiceBusConnectionString' value. Check the sample prerequisites.");
}

var transport = new AzureServiceBusTransport(connectionString, TopicTopology.Default);
endpointConfiguration.UseTransport(transport);

#region asb-cloudevents-serialization
endpointConfiguration.UseSerialization<SystemJsonSerializer>().Options(new JsonSerializerOptions
{
PropertyNameCaseInsensitive = true,
IncludeFields = true
});
#endregion

#region asb-cloudevents-configuration
var cloudEventsConfiguration = endpointConfiguration.EnableCloudEvents();
#endregion

#region asb-cloudevents-typemapping
cloudEventsConfiguration.TypeMappings["Microsoft.Storage.BlobCreated"] = [typeof(BlobCreated)];

Check failure on line 37 in samples/azure-service-bus-netstandard/cloud-events/ASBS_6/Endpoint/Program.cs

View workflow job for this annotation

GitHub Actions / Build samples & snippets

'CloudEventsConfiguration' does not contain a definition for 'TypeMappings' and no accessible extension method 'TypeMappings' accepting a first argument of type 'CloudEventsConfiguration' could be found (are you missing a using directive or an assembly reference?)
#endregion


Console.WriteLine("Press any key, the application is starting");
Console.TreatControlCAsInput = true;
var input = Console.ReadKey();
if (input.Key == ConsoleKey.C && (input.Modifiers & ConsoleModifiers.Control) != 0)
{
Environment.Exit(0);
}
Console.WriteLine("Starting...");

builder.UseNServiceBus(endpointConfiguration);
var host = builder.Build();

await host.StartAsync();

Console.WriteLine("Press any key to exit");
var key = Console.ReadKey();

await host.StopAsync();
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"System": "Information",
"Microsoft": "Information"
}
},
"ConnectionStrings": {
"AzureServiceBusConnectionString": "<PLACEHOLDER>"
}
}
Empty file.
Loading
Loading