The Map Phase of Hadoop’s MapReduce Application Flow

By Dirk deRoos

A MapReduce application processes the data in input splits on a record-by-record basis and that each record is understood by MapReduce to be a key/value pair. After the input splits have been calculated, the mapper tasks can start processing them — that is, right after the Resource Manager’s scheduling facility assigns them their processing resources. (In Hadoop 1, the JobTracker assigns mapper tasks to specific processing slots.)

The mapper task itself processes its input split one record at a time — in the figure, this lone record is represented by the key/value pair . In the case of our flight data, when the input splits are calculated (using the default file processing method for text files), the assumption is that each row in the text file is a single record.

image0.jpg

For each record, the text of the row itself represents the value, and the byte offset of each row from the beginning of the split is considered to be the key.

You might be wondering why the row number isn’t used instead of the byte offset. When you consider that a very large text file is broken down into many individual data blocks, and is processed as many splits, the row number is a risky concept.

The number of lines in each split vary, so it would be impossible to compute the number of rows preceding the one being processed. However, with the byte offset, you can be precise, because every block has a fixed number of bytes.

As a mapper task processes each record, it generates a new key/value pair: The key and the value here can be completely different from the input pair. The output of the mapper task is the full collection of all these key/value pairs.

Before the final output file for each mapper task is written, the output is partitioned based on the key and sorted. This partitioning means that all of the values for each key are grouped together.

In the case of the fairly basic sample application, there is only a single reducer, so all the output of the mapper task is written to a single file. But in cases with multiple reducers, every mapper task may generate multiple output files as well.

The breakdown of these output files is based on the partitioning key. For example, if there are only three distinct partitioning keys output for the mapper tasks and you have configured three reducers for the job, there will be three mapper output files. In this example, if a particular mapper task processes an input split and it generates output with two of the three keys, there will be only two output files.

Always compress your mapper tasks’ output files. The biggest benefit here is in performance gains, because writing smaller output files minimizes the inevitable cost of transferring the mapper output to the nodes where the reducers are running.

The default partitioner is more than adequate in most situations, but sometimes you may want to customize how the data is partitioned before it’s processed by the reducers. For example, you may want the data in your result sets to be sorted by the key and their values — known as a secondary sort.

To do this, you can override the default partitioner and implement your own. This process requires some care, however, because you’ll want to ensure that the number of records in each partition is uniform. (If one reducer has to process much more data than the other reducers, you’ll wait for your MapReduce job to finish while the single overworked reducer is slogging through its disproportionally large data set.)

Using uniformly sized intermediate files, you can better take advantage of the parallelism available in MapReduce processing.