Before delving deep into MapReduce implementation in Hadoop, let's first understand MapReduce as a concept in parallel computing and why it is a preferred way of computing. MapReduce comprises two mutually exclusive but dependent phases, each capable of running on two different machines or nodes:
- Map: In the Map phase, the transformation of the data takes place. It splits data into key value pairs by splitting it on a keyword.
- Suppose we have a text file and we would want to do an analysis such as counting the total number of words or even the frequency with which the word has occurred in the text file. This is the classical word count problem of MapReduce. To address this problem, first we will have to identify the splitting keyword so that the data can be spilt and be converted into a key value pair.
Let's begin with John Lennon's song, Imagine.
Sample text:
Imagine there's no heaven
It's easy if you try
No hell below us
Above us only sky
Imagine all the people living for today
After running the Map phase on the sampled text and splitting it over <space>, it will get converted to a key value pair as shown here:
<imagine, 1> <there's, 1> <no, 1> <heaven, 1> <it's, 1> <easy, 1>
<if, 1> <you, 1> <try, 1> <no, 1> <hell, 1> <below, 1> <us, 1>
<above, 1> <us, 1> <only, 1> <sky, 1> <imagine, 1> <all, 1> <the, 1>
<people, 1> <living, 1> <for, 1> <today, 1>]
The key here represents the word and the value represents the count. Also it should be noted that we have converted all the keys to lowercase to reduce any further complexity arising out of matching case sensitive keys.
- Reduce: The Reduce phase deals with aggregation of the Map phase results and hence all the key value pairs are aggregated over the key.
- So the Map output of the text would get aggregated as follows:
[<imagine, 2> <there's, 1> <no, 2> <heaven, 1> <it's, 1> <easy, 1>
<if, 1> <you, 1> <try, 1> <hell, 1> <below, 1> <us, 2> <above, 1>
<only, 1> <sky, 1> <all, 1> <the, 1> <people, 1> <living, 1>
<for, 1> <today, 1>]
As we can see, both the Map and Reduce phases can be run exclusively and hence can use independent nodes in the cluster to process the data. This approach of separation of tasks into smaller units called Map and Reduce has revolutionized general purpose distributed/parallel computing, which we now know as MapReduce.
Apache Hadoop's MapReduce has been implemented pretty much the same way as discussed, except for adding extra features into how the data from the Map phase of each node gets transferred to their designated Reduce phase node.
Hadoop's implementation of MapReduce enriches the Map and Reduce phases by adding a few more concrete steps in between to make it fault tolerant and truly distributed. We can describe MR jobs on YARN in five stages:
- Job Submission Stage: When a client submits an MR job, the following things happen:
- The RM is requested for an application ID
- The input data location is checked and if present then the file split size is computed
- The job's output location needs to exist as well
If all the three conditions are met, then the MR job jar along with its configuration details of input split are copied to HDFS in a directory named the application ID provided by RM. Then the job is submitted to RM to launch a job-specific Application Master, MRAppMaster.
- MAP Stage: Once RM receives the client's request for launching MRAppMaster, a call is made to the YARN scheduler for assigning a container. As per the resource availability, the container is granted and hence the MRAppMaster is launched at the designated node with provisioned resources. After this, MRAppMaster fetches input split information from the HDFS path that was submitted by the client and computes the number of mapper tasks that will be launched based on the splits. Depending on the number of mappers, it also calculates the required number of reducers as per the configuration, If MRAppMaster now finds the number of mapper, reducer and size of input files to be small enough to be run in the same JVM, then it goes ahead in doing so. Such tasks are called Uber tasks. However, in other scenarios, MRAppMaster negotiates container resources from RM for running these tasks, albeit mapper tasks have a higher order and priority. This is why Mapper tasks must finish before the sorting phase can start.
Data locality is another concern for containers hosting mappers, as local data nodes are preferred over rack locals, with the least preference being given to remote node hosted data. But when it comes to the Reduce phase no such preference of data locality exists for containers. Containers hosting function mappers first copy mapReduce JAR and configuration files locally and then launch a class called YarnChild in the JVM. The mapper then starts reading the input file, processes them by making key value pairs, and writes them in a circular buffer.
- Shuffle and Sort Phase: Considering that circular buffers have a size constraint, after a certain percentage, the default being 80, a thread gets spawned which spills the data from the buffer. But, before copying the spilled data to disk, it is first partitioned with respect to its reducer and then the background thread also sorts the partitioned data on a key and if the combiner is mentioned it then combines the data too. This process optimizes the data once it is copied to its respective partitioned folder. This process is continued until all the data from circular buffer gets written to disk. A background thread again checks if the number of spilled files in each partition is within the range of the configurable parameter or else the files are merged and the combiner is run over them until it falls within the limit of the parameter.
A Map task keeps updating the status to ApplicationMaster for its entire life cycle. It is only when 5 percent of a Map task has been completed that the Reduce task starts. An auxiliary service in the NodeManager serving the Reduce task starts a Netty web server that makes a request to MRAppMaster for Mapper hosts having specific Mapper partitioned files. All the partitioned files that pertain to the Reducer are copied to their respective nodes in a similar fashion. Since multiple files get copied as data from various nodes representing that Reduce nods gets collected, a background thread merges the sorted map file and again sorts them and if the combiner is configured, then combines the result too.
- Reduce Stage: It is important to note here that at this stage every input file of each reducer should have been sorted by key. This is the presumption with which the reducer starts processing these records and converts the key value pair into an aggregated list. Once the reducer has processed the data, it writes them to the output folder as was mentioned during the job submission.
- Clean-up Stage: Each reducer sends a periodic update to MRAppMaster about the task completion. Once the Reduce task is over, the ApplicationMaster starts the clean-up activity. The submitted job status is changed from running to successful, and all the temporary and intermediate files and folders are deleted .The application statistics are archived to a job history server.