Cosmos DB Stored Procedures – handling continuation


astronomy-black-wallpaper-constellation-2150I’ve recently did some work involving Stored Procedures in Cosmos DB.

There are a few techniques to learn when our stored procedures handle large data sets.  It is all about continuation.

In this article, we’ll look at how to build a robust and scalable stored procedure.

We’ll start with a naïve approach and then get more sophisticated.

We assume that we are working with a partitioned collection.  Stored procedure execute only in the context of one logical partition.  So what we explore here apply to unpartitioned collections.

As usual, the code used here is available on GitHub.  We are using cosmos-db-target-config to deploy Cosmos DB artefacts.  We introduce that solution in a past article.

The Azure Cosmos DB team also has some great Stored Procedure sample on GitHub.

Stored Procedures in Cosmos DB

Typically we use Stored Procedures in Cosmos DB in 2 scenarios:

  1. Do bulk write / update / delete.  That’s because SQL query language in Cosmos DB is only for reading.
  2. Do fancy queries currently impossible to do with query language.

The newly introduced BulkExecutor library could be an alternative in some of those scenarios.  We won’t cover it in this article.

Stored Procedure runs in the same compute than the database itself.  They therefore benefit from minimal latency.

Deploying the Azure components

We can deploy the Azure component by clicking the following button:


This will deploy an Azure Cosmos DB account, a database and a collection.

image

We took the minimum throughput, 1000 RUs, for a partitioned collection.  The partition key is part.

The deployment also created 4 stored procedures.

image

Fill a partition

We need to run the fillPartition stored procedure to fill a partition with data.

We could use the Portal to invoke the Stored Procedure.  It’s only able to insert 1500-2000 records at the time.  We need 25000.  So we’re going to use the Console App which simply calls the stored procedure repetitively.

We will need to recover the endpoint and primary key:

image

and insert it in the constants at the beginning of the code:

image

We then need to make sure we uncomment the following line of code.

image

We can now run the code.  It will call the stored procedure several times.  It does so until it has inserted 25000 records in the partition ABC.

image

We can validate the result by running the following query:

SELECT VALUE COUNT(1)
FROM c
WHERE c.part='ABC'

and we should get 25000 as a result.

image

Partitioning

In a partitioned collection a stored procedure executes within the context of one logical partition.

When we do call the stored procedure, we actually need to pass the partition key.  For instance, with the .NET SDK we do it with the RequestOptions.PartitionKey property.

For that reason, we do not need to filter / WHERE on the partition key.  A SELECT * would only return elements from the partition.

Simple Implementation:  a-query-flat.js

bike-boy-child-1058501Let’s start with a simple implementation.

This is a naïve implementation.  It will act as our baseline.

This implementation queries the entire partition, i.e. SELECT * FROM c.  It scans the entire partition.

It then goes on filtering, in JavaScript for c.oneThird=1.  The oneThird property has been seeded to have one third of record being 1.

It then counts the number of records satisfying that criterium.  Essentially, it is implementing, in JavaScript, the following query:

SELECT VALUE COUNT(1)
FROM c
WHERE c.part='ABC'
AND c.oneThird=1

We have a very inefficient implementation on purpose here.  We want to show the effect of paging without having too big a partition for simplicity.

//  Flat query:  simply do the query in a sproc
//
//  We implement a "SELECT * FROM c WHERE c.oneThird=1" by doing a
//  "SELECT * FROM c" and then doing the filtering in code
//
//  Problem:  Although this sproc is simple, it doesn't scale.
//  It only select a page of result and hence won't return a good result
//  if the partition has a few thousand records.
function countOnes() {
    var response = getContext().getResponse();
    var collection = getContext().getCollection();
    var oneCount = 0;

    //  Query all documents
    var isAccepted = collection.queryDocuments(
        collection.getSelfLink(),
        "SELECT * FROM c",
        {},
        function (err, feed, responseOptions) {
            if (err) {
                throw err;
            }

            if (feed) {
                for (var i = 0; i != feed.length; ++i) {
                    var doc = feed[i];

                    //  Filter document with 'oneThird' == 1
                    if (doc.oneThird == 1) {
                        ++oneCount;
                    }
                }
            }

            //  Return the count in the response
            response.setBody(oneCount);
        });

    if (!isAccepted) {
        throw new Error('The query was not accepted by the server.');
    }
}

image

So what happened here?  Why do we get 33 instead of 8333?

Cosmos DB is a highly scalable database.  It is extremely rigorous in the control of its resource.  One way to do that is to run short queries only.

So when we run the SELECT * query, it doesn’t return the entire content of the partition.  This could take a very long time but also, a lot of memory.  Instead, it returns a feed with a continuation token.

So effectively, what we have done is to query the first page of results.

Let’s query the other pages.

Continuation token on the server-side:  b-query-continuation.js

action-adult-athletes-310983Here we are going to call the queryDocuments method multiple times.

Now it isn’t going to be a straight for-loop.  The thing is that Cosmos DB uses continuation pattern.  We do query documents and pass a function to receive those documents once they arrived.  This is the JavaScript equivalent of an async pattern in C# and other languages.

So how can we then call query documents again?  We’ll need something akin to recursion.  Cosmos DB allows us to define functions within functions:

//  Query with continuation:  do the query in a sproc and continue paging the results
//
//  We implement a "SELECT * FROM c WHERE c.oneThird=1" by doing a
//  "SELECT * FROM c" and then doing the filtering in code
//
//  Problem:  Although this sproc implements continuation on the server side and scale
//  better, it won't scale to tens of thousands of records.  Cosmos DB imposes a 5 seconds
//  limit on any query which will force the sproc to stop.  When it does it will throw the
//  the exception at the end of the sproc.
function countOnes() {
    var response = getContext().getResponse();
    var collection = getContext().getCollection();
    var oneCount = 0;

    //  Start a recursion
    query();

    //  Function within the main stored procedure function
    function query(continuation) {
        var requestOptions = { continuation: continuation };
        //  Query all documents
        var isAccepted = collection.queryDocuments(
            collection.getSelfLink(),
            "SELECT * FROM c",
            requestOptions,
            function (err, feed, responseOptions) {
                if (err) {
                    throw err;
                }

                //  Scan results
                if (feed) {
                    for (var i = 0; i != feed.length; ++i) {
                        var doc = feed[i];

                        //  Filter document with 'oneThird' == 1
                        if (doc.oneThird == 1) {
                            ++oneCount;
                        }
                    }
                }

                if (responseOptions.continuation) {
                    //  Continue the query
                    query(responseOptions.continuation)
                } else {
                    //  Return the count in the response
                    response.setBody(oneCount);
                }
            });

        if (!isAccepted) {
            throw new Error('The query was not accepted by the server.');
        }
    }
}

So here we page until we get to the bottom of the feed.

That should do the trick, right?

If we try that in the portal we should get the following result:

image

We can recognize the text from the exception we throw on the last line of the stored procedure code.

What happened?

We did go through a few pages, but at some point the request for more documents got refused by Cosmos DB engine.  Why?

As stated, Cosmos DB is quite disciplined in the way it manages its resources.  This makes sure that no process is “hugging the CPU” and that performance are predictable.  It forces stored procedure to run under 5 seconds.  It also forbid them to consume too many Request Units (RUs).

For that reason, at some point, the engine blocked the stored procedure.  We then threw because we didn’t know better.

What we would need is to call the stored procedure again to continue processing the results.

Continuation on the client-side:  c-query-continuation-both-sides.js

bicycle-bike-cycling-38296Here we have the Stored Procedure implementation continuation itself.

This mechanism isn’t supported natively.  We implement it as a pattern.

The stored procedure will return a JSON object.  The object will have 2 properties:  count and continuation.  The former contains the final result when available and null otherwise.  The latter contains a custom continuation token until the final result is available.

So the client-side pattern is to call the stored procedure with no argument at first.  Upon reception of the response, it calls it again, passing the continuation token.  It does so until the final result is available.

The custom token is actually a stringified JSON object.  It contains the “count so far” and the query continuation token.  The “count so far” is the stored procedure internal state.

//  Query with continuation on both sides:  do the query in a sproc and continue paging the results
//  ; the sproc returns continuation token so it can be called multiple times and get around the
//  5 seconds limit.
//
//  We implement a "SELECT * FROM c WHERE c.oneThird=1" by doing a
//  "SELECT * FROM c" and then doing the filtering in code
function countOnes(sprocContinuationToken) {
    var response = getContext().getResponse();
    var collection = getContext().getCollection();
    var oneCount = 0;

    if (sprocContinuationToken) {   //  Parse the token
        var token = JSON.parse(sprocContinuationToken);

        if (!token.countSoFar) {
            throw new Error('Bad token format:  no count');
        }
        if (!token.queryContinuationToken) {
            throw new Error('Bad token format:  no continuation');
        }
        //  Retrieve "count so far"
        oneCount = token.countSoFar;
        //  Retrieve query continuation token to continue paging
        query(token.queryContinuationToken);
    }
    else {  //  Start a recursion
        query();
    }

    //  Function within the main stored procedure function
    function query(queryContinuation) {
        var requestOptions = { continuation: queryContinuation };
        //  Query all documents
        var isAccepted = collection.queryDocuments(
            collection.getSelfLink(),
            "SELECT * FROM c",
            requestOptions,
            function (err, feed, responseOptions) {
                if (err) {
                    throw err;
                }

                //  Scan results
                if (feed) {
                    for (var i = 0; i != feed.length; ++i) {
                        var doc = feed[i];

                        //  Filter document with 'oneThird' == 1
                        if (doc.oneThird == 1) {
                            ++oneCount;
                        }
                    }
                }

                if (responseOptions.continuation) {
                    //  Continue the query
                    query(responseOptions.continuation)
                } else {
                    //  Return the count in the response
                    response.setBody({ count: oneCount, continuation: null });
                }
            });

        if (!isAccepted) {
            var sprocToken = JSON.stringify({
                countSoFar: oneCount,
                queryContinuationToken: queryContinuation
            });

            response.setBody({ count: null, continuation: sprocToken });
        }
    }
}

It is easier to use the C# code to run that stored procedure multiple times.  Let’s make sure we comment back the FillPartitionAsync and uncomment QueryAsync:

image

The console output should show us the evolution of calls:

image

We finally obtain the expected result:  8333.  This took 4 runs of the stored procedure.

Summary

We three ways to implement stored procedures.

Although the last one is the most robust, it is also the most complex.

It isn’t always necessary.  If the result set is small enough for the collection throughput, a simpler implementation might work.  For that, we need to know in advance how many records we will scan and how much Request Units (RUs) are going to be available.  If we can’t ascertain those in advance, then the third form is the safest.

There are a lot of variations possible on the third form.  The fillPartition stored procedure we used at the beginning is an example.  It doesn’t return continuation token.  It simply returns the number of records it has inserted.  It is up to the client to call it back and substracting the number of records already inserted.  Another variation would be for a stored procedure deleting records.  It could return the number of records it has deleted and if some records are still available.  The client could call it back and “restart from scratch” since deleted records are no longer there.

The form of the continuation token can also vary.  We chose to stringify a JSON object.  This has the advantage that the client doesn’t need to take in an arbitrary complex object.  We could have base64 it to make it opaque.  In general the continuation token is the internal state of the Stored Procedure but can be formatted in different ways.

We hope this gave you a good background to write robust stored procedure in Cosmos DB.

Advertisements

One thought on “Cosmos DB Stored Procedures – handling continuation

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s