Understanding the Apache Hadoop daemons
Most of the Apache Hadoop clusters in production run Apache Hadoop 1.x (MRv1—MapReduce Version 1). However, the new version of Apache Hadoop, 2.x (MRv2—MapReduce Version 2), also referred to as Yet Another Resource Negotiator (YARN) is being adopted by many organizations actively. In this section, we shall go through the daemons for both these versions.
Apache Hadoop 1.x (MRv1) consists of the following daemons:
- Namenode
- Secondary namenode
- Jobtracker
- Datanode
- Tasktracker
All the preceding daemons are Java services and run within their own JVM.
Apache Hadoop stores and processes data in a distributed fashion. To achieve this goal, Hadoop implements a master and slave model. The namenode and jobtracker daemons are master daemons, whereas the datanode and tasktracker daemons are slave daemons.
The namenode daemon is a master daemon and is responsible for storing all the location information of the files present in HDFS. The actual data is never stored on a namenode. In other words, it holds the metadata of the files in HDFS.
The namenode maintains the entire metadata in RAM, which helps clients receive quick responses to read requests. Therefore, it is important to run namenode from a machine that has lots of RAM at its disposal. The higher the number of files in HDFS, the higher the consumption of RAM. The namenode daemon also maintains a persistent checkpoint of the metadata in a file stored on the disk called the fsimage
file.
Whenever a file is placed/deleted/updated in the cluster, an entry of this action is updated in a file called the edits
logfile. After updating the edits
log, the metadata present in-memory is also updated accordingly. It is important to note that the fsimage
file is not updated for every write operation.
In case the namenode daemon is restarted, the following sequence of events occur at namenode boot up:
- Read the
fsimage
file from the disk and load it into memory (RAM). - Read the actions that are present in the
edits
log and apply each action to the in-memory representation of the fsimage
file. - Write the modified in-memory representation to the
fsimage
file on the disk.
The preceding steps make sure that the in-memory representation is up to date.
The namenode daemon is a single point of failure in Hadoop 1.x, which means that if the node hosting the namenode daemon fails, the filesystem becomes unusable. To handle this, the administrator has to configure the namenode to write the fsimage
file to the local disk as well as a remote disk on the network. This backup on the remote disk can be used to restore the namenode on a freshly installed server. Newer versions of Apache Hadoop (2.x) now support High Availability (HA), which deploys two namenodes in an active/passive configuration, wherein if the active namenode fails, the control falls onto the passive namenode, making it active. This configuration reduces the downtime in case of a namenode failure.
Since the fsimage
file is not updated for every operation, it is possible the edits
logfile would grow to a very large file. The restart of namenode service would become very slow because all the actions in the large edits
logfile will have to be applied on the fsimage
file. The slow boot up time could be avoided using the secondary namenode daemon.
The secondary namenode daemon is responsible for performing periodic housekeeping functions for namenode. It only creates checkpoints of the filesystem metadata (fsimage
) present in namenode by merging the edits
logfile and the fsimage
file from the namenode daemon. In case the namenode daemon fails, this checkpoint could be used to rebuild the filesystem metadata. However, it is important to note that checkpoints are done in intervals and it is possible that the checkpoint data could be slightly outdated. Rebuilding the fsimage
file using such a checkpoint could lead to data loss. The secondary namenode is not a failover node for the namenode daemon.
It is recommended that the secondary namenode daemon be hosted on a separate machine for large clusters. The checkpoints are created by merging the edits
logfiles and the fsimage
file from the namenode daemon.
The following are the steps carried out by the secondary namenode daemon:
- Get the
edits
logfile from the primary namenode daemon. - Get the
fsimage
file from the primary namenode daemon. - Apply all the actions present in the
edits
logs to the fsimage
file. - Push the
fsimage
file back to the primary namenode.
This is done periodically and so whenever the namenode daemon is restarted, it would have a relatively updated version of the fsimage
file and the boot up time would be significantly faster. The following diagram shows the communication between namenode and secondary namenode:
The datanode daemon acts as a slave node and is responsible for storing the actual files in HDFS. The files are split as data blocks across the cluster. The blocks are typically 64 MB to 128 MB size blocks. The block size is a configurable parameter. The file blocks in a Hadoop cluster also replicate themselves to other datanodes for redundancy so that no data is lost in case a datanode daemon fails. The datanode daemon sends information to the namenode daemon about the files and blocks stored in that node and responds to the namenode daemon for all filesystem operations. The following diagram shows how files are stored in the cluster:
File blocks of files A, B, and C are replicated across multiple nodes of the cluster for redundancy. This ensures availability of data even if one of the nodes fail. You can also see that blocks of file A are present on nodes 2, 4, and 6; blocks of file B are present on nodes 3, 5, and 7; and blocks of file C are present on 4, 6, and 7. The replication factor configured for this cluster is 3, which signifies that each file block is replicated three times across the cluster. It is the responsibility of the namenode daemon to maintain a list of the files and their corresponding locations on the cluster. Whenever a client needs to access a file, the namenode daemon provides the location of the file to client and the client, then accesses the file directly from the datanode daemon.
The jobtracker daemon is responsible for accepting job requests from a client and scheduling/assigning tasktrackers with tasks to be performed. The jobtracker daemon tries to assign tasks to the tasktracker daemon on the datanode daemon where the data to be processed is stored. This feature is called data locality. If that is not possible, it will at least try to assign tasks to tasktrackers within the same physical server rack. If for some reason the node hosting the datanode and tasktracker daemons fails, the jobtracker daemon assigns the task to another tasktracker daemon where the replica of the data exists. This is possible because of the replication factor configuration for HDFS where the data blocks are replicated across multiple datanodes. This ensures that the job does not fail even if a node fails within the cluster.
The tasktracker daemon is a daemon that accepts tasks (map, reduce, and shuffle) from the jobtracker daemon. The tasktracker daemon is the daemon that performs the actual tasks during a MapReduce operation. The tasktracker daemon sends a heartbeat message to jobtracker, periodically, to notify the jobtracker daemon that it is alive. Along with the heartbeat, it also sends the free slots available within it, to process tasks. The tasktracker daemon starts and monitors the map, and reduces tasks and sends progress/status information back to the jobtracker daemon.
In small clusters, the namenode and jobtracker daemons reside on the same node. However, in larger clusters, there are dedicated nodes for the namenode and jobtracker daemons. This can be easily understood from the following diagram:
In a Hadoop cluster, these daemons can be monitored via specific URLs using a browser. The specific URLs are of the http://<serveraddress>:port_number
type.
By default, the ports for the Hadoop daemons are:
The preceding mentioned ports can be configured in the hdfs-site.xml
and mapred-site.xml
files.
YARN is a general-purpose, distributed, application management framework for processing data in Hadoop clusters.
YARN was built to solve the following two important problems:
- Support for large clusters (4000 nodes or more)
- The ability to run other applications apart from MapReduce to make use of data already stored in HDFS, for example, MPI and Apache Giraph
In Hadoop Version 1.x, MapReduce can be divided into the following two parts:
- The MapReduce user framework: This consists of the user's interaction with MapReduce such as the application programming interface for MapReduce
- The MapReduce system: This consists of system level tasks such as monitoring, scheduling, and restarting of failed tasks
The jobtracker daemon had these two parts tightly coupled within itself and was responsible for managing the tasks and all its related operations by interacting with the tasktracker daemon. This responsibility turned out to be overwhelming for the jobtracker daemon when the nodes in the cluster started increasing and reached the 4000 node mark. This was a scalability issue that needed to be fixed. Also, the investment in Hadoop could not be justified as MapReduce was the only way to process data on HDFS. Other tools were unable to process this data. YARN was built to address these issues and is part of Hadoop Version 2.x. With the introduction of YARN, MapReduce is now just one of the clients that run on the YARN framework.
YARN addresses the preceding mentioned issues by splitting the following two jobtracker responsibilities:
- Resource management
- Job scheduling/monitoring
The jobtracker daemon has been removed and the following two new daemons have been introduced in YARN:
- ResourceManager
- NodeManager
The ResourceManager daemon is a global master daemon that is responsible for managing the resources for the applications in the cluster. The ResourceManager daemon consists of the following two components:
- ApplicationsManager
- Scheduler
The ApplicationsManager performs the following operations:
- Accepts jobs from a client.
- Creates the first container on one of the worker nodes to host the ApplicationMaster. A container, in simple terms, is the memory resource on a single worker node in cluster.
- Restarts the container hosting ApplicationMaster on failure.
The scheduler is responsible for allocating the system resources to the various applications in the cluster and also performs the monitoring of each application.
Each application in YARN will have an ApplicationMaster. This is responsible for communicating with the scheduler and setting up and monitoring its resource containers.
The NodeManager daemon runs on the worker nodes and is responsible for monitoring the containers within the node and its system resources such as CPU, memory, and disk. It sends this monitoring information back to the ResourceManager daemon. Each worker node will have exactly one NodeManager daemon running.
The following are the sequence of steps involved when a job is submitted to a YARN cluster:
- When a job is submitted to the cluster, the client first receives an application ID from the ResourceManager.
- Next, the client copies the job resources to a location in the HDFS.
- The ResourceManager then starts the first container under the NodeManager's management to bring up the ApplicationMaster. For example, if a MapReduce job is submitted, the ResourceManager will bring up the MapReduce ApplicationMaster.
- The ApplicationMaster, based on the job to be executed, requests resources from the ResourceManager.
- Once the ResourceManager schedules a container with the requested resource, the ApplicationMaster contacts the NodeManager to start the container and execute the task. In case of a MapReduce job, that task would be a map or reduce task.
- The client checks with the ApplicationMaster for status updates on the submitted job.
The following diagram shows the interactions of the client and the different daemons in a YARN environment:
In a Hadoop cluster, the ResourceManager and NodeManager daemons can be monitored via specific URLs using a browser. The specific URLs are of the http://<serveraddress>:port_number
type.
By default, the ports for these Hadoop daemons are:
The preceding mentioned ports can be configured in the yarn-site.xml
file.
This was a short introduction to YARN, but it is important as a Hadoop administrator to know about YARN as this is soon going to be the way all Hadoop clusters will function.