MongoDB Map/Reduce using the User Defined Java Class step and MongoDB Java Driver
In this recipe, we will use the MongoDB Map/Reduce on PDI. Unfortunately, PDI doesn't provide a step for this MongoDB feature. However, PDI does provide a step called User Defined Java Class (UDJC) that will allow you to write Java code to manipulate your data.
We are going to get the total price for all orders for a single client, which we will pass to the transformation as a parameter. We will also get a total for all other clients in the collection. In total, we should get two rows back.
Getting ready
To get ready for this recipe, you need to download the MongoDB driver. In this case, we are using the mongo-java-driver-2.11.1
version. You can use the last version, but the code in this recipe may be a bit out of date. The driver should live in the lib folder of PDI. Then, you just need start your ETL development environment Spoon and make sure you have the MongoDB server started with the data from the last recipe inserted.
How to do it…
In this recipe, we'll program Java code and utilize the MongoDB Java driver to connect to the MongoDB database. So, make sure you have the driver in the lib
folder of PDI and then perform the following steps:
- Create a new empty transformation.
- Set the transformation name to MongoDB Map/Reduce.
- On the Transformation properties and Parameters tab, create a new parameter with the name as CUSTOMER_NAME.
- Save the transformation with the name chapter1-mongodb-map-reduce.
- From the Job category folder, find the Get Variables step and drag and drop it into the working area in the right-side view.
- Double-click on the Get Variables step to open the configuration dialog.
- Set the Step name property to Get Customer Name.
- Add a row with the name as customerName, the variable as ${CUSTOMER_NAME}, and Type set to String.
- From the Scripting category folder, find the User Defined Java Class step and drag and drop it into the working area in the right-hand-side view.
- Create a hop between the Get Customer Name step and the User Defined Java Class step.
- Double-click on the User Defined Java Class step to open the configuration dialog.
- In the Step name field, give a suggested name of MapReduce.
- In Class code, let's define our Java code that is sent to MongoDB by a command using the
MapReduce
functions and then we will get the result:import com.mongodb.DB; import com.mongodb.DBCollection; import com.mongodb.DBObject; import com.mongodb.MapReduceCommand; import com.mongodb.MapReduceOutput; import com.mongodb.Mongo; private FieldHelper customerNameIn = null; public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { Object[] r = getRow(); if (r == null) { setOutputDone(); return false; } if (first) { first = false; customerNameIn = get(Fields.In, "customerName"); } try { final Mongo mongo = new Mongo("localhost", 27017); final DB db = mongo.getDB("SteelWheels"); final DBCollection ordersCol = db.getCollection("Orders"); final String map = "function() { "+ "var category; " + "if ( this.customer.name == '"+customerNameIn.getString(r)+"' ) "+ "category = '"+customerNameIn.getString(r)+"'; " + "else " + "category = 'Others'; "+ "emit(category, {totalPrice: this.totalPrice, count: 1});}"; final String reduce = "function(key, values) { " + "var n = { count: 0, totalPrice: 0}; " + "for ( var i = 0; i < values.length; i++ ) {" + "n.count += values[i].count; "+ "n.totalPrice += values[i].totalPrice; "+ } " + "return n;} "; final MapReduceCommand cmd = new MapReduceCommand(ordersCol, map, reduce, null, MapReduceCommand.OutputType.INLINE, null); final MapReduceOutput out = ordersCol.mapReduce(cmd); get(Fields.Out, "mapReduceJSON").setValue(r,out.toString()); } catch (Exception e) { e.printStackTrace(); get(Fields.Out, "mapReduceJSON").setValue(r,""); } r = createOutputRow(r, data.outputRowMeta.size()); putRow(data.outputRowMeta, r); return true; }
Tip
Downloading the example code
You can download the example code files for all Packt books you have purchased from your account at http://www.packtpub.com. If you purchased this book elsewhere, you can visit http://www.packtpub.com/support and register to have the files e-mailed directly to you.
- On the Fields tab, set Fieldname to mapReduceJSON and the Type property to String. This will be the field output from the
MapReduce
command. - Click on OK to finish the configuration.
- From the Input category folder, find the Json Input step and drag and drop it into the working area in the right-hand-side view.
- Create a hop between the MapReduce step and the Json Input step.
- Double-click on the JSON Input step to open the configuration dialog.
- Set the Step Name property to Convert JSON.
- On the File tab, check the Source is defined in a field? option. Next, select the mapReduceJSON option in the select box of Get source from field.
- On the Fields tab, we will map the JSON to Fields in the PDI stream. The definition should be like what is shown in this screenshot:
- Click on OK to finish the configuration.
- Now, let's define the fields that we want to see as the output of the transformation. From the Transform category folder, find the Select values step and drag and drop it into the working area in the right-side view.
- Create a hop between the Convert JSON step and the Select values step.
- Double-click on the Select Values step to open the configuration dialog.
- Set the Step Name property to OUTPUT.
- On the Select & Alter tab, click on the Get fields to select button. This will populate the table with all the available fields in the stream. Remove the mapReduceJSON field; it isn't necessary anymore, since we have converted it into individual fields in the PDI stream.
- Click on OK to finish the configuration.
- When you run the transformation, be sure to set the CUSTOMER_NAME parameter in the Run dialog. This will be used by the Get Customer Name step and to filter the map function.
How it works…
In this example, we executed a transformation that takes CUSTOMER_NAME as a parameter. This value is then sent to User Defined Java Class and used in the Java code within. The code in User Defined Java Class is a simple Map and Reduce JavaScript function that we are sending to the MongoDB server.
The output of this step is a single JSON row that needs to be parsed into fields in the PDI Stream. To do this, we used the JSON input step and mapped the JSON string to individual stream fields.
If you want to know more about User Defined Java Class, you can find out more in the documentation at http://wiki.pentaho.com/display/EAI/User+Defined+Java+Class.
There's more…
When we talk about map and reduce functions, it is almost mandatory to talk about Hadoop, an open source software framework for storage and processing of datasets that uses a MapReduce engine.
PDI provides integration with Hadoop using PDI job steps and transformation steps. You can find more documentation about this on the Pentaho website. Personally, I recommend these two tutorials: