Search icon CANCEL
Subscription
0
Cart icon
Your Cart (0 item)
Close icon
You have no products in your basket yet
Save more on your purchases! discount-offer-chevron-icon
Savings automatically calculated. No voucher code required.
Arrow left icon
Explore Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Newsletter Hub
Free Learning
Arrow right icon
timer SALE ENDS IN
0 Days
:
00 Hours
:
00 Minutes
:
00 Seconds

Storing Apache Storm data in Elasticsearch

Save for later
  • 6 min read
  • 27 Dec 2017

article-image

[box type="note" align="" class="" width=""]This article is an excerpt from a book by Ankit Jain titled Mastering Apache Storm. This book explores various real-time processing functionalities offered by Apache Storm such as parallelism, data partitioning, and more.[/box]

In this article, we are going to cover how to store the data processed by Apache Storm in Elasticsearch. Elasticsearch is an open source, distributed search engine platform developed on Lucene. It provides a multitenant-capable, full-text search engine capability.Though Apache storm is meant for real-time data processing, in most cases, we need to store the processed data in a data store so that it can be used for further batch analysis and to execute the batch analysis queries on the data stored.

We assume that Elasticsearch is running on your environment. Please refer to https://www.elastic.co/guide/en/elasticsearch/reference/2.3/_installation.html to install Elasticsearch on any of the boxes if you don't have any running Elasticsearch cluster. Go through the following steps to integrate Storm with Elasticsearch:

  1. Create a Maven project using com.stormadvance for the groupID and storm_elasticsearch for the artifactID.
  2. Add the following dependencies and repositories to the pom.xml file:
<dependencies>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>2.4.4</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.0.2</version>
<scope>provided</scope>
</dependency>
</dependencies>

3. Create an ElasticSearchOperation class in the com.stormadvance.storm_elasticsearch package.The ElasticSearchOperation class contains the following method:

insert(Map<String, Object> data, String indexName, String indexMapping, String indexId): This method takes the record data, indexName, indexMapping, and indexId as input, and inserts the input record in Elasticsearch.

The following is the source code of the ElasticSearchOperation class:

public class ElasticSearchOperation { private TransportClient client;

public ElasticSearchOperation(List<String> esNodes) throws Exception {



try {

Settings settings = Settings.settingsBuilder()

.put("cluster.name", "elasticsearch").build(); client =

TransportClient.builder().settings(settings).build(); for (String esNode : esNodes) {

client.addTransportAddress(new InetSocketTransportAddress(

InetAddress.getByName(esNode), 9300));

}

} catch (Exception e) { throw e;

}

}

public void insert(Map<String, Object> data, String indexName, String indexMapping, String indexId) {

client.prepareIndex(indexName, indexMapping, indexId)

.setSource(data).get();

}

public static void main(String[] s){ try{

List<String> esNodes = new ArrayList<String>(); esNodes.add("127.0.0.1");

ElasticSearchOperation elasticSearchOperation = new ElasticSearchOperation(esNodes);

Map<String, Object> data = new HashMap<String,



Object>();



data.put("name", "name");

data.put("add", "add");



elasticSearchOperation.insert(data,"indexName","indexMapping",UUID. randomUUID().toString());

}catch(Exception e) { e.printStackTrace();

//System.out.println(e);

}

}

}

4. Create a SampleSpout class in the com.stormadvance.stormhbase package. This class generates random records and passes them to the next action (bolt) in the topology. The following is the format of the record generated by the SampleSpout class:

["john","watson","abc"]

Unlock access to the largest independent learning library in Tech for FREE!
Get unlimited access to 7500+ expert-authored eBooks and video courses covering every tech area you can think of.
Renews at €18.99/month. Cancel anytime

The following is the source code of the SampleSpout class:

public class SampleSpout extends BaseRichSpout { private static final long serialVersionUID = 1L; private SpoutOutputCollector spoutOutputCollector;

private static final Map<Integer, String> FIRSTNAMEMAP = new HashMap<Integer, String>();

static {

FIRSTNAMEMAP.put(0, "john"); FIRSTNAMEMAP.put(1, "nick"); FIRSTNAMEMAP.put(2, "mick"); FIRSTNAMEMAP.put(3, "tom"); FIRSTNAMEMAP.put(4, "jerry");

}

private static final Map<Integer, String> LASTNAME = new   HashMap<Integer, String>();

static {

LASTNAME.put(0, "anderson"); LASTNAME.put(1, "watson"); LASTNAME.put(2, "ponting"); LASTNAME.put(3, "dravid"); LASTNAME.put(4, "lara");

}

private static final Map<Integer, String> COMPANYNAME = new HashMap<Integer, String>();

static {

COMPANYNAME.put(0, "abc"); COMPANYNAME.put(1, "dfg"); COMPANYNAME.put(2, "pqr"); COMPANYNAME.put(3, "ecd"); COMPANYNAME.put(4, "awe");

}

public void open(Map conf, TopologyContext context, SpoutOutputCollector spoutOutputCollector) {

// Open the spout

this.spoutOutputCollector = spoutOutputCollector;

}

public void nextTuple() {



// Storm cluster repeatedly call this method to emit the continuous //

// stream of tuples.

final Random rand = new Random();

// generate the random number from 0 to 4. int randomNumber = rand.nextInt(5); spoutOutputCollector.emit (new

Values(FIRSTNAMEMAP.get(randomNumber),LASTNAME.get(randomNumber),CO MPANYNAME.get(randomNumber)));

}

public void declareOutputFields(OutputFieldsDeclarer declarer)

{

// emits the field   firstName , lastName and companyName. declarer.declare(new

Fields("firstName","lastName","companyName"));

}

}

5. Create an ESBolt class in the com.stormadvance.storm_elasticsearch package. This bolt receives the tuples emitted by the SampleSpout class, converts it to the Map structure, and then calls the insert() method of the ElasticSearchOperation class to insert the record into Elasticsearch. The following is the source code of the ESBolt class:

public class ESBolt implements IBasicBolt {

private static final long serialVersionUID = 2L; private ElasticSearchOperation elasticSearchOperation; private List<String> esNodes;

/**

*

* @param esNodes

*/

public ESBolt(List<String> esNodes) { this.esNodes = esNodes;

}

public void execute(Tuple input, BasicOutputCollector collector) {

Map<String, Object> personalMap = new HashMap<String, Object>();

// "firstName","lastName","companyName") personalMap.put("firstName",

input.getValueByField("firstName")); personalMap.put("lastName",

input.getValueByField("lastName"));

personalMap.put("companyName", input.getValueByField("companyName")); elasticSearchOperation.insert(personalMap,"person","personmapping", UUID.randomUUID().toString());

}

public void declareOutputFields(OutputFieldsDeclarer declarer)

{

}

public Map<String, Object> getComponentConfiguration() {

// TODO Auto-generated method stub return null;

}

public void prepare(Map stormConf, TopologyContext context) { try {

// create the instance of ESOperations class elasticSearchOperation = new

 ElasticSearchOperation(esNodes);

} catch (Exception e) {

throw new RuntimeException();

}

}

public void cleanup() {

}

}

6.  Create an ESTopology class in the com.stormadvance.storm_elasticsearch package. This class creates an instance of the spout and bolt classes and chains them together using a TopologyBuilder class. The following is the implementation of the main class:

public class ESTopology {

public static void main(String[] args) throws AlreadyAliveException,

InvalidTopologyException {

TopologyBuilder builder = new TopologyBuilder();

//ES Node list

List<String> esNodes = new ArrayList<String>(); esNodes.add("10.191.209.14");

// set the spout class

builder.setSpout("spout", new SampleSpout(), 2);

// set the ES bolt class builder.setBolt("bolt", new ESBolt(esNodes), 2)

.shuffleGrouping("spout"); Config conf = new Config(); conf.setDebug(true);

// create an instance of LocalCluster class for

// executing topology in local mode. LocalCluster cluster = new LocalCluster();

// ESTopology is the name of submitted topology. cluster.submitTopology("ESTopology", conf,

builder.createTopology());

try {

Thread.sleep(60000);

} catch (Exception exception) {

System.out.println("Thread interrupted exception : " +

exception);

}

System.out.println("Stopped Called : ");

// kill the LearningStormTopology cluster.killTopology("StormHBaseTopology");

// shutdown the storm test cluster cluster.shutdown();

}

}

To summarize we covered how we can store the data processed by Apache Storm into Elasticsearch by making the connection with Elasticsearch nodes inside the Storm bolts.

If you enjoyed this post, check out the book Mastering Apache Storm to know more about different types of real time processing techniques used to create distributed applications.

storing-apache-storm-data-in-elasticsearch-img-0