Calculating Bucket and Moving Averages with MongoDB

5-Sep-2018 Like this? Dislike this? Let me know

I have come across several unanswered questions on stackoverflow regarding how to do moving averages in MongoDB. In the process of answering, I discovered some irritating consequences of using very large arrays in the $bucket function. I decided to capture the alternative, successful approach in more detail here.

A Brief Overview of $bucket

The $bucket function, introduced in version 3.4, is a very handy way to group documents based on consecutive "sections" of the domain of field values found in the document; basically, grouping by range. It is an initially attractive approach to making time-bounded averages although not necessarily a vector of moving averages but we'll come back to that later. The $bucket documentation explains it in detail but it boils down to this: Given an input set of documents like this:
{"n":1, "val": 4.23, "date": new ISODate("2017-12-31") }
{"n":2, "val": 4.23, "date": new ISODate("2018-01-01") }
{"n":3, "val": 8.91, "date": new ISODate("2018-01-02") }
{"n":4, "val": 0.03, "date": new ISODate("2018-01-03") }
... assume n and date keep increasing by 1 until March 1:
{"n":61, "val": 6.44, "date": new ISODate("2018-03-01") }
Then if we want to count and average val by date, we could do this:
// Make a bucket array of dates:
var arr = [];
["2018-01-01","2018-01-15","2018-02-15","2018-02-28"].forEach(function(d) {
	arr.push(new ISODate(d));
    });

db.collection.aggregate([
{$bucket: {
	groupBy: "$date",
	boundaries: arr,
	default: "Other",
	    output: {
		"count": { $sum: 1 },
		"avg": {$avg:"$val"}
	    }
	}
    }
		    ]);
which might yield:
{ "_id" : "Other", "count" : 3, "avg" : 63.435 }
{ "_id" : ISODate("2018-01-01T00:00:00Z"), "count" : 14, "avg" : 11.435 }
{ "_id" : ISODate("2018-01-15T00:00:00Z"), "count" : 31, "avg" : 33.95322580645161 }
{ "_id" : ISODate("2018-02-15T00:00:00Z"), "count" : 13, "avg" : 55.89153846153847 }
Things to note: We set up the data and the query like this on purpose to expose this important behavior. To well-capture the "heads and tails", you can add a low date and a high date to the array:
["1970-01-01", "2018-01-01","2018-01-15","2018-02-15","2018-02-28","3000-01-01"].forEach(function(d) {
	arr.push(new ISODate(d));
    });
which might yield:
{ "_id" : ISODate("1970-01-01T00:00:00Z"), "count" : 1, "avg" : 5.5 }
{ "_id" : ISODate("2018-01-01T00:00:00Z"), "count" : 14, "avg" : 12.435 }
{ "_id" : ISODate("2018-01-15T00:00:00Z"), "count" : 31, "avg" : 34.91677419354839 }
{ "_id" : ISODate("2018-02-15T00:00:00Z"), "count" : 13, "avg" : 56.97846153846154 }
{ "_id" : ISODate("2018-02-28T00:00:00Z"), "count" : 2, "avg" : 64.435 }
Note how the Other bucket is gone, replaced by head bucket from 1970-01-01 to 2018-12-31 and a tail bucket starting at 2018-02-28 and going "to infinity."

The same approach can be used for numbers:

var arr = [10, 20, 30, 40];
db.foo.aggregate([
{$bucket: {
	groupBy: "$n",  // grouping by n now, not date!
	boundaries: arr,
	default: "Other",
	    output: {
		"count": { $sum: 1 },
		"avg": {$avg: "$val"}

	    }
	}
    }
	    ]);

{ "_id" : 10, "count" : 10, "avg" : 18.435 }
{ "_id" : 20, "count" : 10, "avg" : 28.435000000000002 }
{ "_id" : 30, "count" : 10, "avg" : 38.434999999999995 }
{ "_id" : "Other", "count" : 31, "avg" : 41.24354838709677 }
Low and high values are easily created using Infinity:
var arr = [-Infinity, 0, 20, 30, 40, Infinity];

{ "_id" : -Infinity, "count" : 9, "avg" : 8.997777777777777 }
{ "_id" : 10, "count" : 10, "avg" : 18.435 }
{ "_id" : 20, "count" : 10, "avg" : 28.435000000000002 }
{ "_id" : 30, "count" : 10, "avg" : 38.434999999999995 }
{ "_id" : 40, "count" : 22, "avg" : 54.434999999999995 }
The array of bucket values must be either all numbers (any mix of int, double, or decimal) or all dates and must be increasing order but other than, you are free to construct any boundaries you wish.

The Problem With $bucket

Bucket array lengths up to, say, a couple dozen seem to perform fine. But suppose we have a different kind of data and query setup. Consider a collection with a timeseries of sensor data captured at 1 second intervals:
{ "n" : 0, "val" : 107.05756543841737, "date" : ISODate("2018-01-01T00:00:00Z") }
{ "n" : 1, "val" : 112.15474746119355, "date" : ISODate("2018-01-01T00:00:01Z") }
{ "n" : 2, "val" : 114.9389559965861, "date" : ISODate("2018-01-01T00:00:02Z") }
{ "n" : 3, "val" : 100.96869281305015, "date" : ISODate("2018-01-01T00:00:03Z") }
{ "n" : 4, "val" : 119.4575361398694, "date" : ISODate("2018-01-01T00:00:04Z") }
{ "n" : 5, "val" : 117.81224531915713, "date" : ISODate("2018-01-01T00:00:05Z") }
{ "n" : 6, "val" : 101.8992107912693, "date" : ISODate("2018-01-01T00:00:06Z") }
{ "n" : 7, "val" : 118.00930576370166, "date" : ISODate("2018-01-01T00:00:07Z") }
{ "n" : 8, "val" : 111.10144964537338, "date" : ISODate("2018-01-01T00:00:08Z") }
{ "n" : 9, "val" : 109.83909498986932, "date" : ISODate("2018-01-01T00:00:09Z") }
{ "n" : 10, "val" : 108.16311721339869, "date" : ISODate("2018-01-01T00:00:10Z") }
{ "n" : 11, "val" : 109.8721409530211, "date" : ISODate("2018-01-01T00:00:11Z") }
...
Let's say we wish to find the 5 second average of values, e.g. 2018-01-01T00:00:00Z to 2018-01-01T00:00:04Z inclusive, 2018-01-01T00:00:05Z to 2018-01-01T00:00:09Z inclusive, etc. The temptation here is to create a date array as above:
var arr = [];

var sv   = new ISODate("2018-01-01").getTime();
var endv = new ISODate("2018-01-02").getTime();

var incr = (1000 * 5); // 5000 ms incremenet

while(endv > sv) {
    arr.push(new Date(sv));
    sv += incr;
}
The problem is you have now created an array of length 17280! Passing this array to $bucket as before results in dismal performance because $bucket clearly is not using an efficient algorithm to find the right slot. Basic tests yield agg times of more than 120 seconds. This can be proven by NOT aggregating in the database and instead pulling all 86400 records for the day to the client-side and doing a regular vs. bisectional search:
var arr = make array of 17280 dates;
c=db.collection.aggregate([ no $bucket! ]);
c.forEach(function(r) {
	var x = findSlot(arr, r['date']);
	if(buckets[x] == undefined) {
	    buckets[x] = {lb: arr[x], ub: arr[x+1], n: 0, v:0, a:0};
	}
	var zz = buckets[x];
	zz['n']++;
	zz['v'] += r['val'];
	zz['a'] = zz['v']/zz['n'];
    });
If findSlot is a simple linear search, e.g.
function findSlotLinear(arr, val) {
    var max = arr.length - 1;
    for(var k = 0; k < max; k++) {
	if(val < arr[k+1]) {
	    return k;
	}
    }
}
then the performance even worse but on a similar scale, around 170 seconds. But if findSlot is implemented as a bisectional search, then performance improves dramatically to about 700 millseconds, approx 150x faster than using $bucket.

But clearly, we'd rather aggregate in the database than write client-side code.

Solution: Use Math

Instead of creating buckets, we can group the materials ourselves based on math. The time offset of each sensor reading minus the start time, as represented in milliseconds, produces zero based offset, which we can divide by 1000
doc n in ms      start in ms     diff   diff/1000
1514764800000    1514764800000   0      0
1514764801000    1514764800000   1000   1
1514764802000    1514764800000   2000   2
1514764803000    1514764800000   3000   3
1514764804000    1514764800000   4000   4
1514764805000    1514764800000   5000   5
1514764806000    1514764800000   6000   6
...
So this gives us some slots (0-6 above) -- but we want to group by 5 seconds. This is where division and a floor function come to the rescue. We divide by the desired time range and floor the result to eliminate the fractional component. Using the 5 second range as described above:
doc n in ms      start in ms     diff   /1000  div/5  floor
1514764800000    1514764800000   0      0      0      0
1514764801000    1514764800000   1000   1      0.2    0
1514764802000    1514764800000   2000   2      0.4    0
1514764803000    1514764800000   3000   3      0.6    0
1514764804000    1514764800000   4000   4      0.8    0
1514764805000    1514764800000   5000   5      1      1
1514764806000    1514764800000   6000   6      1.2    1
...
Now we have something we can use -- but how to do this in MongoDB? It's pretty easy:
secondsBucket = 5;

db.collection.aggregate([
// Perform any or no $matching here to constrain the bucketing, e.g.
// {$match:{"sym":sym, "date":{$gte:now, $lt:new Date(endv) }}}

// The Juice!  Get the floor of the diff / seconds to yield a "slot":
,{$addFields: {"ff": {$floor: {$divide: [ {$divide: [ {$subtract: [ "$date", now ]}, 1000.0 ]}, secondsBucket ] }} }}

// Now just group on the slot.  We throw in n for the count but
// it is not really necessary...
,{$group: {_id: "$ff", n: {$sum:1}, avg: {$avg: "$val"}} }

// Get it in 0-n order.  Not vital but certainly useful:
,{$sort: {_id: 1}}
		    ]);

{ "_id" : 0, "n" : 5, "avg" : 110.91549956982333 }
{ "_id" : 1, "n" : 5, "avg" : 111.72435237482561 }
{ "_id" : 2, "n" : 5, "avg" : 109.16745278376398 }
...
This approach runs in about 400 milliseconds on the server side. Experiments with a slight bigger shape adding a string field symbol with 100 symbols for 1 year at 1 second intervals (3,153,600,000 docs) with the appropriate indexes on symbol and date still results in about a 500 millisecond response time. The indexes are vital to quickly vend the 86400 docs that are getting into the divide/floor part of the pipeline.

Calculating a Moving Avergage (finally...)

Moving averages were notoriously difficult to perform in conventional SQL until operators like PRECEDING and FOLLOWING appeared recently in implementations like Postgres 9.0+. And even now, window framing operators can be tricky to work with especially if JOINs are involved. This is because SQL fundamentally is a scalar based model, not arrays.

The array operators in MongoDB allow for simple, powerful manipulation of subsets of arrays, which is what we want from a moving average. Consider the following daily close data:

{ "val" : 10, "date" : ISODate("2018-01-01T00:00:00Z") }
{ "val" : 11.43, "date" : ISODate("2018-01-02T00:00:00Z") }
{ "val" : 12.52, "date" : ISODate("2018-01-03T00:00:00Z") }
{ "val" : 12.99, "date" : ISODate("2018-01-04T00:00:00Z") }
{ "val" : 12.72, "date" : ISODate("2018-01-05T00:00:00Z") }
{ "val" : 11.79, "date" : ISODate("2018-01-06T00:00:00Z") }
{ "val" : 10.42, "date" : ISODate("2018-01-07T00:00:00Z") }
{ "val" : 8.94, "date" : ISODate("2018-01-08T00:00:00Z") }
{ "val" : 7.72, "date" : ISODate("2018-01-09T00:00:00Z") }
{ "val" : 7.06, "date" : ISODate("2018-01-10T00:00:00Z") }
{ "val" : 7.12, "date" : ISODate("2018-01-11T00:00:00Z") }
{ "val" : 7.88, "date" : ISODate("2018-01-12T00:00:00Z") }
Suppose we want to create a 4 day moving average. This means we "begin" recording the average at 2018-01-04 for the preceding 4 days, then 2018-01-05 for the preceding 4 days, etc.

This is the solution but note: Our data is set up so that each doc is one actual day apart; therefore, each doc as an observation (or data point) is the same as a day. Our example solution is a general solution for calculating moving averages over a given number of observations in an input set, not days specifically. If your data has multiple datapoints per day, then you will first have to $group the data to get it into the "atom" that will become part of the moving average technique shown below.

// Control the size of the moving average frame:
datapts = 4;

db.collection.aggregate([

// Filter down to what you want.  This can be anything or nothing at all.
{$match: {"sym": "S1"}}

// Ensure dates are going earliest to latest:
,{$sort: {d:1}}

// Turn docs into a single doc with a big vector of observations, e.g.
//     {sym: "A", d: d1, val: 10}
//     {sym: "A", d: d2, val: 11}
//     {sym: "A", d: d3, val: 13}
// becomes
//     {_id: "A", prx: [ {v:10,d:d1}, {v:11,d:d2},  {v:13,d:d3} ] }
//
// This will set us up to take advantage of array processing functions!
,{$group: {_id: "$sym", prx: {$push: {v:"$val",d:"$date"}} }}

// Nice additional info.  Note use of dot notation on array to get
// just scalar date at elem 0, not the object {v:val,d:date}:
,{$addFields: {frameSize: datapts, startDate: {$arrayElemAt: [ "$prx.d", 0 ]}} }

// The Juice!  Basically, use the map function to start at index 0 and keep
// slicing out subsets, calcing the average, and emitting that number.
// 
// Note that we only run the vector to (len(vector) - (datapts-1).
// Also, for extra info, we also add the as-of date which is the tail date
// of the segment.
//
// Again we take advantage of dot notation to turn the vector of
// object {v:val, d:date} into two vectors of simple scalars [v1,v2,...]
// and [d1,d2,...] with $prx.v and $prx.d.  Also, rather than create another
// vector next to prx in the doc, we will overwrite the existing one (which
// we don't need at the end anyway) by using $addFields with the same name (prx).
//
,{$addFields: {"prx": {$map: {
	input: {$range:[0,{$subtract:[{$size:"$prx"}, (datapts-1)]}]} ,
	as: "z",
	in: {
	   avg: {$avg: {$slice: [ "$prx.v", "$$z", datapts ] } },
	   d: {$arrayElemAt: [ "$prx.d", {$add: ["$$z", (datapts-1)] } ]}
		}
	    }}
    }}
		    ]);

This will produce output like this:
{
	"_id" : "S1",
	"prx" : [
		{
			"avg" : 11.738793632512115,
			"d" : ISODate("2018-01-04T00:00:00Z")
		},
		{
			"avg" : 12.420766702631376,
			"d" : ISODate("2018-01-05T00:00:00Z")
		},
		{
			"avg" : 12.510051656756191,
			"d" : ISODate("2018-01-06T00:00:00Z")
		},
...
		{
			"avg" : 12.534681008446219,
			"d" : ISODate("2018-01-31T00:00:00Z")
		},
		{
			"avg" : 12.08669329998585,
			"d" : ISODate("2018-02-01T00:00:00Z")
		}
	],
	"frameSize" : 4,
	"startDate" : ISODate("2018-01-01T00:00:00Z")
}

The output doc contains the startDate and an array of moving averages, each with the as-of date. Obviously, weighted averages or more complicated frame-walking calculations are easily handled by using more than one $slice to extract multiple subsets.

Like this? Dislike this? Let me know


Site copyright © 2014-2018 Buzz Moschetti. All rights reserved