In this article by Sourav Gulati and Sumit Kumar authors of book Apache Spark 2.x for Java Developers , explain in classical sense if we are to talk of Hadoop, then it comprises of two components a storage layer called HDFS and a processing layer called MapReduce. The resource management task prior to Hadoop 2.X was done using MapReduce Framework of Hadoop itself, however that changed with the introduction of YARN. In Hadoop 2.0 YARN was introduced as the third component of Hadoop to manage the resources of Hadoop Cluster and make it more Map Reduce agnostic.
(For more resources related to this topic, see here.)
Hadoop Distributed File System as the name suggests is a distributed file system based on the lines of Google File System written in Java. In practice HDFS resembles closely like any other UNIX file system with support for common file operations like ls, cp, rm, du, cat and so on. However what makes HDFS stand out despite its simplicity, is its mechanism to handle node failure in Hadoop cluster without effectively changing the seek time for accessing stored files. HDFS cluster consists of two major components: Data Nodes and Name Node.
HDFS has a unique way of storing data on HDFS clusters (cheap commodity networked commodity computers). It splits the regular file in smaller chunks called blocks and then makes an exact number of copies of such chunks depending on the replication factor for that file. After that it copies such chunks to different Data Nodes of the Cluster.
Name Node is responsible for managing the metadata of HDFS cluster such as list of files and folders that exist in a cluster, number of splits each file is divided into and their replication and storage at different Data Nodes. It also maintains and manages the namespace and file permission of all the files available in HDFS cluster. Apart from bookkeeping Name Node also has a supervisory role that keeps a watch on the replication factor of all the files and if some block goes missing then issue commands to replicate the missing block of data. It also generates reports to ascertain cluster health too. It is important to note that all the communication for supervisory task happens from Data Node to Name node that is Data Node sends reports a.k.a block reports to Name Node and it is then that Name Node responds to them by issuing different commands or instructions as the need may be.
A HDFS read operation from a client involves:
A HDFS write operation from a client involves:
Simplest way to understand Yet Another Resource manager (YARN) is to think of it as an operating system on a Cluster; provisioning resources, scheduling jobs & node maintenance. With Hadoop 2.x, MapReduce model of processing the data and managing the cluster (job tracker/task tracker) was divided. While data processing was still left to MapReduce, the cluster’s resource allocation (or rather, scheduling) task was assigned to a new component called YARN. Another objective that YARN met was that it made MapReduce one of the techniques to process the data rather than being the only technology to process data on HDFS as was the case in Hadoop 1.x systems. This paradigm shift opened the flood gate for the development of interesting applications around Hadoop and a new eco-system of not only classical MapReduce processing system evolved. It didn’t take much time after that for Apache Spark to break the hegemony of classical MapReduce and become arguably the most popular processing framework for parallel computing as far as active development and adoption is concerned.
In order to serve Multi-tenancy, fault tolerance, and resource isolation in YARN, it developed below components to manage the cluster seamlessly.
Before delving deep into MapReduce implementation in Hadoop, let’s first understand the MapReduce as a concept in parallel computing and why it is a preferred way of computing. MapReduce comprises two mutually exclusive but dependent phases each capable of running on two different machines or nodes:
Map: In Map phase transformation of data takes place. It splits data into key value pair by splitting it on a keyword.
Suppose we have a text file and we would want to do an analysis such as to count total number of words or even the frequency with which the word has occurred in the text file. This is the classical Word Count problem of MapReduce, now to address this problem first we will have to identify the splitting keyword so that the data can be spilt and be converted into a key value pair.
Let’s begin with John Lennon's song Imagine.
Sample Text:
Imagine there's no heaven
It's easy if you try
No hell below us
Above us only sky
Imagine all the people living for today
After running Map phase on the sampled text and splitting it over <space> it will get converted to key value pair as follows:
<imagine, 1> <there's, 1> <no, 1> <heaven, 1> <it's, 1> <easy, 1> <if, 1> <you, 1> <try, 1> <no, 1> <hell, 1> <below, 1> <us, 1> <above, 1> <us, 1> <only, 1> <sky, 1> <imagine, 1> <all, 1> <the, 1> <people, 1> <living, 1> <for, 1> <today, 1>]
The key here represents the word and value represents the count, also it should be noted that we have converted all the keys to lowercase to reduce any further complexity arising out of matching case sensitive keys.
Reduce: Reduce phase deals with aggregation of Map phase result and hence all the key value pairs are aggregated over key.
So the Map output of the text would get aggregated as follows:
[<imagine, 2> <there's, 1> <no, 2> <heaven, 1> <it's, 1> <easy, 1> <if, 1> <you, 1> <try, 1> <hell, 1> <below, 1> <us, 2> <above, 1> <only, 1> <sky, 1> <all, 1> <the, 1> <people, 1> <living, 1> <for, 1> <today, 1>]
As we can see both Map and Reduce phase can be run exclusively and hence can use independent nodes in cluster to process the data. This approach of separation of tasks into smaller units called Map and Reduce has revolutionized general purpose distributed/parallel computing, which we now know as MapReduce.
Apache Hadoop's MapReduce has been implemented pretty much the same way as discussed except for adding extra features into how the data from Map phase of each node gets transferred to their designated Reduce phase node.
Hadoop's implementation of MapReduce enriches the Map and Reduce phase by adding few more concrete steps in between to make it fault tolerant and truly distributed. We can describe MR jobs on YARN in five stages.
Job Submission Stage: When a client submits a MR Job following things happen
If all the three conditions are met then the MR job jar along with its configuration ,details of input split are copied to HDFS in a directory named the application ID provided by RM. And then the job is submitted to RM to launch a job specific Application Master, MRAppMaster.
MAP Stage: Once RM receives the client's request for launching MRAppMaster, a call is made to YARN scheduler for assigning a container. As per resource availability the container is granted and hence the MRAppMaster is launched at the designated node with provisioned resources. After this MRAppMaster fetches input split information from the HDFS path that was submitted by the client and computes the number of Mapper task that will be launched based on the splits. Depending on number of Mappers it also calculates the required number of Reducers as per configuration, If MRAppMaster now finds the number of Mapper ,Reducer & size of input files to be small enough to be run in the same JVM then it goes ahead in doing so, such tasks are called Uber task. However, in other scenarios MRAppMaster negotiates container resources from RM for running these tasks albeit Mapper tasks having higher order and priority. This is so as Mapper tasks must finish before sorting phase can start.
Data locality is another concern for containers hosting Mappers as data local nodes are preferred over rack local, with least preference being given to remote node hosted data. But when it comes to Reduce phase no such preference of data locality exist for containers. Containers hosting Mapper function first copy mapReduce JAR & configuration files locally and then launch a class YarnChild in the JVM. The mapper then start reading the input files, process them by making key value pairs and writes them in a circular buffer.
Shuffle and Sort Phase: Considering circular buffer has size constraint, after a certain percentage where default being 80, a thread gets spawned which spills the data from buffer. But before copying the spilled data to disk, it is first partitioned with respect to its Reducer then the background thread also sorts the partitioned data on key and if combiner is mentioned then combines the data too. This process optimizes the data once it is copied to their respective partitioned folder. This process is continued until all the data from circular buffer gets written to disk. A background thread again checks if the number of spilled files in each partition is within the range of configurable parameter or else the files are merged and combiner is run over them until it falls within the limit of the parameter.
Map task keeps updating the status to ApplicationMaster its entire life cycle, it is only when 5 percent of Map task has been completed that the reduce task start. An auxiliary service in the NodeManager serving Reduce task starts a Netty web server that makes a request to MRAppMaster for Mapper hosts having specific Mapper partitioned files. All the partitioned files that pertain to the Reducer is copied to their respective nodes in similar fashion. Since multiple files gets copied as data from various nodes representing that reduce nodes gets collected, a background thread merges the sorted map file again sorts them and if Combiner is configured then combines the result too.
Reduce Stage: It is important to note here that at this stage every input file of each reducer should have been sorted by key, this is the presumption with which Reducer starts processing these records and converts the key value pair into aggregated list. Once reducer processes the data it writes them to the output folder as was mentioned during Job submission.
Clean up stage: Each Reducer sends periodic update to MRAppMaster about the task completion, once the Reduce task is over the application master starts the clean-up activity. The submitted job status is changed from running to successful, all the temporary and intermediate files and folders are deleted .The application statistics are archived to job history server.
In this article we saw what is HDFS and YARN along with MapReduce in which we learned different function of MapReduce and HDFS I/O.