Search icon CANCEL
Subscription
0
Cart icon
Your Cart (0 item)
Close icon
You have no products in your basket yet
Arrow left icon
Explore Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Free Learning
Arrow right icon
Arrow up icon
GO TO TOP
Data Engineering with Scala and Spark

You're reading from   Data Engineering with Scala and Spark Build streaming and batch pipelines that process massive amounts of data using Scala

Arrow left icon
Product type Paperback
Published in Jan 2024
Publisher Packt
ISBN-13 9781804612583
Length 300 pages
Edition 1st Edition
Languages
Arrow right icon
Authors (3):
Arrow left icon
Rupam Bhattacharjee Rupam Bhattacharjee
Author Profile Icon Rupam Bhattacharjee
Rupam Bhattacharjee
David Radford David Radford
Author Profile Icon David Radford
David Radford
Eric Tome Eric Tome
Author Profile Icon Eric Tome
Eric Tome
Arrow right icon
View More author details
Toc

Table of Contents (21) Chapters Close

Preface 1. Part 1 – Introduction to Data Engineering, Scala, and an Environment Setup
2. Chapter 1: Scala Essentials for Data Engineers FREE CHAPTER 3. Chapter 2: Environment Setup 4. Part 2 – Data Ingestion, Transformation, Cleansing, and Profiling Using Scala and Spark
5. Chapter 3: An Introduction to Apache Spark and Its APIs – DataFrame, Dataset, and Spark SQL 6. Chapter 4: Working with Databases 7. Chapter 5: Object Stores and Data Lakes 8. Chapter 6: Understanding Data Transformation 9. Chapter 7: Data Profiling and Data Quality 10. Part 3 – Software Engineering Best Practices for Data Engineering in Scala
11. Chapter 8: Test-Driven Development, Code Health, and Maintainability 12. Chapter 9: CI/CD with GitHub 13. Part 4 – Productionalizing Data Engineering Pipelines – Orchestration and Tuning
14. Chapter 10: Data Pipeline Orchestration 15. Chapter 11: Performance Tuning 16. Part 5 – End-to-End Data Pipelines
17. Chapter 12: Building Batch Pipelines Using Spark and Scala 18. Chapter 13: Building Streaming Pipelines Using Spark and Scala 19. Index 20. Other Books You May Enjoy

Working with higher-order functions (HOFs)

In Scala, functions are first-class citizens, which means function values can be assigned to variables, passed to functions as arguments, or returned by a function as a value. HOFs take one or more functions as arguments or return a function as a value.

A method can also be passed as an argument to an HOF because the Scala compiler will coerce a method into a function of the required type. For example, let’s define a function literal and a method, both of which take a pair of integers, perform an operation, and then return an integer:

//function literal
val add: (Int, Int) => Int = (x, y) => x + y
//a method
def multiply(x: Int, y: Int): Int = x * y

Example 1.19

Let’s now define a method that takes two integer arguments and performs an operation, op, on them:

def op(x: Int, y: Int) (f: (Int, Int) => Int): Int = f(x,y)

Example 1.20

We can pass any function (or method) of type (Int, Int) => Int to op, as the following example illustrates:

scala> op(1,2)(add)
res15: Int = 3
scala> op(2,3)(multiply)
res16: Int = 6

Example 1.21

This ability to pass functions as parameters is extremely powerful as it allows us to write generic code that can execute arbitrary user-supplied functions. In fact, many of the methods defined in the Scala collection library require functions as arguments, as we will see in the next section.

Examples of HOFs from the Scala collection library

Scala collections provide transformers that take a base collection, run some transformations over each of the collection’s elements, and return a new collection. For example, we can transform a list of integers by doubling each of its elements using the map method, which we will cover in a bit:

scala> List(1,2,3,4).map(_ * 2)
res17: List[Int] = List(2, 4, 6, 8)

Example 1.22

A traversable trait, which is a base trait for all kinds of Scala collections, implements behaviors common to all collections, in terms of a foreach method, with the following signature:

def foreach[U](f: A => U): Unit

Example 1.23

The argument f is a function of type A => U, which is shorthand for Function1[A,U], and thus foreach is an HOF. This is an abstract method that needs to be implemented by all classes that mix in Traversable. The return type is Unit, which means this method does not return any meaningful value and is primarily used for side effects.

Here is an example that prints the elements of a List:

scala> /** let's start with a foreach call that prints the numbers in a list
     |   * List(1,2,3,4).foreach((i: Int) => println(i))
     |   * we can skip the type argument and let Scala infer it
     |   * List(1,2,3,4).foreach( i => println(i))
     |   * Scala provides a shorthand to replace arguments using _
     |   * if the arguments are used only once on the right side
     |   * List(1,2,3,4).foreach(println(_))
     |   * finally Scala allows to leave the argument altogether
     |   * if there is only one argument used on the right side
     |   */
     | List(1,2,3,4).foreach(println)
1
2
3
4

Example 1.24

For the rest of the examples, we will continue to use the List collection type, but they are available for other types of collections, such as Array, Map, and Set.

map is similar to foreach, but instead of returning a unit, it returns a collection by applying the function f to each element of the base collection. Here is the signature for List[A]:

final def map[B](f: (A) ⇒ B): List[B]

Example 1.25

Using the list from the previous example, if we want to double each of the elements in the list, but return a list of Doubles instead of Ints, it can be achieved by using the following:

scala> List(1,2,3,4).map(_ * 2.0)
res22: List[Double] = List(2.0, 4.0, 6.0, 8.0)

Example 1.26

The preceding expression returns a list of Double and can be chained with foreach to print the values contained in the list:

scala> List(1,2,3,4).map(_ * 2.0).foreach(println)
2.0
4.0
6.0
8.0

Example 1.27

A close cousin of map is flatMap, which comprises of two parts—map and flatten. Before looking into flatMap, let’s look at flatten:

//converts a list of traversable collections into a list
//formed by the elements of the traversable collections
def flatten[B]: List[B]

Example 1.28

As the name suggests, it flattens the inner collections:

scala> List(Set(1,2,3), Set(4,5,6)).flatten
res24: List[Int] = List(1, 2, 3, 4, 5, 6)

Example 1.29

Now that we have seen what flatten does, let’s go back to flatMap.

Let’s say that for each element of List(1,2,3,4), we want to create List of elements from 0 to that number (both inclusive) and then combine all of those individual lists into a single list. Our first pass at it would look like the following:

scala> List(1,2,3,4).map(0 to _).flatten
res25: List[Int] = List(0, 1, 0, 1, 2, 0, 1, 2, 3, 0, 1, 2, 3, 4)

Example 1.30

With flatMap, we can achieve the same result in one step:

scala> List(1,2,3,4).flatMap(0 to _)
res26: List[Int] = List(0, 1, 0, 1, 2, 0, 1, 2, 3, 0, 1, 2, 3, 4)

Example 1.31

Scala collections also provide filter, which accepts a function that returns a Boolean as an argument, which is then used to filter elements of a given collection:

def filter(p: (A) ⇒ Boolean): List[A]

Example 1.32

For example, to filter all of the even integers from List of numbers from 1 to 100, try the following:

scala> List.tabulate(100)(_ + 1).filter(_ % 2 == 0)
res27: List[Int] = List(2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98, 100)

Example 1.33

There is also withFilter, which provides performance benefits over filter through the lazy evaluation of intermediate collections. It is part of the TraversableLike trait, with the FilterMonadic trait providing the abstract definition:

trait FilterMonadic[+A, +Repr] extends Any {
  //includes map, flatMap and foreach but are skipped here
  def withFilter(p: A => Boolean): FilterMonadic[A, Repr]
}

Example 1.34

TraversableLike defines the withFilter method through a member class, WithFilter, that extends FilterMonadic:

def withFilter(p: A => Boolean): FilterMonadic[A, Repr] = new WithFilter(p)
class WithFilter(p: A => Boolean) extends FilterMonadic[A, Repr] {
  // implementation of map, flatMap and foreach skipped here
  def withFilter(q: A => Boolean): WithFilter = new WithFilter(x =>
  p(x) && q(x)
  )
}

Example 1.35

Please note that withFilter returns an object of type FilterMonadic, which only has map, flatMap, foreach, and withFilter. These are the only methods that can be chained after a call to withFilter. For example, the following will not compile:

List.tabulate(50)(_ + 1).withFilter(_ % 2 == 0).forall(_ % 2 == 0)

Example 1.36

It is quite common to have a sequence of flatMap, filter, and map chained together and Scala provides syntactic sugar to support that through for comprehensions. To see it in action, let’s consider the following Person class and its instances:

case class Person(firstName: String, isFemale: Boolean, children: Person*)
val bob = Person("Bob", false)
val jennette = Person("Jennette", true)
val laura = Person("Laura", true)
val jean = Person("Jean", true, bob, laura)
val persons = List(bob, jennette, laura, jean)

Example 1.37

Person* represents a variable argument of type Person. A variable argument of type T needs to be the last argument in a class definition or method signature and accepts zero, one, or more instances of type T.

Now say we want to get pairs of mother and child, which would be (Jean, Bob) and (Jean, Laura). Using flatMap, filter, and map we can write it as follows:

scala> persons.filter(_.isFemale).flatMap(p => p.children.map(c => (p.firstName, c.firstName)))
res32: List[(String, String)] = List((Jean,Bob), (Jean,Laura))

Example 1.38

The preceding expression does its job, but it is not quite easy to understand what is happening. This is where for comprehension comes to the rescue:

scala> for {
     |   p <- persons
     |   if p.isFemale
     |   c <- p.children
     | } yield (p.firstName, c.firstName)
res33: List[(String, String)] = List((Jean,Bob), (Jean,Laura))

Example 1.39

It is much easier to understand what this snippet of code does. Behind the scenes, the Scala compiler will convert this expression into the first one (the only difference being filter will be replaced with withFilter).

Scala also provides methods to combine the elements of a collection using the fold and reduce families of functions. The primary difference between the two can be understood by comparing the signatures of foldLeft and reduceLeft:

def foldLeft[B](z: B)(op: (B, A) ⇒ B): B
def reduceLeft[A1 >: A](op: (A1, A1) ⇒ A1): A1

Example 1.40

Both of these methods take a binary operator to combine the elements from left to right. However, foldLeft takes a zero-argument, z, of type B (this value is returned if List is empty), and the output type can differ from the types of the elements in List. On the other hand, reduceLeft requires A1 to be a supertype of A (>: signifies a lower bound). So, we can sum up List[Int] and return the value as Double using foldLeft, as follows:

scala> List(1,2,3,4).foldLeft[Double](0) ( _ + _ )
res34: Double = 10.0

Example 1.41

We cannot do the same with reduceLeft (since Double is not a supertype of Int). Trying to do so will raise a compile-time error of type arguments [Double] do not conform to method reduce's type parameter bounds [A1 >: Int]:

scala> List(1,2,3,4).reduce[Double] ( _ + _ )
<console>:12: error: type arguments [Double] do not conform to method reduce's type parameter bounds [A1 >: Int]
       List(1,2,3,4).reduce[Double] ( _ + _ )
                           ^

Example 1.42

foldRight and reduceRight combine the elements of a collection from right to left. There is also fold and reduce, and for both, the order in which the elements are combined is unspecified and may be nondeterministic.

In this section, we have seen several examples of HOFs from the Scala collection library. By now, you should have noticed that each of these functions uses type parameters. These are called polymorphic functions, which is what we will cover next.

You have been reading a chapter from
Data Engineering with Scala and Spark
Published in: Jan 2024
Publisher: Packt
ISBN-13: 9781804612583
Register for a free Packt account to unlock a world of extra content!
A free Packt account unlocks extra newsletters, articles, discounted offers, and much more. Start advancing your knowledge today.
Unlock this book and the full library FREE for 7 days
Get unlimited access to 7000+ expert-authored eBooks and videos courses covering every tech area you can think of
Renews at $19.99/month. Cancel anytime
Banner background image