Overview
NuGet.Services.ServiceBus is a shared infrastructure library that wraps the Azure.Messaging.ServiceBus SDK with NuGet-specific abstractions. It provides a consistent pattern for sending messages to Service Bus topics and processing messages from subscriptions, with built-in support for typed serialization, schema versioning, graceful shutdown, and telemetry.
The library centers on three concerns: message serialization via BrokeredMessageSerializer<TMessage>, message receipt via SubscriptionProcessor<TMessage>, and message sending via TopicClientWrapper. All message types are decorated with a [Schema] attribute that enforces a named, versioned contract — the serializer validates both the schema name and version on every deserialization attempt, throwing FormatException if either does not match.
A key design decision is that the library uses an XML DataContract serialization format for the message body (for wire-format compatibility with the legacy WindowsAzure.ServiceBus SDK), while message payloads themselves are JSON-serialized via Newtonsoft.Json and stored as the string body. Application properties on the message carry SchemaName and SchemaVersion metadata used for validation. Authentication supports shared access keys, system-assigned managed identities, user-assigned managed identities, and explicit TokenCredential instances, with a DEBUG-only fallback to DefaultAzureCredential.
Role in System
NuGetGallery(the main web application) — sends messages to Service Bus topicsNuGet.Services.Validation— dispatches and receives validation pipeline messagesNuGet.Services.Messaging— sends email notification messagesValidation.Common.Job— base job infrastructure for validation workersValidation.ScanAndSign.Core— antivirus and signing validationValidation.ContentScan.Core— content scanning validation
Typed Message Contracts
Every message type is annotated with
[Schema(Name, Version)]. The serializer enforces both on deserialization, preventing mismatched or stale message formats from being silently processed.Graceful Shutdown
SubscriptionProcessor tracks in-flight messages with an atomic counter and exposes ShutdownAsync(timeout), which drains in-flight work before closing the underlying processor.Scoped DI per Message
ScopedMessageHandler<TMessage> creates a new dependency injection scope for each received message, enabling per-message unit-of-work patterns (e.g., scoped DbContext lifetimes).Dual Auth Strategies
Connection strings containing
SharedAccessKey= use SAS authentication. All other strings are treated as endpoint URLs and use managed identity (or DefaultAzureCredential in debug builds).Key Files and Classes
| File | Class / Type | Purpose |
|---|---|---|
SchemaAttribute.cs | SchemaAttribute | Attribute applied to message POCOs to declare their schema name and version. Name must never change; version is bumped on any schema modification. |
IBrokeredMessageSerializer.cs | IBrokeredMessageSerializer<TMessage> | Interface for serializing a typed message to IBrokeredMessage and deserializing IReceivedBrokeredMessage back to the typed message. |
BrokeredMessageSerializer.cs | BrokeredMessageSerializer<TMessage> | Concrete serializer: writes SchemaName and SchemaVersion as application properties, serializes payload to JSON, and validates both on deserialization. Also contains static extension methods (AssertTypeAndSchemaVersion, GetSchemaName, GetSchemaVersion). |
IMessageHandler.cs | IMessageHandler<TMessage> | Interface that consumers implement. HandleAsync returns true if the message was handled (triggering CompleteAsync), or false to requeue without abandoning. |
ScopedMessageHandler.cs | ScopedMessageHandler<TMessage> | Decorator over IMessageHandler<TMessage> that creates a fresh DI scope per message and resolves the real handler from that scope. |
ISubscriptionProcessor.cs | ISubscriptionProcessor<TMessage> | Interface for starting and stopping message processing, and exposing NumberOfMessagesInProgress. |
SubscriptionProcessor.cs | SubscriptionProcessor<TMessage> | Orchestrates the full receive loop: deserializes messages, calls the handler, completes or requeues, tracks telemetry, handles MessageLockLost, and supports graceful shutdown. |
SubscriptionClientWrapper.cs | SubscriptionClientWrapper | Wraps ServiceBusProcessor from the Azure SDK. Supports SAS keys, managed identity (system or user-assigned), and explicit TokenCredential. Translates SDK callbacks to IReceivedBrokeredMessage. |
TopicClientWrapper.cs | TopicClientWrapper | Wraps ServiceBusSender. Supports SAS keys, managed identity, and client-secret credential. Sends ServiceBusMessageWrapper instances. |
ServiceBusMessageWrapper.cs | ServiceBusMessageWrapper | Wraps an outbound ServiceBusMessage. Serializes the body using XML DataContract format for legacy SDK interoperability. Generates a random MessageId by default (required for partitioned topics). |
ServiceBusReceivedMessageWrapper.cs | ServiceBusReceivedMessageWrapper | Wraps ProcessMessageEventArgs for inbound messages. Exposes CompleteAsync, AbandonAsync, and DeadLetterAsync by delegating to the event args. |
OnMessageOptionsWrapper.cs | OnMessageOptionsWrapper | Configures ServiceBusProcessorOptions (auto-complete and max concurrent calls). AutoComplete defaults to true; SubscriptionProcessor always sets it to false so it can control completion manually. |
ServiceBusClientHelper.cs | ServiceBusClientHelper (internal) | Factory for ServiceBusClient instances (SAS vs. managed identity detection). Also provides SerializeXmlDataContract and DeserializeXmlDataContract for legacy wire-format compatibility. |
ISubscriptionProcessorTelemetryService.cs | ISubscriptionProcessorTelemetryService | Telemetry contract emitted by SubscriptionProcessor: delivery lag, enqueue lag, handler duration, and lock-lost events. |
SubscriptionProcessorNoTelemetryService.cs | SubscriptionProcessorNoTelemetryService | No-op implementation of ISubscriptionProcessorTelemetryService for use in tests or contexts where telemetry is not needed. |
Event.cs | Event (static) | Defines the structured log EventId (601) for subscription message handler exceptions. |
Dependencies
NuGet Package References
| Package | Purpose |
|---|---|
Azure.Messaging.ServiceBus | Core Azure Service Bus SDK — ServiceBusClient, ServiceBusProcessor, ServiceBusSender. |
Azure.Identity | Provides ManagedIdentityCredential, DefaultAzureCredential, and ClientSecretCredential for authentication. |
Azure.Core | TokenCredential abstraction used in SubscriptionClientWrapper constructor overload. |
Microsoft.Extensions.DependencyInjection.Abstractions | IServiceScopeFactory and IServiceScope used in ScopedMessageHandler. |
Microsoft.Extensions.Logging | ILogger<T> used throughout for structured diagnostic logging. |
Newtonsoft.Json | JSON serialization of the typed message payload in BrokeredMessageSerializer. |
System.Text.Json | Referenced in the project file; not directly used in production code paths visible in source. |
Internal Project References
| Project | Purpose |
|---|---|
NuGet.Services.Contracts | Provides the IBrokeredMessage, IReceivedBrokeredMessage, ISubscriptionClient, ITopicClient, and IOnMessageOptions interfaces that this library implements. |
Notable Patterns and Implementation Details
The message body is serialized using
DataContractSerializer with a binary XML writer (XmlDictionaryWriter.CreateBinaryWriter). This preserves wire-format compatibility with the legacy WindowsAzure.ServiceBus SDK, which used the same format. The JSON payload of the typed message is stored as the string value inside this XML envelope — it is not raw JSON on the wire.SubscriptionProcessor deliberately does not propagate exceptions from IMessageHandler<TMessage>.HandleAsync back to the Azure SDK’s processing loop. If an exception is thrown, the processor logs it, tracks telemetry, and decrements the in-progress counter — but the message is not explicitly abandoned. This allows the Service Bus lock to expire naturally and triggers a retry after the lock timeout, which gives transient errors (e.g., network issues) time to resolve before the next delivery attempt.The
[Schema] attribute’s Name property is documented as something that must never change for a given message type, while Version must be incremented whenever any property is added, removed, or modified. The BrokeredMessageSerializer<TMessage> reads these values once at class initialization via a static constructor, throwing InvalidOperationException at startup if the attribute is missing or duplicated — providing fast-fail validation at service startup rather than at first message receipt.