Search icon CANCEL
Subscription
0
Cart icon
Your Cart (0 item)
Close icon
You have no products in your basket yet
Save more on your purchases! discount-offer-chevron-icon
Savings automatically calculated. No voucher code required.
Arrow left icon
Explore Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Free Learning
Arrow right icon
Arrow up icon
GO TO TOP
Apache Spark for Data Science Cookbook

You're reading from   Apache Spark for Data Science Cookbook Solve real-world analytical problems

Arrow left icon
Product type Paperback
Published in Dec 2016
Publisher
ISBN-13 9781785880100
Length 392 pages
Edition 1st Edition
Arrow right icon
Authors (2):
Arrow left icon
Padma Priya Chitturi Padma Priya Chitturi
Author Profile Icon Padma Priya Chitturi
Padma Priya Chitturi
Nagamallikarjuna Inelu Nagamallikarjuna Inelu
Author Profile Icon Nagamallikarjuna Inelu
Nagamallikarjuna Inelu
Arrow right icon
View More author details
Toc

Table of Contents (11) Chapters Close

Preface 1. Big Data Analytics with Spark 2. Tricky Statistics with Spark FREE CHAPTER 3. Data Analysis with Spark 4. Clustering, Classification, and Regression 5. Working with Spark MLlib 6. NLP with Spark 7. Working with Sparkling Water - H2O 8. Data Visualization with Spark 9. Deep Learning on Spark 10. Working with SparkR

Loading and saving data

This recipe shows how Spark supports a wide range of input and output sources. Spark makes it very simple to load and save data in a large number of file formats. Formats range from unstructured, such as text, to semi-structured, such as JSON, to structured, such as SequenceFiles.

Getting ready

To step through this recipe, you will need a running Spark cluster either in pseudo distributed mode or in one of the distributed modes, that is, standalone, YARN, or Mesos. Also, the reader is expected to have an understanding of text files, JSON, CSV, SequenceFiles, and object files.

How to do it…

  1. Load and save a text file as follows:
          val input = 
          sc.textFile("hdfs://namenodeHostName:8020/repos/spark/README.md") 
          val wholeInput = 
          sc.wholeTextFiles("file://home/padma/salesFiles") 
          val result = wholeInput.mapValues{value => val nums = value.split
          (" ").map(x => x.toDouble) 
          nums.sum/nums.size.toDouble} 
          result.saveAsTextFile("/home/Padma/outputFile.txt") 
    
  2. For loading a JSON file, the people.json input file is taken from the SPARK_HOME folder whose location is /spark-1.6.0/examples/src/main/resource/people.json. Now, loading and saving a JSON file looks like this:
            // Loading JSON file 
            import com.fasterxml.jackson.module.scala.DefaultScalaModule 
            import com.fasterxml.jackson.module.scala.
            experimental.ScalaObjectMapper
            import com.fasterxml.jackson.databind.ObjectMapper 
            import com.fasterxml.jackson.module.databind.
                DeserializatiuonFeature 
            ... 
            case class Person(name:String, age:Int) 
            ... 
            val jsonInput =
            sc.textFile(""hdfs://namenode:9000/data/people.json") 
            val result = jsonInput.flatMap(record => {
            try{Some(mapper.readValue(record, classOf[Person])) 
            } 
            catch{ 
            case e:Exception => None 
            }} ) 
            result.filter(person =>
            person.age>15).map(mapper.writeValueAsString(_)).
            saveAsTextFile(output File) 
    
  3. To load and save a CSV file, let's take the stocks data:
           IBM,20160113,133.5,134.279999,131.100006,131.169998,4672300 
           GOOG,20160113,730.849976,734.73999,698.609985,700.559998,2468300 
           MSFT,20160113,53.799999,54.07,51.299999,51.639999,66119000 
           MSFT,20160112,52.759998,53.099998,52.060001,52.779999,35650700 
           YHOO,20160113,30.889999,31.17,29.33,29.440001,16593700 
           . 
           . 
           import java.io.StringReader 
           import au.com.bytecode.opencsv.CSVReader 
           ... 
           case class Stocks(name:String, totalPrice:Long) 
           ... 
           val input = sc.textFile("hdfs://namenodeHostName:8020 
           /data/stocks.txt") 
           val result = input.map{line => val reader = new CSVReader(new 
           StringReader(line)) 
           reader.readAll().map(x => Stocks(x(0), x(6))) 
           } 
           result.map(stock => Array(stock.name, stock.
           totalPrice)).mapPartitions {stock =>  
           val stringWriter = new StringWriter 
           val csvWriter = new CSVWriter(stringWriter) 
           csvWriter.writeAll(people.toList) 
           Iterator(stringWriter.toString) 
           }.saveAsTextFilehdfs://namenode:9000/CSVOutputFile") 
    
  4. Now, let's see the way sequenceFile is loaded and saved:
           val data = sc.sequenceFile(inputFile, classOf[Text],  
           classOf[IntWritable]).map{case(x,y) => (x.toString, y.get())} 
           val input = sc.parallelize(List(("Panda",3),("Kay",6),
           ("Snail",2))) 
           input.saveAsSequenceFilehdfs://namenode:9000/
           sequenceOutputFile") 
    

How it works…

The call to textFile() on the SparkContext with the path to the file loads the text file as RDD. If there exists multiple input parts in the form of a directory then we can use SparkContext.wholeTextFiles(), which returns a pair RDD with the key as the name of the input file. Well, for handling JSON files, the data is loaded as a text file and then it is parsed using a JSON parser. There are a number of JSON libraries available, but in the example we used the Jackson (http://bit.ly/17k6vli) library as it is relatively simple to implement.

Tip

Please refer to other JSON libraries, such as this one: http://bit.ly/1xP8JFK

Loading CSV/TSV data is similar to JSON data, that is, first the data is loaded as text and then processed. Similar to JSON, there are various CSV libraries, but for Scala, we used opencsv (http://opencsv.sourceforge.net). Using CSVReader, the records are parsed and mapped to case class structure. While saving the file, CSVWriter is used to output the file.

When coming to SequenceFile, it is a popular Hadoop format composed of a flat file with key/value pairs. This sequence file implements Hadoop's writable interface. SparkContext.sequenceFile() is the API to load the sequence file in which the parameters classOf[Text] and classOf[IntWritable] indicate the keyClass and valueClass.

There's more…

As Spark is built on the ecosystem of Hadoop, it can access data through the InputFormat and OutputFormat interfaces used by Hadoop MapReduce, which are available for many common file formats and storage systems (for example, S3, HDFS, Cassandra, HBase, and so on).

Spark can also interact with any Hadoop supported formats (for both old and new Hadoop file APIs) using newAPIHadoopFile, which takes a path and three classes. The first class represents the input format. The next class is for our key and the final class is the class of our value. The Spark SQL module provides a more efficient API for structured data sources, which includes JSON and Hive.

lock icon The rest of the chapter is locked
Register for a free Packt account to unlock a world of extra content!
A free Packt account unlocks extra newsletters, articles, discounted offers, and much more. Start advancing your knowledge today.
Unlock this book and the full library FREE for 7 days
Get unlimited access to 7000+ expert-authored eBooks and videos courses covering every tech area you can think of
Renews at $19.99/month. Cancel anytime
Banner background image