Using Azure Storage Queues in .Net Azure Functions is pretty straightforward. The standard templates help you to create a basic function that reacts to Queue messages in no time. All you have to do is configure a connection string and it works immediately.
However, when you want to use the more advanced options, you can run into issues, and it would be helpful to know what happens behind the scenes.
Azure Storage Queue
Azure Storage Queues is a service provided by Microsoft Azure that allows you to store large numbers of messages that can be accessed from anywhere in the world via authenticated calls. Here are some key points:
Message Storage:
Each queue can contain millions of messages, each up to 64 KB in size.
Asynchronous Communication:
They enable asynchronous communication between different components of a cloud application, which helps in decoupling and scaling.
Reliable Delivery:
Messages are stored until the receiving component processes them, ensuring reliable delivery.
Using Azure Storage Queues in a .Net Azure function
Using Visual Studio or the dotnet CLI tool, you can easily create a sample function that connects to a Storage Queue. The created function project usually has 4 important files:
- Program.cs: Sets up the Dependency Injection, if needed.
- host.json: Allows to configure certain aspects of the Function.
- local.settings.json: Can be used for local debugging to store settings.
- Function1.cs: This holds the code of your function. Every public method which has the [Function()] attribute and has some kind of [*Trigger] attribute in the method signature, is automatically bound to some endpoint by the Azure function’s host.
Deserialized object
[Function(nameof(SampleDeserializedTrigger))]
public void SampleDeserializedTrigger([QueueTrigger("sample", Connection = "QueueConnection")] MyDeserializedObject myObject)
{
_logger.LogInformation($"C# Queue trigger function processed: ");
LongRunningTask();
}
This signature offers no control over the lifetime of the Queue message. The message payload is automatically deserialized into an object and allows you to handle the object.
When the message cannot be deserialized into the specified object, an error will occur, but you won’t be able to handle that error yourself. When your code throws an exception, the message will be put back on the queue immediately, and the function call will be retried up to five times (by default, but can be configured in the host.json or your application settings.), and will end up on a poison queue after these 5 retries.
QueueMessage
[Function(nameof(SampleQueueMessageTrigger))]
public void SampleQueueMessageTrigger([QueueTrigger("sample", Connection = "QueueConnection")] QueueMessage message)
{
_logger.LogInformation($"C# Queue trigger function processed: {message.MessageText}");
LongRunningTask();
}
This is the default signature that is generated by the Visual Studio templates nowadays. If you don’t do anything, the same rules apply as the ‘Deserialized object’ signature, however, the QueueMessage doesn’t deserialize the message by itself and offers some extra properties that can be used in the more advanced scenarios. The most important properties are:
- Body: Contains the body of the message as a ‘BinaryData‘ object. Can be deserialized using ‘JsonSerializer.Deserialize<T>(message.Body)‘
- DequeueCount: A Int64, that contains the number of times the message has been dequeued (trigger retried). Can be used for logging.
- NextVisibleOn: This date indicates when the message will be available on the Queue again, allowing other function instances to process the same message. (Not actually true, see ‘Issue’ below)
- MessageId: This string is the internal ID of the message, which is needed to do anything with the message, except to read it.
- PopReceipt: If you want to do anything with the message, this string is the most important one. For any change to a message (delete or change NextVisibleOn), you will need both the MessageId and PopReceipt property. However, after every change the PopReceipt of the message changes. This enforces ownership and prevents multiple actors from changing the message at the same time.
Use case for using QueueMessage object.
One common scenario for handling the message yourself, would be to change the NextVisibleOn (VisibilityTimeOut property on the Message). This allows you to delay retrying based on your own logic, making the Queue act as a kind of a primitive timer:
[Function(nameof(SampleQueueMessageTrigger))]
public async Task SampleQueueMessageTrigger([QueueTrigger("myQueue", Connection = "QueueConnection")] QueueMessage message)
{
QueueClient client = new("connectionString", "myQueue");
await client.CreateIfNotExistsAsync();
try
{
_logger.LogInformation($"C# Queue trigger function processed: {message.MessageText}");
await LongRunningTaskAsync();
await client.DeleteMessageAsync(message.MessageId, message.PopReceipt);
}
catch (Exception ex)
{
TimeSpan? delay = MyCustomRetryLogic();
if (delay.HasValue)
{
_logger.LogInformation(ex, "Something went wrong. Will retry in {delay}", delay);
Azure.Response receiptResponse = await client.UpdateMessageAsync(message.MessageId, message.PopReceipt, visibilityTimeout: delay.Value);
UpdateReceipt receipt = receiptResponse.Value;
return;
}
//Todo handle your own poison queue
await client.DeleteMessageAsync(message.MessageId, message.PopReceipt);
}
}
Message not found!
This setup works, until the LongRunningTask runs for more than 4 to 5 minutes. Then, all of a sudden, you will get an exception that tells you the message doesn’t exist. To find out why, I had to look at the internals.
I investigated the code at azure-webjobs-sdk/src/Microsoft.Azure.WebJobs.Extensions.Storage/Queues/Listeners/QueueListener.cs and discovered the Queue works a little different from what you would expect:
Queue Message
If you read the Queue message yourself, without using a QueueTrigger, there is nothing holding other instances from reading and processing the same message. To prevent this, a property of the message called ‘visibilityTimeout‘ should be set on the message. This hides the message from all function instances that query the queue for new messages, until this visibilityTimeout has expired again. You would need to set this timeout to a timespan large enough to do your work or extend it before it expires.
QueueTrigger
This is where the QueueTrigger comes in. Methods with this attribute are automatically bound to a storage queue, and the following things are handled for you by the host:
- Queries the queue for new messages on the queue (not reading message with visibilityTimeout > now)
- When no message was found, Return to 1 after a delay.
- Sets the visibilityTimeout of the message to 10 minutes.
- Depending on the signature, deserializes message into object, or creates a QueueMessage with the relevant properties.
- Creates a timer that, every 5 minutes, sets the visibility timeout of this message to 10 minutes again, extending the timeout period.
- Calls your method.
- When your method succeeds, deletes the message.
- When your method throws an exception, and dequeueCount < (default) 5, updates visibilityTimeout to extensions:queues:visibilityTimeout‘ from your configuration (default 0), so that the other instances will pick up this message again,
- When dequeueCount >= (default) 5 then a poison queue is created, and the message is copied there.
Because of this, you don’t have to worry about handling the message or the visibilityTimeout, because the host does that for you. As long as you are processing the message, and don’t throw an exception, you know for sure no other instances will touch the same message, until you’re finished.
Issue
The issue with this automatic handling by the QueueTrigger, is the timer that gets created at step 5. This timer gets a copy of the messageId and the popReceipt and uses that to extend the visibilityTimeout after 5 minutes. However, after that is done, some properties of the QueueMessage are no longer valid:
- The NextVisibleOn property can already hold an expired date, even though the message is still valid.
- The PopReceipt property no longer holds a valid popReceipt, since the message has been changed by the timer, and so cannot be used to change the message anymore. Resulting in a NotFound.
Work-around
If you want to handle your message yourself, and you think your Azure Function will run for more than 5 minutes, you can do the following as a work-around.
- Call await QueueClient.UpdateMessageAsync() immediately at the start of your method.
- You will get a new popReceipt. This will make the popReceipt, the timer uses, invalid, and makes it impossible for the timer to interfere.
- Your function method is now fully responsible for handling and/or extending the message visibilityTimeout in time. If not handled properly there is a risk another function instance will handle the same message, also changing the popReceipt for the message in the process.
So, there is a work-around, but it involves some risks, and you should carefully weigh your options.
You can also choose to fail as early as possible (within 5 min time limit) and in the case the message is not found anymore (after 5 min), just throw an Exception, so that the host can still handle the message for you. That won’t be your custom retry, but it will be a retry, nonetheless.