Polling asynchronous APIs with Azure Durable Functions

I have been building a feature on elmah.io lately that picks up validation results from an external API. Before serverless was a thing, I would probably have done this using a scheduled task or Windows Service using Hangfire or similar. But after having migrated everything to serverless Azure Functions, I wanted a good solution running similarly. Azure Durable Functions turned out as the perfect companion and in this post, I'll show you a possible way to implement polling of an asynchronous API.

Polling asynchronous APIs with Azure Durable Functions

While some APIs are REST-based, serving a response within milliseconds, others require more work before returning a result. This can be implemented in many ways where a callback is probably the optimal one since you get the result as soon as it is ready. Another implementation is through polling where you make an initial request that initiates some backend job. The initial job returns an URL to the finished result or a status URL. The initiating client then needs to poll this endpoint until a result is ready.

For the rest of this post, I'll use a fictional API where a client needs to generate a large zip file:

GEThttps://example.com/generate-large-file

The endpoint returns a body like this:

{
  "status": "started",
  "statusUrl": "https://example.com/generate-large-file/42"
}

To get the status the client needs to call the status endpoint:

GEThttps://example.com/generate-large-file/42

That returns a body while the job is running:

{
  "status": "running"
}

And when done, changes the body to this:

{
  "status": "done",
  "path": "https://example.com/generate-large-file/42/file.zip"
}

When the status changes to done the client can download the zip file from the provided path.

To integrate with this test endpoint, we'll start by creating a new Azure Function through Visual Studio or from the console. Make sure to select Durable Functions Orchestration as part of the creation wizard:

Create new Azure Functions project

This will create a Durable Function project with a default implementation awaiting three tasks. We'll change that in a minute. For some reason, I need to include the following package reference manually to get the project to build:

<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.DurableTask" Version="2.8.1" />

I'm not sure if my environment is lacking something or if there's a bug in the project template.

Before digging into the implementation, let's talk about Durable Functions. A Durable Function is a stateful Azure Function that can be split into multiple smaller functions. All Durable Functions consist of a starter function that initiates the function, an orchestrator function, and zero or more activity functions.

Durable Functions provide different models for implementing well-known scenarios like function chaining and fan out/fan in. Microsoft already provides great documentation there so there's no need to repeat that here. In this example, we want to implement a variant of function chaining where we start by initiating the remote task, waiting a bit, start polling for updates, and finally fetching the generated zip file.

We'll begin with the starter function which in this case will be a scheduled function. This could be based on an HTTP request, a message on Azure Service Bus, or any of the bindings available for Azure Functions. To implement the start function, replace the HttpStart method with the following code:

[FunctionName(nameof(ScheduledStart))]
public static async Task ScheduledStart(
    [TimerTrigger("0 0 */1 * * *")] TimerInfo myTimer,
    [DurableClient] IDurableOrchestrationClient starter,
    ILogger log)
{
    string instanceId = await starter.StartNewAsync(nameof(RunOrchestrator), null);
    log.LogInformation($"Started orchestration with ID = '{instanceId}'.");
}

This will schedule the durable function every hour. If you, like me, have trouble quickly understanding cron triggers (is 0 0 */1 * * * every hour or every day?), we've built the Cron Expression Parser to help with just that.

The code simply tells Azure Functions to run a new function named RunOrchestrator that we will include the body for next:

[FunctionName(nameof(RunOrchestrator))]
public static async Task RunOrchestrator(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
}

The RunOrchestrator function is where the magic will happen. The first thing needed is to call the HTTP endpoint to start the zip-generating task. HTTP requests can be initiated through a special API inside orchestrator functions, but I prefer delegating this to a separate activity function:

var statusUrl = await context.CallActivityAsync<string>(nameof(InitiateRequest), null);

This will trigger a new function named InitiateRequest:

[FunctionName(nameof(InitiateRequest))]
public static async Task<string> InitiateRequest(ILogger log)
{
    using (var client = HttpClientFactory.Create())
    {
        var response = await client.GetAsync("https://example.com/generate-large-file");
        response.EnsureSuccessStatusCode();

        var body = await response.Content.ReadAsAsync<dynamic>();
        return body.resultUrl;
    }
}

The code uses a HttpClient to call the https://example.com/generate-large-file endpoint and return the resultUrl available in the body. For production code, you probably want to look at dependency injection with Azure Functions to inject the HttpClient instead.

The next step is to wait x minutes, depending on how long it as a minimum takes to zip the file. I'll wait 5 minutes in this sample by adding the following to the orchestrator function:

var nowIn5Minutes = context.CurrentUtcDateTime.AddMinutes(5);
await context.CreateTimer(nowIn5Minutes, CancellationToken.None);

The upside of using the orchestrator function to pause the execution, rather than doing a Thread.Sleep is that you won't get charged for these 5 minutes in regards to CPU time.

After five minutes, we will start polling for results. Durable Functions provides an easy way to retry a sub-function/activity that can be configured by adding the following code to the orchestrator:

var retryOptions = new RetryOptions(TimeSpan.FromMinutes(1), 10);
var resultUrl = await context.CallActivityWithRetryAsync<string>(nameof(FetchResult), retryOptions, statusUrl);

This will execute a function named FetchResult and retry it up to 10 times one time per minute. The FetchResult function could look like this:

[FunctionName(nameof(FetchResult))]
public static async Task<string> FetchResult([ActivityTrigger] string statusUrl, ILogger log)
{
    using (var client = HttpClientFactory.Create())
    {
        var response = await client.GetAsync(statusUrl);
        response.EnsureSuccessStatusCode();

        var body = await response.Content.ReadAsAsync<dynamic>();

        if (body?.status == "done")
        {
            return body.path;
        }
        else
        {
            throw new ApplicationException("not finished");
        }
    }
}

Notice how the function accepts the statusUrl provided as the result of the InitiateRequest function. The code should be straightforward. The method requests the status URL and waits for it to have a status of done. If the endpoint returns an unsuccessful status code or contains a status other than done, an exception indicates that this function should be retried in 1 minute. A small wish for the API would be to do this with another hint than throwing an exception. A status other than done is perfectly normal here and shouldn't result in an exception. I've shared more thoughts around this programming style here: C# exception handling best practices.

The final step missing is adding the function that downloads the zipped file. This is added as yet another activity to the orchestrator:

await context.CallActivityAsync(nameof(StoreResult), resultUrl);

And just for the example, the StoreResult body looks like this:

[FunctionName(nameof(StoreResult))]
public static async Task StoreResult([ActivityTrigger] string resultUrl, ILogger log)
{
    // Download the content of resultUrl and put it somewhere
}

I'll leave the implementation of the method up to you.

That's it. We have now developed an hourly Azure Durable Function initiating a request, polling for status, and reacting to a finished job. The code is implemented using an orchestrator function and a set of sub-functions/activities. The upside of this way of writing the code is that it can be executed "serverless" by Azure Functions and we don't need a long-running Scheduled Task or similar.

Here's the full code:

public static class ZipDownloader
{
    [FunctionName(nameof(RunOrchestrator))]
    public static async Task RunOrchestrator(
        [OrchestrationTrigger] IDurableOrchestrationContext context)
    {
        var statusUrl = await context.CallActivityAsync<string>(nameof(InitiateRequest), null);

        var nowIn5Minutes = context.CurrentUtcDateTime.AddMinutes(5);
        await context.CreateTimer(nowIn5Minutes, CancellationToken.None);

        var retryOptions = new RetryOptions(TimeSpan.FromMinutes(1), 10);
        var resultUrl = await context.CallActivityWithRetryAsync<string>(nameof(FetchResult), retryOptions, statusUrl);

        await context.CallActivityAsync(nameof(StoreResult), resultUrl);
    }

    [FunctionName(nameof(StoreResult))]
    public static async Task StoreResult([ActivityTrigger] string resultUrl, ILogger log)
    {
        // Download the content of resultUrl and put it somewhere
    }

    [FunctionName(nameof(InitiateRequest))]
    public static async Task<string> InitiateRequest(ILogger log)
    {
        using (var client = HttpClientFactory.Create())
        {
            var response = await client.GetAsync("https://example.com/generate-large-file");
            response.EnsureSuccessStatusCode();

            var body = await response.Content.ReadAsAsync<dynamic>();
            return body.resultUrl;
        }
    }

    [FunctionName(nameof(FetchResult))]
    public static async Task<string> FetchResult([ActivityTrigger] string statusUrl, ILogger log)
    {
        using (var client = HttpClientFactory.Create())
        {
            var response = await client.GetAsync(statusUrl);
            response.EnsureSuccessStatusCode();

            var body = await response.Content.ReadAsAsync<dynamic>();

            if (body?.status == "done")
            {
                return body.path;
            }
            else
            {
                throw new ApplicationException("not finished");
            }
        }
    }

    [FunctionName(nameof(ScheduledStart))]
    public static async Task ScheduledStart([TimerTrigger("0 0 */1 * * *")] TimerInfo myTimer, [DurableClient] IDurableOrchestrationClient starter, ILogger log)
    {
        string instanceId = await starter.StartNewAsync(nameof(RunOrchestrator), null);
        log.LogInformation($"Started orchestration with ID = '{instanceId}'.");
    }
}

Pro Tips

  • The functions are added to the same file for simplicity. I would recommend creating a class per function.
  • No dependency injection is added to this post. I always declare are Startup.cs file and register all of my dependencies there.
  • When debugging Durable Functions you may wonder why the orchestrator function is hit multiple times. This is how Durable Functions are implemented where the entire body of the method is executed on each iteration. When an activity has already been executed by a previous iteration, the method returns without a new execution on subsequent requests.

elmah.io: Error logging and Uptime Monitoring for your web apps

This blog post is brought to you by elmah.io. elmah.io is error logging, uptime monitoring, deployment tracking, and service heartbeats for your .NET and JavaScript applications. Stop relying on your users to notify you when something is wrong or dig through hundreds of megabytes of log files spread across servers. With elmah.io, we store all of your log messages, notify you through popular channels like email, Slack, and Microsoft Teams, and help you fix errors fast.

See how we can help you monitor your website for crashes Monitor your website