Synchronizing two streams with Kusto

We discussed Azure Data Explorer (ADX) and its query language Kusto in a past article.

In this article I want to talk about a typical problem in real time analytics: synchronizing two streams of data.

This happens all the time when sensor data are produced by different devices. Different devices may record measurements at different times and different frequency. In order to reason about measurements from different devices, we need to synchronize those events.

We’ll first explain what the problem is. We’ll then give a naïve solution which we’ll show doesn’t scale. We’ll then give a solution that can scale to millions of records.

As usual, the code is in GitHub.

2 streams problem

Let’s look at two streams of events from two devices:

2 streams

The events from stream 1 are letters while the events from stream 2 are numbers.

We can make several observations:

There are multiple things we might want to do with those streams. For this article, we’ll focus on one problem:

We want to look at measurements done in both streams, so we want measurements in stream 2 to be correlated to measurements in stream 1.

Also, we’ll define the correlation further:

The correlation should work in a way that for a measurement (event) in stream 1, we’re going to take the value of the measurement in stream 2 that happened “just before” (i.e. as close as possible AND before).

We could frame the problem differently, but we found that solving this problem provides a lot of techniques that can be reused to solve similar problems.

For the example we gave we should have the following result:

Event in Stream 1 Event in Stream 2 (correlated)
A NULL / Missing (no event in stream 2 occurred before event A)
B 1
C 3
D 3
E 5

Also, for the different solutions we are going to present, we are going to assume there are multiple assets grouping devices together. For instance, they could represents different vehicles having multiple sensors, different elevators, different machines in a factory, etc. . The conceptual model is the same: it is like having multiple layer of the graph we showed.

Naïve solution

We are going to test a solution using Kusto query language. We suggest trying that solution on a new database.

The naïve solution is basically to formulate the problem and let Kusto take care of it. We want, for each asset, to map an event in one stream with the latest event in the other stream that happened before or at the same time.

Let’s create some sensor data. First, we measure the “colour” of an asset every even second:

.set-or-replace colours <| datatable(assetId:long, timestamp:datetime, colour:string)
    [
    12, datetime(2020-1-1 20:00:04), "blue",
    12, datetime(2020-1-1 20:00:06), "blue",
    12, datetime(2020-1-1 20:00:08), "red",
    13, datetime(2020-1-1 20:00:04), "yellow",
    13, datetime(2020-1-1 20:00:06), "yellow",
    13, datetime(2020-1-1 20:00:08), "green",
    ];

We had two assets (12 & 13) having “colour” measurements taken every two seconds.

Then we measure the temperature of an asset every odd second:

.set-or-replace temperatures <| datatable(assetId:long, timestamp:datetime, temperature:int)
    [
    12, datetime(2020-1-1 20:00:05), 20,
    12, datetime(2020-1-1 20:00:07), 22,
    12, datetime(2020-1-1 20:00:09), 25,
    13, datetime(2020-1-1 20:00:05), 15,
    13, datetime(2020-1-1 20:00:07), 13,
    13, datetime(2020-1-1 20:00:09), 10,
    ];

If we join the two measures by asset-id, we get the usual cross-product, i.e. this returns 18 records:

colours
| join kind=inner temperatures on assetId

We want to find the timestamp in temperatures that is the closest to the one in colours ; we want one early or at the same time. This is how we’re going to eliminate the cross-product rows. Let’s start by having the colour timestamp be greater or equal to temperature’s timestamp:

colours
| join kind=inner temperatures on assetId
| where timestamp >= timestamp1

Instead of 18 records, that returns 6. Notice we lost the colour measure at 20:00:04 since there is no earlier measure in temperature.

Now, let’s take the largest temperature’s timestamp for each colour’s timestamp. This gives us a mapping, by asset, of the 2 sensors timestamp

colours
| join kind=inner temperatures on assetId
| where timestamp >= timestamp1
| summarize temperaturetimestamp=max(timestamp1) by assetId, colourtimestamp=timestamp

Now, let’s use that mapping to match the sensor values. We lost the two colour readings at 20:00:04 again since there was no temperature reading earlier or at the same time.

let mapping=colours
| join kind=inner temperatures on assetId
| where timestamp >= timestamp1
| summarize temperaturetimestamp=max(timestamp1) by assetId, colourtimestamp=timestamp;
colours
| join kind=inner mapping on assetId
| where timestamp == colourtimestamp
| join kind=inner temperatures on assetId
| where timestamp1 == temperaturetimestamp
| project assetId, colourtimestamp, temperaturetimestamp, colour, temperature

This gives us the mapping we are looking for:

assetId colourtimestamp temperaturetimestamp colour temperature
12 2020-01-01T20:00:06Z 2020-01-01T20:00:05Z blue 20
13 2020-01-01T20:00:06Z 2020-01-01T20:00:05Z yellow 15
12 2020-01-01T20:00:08Z 2020-01-01T20:00:07Z red 22
13 2020-01-01T20:00:08Z 2020-01-01T20:00:07Z green 13

Scaling the naïve solution

The solution works. Let’s see if it can scale.

Let’s create 10 million records colour table (with 5000 assets):

.set-or-replace fullColours <|
(
    range i from 0 to 10000000 step 1
    | extend assetId = 1 + i % 5000
    | extend timeStep = i / 5000
    | extend timestamp = datetime(2010-1-1 0:00:00) + timeStep * 2s
    | extend r = rand(3)
    | extend colour = case(r==0, "green", r==1, "yellow", "red")
    | project assetId, timestamp, colour
)

Similarly, let’s create 20 million records (5000 assets) temperature table. This will cover the same time range but with twice the measurement frequency.

.set-or-replace fullTemperatures <|
(
    range i from 0 to 20000000 step 1
    | extend assetId = 1 + i % 5000
    | extend timeStep = i / 5000
    | extend timestamp = datetime(2010-1-1 0:00:00) + timeStep * 1s
    | extend temperature = 10 + rand(25)
    | project assetId, timestamp, temperature
)

Now, let’s try the same solution on the bigger tables

let mapping=fullColours
| join kind=inner fullTemperatures on assetId
| where timestamp <= timestamp1
| summarize temperaturetimestamp=min(timestamp1) by assetId, colourtimestamp=timestamp;
mapping
| limit 10

This query fails on a dev cluster:

Failure

The reason this fails is that the query doesn’t scale. It requires to do an aggregation for each of the 10 million records over million of other records.

Time in a bucket

We’ll develop a more scalable solution in this section. This is largely inspired by the join-timewindow article on the online documentation but it gives a more general solution.

What we want to do is to reduce drastically the cardinality of the set on which we perform an aggregation, i.e. the min(timestamp1) in the last section. The issue we have is that we join on assetID but we take all the temperature measurements for that asset. What we would like to do is just take measurements around the timestamp of the colour measurement.

We can’t join on a range of value. The trick is to quantize the time variable into buckets. Doing this we can then join on a given time bucket.

Buckets

If we look at the example above, we can immediately see that we can’t simply go within one bucket. This works for event A, B, C & E but not for D. Although D is in bucket Delta, the correlated event of stream 2, i.e. event 3, is in bucket Gamma.

How far back do we need to go?

In order not to force ourselves to go back to the beginning in the general case, we need to impose a constraint on the problem. We need to cap the distance between an event in stream 1 and its correlated event in stream 2. We’ll call that “distance” maxDelta, i.e. the maximum delta between two events in two different streams.

Given that, we can have an elegant solution:

This is easy to see. The extreme cases are as follow:

We can easily see that cases in between fall in between.

This allows us to drastically reduce the cardinality of the set as we wanted, provided maxDelta is small enough.

Solution with bucketed time

Let’s use a stored function for the logic core:

.create function extractTimeMapping(
    T1:(assetId:long, timestamp:datetime),
    T2:(assetId:long, timestamp:datetime),
    maxDelta:timespan) {
    let prepT1 = T1
        | project assetId, timestamp1=timestamp
        //  Create an array of 2 values for the time keys
        | extend timeKey=pack_array(
            bin(timestamp1-maxDelta, maxDelta),
            bin(timestamp1, maxDelta))
        //  Expand that array into 2 rows
        | mv-expand timeKey to typeof(datetime);
    let prepT2 = T2
        | project assetId, timestamp2=timestamp
        | extend timeKey=bin(timestamp2, maxDelta);
    let mapping = prepT1
    //  We use a left outer join to get a NULL value if we can't map
        | join kind=leftouter prepT2 on assetId, timeKey
        | where isnull(timestamp2) or timestamp1 >= timestamp2
        | summarize timestamp2=max(timestamp2) by assetId, timestamp1;
    mapping
}

Let’s try this function on our big tables for maxDelta=60s. We’ll first try on the small tables:

let maxDelta = 60s;
extractTimeMapping(colours, temperatures, maxDelta)

We obtain the same result as before. The only difference is that we explicitely returned timestamp2=NULL for records where there exists no mapping.

The result set is so small, it’s not possible to measure how more memory-efficient it was though.

Let’s try on the bigger result set and let’s store the result in a new table:

.set-or-replace fullColoursWithTemperatures <|
let maxDelta = 60s;
extractTimeMapping(fullColours, fullTemperatures, maxDelta)

This query runs in about 2 minutes on a cluster of sku dev (i.e. the smallest / cheapest cluster).

It is still demanding, but it does execute and terminate.

We can notice the cardinality of that last table is 10 000 000.

fullColoursWithTemperatures
| count

Knowing our data, 60 seconds is a very large bucket. Let’s try with a smaller delta:

.set-or-replace fullColoursWithTemperatures2 <|
let maxDelta = 5s;
extractTimeMapping(fullColours, fullTemperatures, maxDelta)

This executes in about 21 seconds on a dev cluster. The bucket size obviously make a difference as the smaller it is, the less records the engine has to join. If we go too small, we could miss points though.

We can validate the two results are equivalent since:

fullColoursWithTemperatures
| join kind=leftanti fullColoursWithTemperatures2 on assetId, timestamp1, timestamp2
| count

Returns zero.

Missing first records

As pointed out in the last section, this solution will remove the first record of each asset if there is no event in stream 2 happening before the first event in stream 1.

In order to fix that, we could simply detect those and union another heuristic (e.g. fetching the record “just after”).

Taking previous event

We assumed that taking the previous event was a good idea.

A more general solution would be to interpolate (e.g. linearly) the measurement values. This would be useful especially if measurements in one of the streams are far in between and are not “slow moving”.

Relative time

The exercise we did was to correlate events in stream 1 with events in stream 2. It is important to notice that process isn’t symetric.

For instance, just looking at our earlier diagram we would see that event ‘B’ correlates with event ‘1’, but taken the other way around, event ‘1’ would correlate with event ‘A’, not event ‘B’.

For this reason, it is important not to mix the two.

Summary

We showed how to synchronize two measurement streams. There is more than meets the eye for this apparently simple operation. It also is quite demanding in terms of computing.

The time bucket technique can be reused in a multitude of context.

As this technique is quite costly in terms of compute, we recommend to do this at ingestion time so the results can be reused in subsequent queries.


Leave a comment