String together Storm-RMQ-PubNub sensor data topology
Let's wire in the JSON bolt to parse this JSON payload in our topology and see the execution:
... public class JsonBolt implements IRichBolt { .. private static final long serialVersionUID = 1L; OutputCollector collector; public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) { this.collector = arg2; } public void execute(Tuple arg0) { String jsonInString = arg0.getStringByField("messages"); System.out.println("message read from queue" + jsonInString); JsonConverter jsonconvertor = new JsonConverter(); MySensorData mysensorObj = jsonconvertor.run(jsonInString); this.collector.emit(new Values(mysensorObj)); this.collector.ack(arg0); } .. } .. class JsonConverter { .. public MySensorData run(String jsonInString) { ObjectMapper mapper = new ObjectMapper...