Search icon CANCEL
Subscription
0
Cart icon
Your Cart (0 item)
Close icon
You have no products in your basket yet
Save more on your purchases now! discount-offer-chevron-icon
Savings automatically calculated. No voucher code required.
Arrow left icon
Explore Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Conferences
Free Learning
Arrow right icon
Arrow up icon
GO TO TOP
Machine Learning: End-to-End guide for Java developers

You're reading from   Machine Learning: End-to-End guide for Java developers Data Analysis, Machine Learning, and Neural Networks simplified

Arrow left icon
Product type Course
Published in Oct 2017
Publisher Packt
ISBN-13 9781788622219
Length 1159 pages
Edition 1st Edition
Languages
Arrow right icon
Authors (2):
Arrow left icon
Krishna Choppella Krishna Choppella
Author Profile Icon Krishna Choppella
Krishna Choppella
Uday Kamath Uday Kamath
Author Profile Icon Uday Kamath
Uday Kamath
Arrow right icon
View More author details
Toc

Chapter 11. Mathematical and Parallel Techniques for Data Analysis

The concurrent execution of a program can result in significant performance improvements. In this chapter, we will address the various techniques that can be used in data science applications. These can range from low-level mathematical calculations to higher-level API-specific options.

Always keep in mind that performance enhancement starts with ensuring that the correct set of application functionality is implemented. If the application does not do what a user expects, then the enhancements are for nought. The architecture of the application and the algorithms used are also more important than code enhancements. Always use the most efficient algorithm. Code enhancement should then be considered. We are not able to address the higher-level optimization issues in this chapter; instead, we will focus on code enhancements.

Many data science applications and supporting APIs use matrix operations to accomplish their tasks. Often these operations are buried within an API, but there are times when we may need to use these directly. Regardless, it can be beneficial to understand how these operations are supported. To this end, we will explain how matrix multiplication is handled using several different approaches.

Concurrent processing can be implemented using Java threads. A developer can use threads and thread pools to improve an application's response time. Many APIs will use threads when multiple CPUs or GPUs are not available, as is the case with Aparapi. We will not illustrate the use of threads here. However, the reader is assumed to have a basic knowledge of threads and thread pools.

The map-reduce algorithm is used extensively for data science applications. We will present a technique for achieving this type of parallel processing using Apache's Hadoop. Hadoop is a framework supporting the manipulation of large datasets, and can greatly decrease the required processing time for large data science projects. We will demonstrate a technique for calculating an average value for a sample set of data.

There are several well-known APIs that support multiple processors, including CUDA and OpenCL. CUDA is supported using Java bindings for CUDA (JCuda) (http://jcuda.org/). We will not demonstrate this technique directly here. However, many of the APIs we will use do support CUDA if it is available, such as DL4J. We will briefly discuss OpenCL and how it is supported in Java. It is worth nothing that the Aparapi API provides higher-level support, which may use either multiple CPUs or GPUs. A demonstration of Aparapi in support of matrix multiplication will be illustrated.

In this chapter, we will examine how multiple CPUs and GPUs can be harnessed to speed up data mining tasks. Many of the APIs we have used already take advantage of multiple processors or at least provide a means to enable GPU usage. We will introduce a number of these options in this chapter.

Concurrent processing is also supported extensively in the cloud. Many of the techniques discussed here are used in the cloud. As a result, we will not explicitly address how to conduct parallel processing in the cloud.

Implementing basic matrix operations

There are several different types of matrix operations, including simple addition, subtraction, scalar multiplication, and various forms of multiplication. To illustrate the matrix operations, we will focus on what is known as matrix product. This is a common approach that involves the multiplication of two matrixes to produce a third matrix.

Consider two matrices, A and B, where matrix A has n rows and m columns. Matrix B will have m rows and p columns. The product of A and B, written as AB, is an n row and p column matrix. The m entries of the rows of A are multiplied by the m entries of the columns of matrix B. This is more explicitly shown here, where:

Implementing basic matrix operations

Where the product is defined as follows:

Implementing basic matrix operations

We start with the declaration and initialization of the matrices. The variables n, m, p represent the dimensions of the matrices. The A matrix is n by m, the B matrix is m by p, and the C matrix representing the product is n by p:

int n = 4; 
int m = 2; 
int p = 3; 
 
double A[][] = { 
    {0.1950, 0.0311}, 
    {0.3588, 0.2203}, 
    {0.1716, 0.5931}, 
    {0.2105, 0.3242}}; 
double B[][] = { 
    {0.0502, 0.9823, 0.9472}, 
    {0.5732, 0.2694, 0.916}}; 
double C[][] = new double[n][p]; 

The following code sequence illustrates the multiplication operation using nested for loops:

for (int i = 0; i < n; i++) { 
    for (int k = 0; k < m; k++) { 
        for (int j = 0; j < p; j++) { 
            C[i][j] += A[i][k] * B[k][j]; 
        } 
    } 
} 

This following code sequence formats output to display our matrix:

out.println("\nResult"); 
for (int i = 0; i < n; i++) { 
    for (int j = 0; j < p; j++) { 
        out.printf("%.4f  ", C[i][j]); 
    } 
    out.println(); 
} 

The result appears as follows:

Result
0.0276  0.1999  0.2132  
0.1443  0.4118  0.5417  
0.3486  0.3283  0.7058  
0.1964  0.2941  0.4964

Later, we will demonstrate several alternative techniques for performing the same operation. Next, we will discuss how to tailor support for multiple processors using DL4J.

Using GPUs with DeepLearning4j

DeepLearning4j works with GPUs such as those provided by NVIDIA. There are options available that enable the use of GPUs, specify how many GPUs should be used, and control the use of GPU memory. In this section, we will show how you can use these options. This type of control is often available with other high-level APIs.

DL4J uses n-dimensional arrays for Java (ND4J) (http://nd4j.org/) to perform numerical computations. This is a library that supports n-dimensional array objects and other numerical computations, such as linear algebra and signal processing. It includes support for GPUs and is also integrated with Hadoop and Spark.

A vector is a one-dimensional array of numbers and is used extensively with neural networks. A vector is a type of mathematical structure called a tensor. A tensor is essentially a multidimensional array. We can think of a tensor as an array with three or more dimensions, and each dimension is called a rank.

There is often a need to map a multidimensional set of numbers to a one-dimensional array. This is done by flattening the array using a defined order. For example, with a two-dimensional array many systems will allocate the members of the array in row-column order. This means the first row is added to the vector, followed by the second vector, and then the third, and so forth. We will use this approach in the Using the ND4J API section.

To enable GPU use, the project's POM file needs to be modified. In the properties section of the POM file, the nd4j.backend tag needs to added or modified, as shown here:

<nd4j.backend>nd4j-cuda-7.5-platform</<nd4j.backend> 

Models can be trained in parallel using the ParallelWrapper class. The training task is automatically distributed among available CPUs/GPUs. The model is used as an argument to the ParallelWrapper class' Builder constructor, as shown here:

ParallelWrapper parallelWrapper =  
    new ParallelWrapper.Builder(aModel) 
        // Builder methods... 
        .build(); 

When executed, a copy of the model is used on each GPU. After the number of iterations is specified by the averagingFrequency method, the models are averaged and then the training process continues.

There are various methods that can be used to configure the class, as summarized in the following table:

Method

Purpose

prefetchBuffer

Specifies the size of a buffer used to pre-fetch data

workers

Specifies the number of workers to be used

averageUpdaters

averagingFrequency

reportScoreAfterAveraging

useLegacyAveraging

Various methods to control how averaging is achieved

The number of workers should be greater than the number of GPUs available.

As with most computations, using a lower precision value will speed up the processing. This can be controlled using the setDTypeForContext method, as shown next. In this case, half precision is specified:

DataTypeUtil.setDTypeForContext(DataBuffer.Type.HALF); 

This support and more details regarding optimization techniques can be found at http://deeplearning4j.org/gpu.

Using map-reduce

Map-reduce is a model for processing large sets of data in a parallel, distributed manner. This model consists of a map method for filtering and sorting data, and a reduce method for summarizing data. The map-reduce framework is effective because it distributes the processing of a dataset across multiple servers, performing mapping and reduction simultaneously on smaller pieces of the data. Map-reduce provides significant performance improvements when implemented in a multi-threaded manner. In this section, we will demonstrate a technique using Apache's Hadoop implementation. In the Using Java 8 to perform map-reduce section, we will discuss techniques for performing map-reduce using Java 8 streams.

Hadoop is a software ecosystem providing support for parallel computing. Map-reduce jobs can be run on Hadoop servers, generally set up as clusters, to significantly improve processing speeds. Hadoop has trackers that run map-reduce operations on nodes within a Hadoop cluster. Each node operates independently and the trackers monitor the progress and integrate the output of each node to generate the final output. The following image can be found at http://www.developer.com/java/data/big-data-tool-map-reduce.html and demonstrates the basic map-reduce model with trackers.

Using Apache's Hadoop to perform map-reduce

We are going to show you a very simple example of a map-reduce application here. Before we can use Hadoop, we need to download and extract Hadoop application files. The latest versions can be found at http://hadoop.apache.org/releases.html. We are using version 2.7.3 for this demonstration.

You will need to set your JAVA_HOME environment variable. Additionally, Hadoop is intolerant of long file paths and spaces within paths, so make sure you extract Hadoop to the simplest directory structure possible.

We will be working with a sample text file containing information about books. Each line of our tab-delimited file has the book title, author, and page count:

Moby Dick  Herman Melville  822

Charlotte's Web  E.B. White  189
The Grapes of Wrath  John Steinbeck  212
Jane Eyre  Charlotte Bronte  299
A Tale of Two Cities  Charles Dickens  673
War and Peace  Leo Tolstoy  1032
The Great Gatsby  F. Scott Fitzgerald  275

We are going to use a map function to extract the title and page count information and then a reduce function to calculate the average page count of the books in our dataset. To begin, create a new class, AveragePageCount. We will create two static classes within AveragePageCount, one to handle the map procedure and one to handle the reduction.

Writing the map method

First, we will create the TextMapper class, which will implement the map method. This class inherits from the Mapper class and has two private instance variables, pages and bookTitle. pages is an IntWritable object and bookTitle is a Text object. IntWritable and Text are used because these objects will need to be serialized to a byte stream before they can be transmitted to the servers for processing. These objects take up less space and transfer faster than the comparable int or String objects:

public static class TextMapper 
        extends Mapper<Object, Text, Text, IntWritable> { 
 
    private final IntWritable pages = new IntWritable(); 
    private final Text bookTitle = new Text(); 
 
} 

Within our TextMapper class we create the map method. This method takes three parameters: the key object, a Text object, bookInfo, and the Context. The key allows the tracker to map each particular object back to the correct job. The bookInfo object contains the text or string data about each book. Context holds information about the entire system and allows the method to report on progress and update values within the system.

Within the map method, we use the split method to break each piece of book information into an array of String objects. We set our bookTitle variable to position 0 of the array and set pages to the value stored in position 2, after parsing it as an integer. We can then write out our book title and page count information through the context and update our entire system:

public void map(Object key, Text bookInfo, Context context) 
 throws IOException, InterruptedException { 
        String[] book = bookInfo.toString().split("\t"); 
        bookTitle.set(book[0]); 
        pages.set(Integer.parseInt(book[2])); 
        context.write(bookTitle, pages); 
    } 

Writing the reduce method

Next, we will write our AverageReduce class. This class extends the Reducer class and will perform the reduction processes to calculate our average page count. We have created four variables for this class: a FloatWritable object to store our average page count, a float average to hold our temporary average, a float count to count how many books exist in our dataset, and an integer sum to add up the page counts:

public static class AverageReduce 
        extends Reducer<Text, IntWritable, Text, FloatWritable> { 
 
    private final FloatWritable finalAvg = new FloatWritable(); 
    Float average = 0f; 
    Float count = 0f; 
    int sum = 0; 
 
} 

Within our AverageReduce class we will create the reduce method. This method takes as input a Text key, an Iterable object holding writeable integers representing the page counts, and the Context. We use our iterator to process the page counts and add each to our sum. We then calculate the average and set the value of finalAvg. This information is paired with a Text object label and written to the Context:

    public void reduce(Text key, Iterable<IntWritable> pageCnts, 
        Context context)  
            throws IOException, InterruptedException { 
 
    for (IntWritable cnt : pageCnts) { 
        sum += cnt.get(); 
    } 
    count += 1; 
    average = sum / count; 
    finalAvg.set(average); 
    context.write(new Text("Average Page Count = "), finalAvg); 
} 

Creating and executing a new Hadoop job

We are now ready to create our main method in the same class and execute our map-reduce processes. To do this, we need to create a new Configuration object and a new Job. We then set up the significant classes to use in our application.

public static void main(String[] args) throws Exception { 
    Configuration con = new Configuration(); 
    Job bookJob = Job.getInstance(con, "Average Page Count"); 
    ... 
} 

We set our main class, AveragePageCount, in the setJarByClass method. We specify our TextMapper and AverageReduce classes using the setMapperClass and setReducerClass methods, respectively. We also specify that our output will have a text-based key and a writeable integer using the setOutputKeyClass and setOutputValueClass methods:

    bookJob.setJarByClass(AveragePageCount.class); 
    bookJob.setMapperClass(TextMapper.class); 
    bookJob.setReducerClass(AverageReduce.class); 
    bookJob.setOutputKeyClass(Text.class); 
    bookJob.setOutputValueClass(IntWritable.class); 

Finally, we create new input and output paths using the addInputPath and setOutputPath methods. These methods both take our Job object as the first parameter and a Path object representing our input and output file locations as the second parameter. We then call waitForCompletion. Our application exits once this call returns true:

    FileInputFormat.addInputPath(bookJob, new Path("C:/Hadoop/books.txt")); 
    FileOutputFormat.setOutputPath(bookJob, new 
        Path("C:/Hadoop/BookOutput")); 
    if (bookJob.waitForCompletion(true)) { 
        System.exit(0); 
    } 

To execute the application, open a command prompt and navigate to the directory containing our AveragePageCount.class file. We then use the following command to execute our sample application:

hadoop AveragePageCount 

While our task is running, we see updated information about our process output to the screen. A sample of our output is shown as follows:

... 
File System Counters
    FILE: Number of bytes read=1132
    FILE: Number of bytes written=569686
    FILE: Number of read operations=0
    FILE: Number of large read operations=0
    FILE: Number of write operations=0
Map-Reduce Framework
    Map input records=7
    Map output records=7
    Map output bytes=136
    Map output materialized bytes=156
    Input split bytes=90
    Combine input records=0
    Combine output records=0
    Reduce input groups=7            
    Reduce shuffle bytes=156           
    Reduce input records=7           
    Reduce output records=7            
    Spilled Records=14            
    Shuffled Maps =1            
    Failed Shuffles=0            
    Merged Map outputs=1            
    GC time elapsed (ms)=11            
    Total committed heap usage (bytes)=536870912            
Shuffle Errors
    BAD_ID=0            
    CONNECTION=0           
    IO_ERROR=0           
    WRONG_LENGTH=0            
    WRONG_MAP=0            
    WRONG_REDUCE=0            
File Input Format Counters
    Bytes Read=249            
File Output Format Counters
     Bytes Written=216           

If we open the BookOutput directory created on our local machine, we find four new files. Use a text editor to open part-r-00000. This file contains information about the average page count as it was calculated using parallel processes. A sample of this output follows:

Average Page Count =  673.0
Average Page Count =   431.0
Average Page Count =   387.0
Average Page Count =   495.75
Average Page Count =   439.0
Average Page Count =   411.66666
Average Page Count =   500.2857

Notice how the average changes as each individual process is combined with the other reduction processes. This has the same effect as calculating the average of the first two books first, then adding in the third book, then the fourth, and so on. The advantage here of course is that the averaging is done in a parallel manner. If we had a huge dataset, we should expect to see a noticeable advantage in execution time. The last line of BookOutput reflects the correct and final average of all seven page counts.

Using Apache's Hadoop to perform map-reduce

We are going to show you a very simple example of a map-reduce application here. Before we can use Hadoop, we need to download and extract Hadoop application files. The latest versions can be found at http://hadoop.apache.org/releases.html. We are using version 2.7.3 for this demonstration.

You will need to set your JAVA_HOME environment variable. Additionally, Hadoop is intolerant of long file paths and spaces within paths, so make sure you extract Hadoop to the simplest directory structure possible.

We will be working with a sample text file containing information about books. Each line of our tab-delimited file has the book title, author, and page count:

Moby Dick  Herman Melville  822

Charlotte's Web  E.B. White  189
The Grapes of Wrath  John Steinbeck  212
Jane Eyre  Charlotte Bronte  299
A Tale of Two Cities  Charles Dickens  673
War and Peace  Leo Tolstoy  1032
The Great Gatsby  F. Scott Fitzgerald  275

We are going to use a map function to extract the title and page count information and then a reduce function to calculate the average page count of the books in our dataset. To begin, create a new class, AveragePageCount. We will create two static classes within AveragePageCount, one to handle the map procedure and one to handle the reduction.

Writing the map method

First, we will create the TextMapper class, which will implement the map method. This class inherits from the Mapper class and has two private instance variables, pages and bookTitle. pages is an IntWritable object and bookTitle is a Text object. IntWritable and Text are used because these objects will need to be serialized to a byte stream before they can be transmitted to the servers for processing. These objects take up less space and transfer faster than the comparable int or String objects:

public static class TextMapper 
        extends Mapper<Object, Text, Text, IntWritable> { 
 
    private final IntWritable pages = new IntWritable(); 
    private final Text bookTitle = new Text(); 
 
} 

Within our TextMapper class we create the map method. This method takes three parameters: the key object, a Text object, bookInfo, and the Context. The key allows the tracker to map each particular object back to the correct job. The bookInfo object contains the text or string data about each book. Context holds information about the entire system and allows the method to report on progress and update values within the system.

Within the map method, we use the split method to break each piece of book information into an array of String objects. We set our bookTitle variable to position 0 of the array and set pages to the value stored in position 2, after parsing it as an integer. We can then write out our book title and page count information through the context and update our entire system:

public void map(Object key, Text bookInfo, Context context) 
 throws IOException, InterruptedException { 
        String[] book = bookInfo.toString().split("\t"); 
        bookTitle.set(book[0]); 
        pages.set(Integer.parseInt(book[2])); 
        context.write(bookTitle, pages); 
    } 

Writing the reduce method

Next, we will write our AverageReduce class. This class extends the Reducer class and will perform the reduction processes to calculate our average page count. We have created four variables for this class: a FloatWritable object to store our average page count, a float average to hold our temporary average, a float count to count how many books exist in our dataset, and an integer sum to add up the page counts:

public static class AverageReduce 
        extends Reducer<Text, IntWritable, Text, FloatWritable> { 
 
    private final FloatWritable finalAvg = new FloatWritable(); 
    Float average = 0f; 
    Float count = 0f; 
    int sum = 0; 
 
} 

Within our AverageReduce class we will create the reduce method. This method takes as input a Text key, an Iterable object holding writeable integers representing the page counts, and the Context. We use our iterator to process the page counts and add each to our sum. We then calculate the average and set the value of finalAvg. This information is paired with a Text object label and written to the Context:

    public void reduce(Text key, Iterable<IntWritable> pageCnts, 
        Context context)  
            throws IOException, InterruptedException { 
 
    for (IntWritable cnt : pageCnts) { 
        sum += cnt.get(); 
    } 
    count += 1; 
    average = sum / count; 
    finalAvg.set(average); 
    context.write(new Text("Average Page Count = "), finalAvg); 
} 

Creating and executing a new Hadoop job

We are now ready to create our main method in the same class and execute our map-reduce processes. To do this, we need to create a new Configuration object and a new Job. We then set up the significant classes to use in our application.

public static void main(String[] args) throws Exception { 
    Configuration con = new Configuration(); 
    Job bookJob = Job.getInstance(con, "Average Page Count"); 
    ... 
} 

We set our main class, AveragePageCount, in the setJarByClass method. We specify our TextMapper and AverageReduce classes using the setMapperClass and setReducerClass methods, respectively. We also specify that our output will have a text-based key and a writeable integer using the setOutputKeyClass and setOutputValueClass methods:

    bookJob.setJarByClass(AveragePageCount.class); 
    bookJob.setMapperClass(TextMapper.class); 
    bookJob.setReducerClass(AverageReduce.class); 
    bookJob.setOutputKeyClass(Text.class); 
    bookJob.setOutputValueClass(IntWritable.class); 

Finally, we create new input and output paths using the addInputPath and setOutputPath methods. These methods both take our Job object as the first parameter and a Path object representing our input and output file locations as the second parameter. We then call waitForCompletion. Our application exits once this call returns true:

    FileInputFormat.addInputPath(bookJob, new Path("C:/Hadoop/books.txt")); 
    FileOutputFormat.setOutputPath(bookJob, new 
        Path("C:/Hadoop/BookOutput")); 
    if (bookJob.waitForCompletion(true)) { 
        System.exit(0); 
    } 

To execute the application, open a command prompt and navigate to the directory containing our AveragePageCount.class file. We then use the following command to execute our sample application:

hadoop AveragePageCount 

While our task is running, we see updated information about our process output to the screen. A sample of our output is shown as follows:

... 
File System Counters
    FILE: Number of bytes read=1132
    FILE: Number of bytes written=569686
    FILE: Number of read operations=0
    FILE: Number of large read operations=0
    FILE: Number of write operations=0
Map-Reduce Framework
    Map input records=7
    Map output records=7
    Map output bytes=136
    Map output materialized bytes=156
    Input split bytes=90
    Combine input records=0
    Combine output records=0
    Reduce input groups=7            
    Reduce shuffle bytes=156           
    Reduce input records=7           
    Reduce output records=7            
    Spilled Records=14            
    Shuffled Maps =1            
    Failed Shuffles=0            
    Merged Map outputs=1            
    GC time elapsed (ms)=11            
    Total committed heap usage (bytes)=536870912            
Shuffle Errors
    BAD_ID=0            
    CONNECTION=0           
    IO_ERROR=0           
    WRONG_LENGTH=0            
    WRONG_MAP=0            
    WRONG_REDUCE=0            
File Input Format Counters
    Bytes Read=249            
File Output Format Counters
     Bytes Written=216           

If we open the BookOutput directory created on our local machine, we find four new files. Use a text editor to open part-r-00000. This file contains information about the average page count as it was calculated using parallel processes. A sample of this output follows:

Average Page Count =  673.0
Average Page Count =   431.0
Average Page Count =   387.0
Average Page Count =   495.75
Average Page Count =   439.0
Average Page Count =   411.66666
Average Page Count =   500.2857

Notice how the average changes as each individual process is combined with the other reduction processes. This has the same effect as calculating the average of the first two books first, then adding in the third book, then the fourth, and so on. The advantage here of course is that the averaging is done in a parallel manner. If we had a huge dataset, we should expect to see a noticeable advantage in execution time. The last line of BookOutput reflects the correct and final average of all seven page counts.

Writing the map method

First, we will create the TextMapper class, which will implement the map method. This class inherits from the Mapper class and has two private instance variables, pages and bookTitle. pages is an IntWritable object and bookTitle is a Text object. IntWritable and Text are used because these objects will need to be serialized to a byte stream before they can be transmitted to the servers for processing. These objects take up less space and transfer faster than the comparable int or String objects:

public static class TextMapper 
        extends Mapper<Object, Text, Text, IntWritable> { 
 
    private final IntWritable pages = new IntWritable(); 
    private final Text bookTitle = new Text(); 
 
} 

Within our TextMapper class we create the map method. This method takes three parameters: the key object, a Text object, bookInfo, and the Context. The key allows the tracker to map each particular object back to the correct job. The bookInfo object contains the text or string data about each book. Context holds information about the entire system and allows the method to report on progress and update values within the system.

Within the map method, we use the split method to break each piece of book information into an array of String objects. We set our bookTitle variable to position 0 of the array and set pages to the value stored in position 2, after parsing it as an integer. We can then write out our book title and page count information through the context and update our entire system:

public void map(Object key, Text bookInfo, Context context) 
 throws IOException, InterruptedException { 
        String[] book = bookInfo.toString().split("\t"); 
        bookTitle.set(book[0]); 
        pages.set(Integer.parseInt(book[2])); 
        context.write(bookTitle, pages); 
    } 

Writing the reduce method

Next, we will write our AverageReduce class. This class extends the Reducer class and will perform the reduction processes to calculate our average page count. We have created four variables for this class: a FloatWritable object to store our average page count, a float average to hold our temporary average, a float count to count how many books exist in our dataset, and an integer sum to add up the page counts:

public static class AverageReduce 
        extends Reducer<Text, IntWritable, Text, FloatWritable> { 
 
    private final FloatWritable finalAvg = new FloatWritable(); 
    Float average = 0f; 
    Float count = 0f; 
    int sum = 0; 
 
} 

Within our AverageReduce class we will create the reduce method. This method takes as input a Text key, an Iterable object holding writeable integers representing the page counts, and the Context. We use our iterator to process the page counts and add each to our sum. We then calculate the average and set the value of finalAvg. This information is paired with a Text object label and written to the Context:

    public void reduce(Text key, Iterable<IntWritable> pageCnts, 
        Context context)  
            throws IOException, InterruptedException { 
 
    for (IntWritable cnt : pageCnts) { 
        sum += cnt.get(); 
    } 
    count += 1; 
    average = sum / count; 
    finalAvg.set(average); 
    context.write(new Text("Average Page Count = "), finalAvg); 
} 

Creating and executing a new Hadoop job

We are now ready to create our main method in the same class and execute our map-reduce processes. To do this, we need to create a new Configuration object and a new Job. We then set up the significant classes to use in our application.

public static void main(String[] args) throws Exception { 
    Configuration con = new Configuration(); 
    Job bookJob = Job.getInstance(con, "Average Page Count"); 
    ... 
} 

We set our main class, AveragePageCount, in the setJarByClass method. We specify our TextMapper and AverageReduce classes using the setMapperClass and setReducerClass methods, respectively. We also specify that our output will have a text-based key and a writeable integer using the setOutputKeyClass and setOutputValueClass methods:

    bookJob.setJarByClass(AveragePageCount.class); 
    bookJob.setMapperClass(TextMapper.class); 
    bookJob.setReducerClass(AverageReduce.class); 
    bookJob.setOutputKeyClass(Text.class); 
    bookJob.setOutputValueClass(IntWritable.class); 

Finally, we create new input and output paths using the addInputPath and setOutputPath methods. These methods both take our Job object as the first parameter and a Path object representing our input and output file locations as the second parameter. We then call waitForCompletion. Our application exits once this call returns true:

    FileInputFormat.addInputPath(bookJob, new Path("C:/Hadoop/books.txt")); 
    FileOutputFormat.setOutputPath(bookJob, new 
        Path("C:/Hadoop/BookOutput")); 
    if (bookJob.waitForCompletion(true)) { 
        System.exit(0); 
    } 

To execute the application, open a command prompt and navigate to the directory containing our AveragePageCount.class file. We then use the following command to execute our sample application:

hadoop AveragePageCount 

While our task is running, we see updated information about our process output to the screen. A sample of our output is shown as follows:

... 
File System Counters
    FILE: Number of bytes read=1132
    FILE: Number of bytes written=569686
    FILE: Number of read operations=0
    FILE: Number of large read operations=0
    FILE: Number of write operations=0
Map-Reduce Framework
    Map input records=7
    Map output records=7
    Map output bytes=136
    Map output materialized bytes=156
    Input split bytes=90
    Combine input records=0
    Combine output records=0
    Reduce input groups=7            
    Reduce shuffle bytes=156           
    Reduce input records=7           
    Reduce output records=7            
    Spilled Records=14            
    Shuffled Maps =1            
    Failed Shuffles=0            
    Merged Map outputs=1            
    GC time elapsed (ms)=11            
    Total committed heap usage (bytes)=536870912            
Shuffle Errors
    BAD_ID=0            
    CONNECTION=0           
    IO_ERROR=0           
    WRONG_LENGTH=0            
    WRONG_MAP=0            
    WRONG_REDUCE=0            
File Input Format Counters
    Bytes Read=249            
File Output Format Counters
     Bytes Written=216           

If we open the BookOutput directory created on our local machine, we find four new files. Use a text editor to open part-r-00000. This file contains information about the average page count as it was calculated using parallel processes. A sample of this output follows:

Average Page Count =  673.0
Average Page Count =   431.0
Average Page Count =   387.0
Average Page Count =   495.75
Average Page Count =   439.0
Average Page Count =   411.66666
Average Page Count =   500.2857

Notice how the average changes as each individual process is combined with the other reduction processes. This has the same effect as calculating the average of the first two books first, then adding in the third book, then the fourth, and so on. The advantage here of course is that the averaging is done in a parallel manner. If we had a huge dataset, we should expect to see a noticeable advantage in execution time. The last line of BookOutput reflects the correct and final average of all seven page counts.

Writing the reduce method

Next, we will write our AverageReduce class. This class extends the Reducer class and will perform the reduction processes to calculate our average page count. We have created four variables for this class: a FloatWritable object to store our average page count, a float average to hold our temporary average, a float count to count how many books exist in our dataset, and an integer sum to add up the page counts:

public static class AverageReduce 
        extends Reducer<Text, IntWritable, Text, FloatWritable> { 
 
    private final FloatWritable finalAvg = new FloatWritable(); 
    Float average = 0f; 
    Float count = 0f; 
    int sum = 0; 
 
} 

Within our AverageReduce class we will create the reduce method. This method takes as input a Text key, an Iterable object holding writeable integers representing the page counts, and the Context. We use our iterator to process the page counts and add each to our sum. We then calculate the average and set the value of finalAvg. This information is paired with a Text object label and written to the Context:

    public void reduce(Text key, Iterable<IntWritable> pageCnts, 
        Context context)  
            throws IOException, InterruptedException { 
 
    for (IntWritable cnt : pageCnts) { 
        sum += cnt.get(); 
    } 
    count += 1; 
    average = sum / count; 
    finalAvg.set(average); 
    context.write(new Text("Average Page Count = "), finalAvg); 
} 

Creating and executing a new Hadoop job

We are now ready to create our main method in the same class and execute our map-reduce processes. To do this, we need to create a new Configuration object and a new Job. We then set up the significant classes to use in our application.

public static void main(String[] args) throws Exception { 
    Configuration con = new Configuration(); 
    Job bookJob = Job.getInstance(con, "Average Page Count"); 
    ... 
} 

We set our main class, AveragePageCount, in the setJarByClass method. We specify our TextMapper and AverageReduce classes using the setMapperClass and setReducerClass methods, respectively. We also specify that our output will have a text-based key and a writeable integer using the setOutputKeyClass and setOutputValueClass methods:

    bookJob.setJarByClass(AveragePageCount.class); 
    bookJob.setMapperClass(TextMapper.class); 
    bookJob.setReducerClass(AverageReduce.class); 
    bookJob.setOutputKeyClass(Text.class); 
    bookJob.setOutputValueClass(IntWritable.class); 

Finally, we create new input and output paths using the addInputPath and setOutputPath methods. These methods both take our Job object as the first parameter and a Path object representing our input and output file locations as the second parameter. We then call waitForCompletion. Our application exits once this call returns true:

    FileInputFormat.addInputPath(bookJob, new Path("C:/Hadoop/books.txt")); 
    FileOutputFormat.setOutputPath(bookJob, new 
        Path("C:/Hadoop/BookOutput")); 
    if (bookJob.waitForCompletion(true)) { 
        System.exit(0); 
    } 

To execute the application, open a command prompt and navigate to the directory containing our AveragePageCount.class file. We then use the following command to execute our sample application:

hadoop AveragePageCount 

While our task is running, we see updated information about our process output to the screen. A sample of our output is shown as follows:

... 
File System Counters
    FILE: Number of bytes read=1132
    FILE: Number of bytes written=569686
    FILE: Number of read operations=0
    FILE: Number of large read operations=0
    FILE: Number of write operations=0
Map-Reduce Framework
    Map input records=7
    Map output records=7
    Map output bytes=136
    Map output materialized bytes=156
    Input split bytes=90
    Combine input records=0
    Combine output records=0
    Reduce input groups=7            
    Reduce shuffle bytes=156           
    Reduce input records=7           
    Reduce output records=7            
    Spilled Records=14            
    Shuffled Maps =1            
    Failed Shuffles=0            
    Merged Map outputs=1            
    GC time elapsed (ms)=11            
    Total committed heap usage (bytes)=536870912            
Shuffle Errors
    BAD_ID=0            
    CONNECTION=0           
    IO_ERROR=0           
    WRONG_LENGTH=0            
    WRONG_MAP=0            
    WRONG_REDUCE=0            
File Input Format Counters
    Bytes Read=249            
File Output Format Counters
     Bytes Written=216           

If we open the BookOutput directory created on our local machine, we find four new files. Use a text editor to open part-r-00000. This file contains information about the average page count as it was calculated using parallel processes. A sample of this output follows:

Average Page Count =  673.0
Average Page Count =   431.0
Average Page Count =   387.0
Average Page Count =   495.75
Average Page Count =   439.0
Average Page Count =   411.66666
Average Page Count =   500.2857

Notice how the average changes as each individual process is combined with the other reduction processes. This has the same effect as calculating the average of the first two books first, then adding in the third book, then the fourth, and so on. The advantage here of course is that the averaging is done in a parallel manner. If we had a huge dataset, we should expect to see a noticeable advantage in execution time. The last line of BookOutput reflects the correct and final average of all seven page counts.

Creating and executing a new Hadoop job

We are now ready to create our main method in the same class and execute our map-reduce processes. To do this, we need to create a new Configuration object and a new Job. We then set up the significant classes to use in our application.

public static void main(String[] args) throws Exception { 
    Configuration con = new Configuration(); 
    Job bookJob = Job.getInstance(con, "Average Page Count"); 
    ... 
} 

We set our main class, AveragePageCount, in the setJarByClass method. We specify our TextMapper and AverageReduce classes using the setMapperClass and setReducerClass methods, respectively. We also specify that our output will have a text-based key and a writeable integer using the setOutputKeyClass and setOutputValueClass methods:

    bookJob.setJarByClass(AveragePageCount.class); 
    bookJob.setMapperClass(TextMapper.class); 
    bookJob.setReducerClass(AverageReduce.class); 
    bookJob.setOutputKeyClass(Text.class); 
    bookJob.setOutputValueClass(IntWritable.class); 

Finally, we create new input and output paths using the addInputPath and setOutputPath methods. These methods both take our Job object as the first parameter and a Path object representing our input and output file locations as the second parameter. We then call waitForCompletion. Our application exits once this call returns true:

    FileInputFormat.addInputPath(bookJob, new Path("C:/Hadoop/books.txt")); 
    FileOutputFormat.setOutputPath(bookJob, new 
        Path("C:/Hadoop/BookOutput")); 
    if (bookJob.waitForCompletion(true)) { 
        System.exit(0); 
    } 

To execute the application, open a command prompt and navigate to the directory containing our AveragePageCount.class file. We then use the following command to execute our sample application:

hadoop AveragePageCount 

While our task is running, we see updated information about our process output to the screen. A sample of our output is shown as follows:

... 
File System Counters
    FILE: Number of bytes read=1132
    FILE: Number of bytes written=569686
    FILE: Number of read operations=0
    FILE: Number of large read operations=0
    FILE: Number of write operations=0
Map-Reduce Framework
    Map input records=7
    Map output records=7
    Map output bytes=136
    Map output materialized bytes=156
    Input split bytes=90
    Combine input records=0
    Combine output records=0
    Reduce input groups=7            
    Reduce shuffle bytes=156           
    Reduce input records=7           
    Reduce output records=7            
    Spilled Records=14            
    Shuffled Maps =1            
    Failed Shuffles=0            
    Merged Map outputs=1            
    GC time elapsed (ms)=11            
    Total committed heap usage (bytes)=536870912            
Shuffle Errors
    BAD_ID=0            
    CONNECTION=0           
    IO_ERROR=0           
    WRONG_LENGTH=0            
    WRONG_MAP=0            
    WRONG_REDUCE=0            
File Input Format Counters
    Bytes Read=249            
File Output Format Counters
     Bytes Written=216           

If we open the BookOutput directory created on our local machine, we find four new files. Use a text editor to open part-r-00000. This file contains information about the average page count as it was calculated using parallel processes. A sample of this output follows:

Average Page Count =  673.0
Average Page Count =   431.0
Average Page Count =   387.0
Average Page Count =   495.75
Average Page Count =   439.0
Average Page Count =   411.66666
Average Page Count =   500.2857

Notice how the average changes as each individual process is combined with the other reduction processes. This has the same effect as calculating the average of the first two books first, then adding in the third book, then the fourth, and so on. The advantage here of course is that the averaging is done in a parallel manner. If we had a huge dataset, we should expect to see a noticeable advantage in execution time. The last line of BookOutput reflects the correct and final average of all seven page counts.

Various mathematical libraries

There are numerous mathematical libraries available for Java use. In this section, we will provide a quick and high-level overview of several libraries. These libraries do not necessarily automatically support multiple processors. In addition, the intent of this section is to provide some insight into how these libraries can be used. In most cases, they are relatively easy to use.

A list of Java mathematical libraries is found at https://en.wikipedia.org/wiki/List_of_numerical_libraries#Java and https://java-matrix.org/. We will demonstrate the use of the jblas, Apache Commons Math, and the ND4J libraries.

Using the jblas API

The jblas API (http://jblas.org/) is a math library supporting Java. It is based on Basic Linear Algebra Subprograms (BLAS) (http://www.netlib.org/blas/) and Linear Algebra Package (LAPACK) (http://www.netlib.org/lapack/), which are standard libraries for fast arithmetic calculation. The jblas API provides a wrapper around these libraries.

The following is a demonstration of how matrix multiplication is performed. We start with the matrix definitions:

DoubleMatrix A = new DoubleMatrix(new double[][]{ 
    {0.1950, 0.0311}, 
    {0.3588, 0.2203}, 
    {0.1716, 0.5931}, 
    {0.2105, 0.3242}}); 
 
DoubleMatrix B = new DoubleMatrix(new double[][]{ 
    {0.0502, 0.9823, 0.9472}, 
    {0.5732, 0.2694, 0.916}}); 
DoubleMatrix C; 

The actual statement to perform multiplication is quite short, as shown next. The mmul method is executed against the A matrix, where the B array is passed as an argument:

C = A.mmul(B); 

The resulting C matrix is then displayed:

for(int i=0; i<C.getRows(); i++) { 
    out.println(C.getRow(i)); 
} 

The output should be as follows:

[0.027616, 0.199927, 0.213192]
[0.144288, 0.411798, 0.541650]
[0.348579, 0.328344, 0.705819]
[0.196399, 0.294114, 0.496353]

This library is fairly easy to use and supports an extensive set of arithmetic operations.

Using the Apache Commons math API

The Apache Commons math API (http://commons.apache.org/proper/commons-math/) supports a large number of mathematical and statistical operations. The following example illustrates how to perform matrix multiplication.

We start with the declaration and initialization of the A and B matrices:

double[][] A = { 
    {0.1950, 0.0311}, 
    {0.3588, 0.2203}, 
    {0.1716, 0.5931}, 
    {0.2105, 0.3242}}; 
 
double[][] B = { 
    {0.0502, 0.9823, 0.9472}, 
    {0.5732, 0.2694, 0.916}}; 

Apache Commons uses the RealMatrix class to hold a matrix. In the following code sequence, the corresponding matrices for the A and B matrices are created using the Array2DRowRealMatrix constructor:

RealMatrix aRealMatrix = new Array2DRowRealMatrix(A); 
RealMatrix bRealMatrix = new Array2DRowRealMatrix(B); 

The multiplication is straightforward using the multiply method, as shown next:

RealMatrix cRealMatrix = aRealMatrix.multiply(bRealMatrix); 

The next for loop will display the following results:

for (int i = 0; i < cRealMatrix.getRowDimension(); i++) { 
    out.println(cRealMatrix.getRowVector(i)); 
} 

The output should be as follows:

{0.02761552; 0.19992684; 0.2131916}
{0.14428772; 0.41179806; 0.54165016}
{0.34857924; 0.32834382; 0.70581912}
{0.19639854; 0.29411363; 0.4963528}

Using the ND4J API

ND4J (http://nd4j.org/) is the library used by DL4J to perform arithmetic operations. The library is also available for direct use. In this section, we will demonstrate how matrix multiplication is performed using the A and B matrices.

Before we can perform the multiplication, we need to flatten the matrices to vectors. The following declares and initializes these vectors:

double[] A = { 
    0.1950, 0.0311, 
    0.3588, 0.2203, 
    0.1716, 0.5931, 
    0.2105, 0.3242}; 
 
double[] B = { 
    0.0502, 0.9823, 0.9472, 
    0.5732, 0.2694, 0.916}; 

The Nd4j class' create method creates an INDArray instance given a vector and dimension information. The first argument of the method is the vector. The second argument specifies the dimensions of the matrix. The last argument specifies the order the rows and columns are laid out. This order is either row-column major as exemplified by c, or column-row major order as used by FORTRAN. Row-column order means the first row is allocated to the vector, followed by the second row, and so forth.

In the following code sequence 2INDArray instances are created using the A and B vectors. The first is a 4 row, 2 column matrix using row-major order as specified by the third argument, c. The second INDArray instance represents the B matrix. If we wanted to use column-row ordering, we would use an f instead.

INDArray aINDArray = Nd4j.create(A,new int[]{4,2},'c'); 
INDArray bINDArray = Nd4j.create(B,new int[]{2,3},'c'); 

The C array, represented by cINDArray, is then declared and assigned the result of the multiplication. The mmul performs the operation:

INDArray cINDArray; 
cINDArray = aINDArray.mmul(bINDArray); 

The following sequence displays the results using the getRow method:

for(int i=0; i<cINDArray.rows(); i++) { 
   out.println(cINDArray.getRow(i)); 
} 

The output should be as follows:

[0.03, 0.20, 0.21]
[0.14, 0.41, 0.54]
[0.35, 0.33, 0.71]
[0.20, 0.29, 0.50]

Next, we will provide an overview of the OpenCL API that provide supports for concurrent operations on a number of platforms.

Using the jblas API

The jblas API (http://jblas.org/) is a math library supporting Java. It is based on Basic Linear Algebra Subprograms (BLAS) (http://www.netlib.org/blas/) and Linear Algebra Package (LAPACK) (http://www.netlib.org/lapack/), which are standard libraries for fast arithmetic calculation. The jblas API provides a wrapper around these libraries.

The following is a demonstration of how matrix multiplication is performed. We start with the matrix definitions:

DoubleMatrix A = new DoubleMatrix(new double[][]{ 
    {0.1950, 0.0311}, 
    {0.3588, 0.2203}, 
    {0.1716, 0.5931}, 
    {0.2105, 0.3242}}); 
 
DoubleMatrix B = new DoubleMatrix(new double[][]{ 
    {0.0502, 0.9823, 0.9472}, 
    {0.5732, 0.2694, 0.916}}); 
DoubleMatrix C; 

The actual statement to perform multiplication is quite short, as shown next. The mmul method is executed against the A matrix, where the B array is passed as an argument:

C = A.mmul(B); 

The resulting C matrix is then displayed:

for(int i=0; i<C.getRows(); i++) { 
    out.println(C.getRow(i)); 
} 

The output should be as follows:

[0.027616, 0.199927, 0.213192]
[0.144288, 0.411798, 0.541650]
[0.348579, 0.328344, 0.705819]
[0.196399, 0.294114, 0.496353]

This library is fairly easy to use and supports an extensive set of arithmetic operations.

Using the Apache Commons math API

The Apache Commons math API (http://commons.apache.org/proper/commons-math/) supports a large number of mathematical and statistical operations. The following example illustrates how to perform matrix multiplication.

We start with the declaration and initialization of the A and B matrices:

double[][] A = { 
    {0.1950, 0.0311}, 
    {0.3588, 0.2203}, 
    {0.1716, 0.5931}, 
    {0.2105, 0.3242}}; 
 
double[][] B = { 
    {0.0502, 0.9823, 0.9472}, 
    {0.5732, 0.2694, 0.916}}; 

Apache Commons uses the RealMatrix class to hold a matrix. In the following code sequence, the corresponding matrices for the A and B matrices are created using the Array2DRowRealMatrix constructor:

RealMatrix aRealMatrix = new Array2DRowRealMatrix(A); 
RealMatrix bRealMatrix = new Array2DRowRealMatrix(B); 

The multiplication is straightforward using the multiply method, as shown next:

RealMatrix cRealMatrix = aRealMatrix.multiply(bRealMatrix); 

The next for loop will display the following results:

for (int i = 0; i < cRealMatrix.getRowDimension(); i++) { 
    out.println(cRealMatrix.getRowVector(i)); 
} 

The output should be as follows:

{0.02761552; 0.19992684; 0.2131916}
{0.14428772; 0.41179806; 0.54165016}
{0.34857924; 0.32834382; 0.70581912}
{0.19639854; 0.29411363; 0.4963528}

Using the ND4J API

ND4J (http://nd4j.org/) is the library used by DL4J to perform arithmetic operations. The library is also available for direct use. In this section, we will demonstrate how matrix multiplication is performed using the A and B matrices.

Before we can perform the multiplication, we need to flatten the matrices to vectors. The following declares and initializes these vectors:

double[] A = { 
    0.1950, 0.0311, 
    0.3588, 0.2203, 
    0.1716, 0.5931, 
    0.2105, 0.3242}; 
 
double[] B = { 
    0.0502, 0.9823, 0.9472, 
    0.5732, 0.2694, 0.916}; 

The Nd4j class' create method creates an INDArray instance given a vector and dimension information. The first argument of the method is the vector. The second argument specifies the dimensions of the matrix. The last argument specifies the order the rows and columns are laid out. This order is either row-column major as exemplified by c, or column-row major order as used by FORTRAN. Row-column order means the first row is allocated to the vector, followed by the second row, and so forth.

In the following code sequence 2INDArray instances are created using the A and B vectors. The first is a 4 row, 2 column matrix using row-major order as specified by the third argument, c. The second INDArray instance represents the B matrix. If we wanted to use column-row ordering, we would use an f instead.

INDArray aINDArray = Nd4j.create(A,new int[]{4,2},'c'); 
INDArray bINDArray = Nd4j.create(B,new int[]{2,3},'c'); 

The C array, represented by cINDArray, is then declared and assigned the result of the multiplication. The mmul performs the operation:

INDArray cINDArray; 
cINDArray = aINDArray.mmul(bINDArray); 

The following sequence displays the results using the getRow method:

for(int i=0; i<cINDArray.rows(); i++) { 
   out.println(cINDArray.getRow(i)); 
} 

The output should be as follows:

[0.03, 0.20, 0.21]
[0.14, 0.41, 0.54]
[0.35, 0.33, 0.71]
[0.20, 0.29, 0.50]

Next, we will provide an overview of the OpenCL API that provide supports for concurrent operations on a number of platforms.

Using the Apache Commons math API

The Apache Commons math API (http://commons.apache.org/proper/commons-math/) supports a large number of mathematical and statistical operations. The following example illustrates how to perform matrix multiplication.

We start with the declaration and initialization of the A and B matrices:

double[][] A = { 
    {0.1950, 0.0311}, 
    {0.3588, 0.2203}, 
    {0.1716, 0.5931}, 
    {0.2105, 0.3242}}; 
 
double[][] B = { 
    {0.0502, 0.9823, 0.9472}, 
    {0.5732, 0.2694, 0.916}}; 

Apache Commons uses the RealMatrix class to hold a matrix. In the following code sequence, the corresponding matrices for the A and B matrices are created using the Array2DRowRealMatrix constructor:

RealMatrix aRealMatrix = new Array2DRowRealMatrix(A); 
RealMatrix bRealMatrix = new Array2DRowRealMatrix(B); 

The multiplication is straightforward using the multiply method, as shown next:

RealMatrix cRealMatrix = aRealMatrix.multiply(bRealMatrix); 

The next for loop will display the following results:

for (int i = 0; i < cRealMatrix.getRowDimension(); i++) { 
    out.println(cRealMatrix.getRowVector(i)); 
} 

The output should be as follows:

{0.02761552; 0.19992684; 0.2131916}
{0.14428772; 0.41179806; 0.54165016}
{0.34857924; 0.32834382; 0.70581912}
{0.19639854; 0.29411363; 0.4963528}

Using the ND4J API

ND4J (http://nd4j.org/) is the library used by DL4J to perform arithmetic operations. The library is also available for direct use. In this section, we will demonstrate how matrix multiplication is performed using the A and B matrices.

Before we can perform the multiplication, we need to flatten the matrices to vectors. The following declares and initializes these vectors:

double[] A = { 
    0.1950, 0.0311, 
    0.3588, 0.2203, 
    0.1716, 0.5931, 
    0.2105, 0.3242}; 
 
double[] B = { 
    0.0502, 0.9823, 0.9472, 
    0.5732, 0.2694, 0.916}; 

The Nd4j class' create method creates an INDArray instance given a vector and dimension information. The first argument of the method is the vector. The second argument specifies the dimensions of the matrix. The last argument specifies the order the rows and columns are laid out. This order is either row-column major as exemplified by c, or column-row major order as used by FORTRAN. Row-column order means the first row is allocated to the vector, followed by the second row, and so forth.

In the following code sequence 2INDArray instances are created using the A and B vectors. The first is a 4 row, 2 column matrix using row-major order as specified by the third argument, c. The second INDArray instance represents the B matrix. If we wanted to use column-row ordering, we would use an f instead.

INDArray aINDArray = Nd4j.create(A,new int[]{4,2},'c'); 
INDArray bINDArray = Nd4j.create(B,new int[]{2,3},'c'); 

The C array, represented by cINDArray, is then declared and assigned the result of the multiplication. The mmul performs the operation:

INDArray cINDArray; 
cINDArray = aINDArray.mmul(bINDArray); 

The following sequence displays the results using the getRow method:

for(int i=0; i<cINDArray.rows(); i++) { 
   out.println(cINDArray.getRow(i)); 
} 

The output should be as follows:

[0.03, 0.20, 0.21]
[0.14, 0.41, 0.54]
[0.35, 0.33, 0.71]
[0.20, 0.29, 0.50]

Next, we will provide an overview of the OpenCL API that provide supports for concurrent operations on a number of platforms.

Using the ND4J API

ND4J (http://nd4j.org/) is the library used by DL4J to perform arithmetic operations. The library is also available for direct use. In this section, we will demonstrate how matrix multiplication is performed using the A and B matrices.

Before we can perform the multiplication, we need to flatten the matrices to vectors. The following declares and initializes these vectors:

double[] A = { 
    0.1950, 0.0311, 
    0.3588, 0.2203, 
    0.1716, 0.5931, 
    0.2105, 0.3242}; 
 
double[] B = { 
    0.0502, 0.9823, 0.9472, 
    0.5732, 0.2694, 0.916}; 

The Nd4j class' create method creates an INDArray instance given a vector and dimension information. The first argument of the method is the vector. The second argument specifies the dimensions of the matrix. The last argument specifies the order the rows and columns are laid out. This order is either row-column major as exemplified by c, or column-row major order as used by FORTRAN. Row-column order means the first row is allocated to the vector, followed by the second row, and so forth.

In the following code sequence 2INDArray instances are created using the A and B vectors. The first is a 4 row, 2 column matrix using row-major order as specified by the third argument, c. The second INDArray instance represents the B matrix. If we wanted to use column-row ordering, we would use an f instead.

INDArray aINDArray = Nd4j.create(A,new int[]{4,2},'c'); 
INDArray bINDArray = Nd4j.create(B,new int[]{2,3},'c'); 

The C array, represented by cINDArray, is then declared and assigned the result of the multiplication. The mmul performs the operation:

INDArray cINDArray; 
cINDArray = aINDArray.mmul(bINDArray); 

The following sequence displays the results using the getRow method:

for(int i=0; i<cINDArray.rows(); i++) { 
   out.println(cINDArray.getRow(i)); 
} 

The output should be as follows:

[0.03, 0.20, 0.21]
[0.14, 0.41, 0.54]
[0.35, 0.33, 0.71]
[0.20, 0.29, 0.50]

Next, we will provide an overview of the OpenCL API that provide supports for concurrent operations on a number of platforms.

Using OpenCL

Open Computing Language (OpenCL) (https://www.khronos.org/opencl/) supports programs that execute across heterogeneous platforms, that is, platforms potentially using different vendors and architectures. The platforms can use different processing units, including Central Processing Unit (CPU), Graphical Processing Unit (GPU), Digital Signal Processor (DSP), Field-Programmable Gate Array (FPGA), and other types of processors.

OpenCL uses a C99-based language to program the devices, providing a standard interface for programming concurrent behavior. OpenCL supports an API that allows code to be written in different languages. For Java, there are several APIs that support the development of OpenCL based languages:

In addition, Aparapi provides higher-level access to OpenCL, thus avoiding some of the complexity involved in creating OpenCL applications.

Code that runs on a processor is encapsulated in a kernel. Multiple kernels will execute in parallel on different computing devices. There are different levels of memory supported by OpenCL. A specific device may not support each level. The levels include:

  • Global memory - Shared by all computing units
  • Read-only memory - Generally not writable
  • Local memory - Shared by a group of computing units
  • Per-element private memory - Often a register

OpenCL applications require a considerable amount of initial code to be useful. This complexity does not permit us to provide a detailed example of its use. However, the Aparapi section does provide some feel for how OpenCL applications are structured.

Using Aparapi

Aparapi (https://github.com/aparapi/aparapi) is a Java library that supports concurrent operations. The API supports code running on GPUs or CPUs. GPU operations are executed using OpenCL, while CPU operations use Java threads. The user can specify which computing resource to use. However, if GPU support is not available, Aparapi will revert to Java threads.

The API will convert Java byte codes to OpenCL at runtime. This makes the API largely independent from the graphics card used. The API was initially developed by AMD but has been released as open source. This is reflected in the basic package name, com.amd.aparari. Aparapi offers a higher level of abstraction than provided by OpenCL.

Aparapi code is located in a class derived from the Kernel class. Its execute method will start the operations. This will result in an internal call to a run method, which needs to be overridden. It is within the run method that concurrent code is placed. The run method is executed multiple times on different processors.

Due to OpenCL limitations, we are unable to use inheritance or method overloading. In addition, it does not like println in the run method, since the code may be running on a GPI. Aparapi only supports one-dimensional arrays. Arrays using two or more dimensions need to be flattened to a one dimension array. The support for double values is dependent on the OpenCL version and GPU configuration.

When a Java thread pool is used, it allocates one thread per CPU core. The kernel containing the Java code is cloned, one copy per thread. This avoids the need to access data across a thread. Each thread has access to information, such as a global ID, to assist in the code execution. The kernel will wait for all of the threads to complete.

Aparapi downloads can be found at https://github.com/aparapi/aparapi/releases.

Creating an Aparapi application

The basic framework for an Aparapi application is shown next. It consists of a Kernel derived class where the run method is overridden. In this example, the run method will perform scalar multiplication. This operation involves multiplying each element of a vector by some value.

The ScalarMultiplicationKernel extends the Kernel class. It possesses two instance variables used to hold the matrices for input and output. The constructor will initialize the matrices. The run method will perform the actual computations, and the displayResult method will show the results of the multiplication:

public class ScalarMultiplicationKernel extends Kernel { 
    float[] inputMatrix; 
    float outputMatrix []; 
 
    public ScalarMultiplicationKernel(float inputMatrix[]) { 
        ... 
    } 
 
    @Override 
    public void run() { 
        ... 
    } 
 
    public void displayResult() { 
        ... 
    } 
} 

The constructor is shown here:

public ScalarMultiplicationKernel(float inputMatrix[]) { 
    this.inputMatrix = inputMatrix; 
    outputMatrix = new float[this.inputMatrix.length]; 
} 

In the run method, we use a global ID to index into the matrix. This code is executed on each computation unit, for example, a GPU or thread. A unique global ID is provided to each computational unit, allowing the code to access a specific element of the matrix. In this example, each element of the input matrix is multiplied by 2 and then assigned to the corresponding element of the output matrix:

public void run() { 
    int globalID = this.getGlobalId(); 
    outputMatrix[globalID] = 2.0f * inputMatrix[globalID]; 
} 

The displayResult method simply displays the contents of the outputMatrix array:

public void displayResult() { 
    out.println("Result"); 
    for (float element : outputMatrix) { 
        out.printf("%.4f ", element); 
    } 
    out.println(); 
} 

To use this kernel, we need to declare variables for the inputMatrix and its size. The size will be used to control how many kernels to execute:

float inputMatrix[] = {3, 4, 5, 6, 7, 8, 9}; 
int size = inputMatrix.length; 

The kernel is then created using the input matrix followed by the invocation of the execute method. This method starts the process and will eventually invoke the Kernel class' run method based on the execute method's argument. This argument is referred to as the pass ID. While not used in this example, we will use it in the next section. When the process is complete, the resulting output matrix is displayed and the dispose method is called to stop the process:

ScalarMultiplicationKernel kernel =  
        new ScalarMultiplicationKernel(inputMatrix); 
kernel.execute(size); 
kernel.displayResult(); 
kernel.dispose(); 

When this application is executed we will get the following output:

6.0000 8.0000 10.0000 12.0000 14.0000 16.0000 18.000

We can specify the execution mode using the Kernel class' setExecutionMode method, as shown here:

kernel.setExecutionMode(Kernel.EXECUTION_MODE.GPU); 

However, it is best to let Aparapi determine the execution mode. The following table summarizes the execution modes available:

Execution mode

Meaning

Kernel.EXECUTION_MODE.NONE

Does not specify mode

Kernel.EXECUTION_MODE.CPU

Use CPU

Kernel.EXECUTION_MODE.GPU

Use GPU

Kernel.EXECUTION_MODE.JTP

Use Java threads

Kernel.EXECUTION_MODE.SEQ

Use single loop (for debugging purposes)

Next, we will demonstrate how we can use Aparapi to perform dot product matrix multiplication.

Using Aparapi for matrix multiplication

We will use the matrices as used in the Implementing basic matrix operations section. We start with the declaration of the MatrixMultiplicationKernel class, which contains the vector declarations, a constructor, the run method, and a displayResults method. The vectors for matrices A and B have been flattened to one-dimensional arrays by allocating the matrices in row-column order:

class MatrixMultiplicationKernel extends Kernel { 
    float[] vectorA = { 
        0.1950f, 0.0311f, 0.3588f,  
        0.2203f, 0.1716f, 0.5931f,  
        0.2105f, 0.3242f}; 
    float[] vectorB = { 
        0.0502f, 0.9823f, 0.9472f,  
        0.5732f, 0.2694f, 0.916f}; 
    float[] vectorC; 
    int n; 
    int m; 
    int p; 
 
    @Override 
    public void run() { 
        ... 
    } 
 
    public MatrixMultiplicationKernel(int n, int m, int p) { 
        ... 
    } 
 
    public void displayResults () { 
        ... 
    } 
} 

The MatrixMultiplicationKernel constructor assigns values for the matrices' dimensions and allocates memory for the result stored in vectorC, as shown here:

public MatrixMultiplicationKernel(int n, int m, int p) { 
    this.n = n; 
    this.p = p; 
    this.m = m; 
    vectorC = new float[n * p]; 
} 

The run method uses a global ID and a pass ID to perform the matrix multiplication. The pass ID is specified as the second argument of the Kernel class' execute method, as we will see shortly. This value allows us to advance the column index for vectorC. The vector indexes map to the corresponding row and column positions of the original matrices:

public void run() { 
    int i = getGlobalId(); 
    int j = this.getPassId(); 
    float value = 0; 
    for (int k = 0; k < p; k++) { 
        value += vectorA[k + i * m] * vectorB[k * p + j]; 
    } 
    vectorC[i * p + j] = value; 
} 

The displayResults method is shown as follows:

public void displayResults() { 
    out.println("Result"); 
    for (int i = 0; i < n; i++) { 
        for (int j = 0; j < p; j++) { 
            out.printf("%.4f  ", vectorC[i * p + j]); 
        } 
        out.println(); 
    } 
} 

The kernel is started in the same way as in the previous section. The execute method is passed the number of kernels that should be created and an integer indicating the number of passes to make. The number of passes is used to control the index into the vectorA and vectorB arrays:

MatrixMultiplicationKernel kernel = new MatrixMultiplicationKernel(n, m,
   p);kernel.execute(6, 3);kernel.displayResults(); 
kernel.dispose(); 

When this example is executed, you will get the following output:

Result
0.0276  0.1999  0.2132  
0.1443  0.4118  0.5417  
0.3486  0.3283  0.7058  
0.1964  0.2941  0.4964

Next, we will see how Java 8 additions can contribute to solving math-intensive problems in a parallel manner.

Creating an Aparapi application

The basic framework for an Aparapi application is shown next. It consists of a Kernel derived class where the run method is overridden. In this example, the run method will perform scalar multiplication. This operation involves multiplying each element of a vector by some value.

The ScalarMultiplicationKernel extends the Kernel class. It possesses two instance variables used to hold the matrices for input and output. The constructor will initialize the matrices. The run method will perform the actual computations, and the displayResult method will show the results of the multiplication:

public class ScalarMultiplicationKernel extends Kernel { 
    float[] inputMatrix; 
    float outputMatrix []; 
 
    public ScalarMultiplicationKernel(float inputMatrix[]) { 
        ... 
    } 
 
    @Override 
    public void run() { 
        ... 
    } 
 
    public void displayResult() { 
        ... 
    } 
} 

The constructor is shown here:

public ScalarMultiplicationKernel(float inputMatrix[]) { 
    this.inputMatrix = inputMatrix; 
    outputMatrix = new float[this.inputMatrix.length]; 
} 

In the run method, we use a global ID to index into the matrix. This code is executed on each computation unit, for example, a GPU or thread. A unique global ID is provided to each computational unit, allowing the code to access a specific element of the matrix. In this example, each element of the input matrix is multiplied by 2 and then assigned to the corresponding element of the output matrix:

public void run() { 
    int globalID = this.getGlobalId(); 
    outputMatrix[globalID] = 2.0f * inputMatrix[globalID]; 
} 

The displayResult method simply displays the contents of the outputMatrix array:

public void displayResult() { 
    out.println("Result"); 
    for (float element : outputMatrix) { 
        out.printf("%.4f ", element); 
    } 
    out.println(); 
} 

To use this kernel, we need to declare variables for the inputMatrix and its size. The size will be used to control how many kernels to execute:

float inputMatrix[] = {3, 4, 5, 6, 7, 8, 9}; 
int size = inputMatrix.length; 

The kernel is then created using the input matrix followed by the invocation of the execute method. This method starts the process and will eventually invoke the Kernel class' run method based on the execute method's argument. This argument is referred to as the pass ID. While not used in this example, we will use it in the next section. When the process is complete, the resulting output matrix is displayed and the dispose method is called to stop the process:

ScalarMultiplicationKernel kernel =  
        new ScalarMultiplicationKernel(inputMatrix); 
kernel.execute(size); 
kernel.displayResult(); 
kernel.dispose(); 

When this application is executed we will get the following output:

6.0000 8.0000 10.0000 12.0000 14.0000 16.0000 18.000

We can specify the execution mode using the Kernel class' setExecutionMode method, as shown here:

kernel.setExecutionMode(Kernel.EXECUTION_MODE.GPU); 

However, it is best to let Aparapi determine the execution mode. The following table summarizes the execution modes available:

Execution mode

Meaning

Kernel.EXECUTION_MODE.NONE

Does not specify mode

Kernel.EXECUTION_MODE.CPU

Use CPU

Kernel.EXECUTION_MODE.GPU

Use GPU

Kernel.EXECUTION_MODE.JTP

Use Java threads

Kernel.EXECUTION_MODE.SEQ

Use single loop (for debugging purposes)

Next, we will demonstrate how we can use Aparapi to perform dot product matrix multiplication.

Using Aparapi for matrix multiplication

We will use the matrices as used in the Implementing basic matrix operations section. We start with the declaration of the MatrixMultiplicationKernel class, which contains the vector declarations, a constructor, the run method, and a displayResults method. The vectors for matrices A and B have been flattened to one-dimensional arrays by allocating the matrices in row-column order:

class MatrixMultiplicationKernel extends Kernel { 
    float[] vectorA = { 
        0.1950f, 0.0311f, 0.3588f,  
        0.2203f, 0.1716f, 0.5931f,  
        0.2105f, 0.3242f}; 
    float[] vectorB = { 
        0.0502f, 0.9823f, 0.9472f,  
        0.5732f, 0.2694f, 0.916f}; 
    float[] vectorC; 
    int n; 
    int m; 
    int p; 
 
    @Override 
    public void run() { 
        ... 
    } 
 
    public MatrixMultiplicationKernel(int n, int m, int p) { 
        ... 
    } 
 
    public void displayResults () { 
        ... 
    } 
} 

The MatrixMultiplicationKernel constructor assigns values for the matrices' dimensions and allocates memory for the result stored in vectorC, as shown here:

public MatrixMultiplicationKernel(int n, int m, int p) { 
    this.n = n; 
    this.p = p; 
    this.m = m; 
    vectorC = new float[n * p]; 
} 

The run method uses a global ID and a pass ID to perform the matrix multiplication. The pass ID is specified as the second argument of the Kernel class' execute method, as we will see shortly. This value allows us to advance the column index for vectorC. The vector indexes map to the corresponding row and column positions of the original matrices:

public void run() { 
    int i = getGlobalId(); 
    int j = this.getPassId(); 
    float value = 0; 
    for (int k = 0; k < p; k++) { 
        value += vectorA[k + i * m] * vectorB[k * p + j]; 
    } 
    vectorC[i * p + j] = value; 
} 

The displayResults method is shown as follows:

public void displayResults() { 
    out.println("Result"); 
    for (int i = 0; i < n; i++) { 
        for (int j = 0; j < p; j++) { 
            out.printf("%.4f  ", vectorC[i * p + j]); 
        } 
        out.println(); 
    } 
} 

The kernel is started in the same way as in the previous section. The execute method is passed the number of kernels that should be created and an integer indicating the number of passes to make. The number of passes is used to control the index into the vectorA and vectorB arrays:

MatrixMultiplicationKernel kernel = new MatrixMultiplicationKernel(n, m,
   p);kernel.execute(6, 3);kernel.displayResults(); 
kernel.dispose(); 

When this example is executed, you will get the following output:

Result
0.0276  0.1999  0.2132  
0.1443  0.4118  0.5417  
0.3486  0.3283  0.7058  
0.1964  0.2941  0.4964

Next, we will see how Java 8 additions can contribute to solving math-intensive problems in a parallel manner.

Using Aparapi for matrix multiplication

We will use the matrices as used in the Implementing basic matrix operations section. We start with the declaration of the MatrixMultiplicationKernel class, which contains the vector declarations, a constructor, the run method, and a displayResults method. The vectors for matrices A and B have been flattened to one-dimensional arrays by allocating the matrices in row-column order:

class MatrixMultiplicationKernel extends Kernel { 
    float[] vectorA = { 
        0.1950f, 0.0311f, 0.3588f,  
        0.2203f, 0.1716f, 0.5931f,  
        0.2105f, 0.3242f}; 
    float[] vectorB = { 
        0.0502f, 0.9823f, 0.9472f,  
        0.5732f, 0.2694f, 0.916f}; 
    float[] vectorC; 
    int n; 
    int m; 
    int p; 
 
    @Override 
    public void run() { 
        ... 
    } 
 
    public MatrixMultiplicationKernel(int n, int m, int p) { 
        ... 
    } 
 
    public void displayResults () { 
        ... 
    } 
} 

The MatrixMultiplicationKernel constructor assigns values for the matrices' dimensions and allocates memory for the result stored in vectorC, as shown here:

public MatrixMultiplicationKernel(int n, int m, int p) { 
    this.n = n; 
    this.p = p; 
    this.m = m; 
    vectorC = new float[n * p]; 
} 

The run method uses a global ID and a pass ID to perform the matrix multiplication. The pass ID is specified as the second argument of the Kernel class' execute method, as we will see shortly. This value allows us to advance the column index for vectorC. The vector indexes map to the corresponding row and column positions of the original matrices:

public void run() { 
    int i = getGlobalId(); 
    int j = this.getPassId(); 
    float value = 0; 
    for (int k = 0; k < p; k++) { 
        value += vectorA[k + i * m] * vectorB[k * p + j]; 
    } 
    vectorC[i * p + j] = value; 
} 

The displayResults method is shown as follows:

public void displayResults() { 
    out.println("Result"); 
    for (int i = 0; i < n; i++) { 
        for (int j = 0; j < p; j++) { 
            out.printf("%.4f  ", vectorC[i * p + j]); 
        } 
        out.println(); 
    } 
} 

The kernel is started in the same way as in the previous section. The execute method is passed the number of kernels that should be created and an integer indicating the number of passes to make. The number of passes is used to control the index into the vectorA and vectorB arrays:

MatrixMultiplicationKernel kernel = new MatrixMultiplicationKernel(n, m,
   p);kernel.execute(6, 3);kernel.displayResults(); 
kernel.dispose(); 

When this example is executed, you will get the following output:

Result
0.0276  0.1999  0.2132  
0.1443  0.4118  0.5417  
0.3486  0.3283  0.7058  
0.1964  0.2941  0.4964

Next, we will see how Java 8 additions can contribute to solving math-intensive problems in a parallel manner.

Using Java 8 streams

The release of Java 8 came with a number of important enhancements to the language. The two enhancements of interest to us include lambda expressions and streams. A lambda expression is essentially an anonymous function that adds a functional programming dimension to Java. The concept of streams, as introduced in Java 8, does not refer to IO streams. Instead, you can think of it as a sequence of objects that can be generated and manipulated using a fluent style of programming. This style will be demonstrated shortly.

As with most APIs, programmers must be careful to consider the actual execution performance of their code using realistic test cases and environments. If not used properly, streams may not actually provide performance improvements. In particular, parallel streams, if not crafted carefully, can produce incorrect results.

We will start with a quick introduction to lambda expressions and streams. If you are familiar with these concepts you may want to skip over the next section.

Understanding Java 8 lambda expressions and streams

A lambda expression can be expressed in several different forms. The following illustrates a simple lambda expression where the symbol, ->, is the lambda operator. This will take some value, e, and return the value multiplied by two. There is nothing special about the name e. Any valid Java variable name can be used:

e -> 2 * e 

It can also be expressed in other forms, such as the following:

(int e) -> 2 * e 
(double e) -> 2 * e 
(int e) -> {return 2 * e; 

The form used depends on the intended value of e. Lambda expressions are frequently used as arguments to a method, as we will see shortly.

A stream can be created using a number of techniques. In the following example, a stream is created from an array. The IntStream interface is a type of stream that uses integers. The Arrays class' stream method converts an array into a stream:

IntStream stream = Arrays.stream(numbers); 

We can then apply various stream methods to perform an operation. In the following statement, the forEach method will simply display each integer in the stream:

stream.forEach(e -> out.printf("%d  ", e)); 

There are a variety of stream methods that can be applied to a stream. In the following example, the mapToDouble method will take an integer, multiply it by 2, and then return it as a double. The forEach method will then display these values:

stream 
        .mapToDouble(e-> 2 * e) 
        .forEach(e -> out.printf("%.4f  ", e)); 

The cascading of method invocations is referred to as fluent programing.

Using Java 8 to perform matrix multiplication

Here, we will illustrate how streams can be used to perform matrix multiplication. The definitions of the A, B, and C matrices are the same as declared in the Implementing basic matrix operations section. They are duplicated here for your convenience:

double A[][] = { 
    {0.1950, 0.0311}, 
    {0.3588, 0.2203}, 
    {0.1716, 0.5931}, 
    {0.2105, 0.3242}}; 
double B[][] = { 
    {0.0502, 0.9823, 0.9472}, 
    {0.5732, 0.2694, 0.916}}; 
double C[][] = new double[n][p]; 

The following sequence is a stream implementation of matrix multiplication. A detailed explanation of the code follows:

C = Arrays.stream(A) 
        .parallel() 
        .map(AMatrixRow -> IntStream.range(0, B[0].length) 
                .mapToDouble(i -> IntStream.range(0, B.length) 
                        .mapToDouble(j -> AMatrixRow[j] * B[j][i]) 
                        .sum() 
                ).toArray()).toArray(double[][]::new); 

The first map method, shown as follows, creates a stream of double vectors representing the 4 rows of the A matrix. The range method will return a list of stream elements ranging from its first argument to the second argument.

.map(AMatrixRow -> IntStream.range(0, B[0].length) 

The variable i corresponds to the numbers generated by the second range method, which corresponds to the number of rows in the B matrix (2). The variable j corresponds to the numbers generated by the third range method, representing the number of columns of the B matrix (3).

At the heart of the statement is the matrix multiplication, where the sum method calculates the sum:

.mapToDouble(j -> AMatrixRow[j] * B[j][i]) 
.sum() 

The last part of the expression creates the two-dimensional array for the C matrix. The operator, ::new, is called a method reference and is a shorter way of invoking the new operator to create a new object:

).toArray()).toArray(double[][]::new); 

The displayResult method is as follows:

public void displayResult() { 
    out.println("Result"); 
    for (int i = 0; i < n; i++) { 
        for (int j = 0; j < p; j++) { 
            out.printf("%.4f  ", C[i][j]); 
        } 
        out.println(); 
    } 
} 

The output of this sequence follows:

Result
0.0276  0.1999  0.2132  
0.1443  0.4118  0.5417  
0.3486  0.3283  0.7058  
0.1964  0.2941  0.4964

Using Java 8 to perform map-reduce

In the next section, we will use Java 8 streams to perform a map-reduce operation similar to the one demonstrated using Hadoop in the Using map-reduce section. In this example, we will use a Stream of Book objects. We will then demonstrate how to use the Java 8 reduce and average methods to get our total page count and average page count.

Rather than begin with a text file, as we did in the Hadoop example, we have created a Book class with title, author, and page-count fields. In the main method of the driver class, we have created new instances of Book and added them to an ArrayList called books. We have also created a double value average to hold our average, and initialized our variable totalPg to zero:

ArrayList<Book> books = new ArrayList<>(); 
double average; 
int totalPg = 0; 
 
books.add(new Book("Moby Dick", "Herman Melville", 822)); 
books.add(new Book("Charlotte's Web", "E.B. White", 189)); 
books.add(new Book("The Grapes of Wrath", "John Steinbeck", 212)); 
books.add(new Book("Jane Eyre", "Charlotte Bronte", 299)); 
books.add(new Book("A Tale of Two Cities", "Charles Dickens", 673)); 
books.add(new Book("War and Peace", "Leo Tolstoy", 1032)); 
books.add(new Book("The Great Gatsby", "F. Scott Fitzgerald",    275)); 

Next, we perform a map and reduce operation to calculate the total number of pages in our set of books. To accomplish this in a parallel manner, we use the stream and parallel methods. We then use the map method with a lambda expression to accumulate all of the page counts from each Book object. Finally, we use the reduce method to merge our page counts into one final value, which is to be assigned to totalPg:

totalPg = books 
        .stream() 
        .parallel() 
        .map((b) -> b.pgCnt) 
        .reduce(totalPg, (accumulator, _item) -> { 
            out.println(accumulator + " " +_item); 
            return accumulator + _item; 
                }); 

Notice in the preceding reduce method we have chosen to print out information about the reduction operation's cumulative value and individual items. The accumulator represents the aggregation of our page counts. The _item represents the individual task within the map-reduce process undergoing reduction at any given moment.

In the output that follows, we will first see the accumulator value stay at zero as each individual book item is processed. Gradually, the accumulator value increases. The final operation is the reduction of the values 1223 and 2279. The sum of these two numbers is 3502, or the total page count for all of our books:

0 822
0 189
0 299
0 673
0 212
299 673
0 1032

0 275
1032 275
972 1307
189 212
822 401
1223 2279

Next, we will add code to calculate the average page count of our set of books. We multiply our totalPg value, determined using map-reduce, by 1.0 to prevent truncation when we divide by the integer returned by the size method. We then print out average.

average = 1.0 * totalPg / books.size(); 
out.printf("Average Page Count: %.4f\n", average); 

Our output is as follows:

Average Page Count: 500.2857

We could have used Java 8 streams to calculate the average directly using the map method. Add the following code to the main method. We use parallelStream with our map method to simultaneously get the page count for each of our books. We then use mapToDouble to ensure our data is of the correct type to calculate our average. Finally, we use the average and getAsDouble methods to calculate our average page count:

average = books 
        .parallelStream() 
        .map(b -> b.pgCnt) 
        .mapToDouble(s -> s) 
        .average() 
        .getAsDouble(); 
out.printf("Average Page Count: %.4f\n", average);

Then we print out our average. Our output, identical to our previous example, is as follows:

Average Page Count: 500.2857

These techniques made use of Java 8 capabilities related to the map-reduce framework to solve numeric problems. This type of process can also be applied to other types of data, including text-based data. The true benefit is seen when these processes handle extremely large datasets within a greatly reduced time frame.

Understanding Java 8 lambda expressions and streams

A lambda expression can be expressed in several different forms. The following illustrates a simple lambda expression where the symbol, ->, is the lambda operator. This will take some value, e, and return the value multiplied by two. There is nothing special about the name e. Any valid Java variable name can be used:

e -> 2 * e 

It can also be expressed in other forms, such as the following:

(int e) -> 2 * e 
(double e) -> 2 * e 
(int e) -> {return 2 * e; 

The form used depends on the intended value of e. Lambda expressions are frequently used as arguments to a method, as we will see shortly.

A stream can be created using a number of techniques. In the following example, a stream is created from an array. The IntStream interface is a type of stream that uses integers. The Arrays class' stream method converts an array into a stream:

IntStream stream = Arrays.stream(numbers); 

We can then apply various stream methods to perform an operation. In the following statement, the forEach method will simply display each integer in the stream:

stream.forEach(e -> out.printf("%d  ", e)); 

There are a variety of stream methods that can be applied to a stream. In the following example, the mapToDouble method will take an integer, multiply it by 2, and then return it as a double. The forEach method will then display these values:

stream 
        .mapToDouble(e-> 2 * e) 
        .forEach(e -> out.printf("%.4f  ", e)); 

The cascading of method invocations is referred to as fluent programing.

Using Java 8 to perform matrix multiplication

Here, we will illustrate how streams can be used to perform matrix multiplication. The definitions of the A, B, and C matrices are the same as declared in the Implementing basic matrix operations section. They are duplicated here for your convenience:

double A[][] = { 
    {0.1950, 0.0311}, 
    {0.3588, 0.2203}, 
    {0.1716, 0.5931}, 
    {0.2105, 0.3242}}; 
double B[][] = { 
    {0.0502, 0.9823, 0.9472}, 
    {0.5732, 0.2694, 0.916}}; 
double C[][] = new double[n][p]; 

The following sequence is a stream implementation of matrix multiplication. A detailed explanation of the code follows:

C = Arrays.stream(A) 
        .parallel() 
        .map(AMatrixRow -> IntStream.range(0, B[0].length) 
                .mapToDouble(i -> IntStream.range(0, B.length) 
                        .mapToDouble(j -> AMatrixRow[j] * B[j][i]) 
                        .sum() 
                ).toArray()).toArray(double[][]::new); 

The first map method, shown as follows, creates a stream of double vectors representing the 4 rows of the A matrix. The range method will return a list of stream elements ranging from its first argument to the second argument.

.map(AMatrixRow -> IntStream.range(0, B[0].length) 

The variable i corresponds to the numbers generated by the second range method, which corresponds to the number of rows in the B matrix (2). The variable j corresponds to the numbers generated by the third range method, representing the number of columns of the B matrix (3).

At the heart of the statement is the matrix multiplication, where the sum method calculates the sum:

.mapToDouble(j -> AMatrixRow[j] * B[j][i]) 
.sum() 

The last part of the expression creates the two-dimensional array for the C matrix. The operator, ::new, is called a method reference and is a shorter way of invoking the new operator to create a new object:

).toArray()).toArray(double[][]::new); 

The displayResult method is as follows:

public void displayResult() { 
    out.println("Result"); 
    for (int i = 0; i < n; i++) { 
        for (int j = 0; j < p; j++) { 
            out.printf("%.4f  ", C[i][j]); 
        } 
        out.println(); 
    } 
} 

The output of this sequence follows:

Result
0.0276  0.1999  0.2132  
0.1443  0.4118  0.5417  
0.3486  0.3283  0.7058  
0.1964  0.2941  0.4964

Using Java 8 to perform map-reduce

In the next section, we will use Java 8 streams to perform a map-reduce operation similar to the one demonstrated using Hadoop in the Using map-reduce section. In this example, we will use a Stream of Book objects. We will then demonstrate how to use the Java 8 reduce and average methods to get our total page count and average page count.

Rather than begin with a text file, as we did in the Hadoop example, we have created a Book class with title, author, and page-count fields. In the main method of the driver class, we have created new instances of Book and added them to an ArrayList called books. We have also created a double value average to hold our average, and initialized our variable totalPg to zero:

ArrayList<Book> books = new ArrayList<>(); 
double average; 
int totalPg = 0; 
 
books.add(new Book("Moby Dick", "Herman Melville", 822)); 
books.add(new Book("Charlotte's Web", "E.B. White", 189)); 
books.add(new Book("The Grapes of Wrath", "John Steinbeck", 212)); 
books.add(new Book("Jane Eyre", "Charlotte Bronte", 299)); 
books.add(new Book("A Tale of Two Cities", "Charles Dickens", 673)); 
books.add(new Book("War and Peace", "Leo Tolstoy", 1032)); 
books.add(new Book("The Great Gatsby", "F. Scott Fitzgerald",    275)); 

Next, we perform a map and reduce operation to calculate the total number of pages in our set of books. To accomplish this in a parallel manner, we use the stream and parallel methods. We then use the map method with a lambda expression to accumulate all of the page counts from each Book object. Finally, we use the reduce method to merge our page counts into one final value, which is to be assigned to totalPg:

totalPg = books 
        .stream() 
        .parallel() 
        .map((b) -> b.pgCnt) 
        .reduce(totalPg, (accumulator, _item) -> { 
            out.println(accumulator + " " +_item); 
            return accumulator + _item; 
                }); 

Notice in the preceding reduce method we have chosen to print out information about the reduction operation's cumulative value and individual items. The accumulator represents the aggregation of our page counts. The _item represents the individual task within the map-reduce process undergoing reduction at any given moment.

In the output that follows, we will first see the accumulator value stay at zero as each individual book item is processed. Gradually, the accumulator value increases. The final operation is the reduction of the values 1223 and 2279. The sum of these two numbers is 3502, or the total page count for all of our books:

0 822
0 189
0 299
0 673
0 212
299 673
0 1032

0 275
1032 275
972 1307
189 212
822 401
1223 2279

Next, we will add code to calculate the average page count of our set of books. We multiply our totalPg value, determined using map-reduce, by 1.0 to prevent truncation when we divide by the integer returned by the size method. We then print out average.

average = 1.0 * totalPg / books.size(); 
out.printf("Average Page Count: %.4f\n", average); 

Our output is as follows:

Average Page Count: 500.2857

We could have used Java 8 streams to calculate the average directly using the map method. Add the following code to the main method. We use parallelStream with our map method to simultaneously get the page count for each of our books. We then use mapToDouble to ensure our data is of the correct type to calculate our average. Finally, we use the average and getAsDouble methods to calculate our average page count:

average = books 
        .parallelStream() 
        .map(b -> b.pgCnt) 
        .mapToDouble(s -> s) 
        .average() 
        .getAsDouble(); 
out.printf("Average Page Count: %.4f\n", average);

Then we print out our average. Our output, identical to our previous example, is as follows:

Average Page Count: 500.2857

These techniques made use of Java 8 capabilities related to the map-reduce framework to solve numeric problems. This type of process can also be applied to other types of data, including text-based data. The true benefit is seen when these processes handle extremely large datasets within a greatly reduced time frame.

Using Java 8 to perform matrix multiplication

Here, we will illustrate how streams can be used to perform matrix multiplication. The definitions of the A, B, and C matrices are the same as declared in the Implementing basic matrix operations section. They are duplicated here for your convenience:

double A[][] = { 
    {0.1950, 0.0311}, 
    {0.3588, 0.2203}, 
    {0.1716, 0.5931}, 
    {0.2105, 0.3242}}; 
double B[][] = { 
    {0.0502, 0.9823, 0.9472}, 
    {0.5732, 0.2694, 0.916}}; 
double C[][] = new double[n][p]; 

The following sequence is a stream implementation of matrix multiplication. A detailed explanation of the code follows:

C = Arrays.stream(A) 
        .parallel() 
        .map(AMatrixRow -> IntStream.range(0, B[0].length) 
                .mapToDouble(i -> IntStream.range(0, B.length) 
                        .mapToDouble(j -> AMatrixRow[j] * B[j][i]) 
                        .sum() 
                ).toArray()).toArray(double[][]::new); 

The first map method, shown as follows, creates a stream of double vectors representing the 4 rows of the A matrix. The range method will return a list of stream elements ranging from its first argument to the second argument.

.map(AMatrixRow -> IntStream.range(0, B[0].length) 

The variable i corresponds to the numbers generated by the second range method, which corresponds to the number of rows in the B matrix (2). The variable j corresponds to the numbers generated by the third range method, representing the number of columns of the B matrix (3).

At the heart of the statement is the matrix multiplication, where the sum method calculates the sum:

.mapToDouble(j -> AMatrixRow[j] * B[j][i]) 
.sum() 

The last part of the expression creates the two-dimensional array for the C matrix. The operator, ::new, is called a method reference and is a shorter way of invoking the new operator to create a new object:

).toArray()).toArray(double[][]::new); 

The displayResult method is as follows:

public void displayResult() { 
    out.println("Result"); 
    for (int i = 0; i < n; i++) { 
        for (int j = 0; j < p; j++) { 
            out.printf("%.4f  ", C[i][j]); 
        } 
        out.println(); 
    } 
} 

The output of this sequence follows:

Result
0.0276  0.1999  0.2132  
0.1443  0.4118  0.5417  
0.3486  0.3283  0.7058  
0.1964  0.2941  0.4964

Using Java 8 to perform map-reduce

In the next section, we will use Java 8 streams to perform a map-reduce operation similar to the one demonstrated using Hadoop in the Using map-reduce section. In this example, we will use a Stream of Book objects. We will then demonstrate how to use the Java 8 reduce and average methods to get our total page count and average page count.

Rather than begin with a text file, as we did in the Hadoop example, we have created a Book class with title, author, and page-count fields. In the main method of the driver class, we have created new instances of Book and added them to an ArrayList called books. We have also created a double value average to hold our average, and initialized our variable totalPg to zero:

ArrayList<Book> books = new ArrayList<>(); 
double average; 
int totalPg = 0; 
 
books.add(new Book("Moby Dick", "Herman Melville", 822)); 
books.add(new Book("Charlotte's Web", "E.B. White", 189)); 
books.add(new Book("The Grapes of Wrath", "John Steinbeck", 212)); 
books.add(new Book("Jane Eyre", "Charlotte Bronte", 299)); 
books.add(new Book("A Tale of Two Cities", "Charles Dickens", 673)); 
books.add(new Book("War and Peace", "Leo Tolstoy", 1032)); 
books.add(new Book("The Great Gatsby", "F. Scott Fitzgerald",    275)); 

Next, we perform a map and reduce operation to calculate the total number of pages in our set of books. To accomplish this in a parallel manner, we use the stream and parallel methods. We then use the map method with a lambda expression to accumulate all of the page counts from each Book object. Finally, we use the reduce method to merge our page counts into one final value, which is to be assigned to totalPg:

totalPg = books 
        .stream() 
        .parallel() 
        .map((b) -> b.pgCnt) 
        .reduce(totalPg, (accumulator, _item) -> { 
            out.println(accumulator + " " +_item); 
            return accumulator + _item; 
                }); 

Notice in the preceding reduce method we have chosen to print out information about the reduction operation's cumulative value and individual items. The accumulator represents the aggregation of our page counts. The _item represents the individual task within the map-reduce process undergoing reduction at any given moment.

In the output that follows, we will first see the accumulator value stay at zero as each individual book item is processed. Gradually, the accumulator value increases. The final operation is the reduction of the values 1223 and 2279. The sum of these two numbers is 3502, or the total page count for all of our books:

0 822
0 189
0 299
0 673
0 212
299 673
0 1032

0 275
1032 275
972 1307
189 212
822 401
1223 2279

Next, we will add code to calculate the average page count of our set of books. We multiply our totalPg value, determined using map-reduce, by 1.0 to prevent truncation when we divide by the integer returned by the size method. We then print out average.

average = 1.0 * totalPg / books.size(); 
out.printf("Average Page Count: %.4f\n", average); 

Our output is as follows:

Average Page Count: 500.2857

We could have used Java 8 streams to calculate the average directly using the map method. Add the following code to the main method. We use parallelStream with our map method to simultaneously get the page count for each of our books. We then use mapToDouble to ensure our data is of the correct type to calculate our average. Finally, we use the average and getAsDouble methods to calculate our average page count:

average = books 
        .parallelStream() 
        .map(b -> b.pgCnt) 
        .mapToDouble(s -> s) 
        .average() 
        .getAsDouble(); 
out.printf("Average Page Count: %.4f\n", average);

Then we print out our average. Our output, identical to our previous example, is as follows:

Average Page Count: 500.2857

These techniques made use of Java 8 capabilities related to the map-reduce framework to solve numeric problems. This type of process can also be applied to other types of data, including text-based data. The true benefit is seen when these processes handle extremely large datasets within a greatly reduced time frame.

Using Java 8 to perform map-reduce

In the next section, we will use Java 8 streams to perform a map-reduce operation similar to the one demonstrated using Hadoop in the Using map-reduce section. In this example, we will use a Stream of Book objects. We will then demonstrate how to use the Java 8 reduce and average methods to get our total page count and average page count.

Rather than begin with a text file, as we did in the Hadoop example, we have created a Book class with title, author, and page-count fields. In the main method of the driver class, we have created new instances of Book and added them to an ArrayList called books. We have also created a double value average to hold our average, and initialized our variable totalPg to zero:

ArrayList<Book> books = new ArrayList<>(); 
double average; 
int totalPg = 0; 
 
books.add(new Book("Moby Dick", "Herman Melville", 822)); 
books.add(new Book("Charlotte's Web", "E.B. White", 189)); 
books.add(new Book("The Grapes of Wrath", "John Steinbeck", 212)); 
books.add(new Book("Jane Eyre", "Charlotte Bronte", 299)); 
books.add(new Book("A Tale of Two Cities", "Charles Dickens", 673)); 
books.add(new Book("War and Peace", "Leo Tolstoy", 1032)); 
books.add(new Book("The Great Gatsby", "F. Scott Fitzgerald",    275)); 

Next, we perform a map and reduce operation to calculate the total number of pages in our set of books. To accomplish this in a parallel manner, we use the stream and parallel methods. We then use the map method with a lambda expression to accumulate all of the page counts from each Book object. Finally, we use the reduce method to merge our page counts into one final value, which is to be assigned to totalPg:

totalPg = books 
        .stream() 
        .parallel() 
        .map((b) -> b.pgCnt) 
        .reduce(totalPg, (accumulator, _item) -> { 
            out.println(accumulator + " " +_item); 
            return accumulator + _item; 
                }); 

Notice in the preceding reduce method we have chosen to print out information about the reduction operation's cumulative value and individual items. The accumulator represents the aggregation of our page counts. The _item represents the individual task within the map-reduce process undergoing reduction at any given moment.

In the output that follows, we will first see the accumulator value stay at zero as each individual book item is processed. Gradually, the accumulator value increases. The final operation is the reduction of the values 1223 and 2279. The sum of these two numbers is 3502, or the total page count for all of our books:

0 822
0 189
0 299
0 673
0 212
299 673
0 1032

0 275
1032 275
972 1307
189 212
822 401
1223 2279

Next, we will add code to calculate the average page count of our set of books. We multiply our totalPg value, determined using map-reduce, by 1.0 to prevent truncation when we divide by the integer returned by the size method. We then print out average.

average = 1.0 * totalPg / books.size(); 
out.printf("Average Page Count: %.4f\n", average); 

Our output is as follows:

Average Page Count: 500.2857

We could have used Java 8 streams to calculate the average directly using the map method. Add the following code to the main method. We use parallelStream with our map method to simultaneously get the page count for each of our books. We then use mapToDouble to ensure our data is of the correct type to calculate our average. Finally, we use the average and getAsDouble methods to calculate our average page count:

average = books 
        .parallelStream() 
        .map(b -> b.pgCnt) 
        .mapToDouble(s -> s) 
        .average() 
        .getAsDouble(); 
out.printf("Average Page Count: %.4f\n", average);

Then we print out our average. Our output, identical to our previous example, is as follows:

Average Page Count: 500.2857

These techniques made use of Java 8 capabilities related to the map-reduce framework to solve numeric problems. This type of process can also be applied to other types of data, including text-based data. The true benefit is seen when these processes handle extremely large datasets within a greatly reduced time frame.

Summary

Data science uses math extensively to analyze problems. There are numerous Java math libraries available, many of which support concurrent operations. In this chapter, we introduced a number of libraries and techniques to provide some insight into how they can be used to support and improve the performance of applications.

We started with a discussion of how simple matrix multiplication is performed. A basic Java implementation was presented. In later sections, we duplicated the implementation using other APIs and technologies.

Many higher level APIs, such as DL4J, support a number of useful data analysis techniques. Beneath these APIs often lies concurrent support for multiple CPUs and GPUs. Sometimes this support is configurable, as is the case for DL4J. We briefly discussed how we can configure ND4J to support multiple processors.

The map-reduce algorithm has found extensive use in the data science community. We took advantage of the parallel processing power of this framework to calculate the average of a given set of values, the page counts for a set of books. This technique used Apache's Hadoop to perform the map and reduce functions.

Mathematical techniques are supported by a large number of libraries. Many of these libraries do not directly support parallel operations. However, understanding what is available and how they can be used is important. To that end, we demonstrated how three different Java APIs can be used: jblas, Apache Commons Math, and ND4J.

OpenCL is an API that supports parallel operations on a variety of hardware platforms, processor types, and languages. This support is fairly low level. There are a number of Java bindings for OpenCL, which we reviewed.

Aparapi is a higher level of support for Java that can use CPUs, CUDA, or OpenCL to effect parallel operations. We demonstrated this support using the matrix multiplication example.

We wrapped up our discussion with an introduction to Java 8 streams and lambda expressions. These language elements can support parallel operations to improve an application's performance. In addition, this can often provide a more elegant and more maintainable implementation once the programmer becomes familiar with the techniques. We also demonstrated techniques for performing map-reduce using Java 8.

In the next chapter, we will conclude the book by illustrating how many of the techniques introduced can be used to build a complete application.

You have been reading a chapter from
Machine Learning: End-to-End guide for Java developers
Published in: Oct 2017
Publisher: Packt
ISBN-13: 9781788622219
Register for a free Packt account to unlock a world of extra content!
A free Packt account unlocks extra newsletters, articles, discounted offers, and much more. Start advancing your knowledge today.
Unlock this book and the full library FREE for 7 days
Get unlimited access to 7000+ expert-authored eBooks and videos courses covering every tech area you can think of
Renews at $19.99/month. Cancel anytime