DEV Community

Eelco Los
Eelco Los

Posted on

QueueTriggers & Logic Apps with FastEndpoints

In this blog, I'd like to touch on some final steps regarding migrating to
Minimal Api using FastEndpoints. Those mainly include Azure Storage Queue and
Logic Apps. Logic Apps together with previously blog mentioned Event Grid will
most likely cover most of the scenarios you might have used Azure Functions for.


Azure Storage Queue

Having had a storage queue already in place, we wanted to keep the same
functionality. This meant we needed to implement an endpoint using an azure storage
queue trigger.

[FunctionName(PollFunctionName)]
    public async Task Poll(
            [QueueTrigger("your-queue-name")]
            CustomQueueMessage message)
Enter fullscreen mode Exit fullscreen mode

Azure Functions has a built-in trigger for this, but we needed to implement
this ourselves using FastEndpoints. However, this same trigger wasn't available.

Binding Challenges

The QueueMessage of Azure Table Storage does not have a parameterless
constructor, required for Json Deserialization. It does have a
QueuesModelFactory to instantiate the QueueMessage:
https://github.com/Azure/azure-sdk-for-net/blob/a969ac5f3f57219e420ecdf8ad32fc59d789edd4/sdk/storage/Azure.Storage.Queues/src/Models/QueuesModelFactory.cs#L21-L40:

public static QueueMessage QueueMessage(
            string messageId,
            string popReceipt,
            string messageText,
            long dequeueCount,
            System.DateTimeOffset? nextVisibleOn = default,
            System.DateTimeOffset? insertedOn = default,
            System.DateTimeOffset? expiresOn = default)
        {
            return new QueueMessage()
            {
                MessageId = messageId,
                PopReceipt = popReceipt,
                Body = new BinaryData(messageText),
                DequeueCount = dequeueCount,
                NextVisibleOn = nextVisibleOn,
                InsertedOn = insertedOn,
                ExpiresOn = expiresOn,
            };
        }
Enter fullscreen mode Exit fullscreen mode

I tried to have a custom registered JsonSerializer to then consume incoming
objects normally. However, the incoming message didn't want to accept the
deserialization of this object. Hence, I wrote a Bridging QueueMessage class:

internal class BridgingQueueMessage
{
    public string? MessageId { get; set; }
    public string? PopReceipt { get; set; }
    public string? TimeNextVisible { get; set; }
    public string? InsertionTime { get; set; }
    public string? ExpirationTime { get; set; }
    public string? DequeueCount { get; set; }
    public string? MessageText { get; set; }

    // Parameterless constructor for deserialization
    public BridgingQueueMessage() { }
}
Enter fullscreen mode Exit fullscreen mode

with an internal binder:

internal class InternalObjectQueueBinder() : 
  RequestBinder<AzureStorageQueueRequest>
{
    readonly JsonSerializerOptions serializerOptions = new() { 
        UnmappedMemberHandling = JsonUnmappedMemberHandling.Skip 
    const string logMessage = "Failed to deserialize message: {0}";
    };
    public async override ValueTask<AzureStorageQueueRequest> BindAsync(
        BinderContext ctx, CancellationToken ct)
    {
        using var reader = new StreamReader(ctx.HttpContext.Request.Body);
        var json = await reader.ReadToEndAsync(ct);

        var bridgingMessage = JsonSerializer.Deserialize<BridgingQueueMessage>(
            json, serializerOptions) ?? throw new InvalidOperationException(
                "Deserialization returned null");

        var queueMessage = QueuesModelFactory.QueueMessage(
            bridgingMessage.MessageId ?? string.Empty,
            bridgingMessage.PopReceipt ?? string.Empty,
            bridgingMessage.MessageText ?? string.Empty,
            long.Parse(bridgingMessage.DequeueCount!),
            DateTime.Parse(bridgingMessage.TimeNextVisible!),
            DateTime.Parse(bridgingMessage.InsertionTime!),
            DateTime.Parse(bridgingMessage.ExpirationTime!));

        InternalObject? message = null;
        try
        {
            message = JsonSerializer.Deserialize<InternalObject>(
                bridgingMessage.MessageText!, serializerOptions);
        }
        catch (JsonException)
        {
            var logger = 
              ctx.HttpContext.Resolve<ILogger<InternalObjectQueueBinder>>();

            logger.LogWarning(logMessage, queueMessage.MessageText);
            return new AzureStorageQueueRequest(null!, null!);
        }

        return new AzureStorageQueueRequest(message!, request!);
    }
}
Enter fullscreen mode Exit fullscreen mode

This made it possible to bind the incoming message to the InternalObject
class.
if the json deserialization fails, the message is logged and the request is
returned with a null message.

This way, you have your validator check on the null message, and not have an internal server error.
To deal with the error, we can use the logic app to move the message to a poison queue.


Logic Apps

To have queue storage trigger the endpoint, we decided to get one final item in
the loop: Logic Apps.

Separating Logic App Designer Code from Bicep

When working with Logic Apps and Bicep, it's beneficial to separate the workflow
definition from the Bicep file. This improves maintainability and
parameterization.

We achieve this by using json(loadTextContent()) to reference a separate JSON
file that contains the Logic App designer code. This allows for:

  • Easier updates to workflow logic without modifying the Bicep file
  • Improved parameterization by passing values dynamically
  • Keeping the Bicep file focused on deployment while maintaining readability

To allow Logic Apps to read from the Azure Storage Queue, we need to assign it
the Storage Queue Data Reader role.

This ensures that the Logic App has the necessary permissions to listen for
messages on the queue without granting excessive access.

param endpointurl string
param environment string
param name string
param storageaccountname string
param queueName string
param B2CAudience string
param location string

var logic_raw_name = 'logic-${name}-${environment}'
var connections_azurequeues_1_name = 'conn-${name}-queue-${environment}'

var workflowDefinition = json(loadTextContent('schemas/logicapp-workflow.json'))

resource storage 'Microsoft.Storage/storageAccounts@2023-05-01' existing = {
  name: storageaccountname
}

resource apiConnection 'Microsoft.Web/connections@2016-06-01' = {
  name: connections_azurequeues_1_name
  location: location
  properties: {
    displayName: connections_azurequeues_1_name
    api: {
      id: subscriptionResourceId('Microsoft.Web/locations/managedApis', location, 'azurequeues')
    }
    parameterValues: {
      storageaccount: storageaccountname
      sharedkey: storage.listKeys().keys[0].value
    }
  }
}

resource workflows_logic_test_name_resource 'Microsoft.Logic/workflows@2017-07-01' = {
  name: logic_raw_name
  location: location
  identity: {
    type: 'SystemAssigned'
  }
  properties: {
    state: 'Enabled'
    definition: workflowDefinition
    parameters: {
      '$connections': {
        value: {
          azurequeues: {
            connectionId: apiConnection.id
            connectionName: connections_azurequeues_1_name
            id: subscriptionResourceId('Microsoft.Web/locations/managedApis', location, 'azurequeues')
          }
        }
      }
      storageAccountName: {
        value: storageaccountname
      }
      queueName: {
        value: queueName
      }
      poisonQueueName: {
        value: '${queueName}-poison'
      }
      endpointUrl: {
        value: endpointurl
      }
      audience: {
        value: B2CAudience
      }
    }
  }
}

resource queueReaderRoleAssignment 'Microsoft.Authorization/roleAssignments@2020-04-01-preview' = {
  name: guid(storage.id, 'Queue Reader Role Assignment')
  scope: storage
  properties: {
    roleDefinitionId: subscriptionResourceId(
      'Microsoft.Authorization/roleDefinitions',
      '974c5e8b-45b9-4653-ba55-5f855dd0fb88'
    ) // Queue Reader role
    principalId: workflows_logic_test_name_resource.identity.principalId
  }
}
Enter fullscreen mode Exit fullscreen mode

the logicapp-workflow.json file contains the Logic App designer code:

{
  "$schema": "https://schema.management.azure.com/providers/Microsoft.Logic/schemas/2016-06-01/workflowdefinition.json#",
  "contentVersion": "1.0.0.0",
  "parameters": {
    "$connections": {
      "type": "Object",
      "defaultValue": {}
    },
    "storageAccountName": {
      "type": "string"
    },
    "queueName": {
      "type": "string"
    },
    "poisonQueueName": {
      "type": "string",
      "defaultValue": "[concat(parameters('queueName'), '-poison')]"
    },
    "endpointUrl": {
      "type": "string"
    },
    "audience": {
      "type": "string"
    }
  },
  "triggers": {
    "When_there_are_messages_in_a_queue_(V2)": {
      "recurrence": {
        "interval": 3,
        "frequency": "Minute"
      },
      "evaluatedRecurrence": {
        "interval": 3,
        "frequency": "Minute"
      },
      "splitOn": "@triggerBody()?['QueueMessagesList']?['QueueMessage']",
      "type": "ApiConnection",
      "inputs": {
        "host": {
          "connection": {
            "name": "@parameters('$connections')['azurequeues']['connectionId']"
          }
        },
        "method": "get",
        "path": "/v2/storageAccounts/@{encodeURIComponent(encodeURIComponent(parameters('storageAccountName')))}/queues/@{encodeURIComponent(parameters('queueName'))}/message_trigger"
      }
    }
  },
  "actions": {
    "HTTP": {
      "runAfter": {},
      "type": "Http",
      "inputs": {
        "uri": "@parameters('endpointUrl')",
        "method": "POST",
        "body": "@triggerBody()",
        "authentication": {
          "type": "ManagedServiceIdentity",
          "audience": "@parameters('audience')"
        }
      },
      "runtimeConfiguration": {
        "contentTransfer": {
          "transferMode": "Chunked"
        }
      }
    },
    "Condition": {
      "actions": {
        "Delete_message_(V2)": {
          "type": "ApiConnection",
          "inputs": {
            "host": {
              "connection": {
                "name": "@parameters('$connections')['azurequeues']['connectionId']"
              }
            },
            "method": "delete",
            "path": "/v2/storageAccounts/@{encodeURIComponent(encodeURIComponent(parameters('storageAccountName')))}/queues/@{encodeURIComponent(parameters('queueName'))}/messages/@{encodeURIComponent(triggerBody()?['MessageId'])}",
            "queries": {
              "popreceipt": "@triggerBody()?['PopReceipt']"
            }
          }
        }
      },
      "runAfter": {
        "HTTP": ["Succeeded", "Failed"]
      },
      "else": {
        "actions": {
          "Condition_1": {
            "actions": {
              "Put_a_message_on_a_queue_(V2)": {
                "type": "ApiConnection",
                "inputs": {
                  "host": {
                    "connection": {
                      "name": "@parameters('$connections')['azurequeues']['connectionId']"
                    }
                  },
                  "method": "post",
                  "body": "MessageId: @{triggerBody()?['MessageId']}, MessageText: @{triggerBody()?['MessageText']}, resultcode: @{outputs('HTTP')?['statusCode']}, resultbody:@{body('HTTP')}",
                  "path": "/v2/storageAccounts/@{encodeURIComponent(encodeURIComponent(parameters('storageAccountName')))}/queues/@{encodeURIComponent(parameters('poisonQueueName'))}/messages"
                }
              },
              "Delete_message_(V2)_1": {
                "runAfter": {
                  "Put_a_message_on_a_queue_(V2)": ["Succeeded"]
                },
                "type": "ApiConnection",
                "inputs": {
                  "host": {
                    "connection": {
                      "name": "@parameters('$connections')['azurequeues']['connectionId']"
                    }
                  },
                  "method": "delete",
                  "path": "/v2/storageAccounts/@{encodeURIComponent(encodeURIComponent(parameters('storageAccountName')))}/queues/@{encodeURIComponent(parameters('queueName'))}/messages/@{encodeURIComponent(triggerBody()?['MessageId'])}",
                  "queries": {
                    "popreceipt": "@triggerBody()?['PopReceipt']"
                  }
                }
              }
            },
            "else": {
              "actions": {}
            },
            "expression": {
              "and": [
                {
                  "not": {
                    "equals": ["@outputs('HTTP')?['statusCode']", 401]
                  }
                }
              ]
            },
            "type": "If"
          }
        }
      },
      "expression": {
        "or": [
          {
            "equals": ["@outputs('HTTP')?['statusCode']", 200]
          },
          {
            "equals": ["@outputs('HTTP')?['statusCode']", 204]
          }
        ]
      },
      "type": "If"
    }
  },
  "outputs": {}
}
Enter fullscreen mode Exit fullscreen mode

Authentication Handler

To have the storage queue trigger the endpoint, we needed to implement an
B2C OpenIdJwt authentication handler.
The OpenIdJwtAuthenticationHandler implements authentication using JWT
bearer tokens
, validating against an OpenID metadata endpoint.

public class OpenIdJwtAuthenticationHandler(
    IOptionsMonitor<JwtBearerOptions> options,
    ILoggerFactory logger,
    UrlEncoder encoder) : AuthenticationHandler<JwtBearerOptions>(options, 
                                                                  logger,
                                                                  encoder)
{
    public const string SchemeName = "OpenIdJwtAuthenticationHandler";
    protected override Task<AuthenticateResult> HandleAuthenticateAsync()
    {
        var token = Request.Headers.Authorization
          .FirstOrDefault()?.Split(" ").Last();
        if (string.IsNullOrEmpty(token))
        {
            return Task.FromResult(
              AuthenticateResult.Fail("Token is not present."));
        }

        // Validate the token using JwtBearerOptions
        var tokenHandler = new JwtSecurityTokenHandler();
        try
        {
            Context.User = tokenHandler.ValidateToken(token, 
              Options.TokenValidationParameters, out var validatedToken);
        }
        catch (Exception ex)
        {
          return Task.FromResult(
            AuthenticateResult
              .Fail($"Token validation failed: {ex.Message}"));
        }

        var ticket = new AuthenticationTicket(Context.User, Scheme.Name);
        return Task.FromResult(AuthenticateResult.Success(ticket));
    }
}

public class ConfigureCustomAuthOptions
  (IConfiguration configuration,
  ConfigurationManager<OpenIdConnectConfiguration> configurationManager) 
    : IConfigureNamedOptions<JwtBearerOptions>
{
    public void Configure(JwtBearerOptions options)
    {
        var validIssuers = 
          configuration.GetSection("CustomDomain:ValidIssuers").Get<string[]>()
          ?? throw new InvalidOperationException(
            "ValidIssuers configuration is missing");

        options.TokenValidationParameters = new TokenValidationParameters
        {
            ValidateIssuer = true,
            ValidIssuers = validIssuers,
            ValidateAudience = true,
            ValidAudience = configuration["AzureAd:Audience"],
            ValidateLifetime = true,
            ClockSkew = TimeSpan.Zero,
            IssuerSigningKeys = configurationManager
              .GetConfigurationAsync().GetAwaiter().GetResult().SigningKeys
        };
    }

    public void Configure(string? name, JwtBearerOptions options) => 
      Configure(options);
}
Enter fullscreen mode Exit fullscreen mode

And then register the authentication handler in the Program.cs:

builder.Services.AddAuthentication(options =>
    {
        options.DefaultScheme = defaultSchemeName;
        options.DefaultChallengeScheme = defaultSchemeName;
    })
    .AddScheme<JwtBearerOptions, OpenIdJwtAuthenticationHandler>
      (OpenIdJwtAuthenticationHandler.SchemeName, options =>
    {
        var serviceProvider = builder.Services.BuildServiceProvider();
        var configManager = new ConfigurationManager<OpenIdConnectConfiguration>(
    $"https://login.microsoftonline.com/{builder.Configuration["AzureAd:TenantId"]}/v2.0/.well-known/openid-configuration",
    new OpenIdConnectConfigurationRetriever());
        options.ConfigurationManager = configManager;
        var customauthoptions = new ConfigureCustomAuthOptions(
          serviceProvider.GetRequiredService<IConfiguration>(), configManager);
        customauthoptions.Configure(options);
    });
Enter fullscreen mode Exit fullscreen mode

This might lead into a situation where you have multiple bearer tokens in use.

Multiple Bearer Usage

At this point, you might have multiple sources requiring Bearer
authentication. These could include:

  • Logic Apps using OpenID JWT authentication
  • Other services using custom tokens
  • Default Bearer token authentication

To handle this, we use a Policy Scheme that dynamically selects the
authentication scheme based on the request path:

  • Requests to /custom → Use CustomToken. This is an arbitrary
  • Requests to /storagequeue → Use OpenIdJwtAuthenticationHandler
  • All others → Use standard Bearer authentication

This allows for multiple authentication mechanisms within the same application
while keeping things flexible and maintainable.

var defaultSchemeName = 
  $"{Constants.Bearer} or {CustomToken.TokenId} or {OpenIdJwtAuthenticationHandler.SchemeName}";
builder.Services.AddAuthentication(options =>
    {
        options.DefaultScheme = defaultSchemeName;
        options.DefaultChallengeScheme = defaultSchemeName;
    })
    .AddPolicyScheme(defaultSchemeName, defaultSchemeName, options =>
    {
        options.ForwardDefaultSelector = context => context.Request.Path.Value switch
        {
            var p when p?.StartsWith("/custom") == true => CustomToken.TokenId,
            var p when p?.StartsWith("/storagequeue") == true =>  
              OpenIdJwtAuthenticationHandler.SchemeName,
            _ => Constants.Bearer
        };
    })
Enter fullscreen mode Exit fullscreen mode

Conclusion

In this series, I stated to migrate from azure functions.
To summarize, we have covered the following topics:

  • Migrating Azure Http calls
    Here, we discussed the use of Azure Functions for HTTP calls and how to migrate, why having control over various stages will keep your code clean and maintainable.

  • Migrating Azure Event Grid
    We discussed the use of webhooks and ModelBinding to handle any incoming data to be transformed into valid objects.

  • This article, Migrating Azure Storage Queue and Logic Apps
    Finally we discussed the use of Azure Storage Queue and Logic Apps to handle the queue triggers and how to authenticate the incoming requests.

But some might argue:
If you are providing all these blogs for migrating away from azure functions, is
there use for azure functions?

Is there use for Azure Functions?

Yes, Azure Functions still have their place in the cloud architecture ecosystem, particularly in these situations:

  • Early-stage development and prototyping: When you're just starting a project and aren't sure about long-term requirements, Azure Functions provide a quick way to get functionality up and running without extensive infrastructure setup.
  • Non-critical business processes: For auxiliary processes that aren't part of your core business critical path, Functions offer simplicity and cost-effectiveness.
  • Lightweight event processing: For simple event-driven tasks that don't require complex business logic or need to scale independently.

And allthough there are other cases which could be handled by Azure Functions, I would say that using FastEndpoints with Minimal API is a better choice for most of the scenarios.

While our series focused on migration paths away from Azure Functions for more complex scenarios, they remain a valuable tool in the right contexts. The key is understanding when the simplicity and managed nature of Functions is advantageous versus when your application would benefit from the greater control, testing capabilities, and flexibility of a solution like Minimal APIs with FastEndpoints.

Top comments (0)