Using the filesystem
HBase depends on the Hadoop Distributed File System (HDFS).
HDFS fundamentally is a distributed file system, which relies on following core principles:
Getting ready
The following are the benefits of using HDFS:
- It's designed to work as a fault-tolerant system and is rack aware.
- It works on the low-cost commodity hardware.
- HDFS relaxes core system POSIX requirements to facilitate streaming access to the underlying OS access of file system data.
- It's designed to write once and read many times. It also supports parallel reading and processing the data (read, write, and append). It doesn't support random writes of data.
- It's designed to scale at a very large level, which means file size like petabyte of data.
- It works with minimum data motion. The MapReduce processes the data on the machine/node where the data is actually present. This intelligent invocation process, thus avoiding or minimizing the network I/O and keep the expensive I/O operation localized (within the same rack or to the local disk).
- HDFS has an excellent checksummed file system at a block level, and if an inconsistency between the checksum and the block contents is observed, This does not make sense!, the communication is sent to the HDFS master, which synchronizes the making of a new replica of the affected block as well, as the removal of the corrupted block immediately.
A lot of work is continuously happening on the core implementations of HDFS; some are as follows:
- Much granular file-level permissions and authentication.
- Rack awareness was added to optimize the physical location during scheduling task and allocating storage.
- For administrative purposes, a new feature was added known as Safemode.
- In addition to these, for administrators a diagnostics service like fsck was added, this enables is to do an analysis on the missing blocks of a file system.
- Rebalancer tool is an internal distribution mechanism which re-distributes the load in the
DataNode
, which becomes unbalanced due to the continuous data between DataNodes. - An upgrade and rollback step was added for administrators, which now allow reverting to the old version of HDFS in case of any unforeseen situations which was caused by the upgrade; this allows us a safe and painless recovery.
- The concept of checkpoints by secondary
NameNode
is introduced to make sure size of the file which holds logs of HDFS changes stays within the specified limits at theNameNode
.More Information can be obtained at this locations http://hadoop.apache.org/.
Tip
We are not considering a local setup of HBase as we are more focused on the HA and larger scale fully distributed setup.
Data in HDFS is not placed homogeneously in the distributed DataNodes. The most obvious reason is addition of new DataNodes is the preexisting cluster. Internally the system (NameNode) performs various checks before is starts sending the data/new blocks to the DataNode
, which are listed as below:
- One replica of a blow is kept on the same node which is writing the block.
To make sure the fault tolerant design is compiled, the replicas are kept across the distributed rack within the cluster.
- To reduce cross-network chattiness, one replica is placed on the same rack of the node writing to the file. This also helps to keep the homogeneousness of HDFS data in a distributed very large DataNode cluster.
- In some scenario's there can be competing considerations, and this may cause non-uniform data across DataNode.
To overcome this scenario, the new HDFS framework enables administrators with tools which can be use to re-balance, check the data across different DataNodes.
You would need to set up Hadoop 2.2.0 in a fully distributed mode, as discussed in the previous section. Web interface is also used for browsing the file system.
How to do it…
To use the File system we go as per the following steps:
- Logging the
NameNode
instance by the following:ssh hadoop@your-namenode ( you can you IP or the fully qualified machine name) then type cd /u/HBase B/hadoop-2.2.0/bin
- Let's run some commands related to
dfs
:Note: this will make sure the setup is proper and we are able to interact with it /u/HBase B/hadoop-2.2.0/bin/hadoop dfs -ls / drwxr-xr-x - hadoop supergroup 0 2014-08-13 22:48 /nn01 drwxr-xr-x - hadoop supergroup 0 2014-08-17 23:28 /nn02
For Putting the file into HDFS:
/u/HBase B/hadoop-2.2.0/bin/hadoop dfs -put hello.txt /nn02/hello.txt running /u/HBase B/hadoop-2.2.0/bin/hadoop dfs –du /nn01/ /nn02 0 /nn02/hello.txt 0 /nn01/hello.txt
For the recursive version:
/u/HBase B/hadoop-2.2.0/bin/hadoop dfs -ltr / drwxr-xr-x - hadoop supergroup 0 2014-08-13 22:48 /nn01 -rw-r--r-- 3 hadoop supergroup 0 2014-08-13 22:48 /nn01/hello.txt drwxr-xr-x - hadoop supergroup 0 2014-08-17 23:39 /nn02 -rw-r--r-- 3 hadoop supergroup 0 2014-08-17 23:39 /nn02/hello.txt
Similarly you can use the following commands:
touchz, text,tail, stat, setrep, rmr, rm, put, mv, movefromLocal, mkdir, lsr, ls, getmerge, get, dus, expunge, du, copyToLocal, chown, chmod, chgrp, cat.
- Let us take a look at
fsck
commands:hdfs fsck [GENERIC_OPTIONS] <path> [-move | -delete | -openforwrite] [-files [-blocks [-locations | -racks]]]
-move
: This moves the corrupted files to /lost +found-delete
: This deletes the corrupted files-openforwrite
: This prints out the files opened for write-files
: This prints out the files being checked-blocks
: This prints the block report-locaitons
: This prints location of every block-rackes
: This prints network topology for the data-node location
- Let's take a look on some
NameNode
:hadoop namenode [-format] | [-upgrade] | [-rollback] | [-finalize] | [-importCheckpoint] hadoop namenode –format Formats the namenode. Hafoop namenode –upgrade ,first it upgraded the namenode and then distributes and starts the new namenode Hadop namnode –rollback as the name suggests the Rollsback namenode to the previous version. This should be used only after stopping the cluster and distributing the old hadoop version. hadoop namenode -finalize Resent upgrade will become permanent. hadoop namnode –importCheckpoint Load image from a checkpoint directory and save it into the current one.
- Let's consider
seconderynamenode
:hadoop secondarynamenode [-checkpoint [force]] | [-geteditsize] hadoop secondarynamenode –geteditsize Prints the Edit Log size hadoop secondarynamenode –checkpoint [force] checkpoints the secondary namenode if EditLog size >= fs.checkpoint.size. If –force is used, checkpoint irrespective of EditLog size.
- We have discussed
DataNode
and its functions:hadoop datanode [-rollback] It rollsback the datanode to the previous version. This should be only used after stopping the all the datanode and distributing the old hadoop version.
- Considering Jobtracker runs the MapReduce job tracker node:
hadoop jobtracker
The HBase setup
Configuring HBase in a fully distributed environment:
- Prerequisites: The
hadoop/hdfs
cluster is healthy - It has
namenode,data
node, secondarynamenode
setup done as discussed earlier - Passwordless access is there between the
namenode
,datanode
,secondary namenocde
- The directory structure is having appropriate access levels
- Hope paths are set as described earlier
Just for recap you can run this command, and it must show the following details:
Tip
Please check the compatibility of Hadoop and HBase .
In this book, we used hadoop-2.2.0 and HBase 0.98.5-hadoop2.
- Let's go to the
NameNode
of Hadoop/HDFS by typing this command:Vi /u/HBase B/hadoop-2.2.0/etc/hadoop/ hdfs-site.xml
The setup should be like this:
These are the data nodes that we will use for regional servers later on. We will use NameNode as an HBase master node.
vi /u/HBase B/hadoop-2.2.0/etc/hadoop/slave it should have the nodes which will be used as a data node your-datanode01 your-datenode02
The following steps will help you to implement the same:
- Copy the
hdfs-stie.xml
which is in Hadoop setup to:cd $HBase _HOME/conf
- Also, copy it to all the Region servers. Edit the
regionserver
file by:Vi $HBase _HOME/conf/ regionservers on the HMASTER server
- Place the IP or the fully qualified name of the region servers.
Vi HBase -env.sh and change the export HBase _MANAGES_ZK=true
- This will allow HBase to manage the zookeeper internally on port 2181.
Starting the cluster
For starting the HBase cluster, we will go to:
cd $HBase _HOME/bin start-HBase .sh
This will start the entire cluster and its region servers.
Tip
Please check the logs in the log folder just to make sure the cluster starts properly:
cd $HBase _LOGS/ ls -ltr -rw-rw-r--. 1 hadoop hadoop 0 Aug 29 19:22 SecurityAuth.audit -rw-rw-r--. 1 hadoop hadoop 92590 Aug 30 15:04 HBase -hadoop-zookeeper-your-HBase -master.log -rw-rw-r--. 1 hadoop hadoop 484092 Aug 30 16:31 HBase -hadoop-master-rchoudhry-your-HBase -master.log tail -200 HBase -hadoop-zookeeper-your-HBase -master.log
There you will see no binding errors or exceptions.
tail -200 hadoop hadoop 484092 Aug 30 16:31 HBase -hadoop-master-rchoudhry-your-HBase -master.log
There should be no errors or exceptions.
Validating the cluster
Let's validate all of the setup of HBase ; on the master node run jps, it will show the following:
[hadoop@rchoudhry-linux64 logs]$ jps 960 SecondaryNameNode // secondary name node is up 8467 NameNode // Name node is up 11892 HQuorumPeer // zookeeper is running in Quorum mode 25318 Jps // pls neglect this 12008 HMaster // HBase Master is running successfully 8699 ResourceManager // Resource manager is running 12171 HRegionServer // HBase Region server is running 8974 JobHistoryServer // JobHistory Server is running
This will ensure that all the system on the master is working perfectly. We are having a region server on the master node; hence, we are seeing HRegionServer
listed as earlier.
On the region server (your region server running on different node), use the same command and you will see the following:
13026 NodeManager 12425 Jps 12778 DataNode 13567 HRegionServer
We will make sure that all the region servers are working. Basic operations on the cluster:
On the HBase Master:
cd $HBase _HOME/bin [hadoop@rchoudhry-linux64 bin]$ HBase shell -d HBase (main):001:0>
This is the command line for HBase
shell. We are using the –d
option to manage it in a debug mode. In production, it should be avoided and we should see the logs file to make sure that the logs
is not having connection errors to any of the components:
HBase (main):001:0> list City_Table MyClickStream t1 3 row(s) in 1.1270 seconds ["City_Table", "MyClickStream", "t1"] HBase (main):002:0> statusHBase (main):002:0> status 'simple'HBase (main):002:0> status 'summary'HBase (main):002:0> status 'detailed' HBase (main):002:0> describe 'MyClickStream' HBase (main):002:0> scan 'yourtablename' HBase (main):002:0> create 'yourtablename','cf01',cf'02'
There are many such commands that we can run from the HBase
shell command line, which we will discuss as we go through different chapters as we go ahead.
The preceding tables are created in the following section. It's just for reference.
The following is the Snapshot process:
- We will consider from Hadoop and then from an HBase prospective; once the directory is marked as ready to snapshot, which essentially means it's not getting any operations of read/write at this particular time, at this time a snapshot can be taken.
- It can be taken on any dir within the Hadoop/HBase data ecosystem. A snapshottable directory has a limit of 65,536 concurrent snapshots. There is no limit on the snapshottable directories (however file descriptor or other OS limitations can come into the picture). It's a good practice for administrators to set any directory to be snapshottable.
Note
If a snapshottable directory has snapshots, it won't allow deletes or renames before all the snapshots residing are deleted.
- There is a system limitation that doesn't allow nested snapshottable directories.
Create a directory as a snapshot:
hdfs dfs -mkdir /snapshot using this command we can make it enable for snapshots. hdfs dfsadmin -allowSnapshot /snapshot hdfs dfs -createSnapshot /snapshot [<snapshotName>]
Deleting a snapshot:
Delete a snapshot from a snapshottable directory.
This can be only done using the owners privilege of the snapshottable directory:
hdfs dfs -deleteSnapshot <path> <snapshotName>
Snapshots in HBase :
To reduce the impact on the Region Servers, HBase snapshots by design give flexibility to clone a table without making data copies. In addition to this, we can export the table to another cluster, this will also avoid any impact on the region server.
Configuring HBase Snapshot:
<property> <name>HBase .snapshot.enabled</name> <value>true</value> </property>
We are assuming that a table MyClickStream
is created in HBase . We can also create the table if it's not present:
./bin/HBase shell HBase > create 'MyClickStream' ,'cf01', 'cf2' cf01-> is represented as a column family 01 cf02-> is represented as a column family 02 ./bin/HBase shell –d HBase > disable 'MyClickStream' HBase > snapshot 'MyClickStream' ,'MyClickStreamSnapshot-08302014'
Listing a Snapshot: List all the snapshots taken:
./bin/HBase shell HBase > list_snapshots
- Deleting a Snapshot: We can remove the unwanted Snapshots by running the following command:
./bin/HBase / shell HBase > delete_snapshot ''MyClickStreamSnapshot-08212014'
- Clone a table from Snapshot: Cloning allows us to create a new table with the same dataset when the snapshot was taken. Changes to the clone table are isolated to itself, and the changes in the original table are not going to impact the snapshot:
./bin/HBase shell HBase > clone_snapshot 'MyClickStreamSnapshot-08212014', 'MyClickStreamSnapshot01-08212014'
- Restoring Snapshots: This can be only performed when the table is disabled. The effectiveness of this process is that the table comes up with the same state as before, when we took the snapshot:
./bin/HBase / shell HBase > disable 'MyClickSteam' –-- the name of the table
This will disable the table for active use and no operation like read/write it does at this point:
HBase > restore_snapshot ''MyClickStreamSnapshot-08212014'
Internally there are differences in which replication and snapshot works.
Replication is performed at log level wherein snapshots are always at file system . Thus its essential to sync the states from the master as once the restore operation is done the replica will be different then the master. In case of we performed restore operation, it's pivotal to stop the replication process first and perform the bootstrapping operation again.
In the scenario of limited data loss due to any client, it's recommended to clone the table using the existing snapshot and run a MapReduce job which essentially copies the data from cloned to the main, this way we don't have to go for a full restore which predecessor process is to disable the tables :
Specify the
HBase .rootdir
of the other cluster:./bin/HBase HBase org.apache.hadoop.HBase .snapshot.ExportSnapshot -snapshot 'MyClickStreamSnapshot-08212014 -copy-to hdfs:///mynamendoe server02:8082/HBase mapper -8
In case of a highly used production environment, it's advisable to restrict bandwidth consumption while exporting a snapshot.
This can be achieved by invoking the preceding command with bandwidth parameter, as shown next; the unit of measure is megabyte per second and the value is an integer:
./bin/HBase HBase org.apache.hadoop.HBase .snapshot.ExportSnapshot -snapshot 'MyClickStreamSnapshot-08212014 -copy-to hdfs:///mynamendoe server02:8082/HBase mapper -8 –bandwidth 200
How it works…
To better understand the concepts, I have broken down the parameter into:
- WebInterface: This shows the details of NameNode and DataNode and display basic information about the cluster. The URL will be
http://your -namenode-name:50070/
. Alternatively you can use the same interface for navigating the filesystem within the NameNode. - Snapshots: Snapshots in HDFS are always read-only and represent the status of the file at the time snapshot was taken. You can restrict Inconsistency throughout chapter of snapshot versus Snapshot to a limited scope of a filesystem or Snapshot can or it can span to the entire file system.
- HBase Snapshots: A snapshot is an array of metadata information used by administrators to restore the previous state of the tables on which it was taken. In technical meaning it's not a copy of table but it's a set of operation which calibrates metadata (which is nothing but table and regions) and the actual data (HFiles, me store, WALs).
- Offline Snapshots: The standard scenario is to take the snapshot when the table is disabled, This makes sure that all the data is flushed on disk, and no writes or reads are accepted on this dataset. Which means, taking a snapshot is just a matter of working through the table metadata and the HFiles which reside on the disk and keeping a reference to them. The master invokes this operation, and the time required to do this operation is governed by the time taken by the HDFS NameNode to calibrate and provide the list of the files.
- Online Snapshots: This type of snapshot works differently; in it, tables are enabled and the regions are getting read and write, or in the other words it's getting put and get by the live traffic, when master receives the request for snapshot, master coordinates it by asking all the region server to take a snapshot of their region. This works on simple-flush and does not provide casual consistency. This type of snapshot has minimal performance overhead.
DFS Administration commands:
bin/hadoop dfsadmin -help
: provide you all the commands.bin/hadoop dfsadmin -reports
: provides statistics and file information.bin/hadoop dfsadmin -safemode enter | leave | get | wait –
.Safe mode:
Immediately blocks changes to the name space and converts it to read only. It also blocks replication and any delete operations on the data block.Note
An important point to note about the safe mode, is that during the startup process, safe mode is turned on automatically but is switched to normal once the process detects the minimum condition is fulfilled. You can also manual trigger safe mode but in this case you have to switch-off manual mode too.
bin/hadoop dfsadmin –saveNamespace
: This command requires su permission and saves the current namespace and resets the edit logs.bin/hadoop dfsadmin –rollEdits
: This rolls the edit logs. Note that this requires super user permission.bin/hadoop dfsadmin -restoreFailedStorage
: This comes with three parameters (Set/Unset/Check) it attempts to restore failed storage replicas only if they become available.Note
This can be only done by su option.
bin/hadoop dfsadmin –refreshNodes
: This commend updated the NameNode by allowing theDataNode
to connect to theNameNode
.bin/hadoop dfsadmin - finalizeUpgrade
: This concludes the upgrade of HDFS. This invokes an internal process and instructs the DataNodes to delete their previous version working directories, and then invoking the Namenode to do the same. This finishes the upgrade process.bin/hadoop dfsadmin -deleteBlockPool
: Arguments are datanodehost:port, blockpool id and an optional argumentforce
. If force is passed, block pool directory for the given blockpool Inconsistency between id and ID on the givenDataNode
is deleted along with its contents; otherwise, the directory is deleted only if it is empty. The command will fail ifDataNode
is still serving the block pool. Refer to refresh NameNodes to shut down a block pool service on aDataNode
:bin/hadoop dfsadmin –help
.
Let's discuss other important components:
SecondaryNameNode
: NameNode stores changes to the native file system file (edits). During the startup process, the HDFS state is read from the image file commonly known as fsimage. These changes are applied to the edit log files. The latest state of the HDFS is pushed to the fsimage, then the normal process is invoked by generating a blank edit log file. In essence, NameNode combines these two(fsimage
andlog
) files during the startup. This merge process makes the next restart faster.Rebalancer
: The HDFS cluster gets easily imbalanced due to the following reasons:When a new DataNode joins the cluster, any map task assigned to the machine most likely does not access local data, thus consuming more network bandwidth. When the DataNodes becomes full new, atablocks are placed on full data nodes, thus reducing the read parallelism.
Rack Awareness
: AsNameNode
design is for HA/Fault tolerant thus the system attempts to cascade the replicas of block on the multiple racks. Using the variabledfs.network.script
, the administrator can govern these settings.Safemode
: Makes the HDFS block read-only.fsck
: It's designed to report problems with missing blocks, under-replicated blocks;fsck
ignores open files. Depending on the needs it can be run on a subsection of files or can be run on the entire file system which is underNameNode
.Snapshotting
: We will consider from Hadoop and then from the HBase perspective.Snapshots process is very flexible and robust and it allows snapshots at directory level, cascaded directory level. Total of 65,536 simultaneous snapshots can be accommodated. In essence there is no limit on snapshottable directories.
Tip
Nested snapshottable directories are currently not possible.
Exporting to another cluster tool helps us duplicate the data between clusters. The data copied is hfiles, logs, and snapshot metadata. This works at a file system (HDFS) level, thus it's necessary to have an HBase cluster fully online also. This is the reason it does not impact the RegionServer workload.
In the preceding section, we discussed the core file system, which is the foundation of HBase . We discussed HDFS and how it's related to Hadoop ecosystem and then how HBase relies on the Hadoop/HBase foundation to work. In doing so, we discussed the internal structure of the HDFS, HBase integration points. In step 1 to 9, we discussed the HDFS/Hadoop commands in a fully distributed mode. This is needed to make sure that HBase runs in the fully distributed environment. We cannot run HBase if we don't have the Hadoop setup; however for development purposes we can run HBase using standalone mode installation; the other way will be to run it in Pseudo-Distributed.
There is more…
The entire process helps us set up the Hadoop/HDFS file system, and later on HBase can sit on get the benefits of the HDFS distributed architecture.
See also
Refer to the following chapter:
- Working with Large Distributed Systems.