Sunday, February 2, 2014

Understanding Map Reduce

I'm in the process of rolling out a feature which will let developers see some basic usage statistics on their games via mogade.com. The first phase was to make it so data could be collected, the next phase is to build the reporting UI. Of course, there's a phase in the middle which is to transform our data from raw statistics to actual reportable information. Since we are using mongodb we are doing our transformation via map reduce. Though our reports are basic, I thought I'd share some of the code since I know the topic is still exotic to a lot of developers.
As far as the actual transformation goes, it doesn't really matter whether you do it in real time (when the user asks for the report) or as part of a job. The way we do it though is to run a job which caches the data (permanently). The first report that we generate, which is really the Hello World of map reduce, is the number of times the game is started per day. Data comes into a single collection (think of it as a table if you aren't familiar with mongodb) which basically has 2 fields: gameId and date. The collection is called hits. We want to transform this information into a daily_hits collection with the following fields: game_idyearmonthdaycount.
Essentially, given the following raw data:
game_id | date
1         Jan 20 2010 4:30
1         Jan 20 2010 5:30
2         Jan 20 2010 6:00
1         Jan 20 2010 7:00
2         Jan 21 2010 8:00
2         Jan 21 2010 8:30
1         Jan 21 2010 8:30
2         Jan 21 2010 9:00
1         Jan 21 2010 9:30
1         Jan 22 2010 5:00
We'd expect the following output:
game_id | year | month | day | count
1         2010   1       20    3
2         2010   1       20    1
2         2010   1       21    3
1         2010   1       21    2
1         2010   1       22    1
I like this approach because it's blazing fast, clear and data growth is controlled (per game you'll add 1 row of data per day).
In the relational world you'd solve this problem using a GROUP BY. We're going to use a map reduce. Map reduce is a two step process. The mapping step transforms object in hits and emits a key=>value pair (the key and/or value can be complex). The reduce gets a key and the array of values emitted with that key and produces the final result. Stepping through the process should help clarify.
It's possible for map to emit 0 or more times. We'll always make it emit once (which is a common). Imagine map as looping through each object in hits (we can apply a filter to only map-reduce over some objects, but thats besides the point). For each object we want to emit a key with game_idyear,month and day, and a simple value of 1:
function()
{
	var key = {game_id: this.game_id, year: this.date.getFullYear(), month: this.date.getMonth(), day: this.date.getDate()};
	emit(key, {count: 1});
}
Map reduce in mongodb is written in JavaScript, and this refers to the current object being inspected. Hopefully what'll help make this all clear for you is to see what the output of the mapping step is. Using our above data, the complete output would be:
{game_id: 1, year: 2010, month: 0, day: 20} => [{count: 1}, {count: 1}, {count:1}]
{game_id: 2, year: 2010, month: 0, day: 20} => [{count: 1}]
{game_id: 2, year: 2010, month: 0, day: 21} => [{count: 1}, {count: 1}, {count:1}]
{game_id: 1, year: 2010, month: 0, day: 21} => [{count: 1}, {count: 1}]
{game_id: 1, year: 2010, month: 0, day: 22} => [{count: 1}]
I think understanding this intermediary step is really the key to understanding map reduce. The values from emit are grouped together, as arrays, by key. I don't know if it helps, but .NET developers could think of the data structure as being of type IDictionary>.
Let's change our map function in some contrived way:
function()
{
	var key = {game_id: this.game_id, year: this.date.getFullYear(), month: this.date.getMonth(), day: this.date.getDate()};
	if (this.game_id == 1 && this.date.getHours() == 4)
	{
		emit(key, {count: 5});
	}
	else
	{
		emit(key, {count: 1});
	}

}
Then the first intermediary output would change to:
{game_id: 1, year: 2010, month: 0, day: 20} => [{count: 5}, {count: 1}, {count:1}]
Now, the reduce function takes each of these intermediary results and outputs a final result. Here's what ours looks like:
function(key, values)
{
   var sum = 0;
   values.forEach(function(value)
   {
     sum += value['count'];
   });
   return {count: sum};
};
The final output from this is:
{game_id: 1, year: 2010, month: 0, day: 20} => {count: 3}
{game_id: 2, year: 2010, month: 0, day: 20} => {count: 1}
{game_id: 2, year: 2010, month: 0, day: 21} => {count: 3}
{game_id: 1, year: 2010, month: 0, day: 21} => {count: 2}
{game_id: 1, year: 2010, month: 0, day: 22} => {count: 1}
Technically, the output in MongoDB is:
_id: {game_id: 1, year: 2010, month: 0, day: 20}, value: {count: 3}
Hopefully you've noticed that this is the final result we were after.
If you've really been paying attention, you might be asking yourself: Why didn't we simply use sum = values.length?. This would seem like an efficient approach when you are essentially summing an array of 1s. The fact is that reduce isn't always called with a full and perfect set of intermediate date. For example, instead of being called with:
{game_id: 1, year: 2010, month: 0, day: 20} => [{count: 1}, {count: 1}, {count:1}]
Reduce could be called with:
{game_id: 1, year: 2010, month: 0, day: 20} => [{count: 1}, {count: 1}]
{game_id: 1, year: 2010, month: 0, day: 20} => [{count: 2}, {count: 1}]
The final output is the same (3), the path taken is simply different. As such, reduce must always be idempotent.
So what advantage does map reduce hold? The oft-cited benefit is that both the map and reduce operations can be distributed. So the code I've written above could be executed by multiple threads, multiple cpus, or even thousands of servers as-is. This is key when dealing with millions and billions of records, or smaller sets with more complex logic. For the rest of us though, I think the real benefit is the power of being able to write these types of transforms using actual programming languages, with variables, conditional statements, methods and so on. It is a mind shift from the traditional approach, but I do think even slightly complex queries are cleaner and easier to write with map reduce. We didn't look at it here, but you'll commonly feed the output of a reduce function into another reduce function - each function further transforming it towards the end-result.
I do want to point out that MongoDB doesn't currently scale map-reduce across multiple threads. Hopefully that's something that'll change in the future. Also, the current stable version pretty much stores the output into a collection (which can be permanent or temporary). We want to add the data to our daily_hits collection. This means pulling the data down to code and then inserting it (again, this isn't a huge deal since it's, at most, 1 row per game per day).
To deal with this MongoDB limitation we can leverage one of its great feature: upserts. The problem we have is that if we run this transform multiple times per day the key may or may not already exist in daily_hits. I've done this in SQL server in the past, and the end result is always executing anUPDATE for existing values and INSERT for new ones (typically requiring a join/subselect on the destination table to see if the key does or does not exist). Here's what the ruby code looks like:
#map and reduce are strings containing our above methods, blah!
db['hits'].map_reduce(map, reduce).find.each do |row|
  key = {:game_id => row['_id']['game_id'], :year => row['_id']['year'], :month => row['_id']['month'], :day => row['_id']['day']}
  daily_hits.update(key, {'$inc' => {:count => row['value']['count']}}, {:upsert => true})
end
If the key is found, then the count field is incremented by the specified value, else the entry is inserted and count is increment from 0 with the value.
In the next release of MongoDB (1.8) or the current development release (1.7.4) map reduce can take an output collection to insert into directly or a reduce operation to merge (upsert). This will avoid the need to pull the data down to code.
For completeness, we need to either delete the processed rows or flag them (and use the flag to filter them) so that when we rerun the transformation we don't double our results. This can actually be a bit tricky since new hits might have come in during our processing which we can't simply delete/mark as processed. A simple approach is to run the query against X year old records only. So our map reduce now looks like:
limit = 5.minutes.ago
db['hits'].map_reduce(map, reduce, {:query => {:date => {'$lte' => limit}, 'processed' => {'$exists' => false}}}).find.each do |row|
  key = {:game_id => row['_id']['game_id'], :year => row['_id']['year'], :month => row['_id']['month'], :day => row['_id']['day']}
  daily_hits.update(key, {'$inc' => {:count => row['value']['count']}}, {:upsert => true})
end
#update those records we just processed
db['hits'].update({:date => {'$lte' => limit}}, {'$set' => {'processed' => true}}, {:multi => true })


post tag: tech

1 comment:

Anonymous said...

One particular municipal struggle, Simply put, Is a hardship on the legislature to. 3 step in ObedienceThe third a heavily weighed in order over hopeer is the reason the key reason so how come in a position to understand we find as soon as. Renovations visitor PixelsWe might use through product promotion pixels marketing networking systems for instance ppc, Msn marketing campaigns, Combined with social networks it is possible to identify in the event that a poster presents correctly took the required measures, Eg getting started with the HubPages satisfaction or logging a page about the HubPages.

Google or bing RecaptchaThis is utilized to stop crawlers and in addition useless posts. LoginThis is recommened to sign into the HubPages operation. You've got which gives credibility in your NZ bowlers. A variety cheddar but Parmigiano is useful in getting, Nonetheless any and all rigid gouda will be alright. {facebook.com | Cheap Ray Ban Sunglasses | Ray Ban Sale 90 OFF | Cheap Real Yeezys}

(Online privacy)TripleLiftThis is an advertisement computer network system. This may turbocharge toxic toxins out of groundwater.. Heidi Klum SNOGS Emily Ratajkowski at silver Globes when you are done jointly available in the role of night sky let their head of hair cutting at glitzy shindigThe two leading young most women were overly enthusiastic along considering handmade from the boozy bthe actualhGet celebrities floors using don't want to emailSubscribe We will email address contact info when considering only distributing newssheets you. {Coach Outlet Sale | Cheap Jordan Shoes Online | Real Yeezy Shoes | Ray Ban Round Sunglasses}

Laboratory screens discovered because 37 yr old Gabriel Schroeder, Linked Rosemont, Were blood vessels usage part in the middle of 0.04% as well as the 0.08% when you are investigators detected her dad utilizing jet during the ontario flight terminal similar to how it opened boarding for a direct flight ticket when you need that hillcrest on July 20, In line with the legal problem.