Ingesting historical data at scale with Kusto

There are many ways to ingest data in Kusto.

There is batching vs streaming, queued vs command, plugins, SDK, etc. . There is also a plethora of tools / techniques to achieve this, e.g. Azure Data Factory, LightIngest, .ingest into, etc. .

In this article I want to look at the specific scenario of ingestion large amount of historical data. This can be done in conjunction with streaming real-time data or not.

This is an architecture discussion where I want to focus on the following aspects:

Update 26-06-2020: You can see a great example of ingestion in My ultimate ingestion process for historical data at scale with Kusto.

Long running & Resiliency

Assuming we have a large body of data, let’s say multiple Gbs, we can assume that any ingestion from query command (i.e. .set, .append, .set-or-append, .set-or-replace) or .ingest into would timeout. That is, a query such as

.set MyTable <| <query returning Gbs of data>

(especially if the said query is done over standard blobs) would timeout.

Those command can all be made async. But async here is a client concept. It simply means the command returns immediately. It is still bound to request execution timeout.

Also, a long-running command can fail because of intermittent failures (e.g. a VM failing) and won’t be retried automatically.

Therefore, this type of ingestion is inherently ill-suited to ingest large amount of data.

Queued ingestion is a reliable ingestion mean. It relies on an internal Azure Queue and implements retries in case of failures. Different tools leverage queued ingestion (e.g. LightIngest).

Alternatively, we can leverage the “unreliable” ingestion commands to ingest small batches and orchestrate the batches using a reliable tool such as Azure Data Factory, Azure Logic Apps, etc. .

Caching of old data

Kusto is designed on the assumption that we ingest data in a temporal fashion and that recent data is more interesting than old data.

Concepts of retention and hot cache come from that assumption. Old data is eliminated (retention) while young data is cached and accessed with better performance.

This is configurable: Retention Policy and Cache Policy. But beyond the retention and cache thresholds, those mechanics rely on the time the data was ingested. This makes sense when we ingest data in a continuous way, either through Event Hub, IoT Hub, Event Grid or some other periodic process.

But when we ingest a lot of historical data in one go, the data would appear as if it was just ingested and would look “fresher” than data that would have been streamed the day before in real time.

In order to avoid that, we need to mark the data with a fake ingestion time. This is possible in Kusto. Actually it is done at the extent (data shard) level:

Real time & historical ingestion alignment

The typical projects we see start by exploring some data. This is sample data exported to blob and ingested manually. Once proof of value is done and the team want to scale up, they often move to ingesting data in real time, i.e. streaming. Once this is stabilized, the team want to move (or migrate) the entire historical data set in.

Sometimes teams doing the other way around, i.e. they ingest historical data, then setup real time ingestion, then ingest the historical data they missed in between.

In any case, the point of this section is not to overlap data ingestion and end up with the same data twice.

To make those consideration clear, let’s consider the following timeline:

timeline

The time on the time axis are as follow:

Time Description
t0 Time at which recording of historical data started
ts Time at which we started streaming data into Kusto
ti Time at which we started the ingestion of historical data

The point here is we don’t want to re-ingest data between ts and ti.

This can be hard to do if we do not control the historical data export. For instance, if that data is exported by a legacy system into Parquet files, it will unlikely cut at ts. Typically those process cut at fixed time internal and / or file size.

This is a minor point and could be addressed with a purge of data. We find it more efficient (and elegant) to ingest the data up to ts.

The best way we found to address that is to use an external table as a source instead of blob directly. This allows use to where-clause the external data table to avoid overlapping data.

This discards tools such as LightIngest. This is a shame as it leverages queued ingestion (coming back to the reliability point at the beginning).

Transformation

So far, we assumed we were importing historical data as is. That isn’t always the case.

For real time ingestion, we typically use a sequence of tables and update policies. This simulates an ELT in real time.

We could do that for historical data as well. We would land the historical data in a table then using an update policy to transform that data and land it into another table. Optionally we could do that several times.

In order to preserve the creationTime we talked about in the previous section, we would need to use PropagateIngestionProperties=true in the update policy object.

In some cases, the historical data doesn’t have the same shape as the data we ingest in real time and hence would need different transformations. That would mean a different landing table and different update policies. It could then be interesting to transform the data before ingesting instead of using update policies. For this, the ingestion from a query over external tables could again be interesting.

In other cases, the historical data might require more complex pre-processing that might need to be done outside of Kusto (e.g. Azure Function, Azure Batch, Spark, etc.).

Summary

We have looked at different aspects of large-scale historical data ingestion in Kusto.

It is a balancing act and depending on the scenario we might favor different approach / tools.

In a future article, we’ll detail an approach we like to use that balance those different factors.


Leave a comment