Thursday, 20 July 2017

Azure ServiceBus PeekBatch or ReceiveBatch don't reliably get all messages

I had a requirement with respect to Microsoft's Azure ServiceBus to "resubmit all the messages in the Dead Letter Queue back to the original topic."  Say the messages were dead lettered because of some transient failure and you just want to resubmit them.  So you would think you could just get a reference to the dead letter queue, loop through all the messages (or grab them asynchronously in a batch) and easily process them all.

Turns out it's not so simple if your subscription is a partitioned queue.  You might have an impulse to use the "PeekBatch" or "ReceiveBatch" commands to say, grab all the messages, then process them, as in this stack overflow post which describes how it does not reliably grab all the messages.  This is because (in our case) we were using a partitioned queue.  So the message might be in a different partition, which is why you're not getting all of them.  See the "Browse/Peek messages" section of this MS post.

I tried doing a PeekBatch(int.MaxValue) to get the dead letter count, then loop through that may messages in a "for" loop and receive, clone and resubmit them one by one; that seemed to work in my tests, but didn't work when it was deployed.  It was only getting one message at a time.

It turns out the "SubscriptionClient" has a "MessageCountDetails" property which already has a "DeadLetterMessageCount" property.

This code has some custom classes but you get the idea.

public async Task ResubmitAllDeadLettersAsync(string targetTopic, string targetSubscription, CancellationToken cancellationToken)
        {
            if (string.IsNullOrEmpty(targetTopic))
            {
                throw new Exception("Invalid topic", null);
            }

            if (string.IsNullOrEmpty(targetSubscription))
            {
                throw new Exception("Invalid subscriber", null);
            }

            var topic = _settings.SupportedTopics.FirstOrDefault(x => x.FullName == targetTopic);
            if (topic == null)
            {
                throw new Exception($"Specified topic {targetTopic} not found in list of supported topics", null);
            }

            var response = new HousekeepingDeadLetterResubmissionResponse();

            foreach (var connectionString in _settings.ConnectionStrings)
            {
                var namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);
                var subscriptionDescription = (await namespaceManager.GetSubscriptionsAsync(topic.FullName))
                    .SingleOrDefault(x => x.Name.StartsWith(targetSubscription, StringComparison.OrdinalIgnoreCase));

                if (subscriptionDescription == null)
                {
                    continue;
                }

                var deadLetterQueueSubscriptionClient = SubscriptionClient
                    .CreateFromConnectionString(connectionString, topic.FullName, $"{subscriptionDescription.Name}/$DeadLetterQueue");

                var originalTopicSubscriptionClient = TopicClient.CreateFromConnectionString(connectionString, topic.FullName);

                response.TotalDeadLetterCount = subscriptionDescription.MessageCountDetails.DeadLetterMessageCount;

                for (var i = 0; i < response.TotalDeadLetterCount; i++)
                {
                    try
                    {
                        var message = await deadLetterQueueSubscriptionClient.ReceiveAsync();
                        var newMessage = message.Clone();
                        await originalTopicSubscriptionClient.SendAsync(newMessage);
                        await message.CompleteAsync();
                        response.ResubmissionsSucceeded++;
                    }
                    catch (Exception)
                    {
                        response.ResubmissionsFailed++;
                    }
                }
            }

            if (_settings.ShouldSimulateFailedDeadLetterResubmissionForTesting)
            {
                response.ResubmissionsFailed++;
            }

            return response;
        }

No comments:

Post a Comment