Sunday, February 2, 2014

Understanding Map Reduce

I'm in the process of rolling out a feature which will let developers see some basic usage statistics on their games via The first phase was to make it so data could be collected, the next phase is to build the reporting UI. Of course, there's a phase in the middle which is to transform our data from raw statistics to actual reportable information. Since we are using mongodb we are doing our transformation via map reduce. Though our reports are basic, I thought I'd share some of the code since I know the topic is still exotic to a lot of developers.
As far as the actual transformation goes, it doesn't really matter whether you do it in real time (when the user asks for the report) or as part of a job. The way we do it though is to run a job which caches the data (permanently). The first report that we generate, which is really the Hello World of map reduce, is the number of times the game is started per day. Data comes into a single collection (think of it as a table if you aren't familiar with mongodb) which basically has 2 fields: gameId and date. The collection is called hits. We want to transform this information into a daily_hits collection with the following fields: game_idyearmonthdaycount.
Essentially, given the following raw data:
game_id | date
1         Jan 20 2010 4:30
1         Jan 20 2010 5:30
2         Jan 20 2010 6:00
1         Jan 20 2010 7:00
2         Jan 21 2010 8:00
2         Jan 21 2010 8:30
1         Jan 21 2010 8:30
2         Jan 21 2010 9:00
1         Jan 21 2010 9:30
1         Jan 22 2010 5:00
We'd expect the following output:
game_id | year | month | day | count
1         2010   1       20    3
2         2010   1       20    1
2         2010   1       21    3
1         2010   1       21    2
1         2010   1       22    1
I like this approach because it's blazing fast, clear and data growth is controlled (per game you'll add 1 row of data per day).
In the relational world you'd solve this problem using a GROUP BY. We're going to use a map reduce. Map reduce is a two step process. The mapping step transforms object in hits and emits a key=>value pair (the key and/or value can be complex). The reduce gets a key and the array of values emitted with that key and produces the final result. Stepping through the process should help clarify.
It's possible for map to emit 0 or more times. We'll always make it emit once (which is a common). Imagine map as looping through each object in hits (we can apply a filter to only map-reduce over some objects, but thats besides the point). For each object we want to emit a key with game_idyear,month and day, and a simple value of 1:
	var key = {game_id: this.game_id, year:, month:, day:};
	emit(key, {count: 1});
Map reduce in mongodb is written in JavaScript, and this refers to the current object being inspected. Hopefully what'll help make this all clear for you is to see what the output of the mapping step is. Using our above data, the complete output would be:
{game_id: 1, year: 2010, month: 0, day: 20} => [{count: 1}, {count: 1}, {count:1}]
{game_id: 2, year: 2010, month: 0, day: 20} => [{count: 1}]
{game_id: 2, year: 2010, month: 0, day: 21} => [{count: 1}, {count: 1}, {count:1}]
{game_id: 1, year: 2010, month: 0, day: 21} => [{count: 1}, {count: 1}]
{game_id: 1, year: 2010, month: 0, day: 22} => [{count: 1}]
I think understanding this intermediary step is really the key to understanding map reduce. The values from emit are grouped together, as arrays, by key. I don't know if it helps, but .NET developers could think of the data structure as being of type IDictionary>.
Let's change our map function in some contrived way:
	var key = {game_id: this.game_id, year:, month:, day:};
	if (this.game_id == 1 && == 4)
		emit(key, {count: 5});
		emit(key, {count: 1});

Then the first intermediary output would change to:
{game_id: 1, year: 2010, month: 0, day: 20} => [{count: 5}, {count: 1}, {count:1}]
Now, the reduce function takes each of these intermediary results and outputs a final result. Here's what ours looks like:
function(key, values)
   var sum = 0;
     sum += value['count'];
   return {count: sum};
The final output from this is:
{game_id: 1, year: 2010, month: 0, day: 20} => {count: 3}
{game_id: 2, year: 2010, month: 0, day: 20} => {count: 1}
{game_id: 2, year: 2010, month: 0, day: 21} => {count: 3}
{game_id: 1, year: 2010, month: 0, day: 21} => {count: 2}
{game_id: 1, year: 2010, month: 0, day: 22} => {count: 1}
Technically, the output in MongoDB is:
_id: {game_id: 1, year: 2010, month: 0, day: 20}, value: {count: 3}
Hopefully you've noticed that this is the final result we were after.
If you've really been paying attention, you might be asking yourself: Why didn't we simply use sum = values.length?. This would seem like an efficient approach when you are essentially summing an array of 1s. The fact is that reduce isn't always called with a full and perfect set of intermediate date. For example, instead of being called with:
{game_id: 1, year: 2010, month: 0, day: 20} => [{count: 1}, {count: 1}, {count:1}]
Reduce could be called with:
{game_id: 1, year: 2010, month: 0, day: 20} => [{count: 1}, {count: 1}]
{game_id: 1, year: 2010, month: 0, day: 20} => [{count: 2}, {count: 1}]
The final output is the same (3), the path taken is simply different. As such, reduce must always be idempotent.
So what advantage does map reduce hold? The oft-cited benefit is that both the map and reduce operations can be distributed. So the code I've written above could be executed by multiple threads, multiple cpus, or even thousands of servers as-is. This is key when dealing with millions and billions of records, or smaller sets with more complex logic. For the rest of us though, I think the real benefit is the power of being able to write these types of transforms using actual programming languages, with variables, conditional statements, methods and so on. It is a mind shift from the traditional approach, but I do think even slightly complex queries are cleaner and easier to write with map reduce. We didn't look at it here, but you'll commonly feed the output of a reduce function into another reduce function - each function further transforming it towards the end-result.
I do want to point out that MongoDB doesn't currently scale map-reduce across multiple threads. Hopefully that's something that'll change in the future. Also, the current stable version pretty much stores the output into a collection (which can be permanent or temporary). We want to add the data to our daily_hits collection. This means pulling the data down to code and then inserting it (again, this isn't a huge deal since it's, at most, 1 row per game per day).
To deal with this MongoDB limitation we can leverage one of its great feature: upserts. The problem we have is that if we run this transform multiple times per day the key may or may not already exist in daily_hits. I've done this in SQL server in the past, and the end result is always executing anUPDATE for existing values and INSERT for new ones (typically requiring a join/subselect on the destination table to see if the key does or does not exist). Here's what the ruby code looks like:
#map and reduce are strings containing our above methods, blah!
db['hits'].map_reduce(map, reduce).find.each do |row|
  key = {:game_id => row['_id']['game_id'], :year => row['_id']['year'], :month => row['_id']['month'], :day => row['_id']['day']}
  daily_hits.update(key, {'$inc' => {:count => row['value']['count']}}, {:upsert => true})
If the key is found, then the count field is incremented by the specified value, else the entry is inserted and count is increment from 0 with the value.
In the next release of MongoDB (1.8) or the current development release (1.7.4) map reduce can take an output collection to insert into directly or a reduce operation to merge (upsert). This will avoid the need to pull the data down to code.
For completeness, we need to either delete the processed rows or flag them (and use the flag to filter them) so that when we rerun the transformation we don't double our results. This can actually be a bit tricky since new hits might have come in during our processing which we can't simply delete/mark as processed. A simple approach is to run the query against X year old records only. So our map reduce now looks like:
limit = 5.minutes.ago
db['hits'].map_reduce(map, reduce, {:query => {:date => {'$lte' => limit}, 'processed' => {'$exists' => false}}}).find.each do |row|
  key = {:game_id => row['_id']['game_id'], :year => row['_id']['year'], :month => row['_id']['month'], :day => row['_id']['day']}
  daily_hits.update(key, {'$inc' => {:count => row['value']['count']}}, {:upsert => true})
#update those records we just processed
db['hits'].update({:date => {'$lte' => limit}}, {'$set' => {'processed' => true}}, {:multi => true })

post tag: tech


oakleyses said...

louis vuitton handbags, oakley sunglasses, louboutin, longchamp outlet, nike shoes, louis vuitton outlet stores, chanel handbags, burberry outlet, prada outlet, jordan shoes, tiffany and co, michael kors outlet, tory burch outlet, louis vuitton outlet, longchamp handbags, nike free, true religion jeans, michael kors outlet, kate spade outlet, polo ralph lauren outlet, tiffany and co, prada handbags, polo ralph lauren outlet, michael kors outlet, michael kors outlet, longchamp handbags, oakley sunglasses, ray ban sunglasses, kate spade handbags, burberry outlet, louis vuitton outlet, louboutin outlet, louboutin, coach factory outlet, air max, air max, coach outlet, gucci outlet, christian louboutin shoes, michael kors outlet, coach purses, ray ban sunglasses, michael kors outlet, louis vuitton, coach outlet store online, true religion jeans, oakley sunglasses cheap

oakleyses said...

ralph lauren, lululemon, air max, hollister, north face, nike air max, polo lacoste, vanessa bruno, timberland, vans pas cher, louboutin, louis vuitton, oakley pas cher, air max pas cher, nike roshe run, air max, true religion outlet, barbour, sac longchamp, air force, hollister, sac louis vuitton, nike free, polo ralph lauren, nike trainers, louis vuitton uk, nike roshe, sac hermes, longchamp, michael kors, sac burberry, sac guess, mulberry, new balance pas cher, converse pas cher, sac louis vuitton, hogan outlet, nike tn, north face, true religion outlet, ray ban pas cher, michael kors, air jordan, nike blazer, nike free pas cher, michael kors pas cher, abercrombie and fitch, ray ban sunglasses

oakleyses said...

mac cosmetics, mont blanc, marc jacobs, canada goose outlet, nike huarache, vans shoes, soccer jerseys, hollister, giuseppe zanotti, beats by dre, abercrombie and fitch, longchamp, insanity workout, celine handbags, bottega veneta, ghd, nfl jerseys, north face outlet, chi flat iron, ugg boots, birkin bag, ugg australia, canada goose, herve leger, ugg pas cher, rolex watches, valentino shoes, canada goose uk, canada goose, ferragamo shoes, canada goose, ugg boots, uggs outlet, north face jackets, soccer shoes, asics running shoes, new balance shoes, p90x, lululemon outlet, canada goose jackets, mcm handbags, instyler, babyliss pro, ugg, wedding dresses, jimmy choo outlet, reebok outlet, nike roshe run

oakleyses said...

parajumpers, karen millen, air max, converse, pandora charms, moncler, louboutin, moncler, links of london, lancel, juicy couture outlet, oakley, hollister, pandora charms, supra shoes, thomas sabo, canada goose, gucci, wedding dresses, timberland boots, swarovski crystal, air max, coach outlet store online, moncler, ray ban, canada goose, moncler, ugg, louis vuitton, swarovski, hollister, montre homme, moncler, hollister clothing store, ralph lauren, rolex watches, moncler outlet, moncler, iphone 6 cases, baseball bats, juicy couture outlet, toms shoes, vans, pandora jewelry, ugg, converse shoes