Search icon CANCEL
Subscription
0
Cart icon
Your Cart (0 item)
Close icon
You have no products in your basket yet
Arrow left icon
Explore Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Newsletter Hub
Free Learning
Arrow right icon
timer SALE ENDS IN
0 Days
:
00 Hours
:
00 Minutes
:
00 Seconds

MapReduce on Amazon EMR with Node.js

Save for later
  • 8 min read
  • 14 Dec 2016

article-image

In this post, you will learn how to write a Node.js MapReduce application and how to run it on Amazon EMR. You don’t need to be familiar with Hadoop or EMR API's. In order to run the examples, you will need a Github account, an Amazon AWS, some money to spend at AWS, and Bash or an equivalent installed on your computer.

EMR, BigData, and MapReduce

We define BigData as those data sets too large or too complex to be processed by traditional processing applications. BigData is also a relative term: A data set can be too big for your Raspberry PI, while being a piece of cake for your desktop.

What is MapReduce? MapReduce is a programming model that allows data sets to be processed in a parallel and distributed fashion.

How does it work? You create a cluster and feed it with the data set. Then, you define a mapper and a reducer. MapReduce involves the following three steps:

  • Mapping step: This breaks down the input data into KeyValue pairs
  • Shuffling step: KeyValue pairs are grouped by Key
  • Reducing step: KeyValue pairs are processed by Key in parallel

It’s guaranteed that all data belonging to a single key will be processed by a single reducer instance.

Our processing job project directory setup

Today, we will implement a very simple processing job: counting unique words from a set of text files. The code for this article is hosted at Here.
Let's set up a new directory for our project:

$ mkdir -p emr-node/bin
$ cd emr-node
$ npm init --yes
$ git init

We also need some input data. In our case, we will download some books from project Gutenberg as follows:

$ mkdir data
$ curl -Lo data/tmohah.txt http://www.gutenberg.org/ebooks/45315.txt.utf-8
$ curl -Lo data/mad.txt http://www.gutenberg.org/ebooks/5616.txt.utf-8

Mapper and Reducer

As we stated before, the mapper will break down its input into KeyValue pairs. Since we use the streaming API, we will read the input form stdin. We will then split each line into words, and for each word, we are going to print "word1" to stdout. TAB character is the expected field separator. We will see later the reason for setting "1" as the value.

In plain Javascript, our ./bin/mapper can be expressed as:

#!/usr/bin/env node

const readline = require('readline');
const rl = readline.createInterface({
    input : process.stdin
});

rl.on('line', function(line){
    line.trim().split(' ').forEach(function(word){
        console.log(`${word}t1`);
    });
});

As you can see, we have used the readline module (a Node built-in module) to parse stdin. Each line is broken down into words, and each word is printed to stdout as we stated before.

Time to implement our reducer. The reducer expects a set of KeyValue pairs, sorted by key, as input, such as the following:

First<TAB>1
First<TAB>1
Second<TAB>1
Second<TAB>1
Second<TAB>1

We then expect the reducer to output the following:

First<TAB>2
Second<TAB>3

Reducer logic is very simple and can be expressed in pseudocode as:

IF !previous_key 
    previous_key = current_key
    counter = value
IF previous_key equals current_key
    counter = counter + value
ELSE
    print previous_key<TAB>counter
    previous_key = current_key; counter = value;

The first statement is necessary to initialize the previous_key and counter variables. Let's see the real JavaScript implementation of ./bin/reducer:

#!/usr/bin/env node

var previousKey, counter;

const readline = require('readline');
const rl = readline.createInterface({
    input : process.stdin
});

function print(){
    console.log(`${previousKey}t${counter}`);
}

function countWord(line) {
    let [currentKey, value] = line.split('t');
    value = +value;
    if(typeof previousKey === 'undefined'){
        previousKey = currentKey;
        counter = value;
        return;
    }
    if(previousKey === currentKey){
        counter = counter + value;
        return;
    }
    print();
    previousKey = currentKey;
    counter = value;
}

process.stdin.on('end',function(){
    print();
});

rl.on('line', countWord);

Again, we use readline module to parse stdin line by line. The countWord function implements our reducer logic described before.

The last thing we need to do is to set execution permissions to those files:

chmod +x ./bin/mapper
chmod +x ./bin/reducer

How do I test it locally?

You have two ways to test your code:

Unlock access to the largest independent learning library in Tech for FREE!
Get unlimited access to 7500+ expert-authored eBooks and video courses covering every tech area you can think of.
Renews at €18.99/month. Cancel anytime
  1. Install Hadoop and run a job
  2. With a simple shell script

The second one is my preferred one for its simplicity:

./bin/mapper <<EOF | sort | ./bin/reducer
first second
first first second
first
EOF

It should print the following:

first<TAB>4
second<TAB>2

We are now ready to run our job in EMR!

Amazon environment setup

Before we run any processing job, we need to perform some setup on the AWS side.
If you do not have an S3 bucket, you should create one now. Under that bucket, create the following directory structure:

<your bucket>
├── EMR
│   └── logs
├── bootstrap
├── input
└── output

Upload our previously downloaded books from project Gutenberg to the input folder.
We also need AWS cli installed on the computer. You can install it with the python package manager.

If you do not have AWS cli installed on your computer, then run:

$ sudo pip install awscli

awscli requires some configuration, so run the following and provide the requested data:

$ aws configure 

You can find this data in your Amazon AWS web console. Be aware that usability is not Amazon’s strongest point. If you do not have your IAM EMR roles yet, it is time to create them:

aws emr create-default-roles

Good. You are now ready to deploy your first cluster. Check out this (run-cluster.sh) script:

#!/bin/bash
MACHINE_TYPE='c1.medium'
BUCKET='pngr-emr-demo'
REGION='eu-west-1'
KEY_NAME='pedro@triffid'

aws emr create-cluster --release-label 'emr-4.0.0' --enable-debugging --visible-to-all-users --name PNGRDemo 
  --instance-groups  InstanceCount=1,InstanceGroupType=CORE,InstanceType=$MACHINE_TYPE InstanceCount=1,InstanceGroupType=MASTER,InstanceType=$MACHINE_TYPE 
  --no-auto-terminate --enable-debugging --log-uri s3://$BUCKET/EMR/logs 
  --bootstrap-actions Path=s3://$BUCKET/bootstrap/bootstrap.sh,Name=Install 
  --ec2-attributes KeyName=$KEY_NAME,InstanceProfile=EMR_EC2_DefaultRole 
  --service-role EMR_DefaultRole --region $REGION

The previous script will create a 1 master, 1 core cluster, which is big enough for now. You will need to update this script with your bucket, region, and key name. Remember that your keys are listed at "AWS EC2 console/Key pairs". Running this script will print something like the following:

{
    "ClusterId": "j-1HHM1B0U5DGUM"
}

That is your cluster ID and you will need it later. Please visit your Amazon AWS EMR console and switch to your region. Your cluster should be listed there. It is possible to add the processing steps with either the UI or aws cli. Let's use a shell script (add-step.sh):

#!/bin/bash
CLUSTER_ID=$1
BUCKET='pngr-emr-demo'
OUTPUT='output/1'

aws emr add-steps 
 --cluster-id $CLUSTER_ID 
 --steps Name=CountWords,Type=Streaming,Args=[-input,s3://$BUCKET/input,-output,s3://$BUCKET/$OUTPUT,-mapper,mapper,-reducer,reducer] 

It is important to point out that our "OUTPUT" directory does not exist at S3 yet. Otherwise, the job will fail. Call ./add-step.sh plus the cluster ID to add our CountWords step:

./add-step j-1HHM1B0U5DGUM

Done! So go back to the Amazon UI, reload the cluster page, and check the steps. "CountWords" step should be listed there. You can track job progress from the UI (reload the page) or from the command line interface. Once the job is done, terminate the cluster. You will probably want to configure the cluster to terminate as soon as it finishes or when any step fails. Termination behavior can be specified with the "aws emr create-cluster".
Sometimes the bootstrap process can be difficult. You can SSH into the machines, but before that, you will need to modify their security groups, which are listed at "EC2 web console/security groups".

Where to go from here?

You can (and should) break down your processing jobs into smaller steps because it will simplify your code and add more composability to your steps. You can compose more complex processing jobs by using the output of a step as the input for the next step.
Imagine that you have run the "CountWords" processing job several times and now you want to sum the outputs. Well, for that particular case, you just add a new step with an "identity mapper" and your already built reducer, and feed it with all of the previous outputs. Now you can see why we output "WORD1" from the mapper.

About the author

Pedro Narciso García Revington is a Senior Full Stack Developer with 10+ years of experience in high scalability and availability, microservices, automated deployments, data processing, CI, (T,B,D)DD, and polyglot persistence.