Skip to main content

Overview

NuGet.Services.ServiceBus is a shared infrastructure library that wraps the Azure.Messaging.ServiceBus SDK with NuGet-specific abstractions. It provides a consistent pattern for sending messages to Service Bus topics and processing messages from subscriptions, with built-in support for typed serialization, schema versioning, graceful shutdown, and telemetry. The library centers on three concerns: message serialization via BrokeredMessageSerializer<TMessage>, message receipt via SubscriptionProcessor<TMessage>, and message sending via TopicClientWrapper. All message types are decorated with a [Schema] attribute that enforces a named, versioned contract — the serializer validates both the schema name and version on every deserialization attempt, throwing FormatException if either does not match. A key design decision is that the library uses an XML DataContract serialization format for the message body (for wire-format compatibility with the legacy WindowsAzure.ServiceBus SDK), while message payloads themselves are JSON-serialized via Newtonsoft.Json and stored as the string body. Application properties on the message carry SchemaName and SchemaVersion metadata used for validation. Authentication supports shared access keys, system-assigned managed identities, user-assigned managed identities, and explicit TokenCredential instances, with a DEBUG-only fallback to DefaultAzureCredential.

Role in System

Producer Service                         Consumer Service
─────────────────                        ─────────────────────────────────────
TopicClientWrapper                       SubscriptionClientWrapper
  └─ IBrokeredMessageSerializer              └─ ISubscriptionProcessor<TMessage>
       └─ [Schema] TMessage                       └─ IBrokeredMessageSerializer
            │                                          └─ IMessageHandler<TMessage>
            ▼                                               └─ ScopedMessageHandler
     Azure Service Bus                                            └─ DI scope per message
       Topic ──► Subscription
The library is consumed by several backend services and validation pipelines:
  • NuGetGallery (the main web application) — sends messages to Service Bus topics
  • NuGet.Services.Validation — dispatches and receives validation pipeline messages
  • NuGet.Services.Messaging — sends email notification messages
  • Validation.Common.Job — base job infrastructure for validation workers
  • Validation.ScanAndSign.Core — antivirus and signing validation
  • Validation.ContentScan.Core — content scanning validation

Typed Message Contracts

Every message type is annotated with [Schema(Name, Version)]. The serializer enforces both on deserialization, preventing mismatched or stale message formats from being silently processed.

Graceful Shutdown

SubscriptionProcessor tracks in-flight messages with an atomic counter and exposes ShutdownAsync(timeout), which drains in-flight work before closing the underlying processor.

Scoped DI per Message

ScopedMessageHandler<TMessage> creates a new dependency injection scope for each received message, enabling per-message unit-of-work patterns (e.g., scoped DbContext lifetimes).

Dual Auth Strategies

Connection strings containing SharedAccessKey= use SAS authentication. All other strings are treated as endpoint URLs and use managed identity (or DefaultAzureCredential in debug builds).

Key Files and Classes

FileClass / TypePurpose
SchemaAttribute.csSchemaAttributeAttribute applied to message POCOs to declare their schema name and version. Name must never change; version is bumped on any schema modification.
IBrokeredMessageSerializer.csIBrokeredMessageSerializer<TMessage>Interface for serializing a typed message to IBrokeredMessage and deserializing IReceivedBrokeredMessage back to the typed message.
BrokeredMessageSerializer.csBrokeredMessageSerializer<TMessage>Concrete serializer: writes SchemaName and SchemaVersion as application properties, serializes payload to JSON, and validates both on deserialization. Also contains static extension methods (AssertTypeAndSchemaVersion, GetSchemaName, GetSchemaVersion).
IMessageHandler.csIMessageHandler<TMessage>Interface that consumers implement. HandleAsync returns true if the message was handled (triggering CompleteAsync), or false to requeue without abandoning.
ScopedMessageHandler.csScopedMessageHandler<TMessage>Decorator over IMessageHandler<TMessage> that creates a fresh DI scope per message and resolves the real handler from that scope.
ISubscriptionProcessor.csISubscriptionProcessor<TMessage>Interface for starting and stopping message processing, and exposing NumberOfMessagesInProgress.
SubscriptionProcessor.csSubscriptionProcessor<TMessage>Orchestrates the full receive loop: deserializes messages, calls the handler, completes or requeues, tracks telemetry, handles MessageLockLost, and supports graceful shutdown.
SubscriptionClientWrapper.csSubscriptionClientWrapperWraps ServiceBusProcessor from the Azure SDK. Supports SAS keys, managed identity (system or user-assigned), and explicit TokenCredential. Translates SDK callbacks to IReceivedBrokeredMessage.
TopicClientWrapper.csTopicClientWrapperWraps ServiceBusSender. Supports SAS keys, managed identity, and client-secret credential. Sends ServiceBusMessageWrapper instances.
ServiceBusMessageWrapper.csServiceBusMessageWrapperWraps an outbound ServiceBusMessage. Serializes the body using XML DataContract format for legacy SDK interoperability. Generates a random MessageId by default (required for partitioned topics).
ServiceBusReceivedMessageWrapper.csServiceBusReceivedMessageWrapperWraps ProcessMessageEventArgs for inbound messages. Exposes CompleteAsync, AbandonAsync, and DeadLetterAsync by delegating to the event args.
OnMessageOptionsWrapper.csOnMessageOptionsWrapperConfigures ServiceBusProcessorOptions (auto-complete and max concurrent calls). AutoComplete defaults to true; SubscriptionProcessor always sets it to false so it can control completion manually.
ServiceBusClientHelper.csServiceBusClientHelper (internal)Factory for ServiceBusClient instances (SAS vs. managed identity detection). Also provides SerializeXmlDataContract and DeserializeXmlDataContract for legacy wire-format compatibility.
ISubscriptionProcessorTelemetryService.csISubscriptionProcessorTelemetryServiceTelemetry contract emitted by SubscriptionProcessor: delivery lag, enqueue lag, handler duration, and lock-lost events.
SubscriptionProcessorNoTelemetryService.csSubscriptionProcessorNoTelemetryServiceNo-op implementation of ISubscriptionProcessorTelemetryService for use in tests or contexts where telemetry is not needed.
Event.csEvent (static)Defines the structured log EventId (601) for subscription message handler exceptions.

Dependencies

NuGet Package References

PackagePurpose
Azure.Messaging.ServiceBusCore Azure Service Bus SDK — ServiceBusClient, ServiceBusProcessor, ServiceBusSender.
Azure.IdentityProvides ManagedIdentityCredential, DefaultAzureCredential, and ClientSecretCredential for authentication.
Azure.CoreTokenCredential abstraction used in SubscriptionClientWrapper constructor overload.
Microsoft.Extensions.DependencyInjection.AbstractionsIServiceScopeFactory and IServiceScope used in ScopedMessageHandler.
Microsoft.Extensions.LoggingILogger<T> used throughout for structured diagnostic logging.
Newtonsoft.JsonJSON serialization of the typed message payload in BrokeredMessageSerializer.
System.Text.JsonReferenced in the project file; not directly used in production code paths visible in source.

Internal Project References

ProjectPurpose
NuGet.Services.ContractsProvides the IBrokeredMessage, IReceivedBrokeredMessage, ISubscriptionClient, ITopicClient, and IOnMessageOptions interfaces that this library implements.

Notable Patterns and Implementation Details

The message body is serialized using DataContractSerializer with a binary XML writer (XmlDictionaryWriter.CreateBinaryWriter). This preserves wire-format compatibility with the legacy WindowsAzure.ServiceBus SDK, which used the same format. The JSON payload of the typed message is stored as the string value inside this XML envelope — it is not raw JSON on the wire.
SubscriptionProcessor deliberately does not propagate exceptions from IMessageHandler<TMessage>.HandleAsync back to the Azure SDK’s processing loop. If an exception is thrown, the processor logs it, tracks telemetry, and decrements the in-progress counter — but the message is not explicitly abandoned. This allows the Service Bus lock to expire naturally and triggers a retry after the lock timeout, which gives transient errors (e.g., network issues) time to resolve before the next delivery attempt.
SubscriptionProcessor.ShutdownAsync(timeout) returns false if the timeout expires while messages are still in progress. The method signature’s remarks note that there may still be messages in progress after the returned Task completes — callers should poll NumberOfMessagesInProgress to determine when processing has fully drained if a hard guarantee is needed.
The authentication path in ServiceBusClientHelper.GetServiceBusClient detects managed identity mode by checking whether the connection string contains the literal text SharedAccessKey=. Any connection string without that token is treated as an endpoint URL and authenticated via managed identity. In DEBUG builds, DefaultAzureCredential is used instead of ManagedIdentityCredential, enabling local developer login via the Azure CLI or Visual Studio credential chain.
The [Schema] attribute’s Name property is documented as something that must never change for a given message type, while Version must be incremented whenever any property is added, removed, or modified. The BrokeredMessageSerializer<TMessage> reads these values once at class initialization via a static constructor, throwing InvalidOperationException at startup if the attribute is missing or duplicated — providing fast-fail validation at service startup rather than at first message receipt.