In this post, you will learn how to write a Node.js MapReduce application and how to run it on Amazon EMR. You don’t need to be familiar with Hadoop or EMR API's. In order to run the examples, you will need a Github account, an Amazon AWS, some money to spend at AWS, and Bash or an equivalent installed on your computer.
We define BigData as those data sets too large or too complex to be processed by traditional processing applications. BigData is also a relative term: A data set can be too big for your Raspberry PI, while being a piece of cake for your desktop.
What is MapReduce? MapReduce is a programming model that allows data sets to be processed in a parallel and distributed fashion.
How does it work? You create a cluster and feed it with the data set. Then, you define a mapper and a reducer. MapReduce involves the following three steps:
It’s guaranteed that all data belonging to a single key will be processed by a single reducer instance.
Today, we will implement a very simple processing job: counting unique words from a set of text files. The code for this article is hosted at Here.
Let's set up a new directory for our project:
$ mkdir -p emr-node/bin
$ cd emr-node
$ npm init --yes
$ git init
We also need some input data. In our case, we will download some books from project Gutenberg as follows:
$ mkdir data
$ curl -Lo data/tmohah.txt http://www.gutenberg.org/ebooks/45315.txt.utf-8
$ curl -Lo data/mad.txt http://www.gutenberg.org/ebooks/5616.txt.utf-8
As we stated before, the mapper will break down its input into KeyValue pairs. Since we use the streaming API, we will read the input form stdin. We will then split each line into words, and for each word, we are going to print "word1" to stdout. TAB character is the expected field separator. We will see later the reason for setting "1" as the value.
In plain Javascript, our ./bin/mapper can be expressed as:
#!/usr/bin/env node
const readline = require('readline');
const rl = readline.createInterface({
input : process.stdin
});
rl.on('line', function(line){
line.trim().split(' ').forEach(function(word){
console.log(`${word}t1`);
});
});
As you can see, we have used the readline module (a Node built-in module) to parse stdin. Each line is broken down into words, and each word is printed to stdout as we stated before.
Time to implement our reducer. The reducer expects a set of KeyValue pairs, sorted by key, as input, such as the following:
First<TAB>1
First<TAB>1
Second<TAB>1
Second<TAB>1
Second<TAB>1
We then expect the reducer to output the following:
First<TAB>2
Second<TAB>3
Reducer logic is very simple and can be expressed in pseudocode as:
IF !previous_key
previous_key = current_key
counter = value
IF previous_key equals current_key
counter = counter + value
ELSE
print previous_key<TAB>counter
previous_key = current_key; counter = value;
The first statement is necessary to initialize the previous_key and counter variables. Let's see the real JavaScript implementation of ./bin/reducer:
#!/usr/bin/env node
var previousKey, counter;
const readline = require('readline');
const rl = readline.createInterface({
input : process.stdin
});
function print(){
console.log(`${previousKey}t${counter}`);
}
function countWord(line) {
let [currentKey, value] = line.split('t');
value = +value;
if(typeof previousKey === 'undefined'){
previousKey = currentKey;
counter = value;
return;
}
if(previousKey === currentKey){
counter = counter + value;
return;
}
print();
previousKey = currentKey;
counter = value;
}
process.stdin.on('end',function(){
print();
});
rl.on('line', countWord);
Again, we use readline module to parse stdin line by line. The countWord function implements our reducer logic described before.
The last thing we need to do is to set execution permissions to those files:
chmod +x ./bin/mapper
chmod +x ./bin/reducer
You have two ways to test your code:
The second one is my preferred one for its simplicity:
./bin/mapper <<EOF | sort | ./bin/reducer
first second
first first second
first
EOF
It should print the following:
first<TAB>4
second<TAB>2
We are now ready to run our job in EMR!
Before we run any processing job, we need to perform some setup on the AWS side.
If you do not have an S3 bucket, you should create one now. Under that bucket, create the following directory structure:
<your bucket>
├── EMR
│ └── logs
├── bootstrap
├── input
└── output
Upload our previously downloaded books from project Gutenberg to the input folder.
We also need AWS cli installed on the computer. You can install it with the python package manager.
If you do not have AWS cli installed on your computer, then run:
$ sudo pip install awscli
awscli requires some configuration, so run the following and provide the requested data:
$ aws configure
You can find this data in your Amazon AWS web console. Be aware that usability is not Amazon’s strongest point. If you do not have your IAM EMR roles yet, it is time to create them:
aws emr create-default-roles
Good. You are now ready to deploy your first cluster. Check out this (run-cluster.sh) script:
#!/bin/bash
MACHINE_TYPE='c1.medium'
BUCKET='pngr-emr-demo'
REGION='eu-west-1'
KEY_NAME='pedro@triffid'
aws emr create-cluster --release-label 'emr-4.0.0' --enable-debugging --visible-to-all-users --name PNGRDemo
--instance-groups InstanceCount=1,InstanceGroupType=CORE,InstanceType=$MACHINE_TYPE InstanceCount=1,InstanceGroupType=MASTER,InstanceType=$MACHINE_TYPE
--no-auto-terminate --enable-debugging --log-uri s3://$BUCKET/EMR/logs
--bootstrap-actions Path=s3://$BUCKET/bootstrap/bootstrap.sh,Name=Install
--ec2-attributes KeyName=$KEY_NAME,InstanceProfile=EMR_EC2_DefaultRole
--service-role EMR_DefaultRole --region $REGION
The previous script will create a 1 master, 1 core cluster, which is big enough for now. You will need to update this script with your bucket, region, and key name. Remember that your keys are listed at "AWS EC2 console/Key pairs". Running this script will print something like the following:
{
"ClusterId": "j-1HHM1B0U5DGUM"
}
That is your cluster ID and you will need it later. Please visit your Amazon AWS EMR console and switch to your region. Your cluster should be listed there. It is possible to add the processing steps with either the UI or aws cli. Let's use a shell script (add-step.sh):
#!/bin/bash
CLUSTER_ID=$1
BUCKET='pngr-emr-demo'
OUTPUT='output/1'
aws emr add-steps
--cluster-id $CLUSTER_ID
--steps Name=CountWords,Type=Streaming,Args=[-input,s3://$BUCKET/input,-output,s3://$BUCKET/$OUTPUT,-mapper,mapper,-reducer,reducer]
It is important to point out that our "OUTPUT" directory does not exist at S3 yet. Otherwise, the job will fail. Call ./add-step.sh plus the cluster ID to add our CountWords step:
./add-step j-1HHM1B0U5DGUM
Done! So go back to the Amazon UI, reload the cluster page, and check the steps. "CountWords" step should be listed there. You can track job progress from the UI (reload the page) or from the command line interface. Once the job is done, terminate the cluster. You will probably want to configure the cluster to terminate as soon as it finishes or when any step fails. Termination behavior can be specified with the "aws emr create-cluster".
Sometimes the bootstrap process can be difficult. You can SSH into the machines, but before that, you will need to modify their security groups, which are listed at "EC2 web console/security groups".
You can (and should) break down your processing jobs into smaller steps because it will simplify your code and add more composability to your steps. You can compose more complex processing jobs by using the output of a step as the input for the next step.
Imagine that you have run the "CountWords" processing job several times and now you want to sum the outputs. Well, for that particular case, you just add a new step with an "identity mapper" and your already built reducer, and feed it with all of the previous outputs. Now you can see why we output "WORD1" from the mapper.
Pedro Narciso García Revington is a Senior Full Stack Developer with 10+ years of experience in high scalability and availability, microservices, automated deployments, data processing, CI, (T,B,D)DD, and polyglot persistence.