Sin categoría

Azure Durable Functions

Durable functions are an extension of Azure Functions that allows Azure Functions to be stateful which mean it allows us to keep track of the context of the application even after the function finished the execution or stopped for some reason allowing you to create workflows easily.

No alt text provided for this image


Durable function uses the Durable Task framework which

is a library that allows users to write long running persistent workflows (referred to as orchestrations) in C# using simple async/await coding constructs.

Durable Functions uses a concept called “checkpointing” to keep track of the execution state of a long-running workflow or activity function. Checkpointing allows Durable Functions to know where it left off in the code in case of a failure or interruption, so that it can resume execution from the correct point.

Here’s a simplified overview of how checkpointing works in Durable Functions:

  1. When an orchestration function or activity function is called, Durable Functions creates an “execution context” for that function, which includes information about the current execution state.
  2. As the function executes, Durable Functions periodically takes a “checkpoint” of the execution state, which includes information such as the function input, output, and any local variables or objects that were created during execution.
  3. Checkpoints are stored in durable storage (such as Azure Storage or Azure Cosmos DB) as a “history” of the execution state.
  4. If the function is interrupted or fails, Durable Functions can use the stored checkpoints to determine where the function left off, and can resume execution from that point.
  5. If necessary, Durable Functions can also create a new instance of the function (using the stored input data) and resume execution from an earlier checkpoint.

Overall, checkpointing is a key mechanism that allows Durable Functions to provide reliable, fault-tolerant execution of long-running workflows and activities, by ensuring that execution can be resumed from the correct point after a failure or interruption

The frequency of checkpoints in Durable Functions can be configured using the DurableTask settings. By default, Durable Functions takes a checkpoint every 5 seconds or 100 “orchestrator actions” (whichever comes first).

An orchestrator action is any operation that modifies the state of the orchestrator function. This can include starting a new activity function, waiting for a timer or external event, or calling other orchestrator functions. The exact definition of an orchestrator action can depend on the specific version of the Durable Functions runtime being used.

The checkpoint interval can be adjusted by setting the DurableTaskHubSettings properties when initializing the IDurableOrchestrationClient. For example, to increase the checkpoint interval to 10 seconds, you can set the DurableTaskHubSettings.TrackingStorePartitionSize property to 10000 and DurableTaskHubSettings.TrackingStoreScanInterval property to 10000ms (10 seconds).

Durable Functions Main parts.

The main parts of Durable Functions are:

  1. Durable Orchestration Functions: These are the main building blocks of a Durable Function application. They are responsible for coordinating and managing the state of workflows that span multiple function executions, as well as handling input and output parameters, error handling, and timeouts.
  2. Durable Entities: Durable Entities are a type of stateful object that can be used within a Durable Function application to manage shared state between different instances of a function. They can be used to implement scenarios such as counters, caches, and database connections.
  3. Durable Client Functions: These functions are used to interact with the orchestration and entity functions from outside of the Durable Function application. They provide a simple API for starting new workflows, querying the status of existing workflows, and sending messages to entities.
  4. Durable Timer Functions: These functions provide a way to trigger a workflow at a specified time or interval. They can be used to implement periodic tasks, reminders, and other time-based scenarios.
  5. Durable Activities: Activities are a crucial part of the Durable Functions framework and are responsible for performing the actual work or tasks that are part of the workflow. They are executed by the Durable Function runtime and can be implemented as standard Azure Functions or other types of functions that can be executed within the context of the Durable Function application. They are invoked by the Durable Orchestration Function and can receive input and return output as needed.

In summary, the main parts of Durable Functions are Durable Orchestration Functions, Durable Entities, Durable Client Functions, Durable Timer Functions, and Durable Functions Activities. These components work together to provide a powerful framework for building reliable and scalable serverless applications on Microsoft Azure.

Application patterns

source: https://learn.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-overview?tabs=csharp-inproc

The primary use case for Durable Functions is simplifying complex, stateful coordination requirements in serverless applications. The following sections describe typical application patterns that can benefit from Durable Functions

Pattern #1: Function chaining

In the function chaining pattern, a sequence of functions executes in a specific order. In this pattern, the output of one function is applied to the input of another function.

No alt text provided for this image

Pattern #2: Fan out/fan in

In the fan out/fan in pattern, you execute multiple functions in parallel and then wait for all functions to finish. Often, some aggregation work is done on the results that are returned from the functions.

No alt text provided for this image

Pattern #3: Async HTTP APIs

The async HTTP API pattern addresses the problem of coordinating the state of long-running operations with external clients. A common way to implement this pattern is by having an HTTP endpoint trigger the long-running action. Then, redirect the client to a status endpoint that the client polls to learn when the operation is finished.

Durable Functions provides built-in support for this pattern, simplifying or even removing the code you need to write to interact with long-running function executions.

Because the Durable Functions runtime manages state for you, you don’t need to implement your own status-tracking mechanism.

The Durable Functions extension exposes built-in HTTP APIs that manage long-running orchestrations.

No alt text provided for this image

Pattern #4: Monitor

The monitor pattern refers to a flexible, recurring process in a workflow. An example is polling until specific conditions are met. You can use a regular timer trigger to address a basic scenario, such as a periodic cleanup job, but its interval is static and managing instance lifetimes becomes complex. You can use Durable Functions to create flexible recurrence intervals, manage task lifetimes, and create multiple monitor processes from a single orchestration.

No alt text provided for this image

Pattern #5: Human interaction

Many automated processes involve some kind of human interaction. Involving humans in an automated process is tricky because people aren’t as highly available and as responsive as cloud services. An automated process might allow for this interaction by using timeouts and compensation logic.

No alt text provided for this image

Pattern #6: Aggregator (stateful entities)

The sixth pattern is about aggregating event data over a period of time into a single, addressable entity. In this pattern, the data being aggregated may come from multiple sources, may be delivered in batches, or may be scattered over long-periods of time. The aggregator might need to take action on event data as it arrives, and external clients may need to query the aggregated data.

The tricky thing about trying to implement this pattern with normal, stateless functions is that concurrency control becomes a huge challenge. Not only do you need to worry about multiple threads modifying the same data at the same time, you also need to worry about ensuring that the aggregator only runs on a single VM at a time.

You can use Durable entities to easily implement this pattern as a single function.

Case escenario following the Human Interaction pattern.

Suppose there’s an application that detects cars that exceed the speed limit and captures their license plate. The captured license plate is then sent to a function called the Speed Violation Function.

  1. The Speed Violation Function checks the accuracy percentage of the captured license plate. If the accuracy percentage is high enough, the function saves the license plate information to a database.
  2. If the accuracy percentage is low, the function sends the image to a system (e.g., Slack API) and waits for a human to recognize the license plate in the image. Once the human recognizes the license plate, they send the response back to the function.
  3. If the human doesn’t respond within a certain amount of time, the function saves the information in the database as “not able to recognize”.

In summary, the application checks the accuracy of the captured license plate and saves it to a database if the accuracy is high enough. If the accuracy is low, a human is asked to recognize the license plate and the information is saved once it’s recognized.

Example:

Tools:

  • Azure Account
  • Visual Studio
  • Postman
  • Microsoft Azure StorageExplorer
  • Docker to install Azurita

1.- Create directory to store database

/Users/dmata/Documents/azuriteStorage

2.-donwload azureita image

docker run -p 10000:10000 -p 10001:10001 -p 10002:10002 

-v /Users/dmata/Documents/azuriteStorage

mcr.microsoft.com/azure-storage/azurite\

3.- Run the image

docker run -p 10000:10000 -p 10001:10001 -p 10002:10002 -v '/Users/dmata/Documents/azuriteStorage' mcr.microsoft.com/azure-storage/azurite

Create the Azure Function application

Add the Microsoft.Azure.WebJobs.Extensions.DurableTask library to the project.

Add local.settings.json to main directory in Azure Function App.

This configuration will ensure that the application uses the local storage to run the application.

{
"IsEncrypted": false,

"Values": {

"AzureWebJobsStorage": "UseDevelopmentStorage=true",

"FUNCTIONS_WORKER_RUNTIME": "dotnet"

}

}

Optional if you need to add Dependency Injection

Install this 2 packages

using Microsoft.Azure.Functions.Extensions.DependencyInjection

using Microsoft.Extensions.DependencyInjection;

using Microsoft.Azure.Functions.Extensions;

Add Startup class

[assembly: FunctionsStartup(typeof(DurableFunctionTacit.Startup))]


namespace DurableFunctionTacit

{

public class Startup : FunctionsStartup

{

public override void Configure(IFunctionsHostBuilder builder)

{


builder.Services.AddScoped<IMessageSender,SlackMessageSenderStrategy>();


}

}

};

1.-DurableFunctionTacit

The “DurableFunctionTacit” function is triggered by an HTTP request and takes in three parameters: an HTTP request, an instance of IDurableOrchestrationClient (which is used to interact with the Durable Functions runtime), and an instance of ILogger (which is used for logging).

public static class DurableFunctionTaci

{

[FunctionName("DurableFunctionTacit")]

public static async Task<IActionResult> Run(

[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = null)] HttpRequest req,

[DurableClient]IDurableOrchestrationClient durableClient,

ILogger log)

{

log.LogError("DurableFunctionTacit triggered by http");




//#1 Deserialize request to SpeedVilation model

string requestBody = await new StreamReader(req.Body).ReadToEndAsync();

var speedViolation = JsonConvert.DeserializeObject<SpeedViolation>(requestBody);




//#2 Check if Recognition Acurracy is below the stablished threhold

if(speedViolation.AccuracyRecognition < 0.5)

{

log.LogError("Speed Violation AcurracyRecognition below threhold, sending to Manual recognition");

var instaceId = await durableClient.StartNewAsync(nameof(ManualOrchastrationFunction.ManualApproval), speedViolation);

return durableClient.CreateCheckStatusResponse(req, instaceId);

}


log.LogError("Speed Violation AcurracyRecognition above threhold, saving record in database");

//#3 Save the speed violation in the database

await SaveSpeedViolationRecord(speedViolation);

return new OkObjectResult(speedViolation);

}


private static Task SaveSpeedViolationRecord(SpeedViolation speedViolation)

{

return Task.CompletedTask;

}

}t

The function performs the following actions:

  1. Deserializes the HTTP request body into a SpeedViolation model object.
  2. Checks if the AccuracyRecognition property of the SpeedViolation object is below a certain threshold (0.5 in this case). If it is, the function starts a new instance of the “ManualOrchastrationFunction.ManualApproval” orchestration function using the IDurableOrchestrationClient object and passes in the SpeedViolation object as a parameter. The function then returns a response that includes a status check URL for the new instance of the orchestration function.
  3. If the AccuracyRecognition property is above the threshold, the function saves the SpeedViolation object to a database. The function then returns an HTTP response with the saved SpeedViolation object as the response body.

2.- ManualOrchastrationFunction

The function contains a single function called ManualApproval that is an orchestration function triggered by an external event in the IDurableOrchestrationContext object.

namespace DurableFunctionTacit.Functions

{

public class ManualOrchastrationFunction

{

[FunctionName(nameof(ManualApproval))]

public async Task<bool> ManualApproval(

[OrchestrationTrigger] IDurableOrchestrationContext context, ILogger logger)

{

logger.LogError("Entering Manual Approval");

var speedViolation = context.GetInput<SpeedViolation>();




logger.LogError("Creating cancelation token");

var cancelationToken = new CancellationTokenSource();

var expirationTime = context.CurrentUtcDateTime.AddSeconds(10);

var timerTask = context.CreateTimer(expirationTime, cancelationToken.Token);




//### Here we make the api call to the system that will manage the approval by the user.




var messageSenderRequest = new SlackMessageRequest

{

DurableInstanceId = context.InstanceId

};




await context.CallActivityAsync(nameof(SendApprovalActivity.SendApprovalMessage), messageSenderRequest);




//creating task that will listen for the event to be triggered

var messageResponseTask = context.WaitForExternalEvent<SlackMessageSenderResponse>("externalEvent");




//wait for any task to finish first

logger.LogError("Wait for user to respond or timer to finish");

var winer = await Task.WhenAny(messageResponseTask, timerTask);




if (winer == timerTask && timerTask.IsCompleted)

{

logger.LogError("User didn't repond, save in database as unable to recognize");

return false;

}




logger.LogError("User completed the process");

var userRespons = messageResponseTask.Result;

if (userRespons.Approved)

{

logger.LogError("User approved the license recognition");

}

else if (!string.IsNullOrEmpty(userRespons.NewLicensePlate))

{

logger.LogError("User provided a new license plate");

}

//### ends approval user

return true;

}

}

};

The function first gets the input SpeedViolation object from the IDurableOrchestrationContext. Then, it creates a cancellation token and a timer to wait for a maximum of 10 seconds for the user to respond. It then calls an activity function named SendApprovalMessage to send a message to an external system requesting approval from a user. It listens for an external event named externalEvent, which is triggered when a response is received from the user. It then waits for either the external event to occur or the timer to expire.

If the timer expires before receiving a response from the user, the function saves the record as “unable to recognize” and returns false. If the external event occurs before the timer expires, the function checks whether the user approved the license recognition or provided a new license plate and logs the result accordingly. Finally, the function returns true.

3.- SendApprovalActivity

This activity is triggered by the ActivityTrigger attribute on the SendApprovalMessage method. The method accepts a SlackMessageRequest object and an ILogger object as parameters. The IMessageSender interface is injected into the class constructor.

namespace DurableFunctionTacit.Activities

{

public class SendApprovalActivity

{

private readonly IMessageSender _messageSender;




public SendApprovalActivity(IMessageSender messageSender)

{

_messageSender = messageSender;

}




[FunctionName(nameof(SendApprovalActivity.SendApprovalMessage))]

public async Task<MessageSenderResponseBase> SendApprovalMessage([ActivityTrigger] SlackMessageRequest messageSenderRequestBase, ILogger logger)

{

var response = await _messageSender.SendMessage(messageSenderRequestBase);




return response;

}

}

};

The purpose of this activity is to send an approval message to a Slack channel. The approval message contains information about a speed violation detected by the system and asks the user to approve or reject the detected license plate. The SendApprovalMessage method calls the SendMessage method of the injected IMessageSender object, passing in the SlackMessageRequest object as a parameter. The SendMessage method returns a MessageSenderResponseBase object, which is then returned by the SendApprovalMessage method.

Note that the implementation details of the IMessageSender interface are not shown in this code snippet.

4.- SlackResponseActivity

The request body is expected to be a JSON object representing a Slack message sender response, which is deserialized into a SlackMessageSenderResponse object.

The function then uses the IDurableClient interface to raise an external event with the response object. The event name is “externalEvent” and is used in the ManualApproval function to listen for the user’s response.

namespace DurableFunctionTacit.Functions

{

public static class SlackResponseActivity

{

[FunctionName("SlackResponseActivity")]

public static async Task<IActionResult> Run(

[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = null)] HttpRequest req,

[DurableClient]IDurableClient durableClient,

ILogger log)

{

log.LogInformation("User Send Response ");




string requestBody = await new StreamReader(req.Body).ReadToEndAsync();

var userResponse = JsonConvert.DeserializeObject<SlackMessageSenderResponse>(requestBody);




if(userResponse is null)

{

return new BadRequestObjectResult("Error on User response");

}




await durableClient.RaiseEventAsync(userResponse.DurableInstanceId, "externalEvent", userResponse);




return new OkObjectResult(userResponse);

}

}

};

In Durable Functions, you can raise an event to the running instance using its instance ID. To do so, you first need to obtain an instance of the IDurableOrchestrationContext in your code. Once you have this instance, you can call the RaiseEventAsync method, passing the instance ID and the event name as parameters. Any additional data you want to pass along with the event can be included in an optional parameter.

In the code snippet provided, the SlackResponseActivity function is raising an external event with the name “externalEvent” using the instance ID obtained from the DurableMessageSenderResponse. This event is raised when the user responds to the approval message sent in the ManualApproval orchestration function.

If the timer hasn’t finished yet, the durable function instance will continue to wait for the external event to be triggered, even if the orchestration function has already completed execution. Once the external event is raised, the ManualApproval function can resume execution, and the result of the user’s response can be processed accordingly.

In the ManualApproval function, the code uses the Task.WhenAny method to wait for either the external event or the timer to complete, whichever comes first. If the timer expires before the user responds, the function returns false and saves the result in the database as unable to recognize. If the user responds before the timer expires, the function resumes execution and processes the user’s response accordingly.

To download the complete code visit my github repository:

https://github.com/matvi/azure-DurableFunctionsWithStrategiesPattern.git

Leave a comment