The Aggregation framework
And here is the Week 6’s course notes:
The aggregation framework is a set of analytics tools within MongoDB
that allows us to run various types of reports or analysis on documents in one or more collections. Based on the idea of a pipeline. We take input from a MongoDB
collection and pass the documents from that collection through one or more stages, each of which performs a different operation on it’s inputs. Each stage takes as input whatever the stage before it produced as output. And the inputs and outputs for all stages are a stream of documents. Each stage has a specific job that it does. It’s expecting a specific form of document and produces a specific output, which is itself a stream of documents. At the end of the pipeline, we get access to the output.
An individual stage is a data processing unit. Each stage takes as input a stream of documents one at a time, processes each document one at a time and produces the output stream of documents. Again, one at a time. Each stage provide a set of knobs or tunables that we can control to parameterize the stage to perform whatever task we’re interested in doing. So a stage performs a generic task - a general purpose task of some kind and parameterize the stage for the particular set of documents that we’re working with. And exactly what we would like that stage to do with those documents. These tunables typically take the form of operators that we can supply that will modify fields, perform arithmetic operations, reshape documents or do some sort of accumulation task as well as a veriety of other things. Often times, it the case that we’ll want to include the same type of stage multiple times within a single pipeline.
e.g. We may wish to perform an initial filter so that we don’t have to pass the entire collection into our pipeline. But, then later on, following some additional processing, want to filter once again using a different set of criteria. So, to recap, pipeline works with a MongoDB
collection. They’re composed of stages, each of which does a different data processing task on it’s input and produces documents as output to be passed to the next stage. And finally at the end of the pipeline output is produced that we can then do something within our application. In many cases, it’s necessary to include the same type of stage, multiple times within an individual pipeline.
Familiar Aggregation Operations
As our first step in developing aggregation pipelines, what we’d like to do is take a look at building some pipelines that involve operations that are already familiar to us. So, we’re going to look at the following stages:
match
- this is filtering stage, similar tofind
.project
sort
skip
limit
We might ask ourself why these stages are necessary, given that this functionality is already provided in the MongoDB
query language, and the reason is because we need these stages to support the more complex analytics-oriented functionality that’s included with the aggregation framework. The below query is simply equal to a find
:
db.companies.aggregate([{
$match: {
founded_year: 2004
}
}, ])
Let’s introduce a project stage in this aggregation pipeline:
db.companies.aggregate([{
$match: {
founded_year: 2004
}
}, {
$project: {
_id: 0,
name: 1,
founded_year: 1
}
}])
We use aggregate
method for implementing aggregation framework. The aggregation pipelines are merely an array of documents. Each of the document should stipulate a particular stage operator. So, in the above case we’ve an aggregation pipeline with two stages. The $match
stage is passing the documents one at a time to $project
stage.
db.companies.aggregate([{
$match: {
"funding_rounds.investments.financial_org.permalink": "greylock"
}
}, {
$project: {
_id: 0,
name: 1,
ipo: "$ipo.pub_year",
valuation: "$ipo.valuation_amount",
funders: "$funding_rounds.investments.financial_org.permalink"
}
}, ])
In the above example, we’re promoting deeply nested fields to upper level in the output we’ll produce from this aggregation pipeline. If we specify $1
in the quotes, MongoDB
interprets it as give me the value identified by this key. We cannot change the datatype
for a value from the project stage.
db.companies.aggregate([{
$match: {
"funding_rounds.investments.financial_org.permalink": "greylock"
}
}, {
$project: {
_id: 0,
name: 1,
founded: {
year: "$founded_year",
month: "$founded_month",
day: "$founded_day"
}
}
}, ])
In this case, we’re taking the top level founded_year
, founded_month
& founded_day
documents and showing them as part of the nested document founded
. Now, let’s extend to limit
stage:
db.companies.aggregate([{
$match: {
founded_year: 2004
}
}, {
$limit: 5
}, {
$project: {
_id: 0,
name: 1
}
}])
This gets the matching documents and limits to five before projecting out the fields. So, projection is working only on 5 documents. Assume, if we were to do something like this:
db.companies.aggregate([{
$match: {
founded_year: 2004
}
}, {
$project: {
_id: 0,
name: 1
}
}, {
$limit: 5
}])
This gets the matching documents and projects those large number of documents and finally limits to five. So, projection is working on large number of documents and finally limiting to 5. This gives us a lesson that we should limit the documents to those which are absolutely necessary to be passed to the next stage. Now, let’s look at sort
stage:
db.companies.aggregate([{
$match: {
founded_year: 2004
}
}, {
$sort: {
name: 1
}
}, {
$limit: 5
}, {
$project: {
_id: 0,
name: 1
}
}])
This will sort all documents by name and give only 5 out of them. Assume, if we were to do something like this:
db.companies.aggregate([{
$match: {
founded_year: 2004
}
}, {
$limit: 5
}, {
$sort: {
name: 1
}
}, {
$project: {
_id: 0,
name: 1
}
}])
This will take first 5 documents and sort them.
Let’s add the skip
stage:
db.companies.aggregate([{
$match: {
founded_year: 2004
}
}, {
$sort: {
name: 1
}
}, {
$skip: 10
}, {
$limit: 5
}, {
$project: {
_id: 0,
name: 1
}
}, ])
This will sort all the documents and skip the initial 10 documents and return to us. We should try to include $match
stages as early as possible in the pipeline. To filter documents using a $match
stage, we use the same syntax for constructing query documents (filters) as we do for find()
.
The $unwind stage
The $unwind
allows us to take documents as input that have an array valued field and produces output documents, such that there’s one output document for each element in the array. source
So let’s go back to our companies examples, and take a look at the use of unwind stages. This query:
db.companies.aggregate([
{ $match: {"funding_rounds.investments.financial_org.permalink": "greylock" } },
{ $project: {
_id: 0,
name: 1,
amount: "$funding_rounds.raised_amount",
year: "$funding_rounds.funded_year"
} }
])
produces documents that have arrays for both amount and year.
Because we’re accessing the raised amount and the funded year for every element within the funding rounds array. To fix this, we can include an unwind stage before our project stage in this aggregation pipeline, and parameterize this by saying that we want to unwind
the funding rounds array:
db.companies.aggregate([
{ $match: {"funding_rounds.investments.financial_org.permalink": "greylock" } },
{ $unwind: "$funding_rounds" },
{ $project: {
_id: 0,
name: 1,
amount: "$funding_rounds.raised_amount",
year: "$funding_rounds.funded_year"
} }
])
This is how the document looks like:
If we look at the funding_rounds
array, we know that for each funding_rounds
, there is a raised_amount
and a funded_year
field. So, unwind
will for each one of the documents that are elements of the funding_rounds
array produce an output document. Now, in this example, our values are string
s. But, regardless of the type of value for the elements in an array, unwind
will produce an output document for each one of these values, such that the field in question will have just that element. In the case of funding_rounds
, that element will be one of these documents as the value for funding_rounds
for every document that gets passed on to our project
stage. The result, then of having run this, is that now we get an amount
and a year
. One for each funding round for every company in our collection. What this means is that our match produced many company documents and each one of those company documents results in many documents. One for each funding round within every company document. unwind
performs this operation using the documents handed to it from the match
stage. And all of these documents for every company are then passed to the project
stage.
So, all documents where the funder was Greylock (as in the query example) will be split into a number of documents, equal to the number of funding rounds for every company that matches the filter $match: {"funding_rounds.investments.financial_org.permalink": "greylock" }
. And each one those resulting documents will then be passed along to our project
. Now, unwind
produces an exact copy for every one of the documents that it receives as input. All fields have the same key and value, with one exception, and that is that the funding_rounds
field rather than being an array of funding_rounds
documents, instead has a value that is a single document, which is an individual funding round. So, a company that has 4 funding rounds will result in unwind
creating 4 documents. Where every field is an exact copy, except for the funding_rounds
field, which will instead of being an array for each of those copies will instead be an individual element from the funding_rounds
array from the company document that unwind
is currently processing. So, unwind
has the effect of outputting to the next stage more documents than it receives as input. What that means is that our project
stage now gets a funding_rounds
field that again, is not an array, but is instead a nested document that has a raised_amount
and a funded_year
field. So, project
will receive multiple documents for each company match
ing the filter and can therefore process each of the documents individually and identify an individual amount and year for each funding round for each company.
We’ll add an additional field for understanding this and in doing so, we’ll identify a little bit of a problem with this aggregation query as currently written. So, what I’m going to do is a funder
field and this will access the investments
field of the funding_rounds
embedded document, that it gets from unwind and for the financial_org
gets the permalink
(refer to the $match
filter in the below snippet).
// Add funder to output documents.
db.companies.aggregate([
{ $match: {"funding_rounds.investments.financial_org.permalink": "greylock" } },
{ $unwind: "$funding_rounds" },
{ $project: {
_id: 0,
name: 1,
funder: "$funding_rounds.investments.financial_org.permalink",
amount: "$funding_rounds.raised_amount",
year: "$funding_rounds.funded_year"
} }
])
Notice, that funder
and $match
are very similar. We need to be sure that the funder is what we have stipulated. In general, when building aggregation pipelines - it’s a good idea to put in checks as we’re constructing them to make sure they’re doing what we think they’re doing. The output looks like this:
If, we look at the document and look at the investments
field. We find that it’s an array:
Multiple funders can participate in a single funding round.
So, investments
will list every one of those funders. The output, as we originally saw with the raised_amount
and funded_year
- we’re now seeing an array for funder
. Because, investments
is an array valued field and as we know, the semantics for a project, on an array valued field is to produce all of the values for whatever field we’ve stipulated.
The above query returns companies for which at least in one of the funding rounds, Greylock participated in, what we’d like to do is constrain our results so that we only see results that Greylock participated in. Not, all the companies for which Greylock participated in at least one. So, in order to do that, what we’re going to have to do is figure out a way to filter this further. One possibility is to reverse the order in which we’re doing our $unwind
and $match
:
// Add second unwind stage.
db.companies.aggregate([
{ $unwind: "$funding_rounds" },
{ $match: {"funding_rounds.investments.financial_org.permalink": "greylock" } },
{ $project: {
_id: 0,
name: 1,
funder: "$funding_rounds.investments.financial_org.permalink",
amount: "$funding_rounds.raised_amount",
year: "$funding_rounds.funded_year"
} },
])
This will guarantee that we’ll only match documents coming out of $unwind
that represent funding rounds that Greylock actually participated in. If we run this, we see a slight delay - but then we see that Greylock is one of the funders. To make it more clear, we can include a second $unwind
:
// Add second unwind stage.
db.companies.aggregate([
{ $unwind: "$funding_rounds" },
{ $unwind: "$funding_rounds.investments" },
{ $match: {"funding_rounds.investments.financial_org.permalink": "greylock" } },
{ $project: {
_id: 0,
name: 1,
funder: "$funding_rounds.investments.financial_org.permalink",
amount: "$funding_rounds.raised_amount",
year: "$funding_rounds.funded_year"
} },
])
Which outputs this:
So, what are these two $unwind
stages doing? Both of these are running through the entire collection. However, the $match
operation should occur as early as possible. So, that for each stage we have least number of documents to work with.
So, what we can do is - leave the $match
filter as a first stage in our pipeline and simply include a second match:
// Instead, use a second match stage.
db.companies.aggregate([
{ $match: {"funding_rounds.investments.financial_org.permalink": "greylock" } },
{ $unwind: "$funding_rounds" },
{ $unwind: "$funding_rounds.investments" },
{ $match: {"funding_rounds.investments.financial_org.permalink": "greylock" } },
{ $project: {
_id: 0,
name: 1,
individualFunder: "$funding_rounds.investments.person.permalink",
fundingOrganization: "$funding_rounds.investments.financial_org.permalink",
amount: "$funding_rounds.raised_amount",
year: "$funding_rounds.funded_year"
} },
])
The first $match
will return company documents for which Greylock participated in at least one of the funding rounds. We’ll then unwind the funding_rounds
and the investments
nested array. And then finally filter again - so that any funding rounds, any documents that represent funding rounds, that Greylock did not participated in will be removed from what’s passed on to project. Sometimes, we need to include multiple stages of the same type. This query is a bit faster because of less documents.
Array Expressions
These expressions are used to work with arrays and can be used with project stages. There are a couple of different array expressions such as:
$filter
for selecting a subset of elements in the array based on a certain set of filter criteria that should be passed, in the documents, passed to the next stage in the aggregation pipeline:
db.companies.aggregate([{
$match: {
"funding_rounds.investments.financial_org.permalink": "greylock"
}
}, {
$project: {
_id: 0,
name: 1,
founded_year: 1,
rounds: {
$filter: {
input: "$funding_rounds",
as: "round",
cond: {
$gte: ["$$round.raised_amount", 100000000]
}
}
}
}
}, {
$match: {
"rounds.investments.financial_org.permalink": "greylock"
}
}, ]).pretty()
Here is where things gets interesting. We’re using a $filter
expression - it is designed to work with array fields and has 3 fields we must supply as part of it’s parameters, or this document that we set as the value for our $filter
operator. The first is input
which is an array, as
specifies the alias we need to use for the input
array throughout the rest of the filter expression. And the cond
parameter will provide the condition used to filter whatever array we’ve provided as input selecting a subset. In the above case, we’re selecting elements where raised_amount
is greater than or equal to 100 million. There’s $$round
- where $
refers to a variable named round
. $$
says that we want to dereference a variable specified in this expression. This is to disambiguate the reference to a variable from a reference to, say, fields in the input document. Returns an array with only those elements that match the condition. The returned elements are in the original order.
Let’s look at $arrayElemAt
for returning the element at the specified array index. Let’s pull out the first round and the last round (maybe for comparison).
db.companies.aggregate([{
$match: {
"founded_year": 2010
}
}, {
$project: {
_id: 0,
name: 1,
founded_year: 1,
first_round: {
$arrayElemAt: ["$funding_rounds", 0]
},
last_round: {
$arrayElemAt: ["$funding_rounds", -1]
}
}
}]).pretty()
This requires the array name and the index of the element which needs to be returned. 0 means the first one and -1 refers to last one. -2 would give the second to last or penultimate element in the array. The output will look likes this:
This can also be done by $slice
:
db.companies.aggregate([{
$match: {
"founded_year": 2010
}
}, {
$project: {
_id: 0,
name: 1,
founded_year: 1,
first_round: {
$slice: ["$funding_rounds", 1]
},
last_round: {
$slice: ["$funding_rounds", -1]
}
}
}]).pretty()
Now, related to $arrayElemAt
is the $slice
expression which allows us to return multiple items from an array in sequence beginning with a particular index.
db.companies.aggregate([{
$match: {
"founded_year": 2010
}
}, {
$project: {
_id: 0,
name: 1,
founded_year: 1,
early_rounds: {
$slice: ["$funding_rounds", 1, 3]
}
}
}]).pretty()
Notice that the index is 1, skipping the first (which is 0) funding round.
The last one we need to see is $size - which simply returns the size of the array:
db.companies.aggregate([{
$match: {
"founded_year": 2004
}
}, {
$project: {
_id: 0,
name: 1,
founded_year: 1,
total_rounds: {
$size: "$funding_rounds"
}
}
}]).pretty()
Accumulators
Accumulators are another type of expressions. They involve calculating values from fields in multiple documents. Prior to MongoDB
3.2 accumulators were available only in the group
stage. We’ve ability to access a subset of accumulators within the project
stage. The primary difference between the accumulators in the group
stage and the project
stage is that in the project stage accumulators, such as $sum
and $avg
must operate on arrays within a single document. Whereas accumulators in the group
stage provide us the ability to perform calculations on values across multiple documents.
Using Accumulators in $project Stages
There’re multiple accumulators. The $max
accumulator gives the largest value from the passed in array.
db.companies.aggregate([{
$match: {
"funding_rounds": {
$exists: true,
$ne: []
}
}
}, {
$project: {
_id: 0,
name: 1,
largest_round: {
$max: "$funding_rounds.raised_amount"
}
}
}]).pretty()
The below examples shows the uses of sum
& avg
:
db.companies.aggregate([{
$match: {
"funding_rounds": {
$exists: true,
$ne: []
}
}
}, {
$project: {
_id: 0,
name: 1,
total_funding: {
$sum: "$funding_rounds.raised_amount"
}
}
}])
db.companies.aggregate([{
$group: {
_id: {
founded_year: "$founded_year"
},
average_number_of_employees: {
$avg: "$number_of_employees"
}
}
}, {
$sort: {
average_number_of_employees: -1
}
}
])
Introduction to $group
$group
is similar to SQL Group by command. In the below example, we’re going to aggregate companies on the basis of the year in which they were founded. And calculate the average number of employees for each company.
db.companies.aggregate([{
$group: {
_id: {
founded_year: "$founded_year"
},
average_number_of_employees: {
$avg: "$number_of_employees"
}
}
}, {
$sort: {
average_number_of_employees: -1
}
}
])
This aggregation pipeline has 2 stages
$group
$sort
Now, fundamental to the $group
stage is the _id
field that we specify as the part of the document. That is the value of the $group
operator itself using a very strict interpretation of the arrogation framework syntax. _id
is how we define, how we control, how we tune what the group stage uses to organize the documents that it sees.
The below query find the relationships of the people with companies using $sum
operator:
db.companies.aggregate([{
$match: {
"relationships.person": {
$ne: null
}
}
}, {
$project: {
relationships: 1,
_id: 0
}
}, {
$unwind: "$relationships"
}, {
$group: {
_id: "$relationships.person",
count: {
$sum: 1
}
}
}, {
$sort: {
count: -1
}
}])
_id in $group Stages
We’re going to understand the _id
field within the $group
stage & look at some best practices for constructing _id
s in group aggregation stages. Let’s look at this query:
db.companies.aggregate([{
$match: {
founded_year: {
$gte: 2010
}
}
}, {
$group: {
_id: {
founded_year: "$founded_year"
},
companies: {
$push: "$name"
}
}
}, {
$sort: {
"_id.founded_year": 1
}
}]).pretty()
One thing which might not be clear to us is why the _id
field is constructed this “document” way? We could have done it this way as well:
db.companies.aggregate([{
$match: {
founded_year: {
$gte: 2010
}
}
}, {
$group: {
_id: "$founded_year",
companies: {
$push: "$name"
}
}
}, {
$sort: {
"_id": 1
}
}]).pretty()
We don’t do it this way, because in these output documents - it’s not explicit what exactly this number means. So, we actually don’t know. And in some cases, that means there maybe confusion in interpreting these documents. So, another case maybe to group an _id
document with multiple fields:
db.companies.aggregate([{
$match: {
founded_year: {
$gte: 2010
}
}
}, {
$group: {
_id: {
founded_year: "$founded_year",
category_code: "$category_code"
},
companies: {
$push: "$name"
}
}
}, {
$sort: {
"_id.founded_year": 1
}
}]).pretty()
$push
simply pushes the elements to generating arrays. Often, it might be required to group on promoted fields to upper level:
db.companies.aggregate([{
$group: {
_id: {
ipo_year: "$ipo.pub_year"
},
companies: {
$push: "$name"
}
}
}, {
$sort: {
"_id.ipo_year": 1
}
}]).pretty()
It’s also perfect to have an expression that resolves to a document as a _id
key.
db.companies.aggregate([{
$match: {
"relationships.person": {
$ne: null
}
}
}, {
$project: {
relationships: 1,
_id: 0
}
}, {
$unwind: "$relationships"
}, {
$group: {
_id: "$relationships.person",
count: {
$sum: 1
}
}
}, {
$sort: {
count: -1
}
}])
$group vs. $project
Not all accumulators are available in $project
stage. We need to consider what we can do in project with respect to accumulators and what we can do in group. Let’s take a look at this:
db.companies.aggregate([{
$match: {
funding_rounds: {
$ne: []
}
}
}, {
$unwind: "$funding_rounds"
}, {
$sort: {
"funding_rounds.funded_year": 1,
"funding_rounds.funded_month": 1,
"funding_rounds.funded_day": 1
}
}, {
$group: {
_id: {
company: "$name"
},
funding: {
$push: {
amount: "$funding_rounds.raised_amount",
year: "$funding_rounds.funded_year"
}
}
}
}, ]).pretty()
Where we’re checking if any of the funding_rounds
is not empty. Then it’s unwind
-ed to $sort
and to later stages. We’ll see one document for each element of the funding_rounds
array for every company. So, the first thing we’re going to do here is to $sort
based on:
funding_rounds.funded_year
funding_rounds.funded_month
funding_rounds.funded_day
In the group stage by company name, the array is getting built using $push
. $push
is supposed to be part of a document specified as the value for a field we name in a group stage. We can push on any valid expression. In this case, we’re pushing on documents to this array and for every document that we push it’s being added to the end of the array that we’re accumulating. In this case, we’re pushing on documents that are built from the raised_amount
and funded_year
. So, the $group
stage is a stream of documents that have an _id
where we’re specifying the company name.
Notice that $push
is available in $group
stages but not in $project
stage. This is because $group
stages are designed to take a sequence of documents and accumulate values based on that stream of documents.
$project
on the other hand, works with one document at a time. So, we can calculate an average on an array within an individual document inside a project stage. But doing something like this where one at a time, we’re seeing documents and for every document, it passes through the group stage pushing on a new value, well that’s something that the $project
stage is just not designed to do. For that type of operation we want to use $group
.
Let’s take a look at another example:
db.companies.aggregate([{
$match: {
funding_rounds: {
$exists: true,
$ne: []
}
}
}, {
$unwind: "$funding_rounds"
}, {
$sort: {
"funding_rounds.funded_year": 1,
"funding_rounds.funded_month": 1,
"funding_rounds.funded_day": 1
}
}, {
$group: {
_id: {
company: "$name"
},
first_round: {
$first: "$funding_rounds"
},
last_round: {
$last: "$funding_rounds"
},
num_rounds: {
$sum: 1
},
total_raised: {
$sum: "$funding_rounds.raised_amount"
}
}
}, {
$project: {
_id: 0,
company: "$_id.company",
first_round: {
amount: "$first_round.raised_amount",
article: "$first_round.source_url",
year: "$first_round.funded_year"
},
last_round: {
amount: "$last_round.raised_amount",
article: "$last_round.source_url",
year: "$last_round.funded_year"
},
num_rounds: 1,
total_raised: 1,
}
}, {
$sort: {
total_raised: -1
}
}]).pretty()
In the $group
stage, we’re using $first
and $last
accumulators. Right, again we can see that as with $push
- we can’t use $first
and $last
in project stages. Because again, project stages are not designed to accumulate values based on multiple documents. Rather they’re designed to reshape documents one at a time. Total number of rounds is calculated using the $sum
operator. The value 1 simply counts the number of documents passed through that group together with each document that matches or is grouped under a given _id
value. The project may seem complex, but it’s just making the output pretty. It’s just that it’s including num_rounds
and total_raised
from the previous document.