Implementing & Automating Azure Stream Analytics Pipeline


clouds-dawn-dusk-210186In our last article, we set out to build a resilient architecture around streaming events.

In this article, we are going to build the solution.  We are going to use an ARM template which automates deployment.  We will also go through the configuration of different services.

The goal of the architecture was to allow a fast pace stream of events with a shape akin to


{

  "widgetId" : 42

}

to be simply aggregated at constant intervals (say once a minute).  We wanted the solution to be resilient to different service failures.

We ended up with the following architecture:

image

We have two Azure Event Hubs as ingestion mechanism.  One in the primary region of the workload, one in a secondary region.  The fallback mechanism is assumed by the source, i.e. the agent pushing the events to event hubs.

Events are captured in blob storage.

Events are then processed by Azure Stream Analytics.  It computes aggregation and pushes aggregation events into a third event hub.

Finally, an Azure Logic App consumes the summary events and update a SQL database.

We recommend reading the previous article to understand each aspect of the architecture.

ARM Template

The ARM Template to deploy the solution is available on GitHub.  We can also deploy it from the following button:



The template takes a few minutes to deploy.  It is fully functional after deployment, i.e. we aren’t required to run additional scripts.

The template has the following input parameters:

Name Description
Namespace prefix Prefix for Event Hub namespaces.

There is a primary and secondary namespaces.  They must have globally unique names.  This parameter controls their names.

Storage Account Name Name for the storage account where the events are stored.
SQL Admin Password Password for the SQL Server.

The template also outputs three fields:

Name Description
Primary Telemetry Send Connection String Connection string to the telemetry connection string.

This is using the “send authorization rule” which allows to send events to the hub.

Secondary Telemetry Send Connection String Similar to previous but for secondary telemetry hub.
SQL Summary Connection String This is an ADO.NET connection string to the SQL database.

Outputs are meant to be used to interact with the solution.  In this article, we’ll only use the SQL Database to monitor table updates.

Micro Service

The first thing we notice is that there are 13 resources deployed.

This might seem a lot for something that is clearly only a part of a solution.

image

4 of those resources (those framed in red) are only there for demo purposes.  Those resources wouldn’t be deployed in a production environment.  Also, SQL server and DB (in orange) presumably would be pre-exist the streaming solution.

This means the streaming solution really contain eight (8) services.  We would recommend framing those services as a separate resource group.  We would consider the streaming solution as a Micro Service:

image

The micro service boundaries would have 2 inputs:  the telemetry event hub connection.  It would also have an output:  the summary event hub connection.

The solution around it shouldn’t interact with the details of the micro service.  It hence make sense to package and treat the sub-solution as a micro service.

Event Hubs

Let’s look at the event hubs.

There are three hubs:

  • Primary telemetry
  • Secondary telemetry
  • Primary summary

The first two are part of the same namespace.  Let’s start with that one.  This is the namespace suffixed by “-primary”:

image

Let’s first look at the Shared access policies of the namespace.  Only the default root admin one is available.

image

We define access at the hub level as opposed to namespace level.  We do not share the root admin key.  Access is granular, i.e. send or listen, at the event hub level.  This follows the principle of least privilege.

Now, let’s look at the event hubs

image

Lets look at the telemetry hub.

image

We can see the capture has been configured.

image

We see that we defined a different policy for send and listen action.  This way if a key is compromised, the damage that can be done is limited.

It is also interesting to notice that the ARM template doesn’t explicitly define the keys.  It simply defines the policy:


{
  "type": "authorizationRules",
  "apiVersion": "2017-04-01",
  "name": "listenTelemetryRule",
  "dependsOn": [
    "[variables('Primary Telemetry Hub Id')]"
  ],
  "properties": {
    "rights": [
      "Listen"
    ]
  }
}

and when the key is needed, the following syntax is used:


listKeys(variables('Primary Telemetry Listen Rule Id'), '2017-04-01').primaryKey

This has a definite security advantage.  ARM templates are typically stored in widely available internal source control.  Here the keys aren’t available in the template.  We therefore avoid storing secret.

Similar observations can be made for the secondary namespace.

The secondary namespace is deployed in the same region as the primary namespace.  This is done to simplify the number of parameters to the template.  It is trivial to modify the template to deploy it to a secondary region.

Stream Analytics

Let’s look at the stream analytics job:

image

The meat of the configuration is in the middle of the overview pane:image

We see that we have 2 inputs and 1 output.  The query on the right transforms the 2 inputs and feeds the output.

Let’s look at the primary-telemetry configuration:

image

The input is bound to the primary namespace / telemetry event hub.

The other input and the output is similarly configured.  Something specific about the output is that we specify the format:

image

Here we choose array, as opposed to line separated.  Basically array formats the output as a normal JSON array.  Line separated simply output events as JSON document with a line separator.  We found it easier to process a legal JSON array.

The query is interesting:


SELECT
COUNT(*) AS Count,
widgetId
FROM [primary-telemetry]
TIMESTAMP BY createdAt
GROUP BY widgetId, TumblingWindow(second, 5)
UNION
SELECT
COUNT(*) AS Count,
widgetId
FROM [secondary-telemetry]
TIMESTAMP BY createdAt
GROUP BY widgetId, TumblingWindow(second, 5)

We use the input / output aliases in the query.

Here we do a union of the primary and secondary telemetry hub using the UNION operation.  Similarly to what UNION does in T-SQL, UNION here simply append events from one source and the other.

Specific the stream analytics is windowing functions.  In our case, we use the Tumbling Window.  This window groups events in non-overlapping fashion.  The length of the window here is 5 seconds.  This is actually configured in the Tumbling Window Length in Seconds variable.

Azure Stream Analytics tools for Visual Studio can assist in stream analytics job authoring.

Update Summary Logic App

Let’s look at the logic app that consumes summary events and update the database.

image

The Logic App is quite straightforward.

image

The app triggers on event hub having events.  It then calls a stored procedure.

The trigger full configuration can be seen here:

image

We take maximum 50 events in.  This is done not to overload the database while still batching.

We probe every 2 seconds.  This is actually configured by the Update Summary Probe in Seconds variable.  We recommend setting it up at half the Tumbling Window Length in Seconds variable.  Basically, we want to probe often enough to catch summary events early.

image

The stored procedure action is configured to call [dbo].[updateSummaries].  It passes the content retrieved in the trigger in the jsonPayload parameter.

Both the trigger and the action relies on connections.  For instance, the SQL connection:


{
  "type": "microsoft.web/connections",
  "apiVersion": "2016-06-01",
  "name": "sqlConnection",
  "location": "[resourceGroup().location]",
  "dependsOn": [
    "[resourceId('Microsoft.Sql/servers/databases', variables('SQL Server Name'), variables('SQL DB Name'))]"
  ],
  "properties": {
    "api": {
      "id": "[concat(subscription().id, '/providers/Microsoft.Web/locations/', resourceGroup().location, '/managedApis/', 'sql')]"
    },
    "displayName": "SQL Connection",
    "parameterValues": {
      "server": "[variables('SQL Server FQDN')]",
      "database": "[variables('SQL DB Name')]",
      "username": "[variables('SQL Admin Name')]",
      "password": "[parameters('SQL Admin Password')]"
    }
  }
}

Connections are Azure resources.  They externalize connection configuration.  We have covered in another article how to create them in an ARM Template.

Database

The database consist of two items:  a summary table and a stored procedure.  The script to create them is executed by an Azure Container Instance.


--DROP PROC dbo.updateSummaries
--DROP TABLE [dbo].WidgetSummary

CREATE TABLE [dbo].WidgetSummary
(
[WidgetId] INT NOT NULL PRIMARY KEY,
[WidgetCount] INT NOT NULL
)
GO

CREATE PROC dbo.updateSummaries @jsonPayload AS VARCHAR(MAX)
AS
BEGIN
MERGE dbo.WidgetSummary AS target
USING
( -- We can't merge with repeating ids
-- This can happend in case of failure / restart
SELECT WidgetId, SUM(WidgetCount) AS WidgetCount
FROM
(
SELECT *
FROM OPENJSON(@jsonPayload)
WITH (
WidgetId INT '$.widgetid',
WidgetCount INT '$.count'
)
) AS t
GROUP BY WidgetId
) AS source
ON (target.WidgetId = source.WidgetId)
WHEN MATCHED THEN
UPDATE SET WidgetCount = source.WidgetCount+target.WidgetCount
WHEN NOT MATCHED THEN
INSERT (WidgetId, WidgetCount)
VALUES (source.WidgetId, source.WidgetCount);
END
GO

The table has two fields:  a widget ID and the count.  The count will reflect how many telemetry events with the given widget ID have been observed.

We can connect to the database using our favorite tool.  We are going to use Visual Studio and run the following query:


SELECT * FROM [dbo].WidgetSummary
SELECT SUM(widgetCount) FROM [dbo].WidgetSummary

We should have the following output:

image

Since the database is initially empty.

Simulate a source

The ARM template we deploy contains a second logic app.

image

That app bombards both the primary and secondary telemetry with telemetry events.  It sends 10000 events by default.  That number is controlled by the Simulation Burst Count variable.

Events contain randomized widget id.  The range of ids vary from 1 to 500.  That number is controlled by the Simulation Widget Range variable.

We can start the logic app by clicking Run Trigger then manual.  “Manual” is the name of the trigger inside this Logic App.

image

Let’s run the SQL queries we defined in the previous section.  We should start seeing the events coming in quickly.  We’ll notice that at about every 5 seconds a batch of events is processed.

image

The 10000 events should take about 5 minutes to complete.

image

If there are no failure in the pipeline we should end up with exactly 10000 events in the count query.

Simulate a Stream Analytics failure

Let’s run the simulation logic app again.  Let’s have some events coming in for a minute or so.

Let’s stop the Stream Analytics job.  This will simulate a failure.

image

It takes a little while for the job to stop.

After a short while we should see no movement in the database.

Let’s restart the job.

image

Let’s specify we want to start back when we last stopped:

image

It takes a little while to restart.  We should then see database movement again.

We would expect to land at 20000.  Typically we land a little further.

image

This is due to the job reprocessing some events.  Avoiding this isn’t trivial and we won’t cover it in this article.

Summary

We’ve looked at how to implement an Azure Stream Analytics pipeline.

We’ve done a tour of how to configure different services.

Each service is quite simple to configure.  By assembling multiple service in a chain we create a powerful pipeline.

We’ve also provided the ARM template deploying the entire solution.  This allows us to achieve consistency between environments and to deploy on demand.

Advertisements

2 thoughts on “Implementing & Automating Azure Stream Analytics Pipeline

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

w

Connecting to %s