Search icon CANCEL
Subscription
0
Cart icon
Your Cart (0 item)
Close icon
You have no products in your basket yet
Save more on your purchases! discount-offer-chevron-icon
Savings automatically calculated. No voucher code required.
Arrow left icon
Explore Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Newsletter Hub
Free Learning
Arrow right icon
timer SALE ENDS IN
0 Days
:
00 Hours
:
00 Minutes
:
00 Seconds

Big Data

Save for later
  • 24 min read
  • 02 Sep 2015

article-image

 In this article by Henry Garner, author of the book Clojure for Data Science, we'll be working with a relatively modest dataset of only 100,000 records. This isn't big data (at 100 MB, it will fit comfortably in the memory of one machine), but it's large enough to demonstrate the common techniques of large-scale data processing. Using Hadoop (the popular framework for distributed computation) as its case study, this article will focus on how to scale algorithms to very large volumes of data through parallelism.

Before we get to Hadoop and distributed data processing though, we'll see how some of the same principles that enable Hadoop to be effective at a very large scale can also be applied to data processing on a single machine, by taking advantage of the parallel capacity available in all modern computers.

(For more resources related to this topic, see here.)

The reducers library

The count operation we implemented previously is a sequential algorithm. Each line is processed one at a time until the sequence is exhausted. But there is nothing about the operation that demands that it must be done in this way.

We could split the number of lines into two sequences (ideally of roughly equal length) and reduce over each sequence independently. When we're done, we would just add together the total number of lines from each sequence to get the total number of lines in the file:

big-data-img-0

If each Reduce ran on its own processing unit, then the two count operations would run in parallel. All the other things being equal, the algorithm would run twice as fast. This is one of the aims of the clojure.core.reducers library—to bring the benefit of parallelism to algorithms implemented on a single machine by taking advantage of multiple cores.

Parallel folds with reducers

The parallel implementation of reduce implemented by the reducers library is called fold. To make use of a fold, we have to supply a combiner function that will take the results of our reduced sequences (the partial row counts) and return the final result. Since our row counts are numbers, the combiner function is simply +.

Reducers are a part of Clojure's standard library, they do not need to be added as an external dependency.

The adjusted example, using clojure.core.reducers as r, looks like this:

(defn ex-5-5 []
  (->> (io/reader "data/soi.csv")
       (line-seq)
       (r/fold + (fn [i x]
                   (inc i)))))

The combiner function, +, has been included as the first argument to fold and our unchanged reduce function is supplied as the second argument. We no longer need to pass the initial value of zero—fold will get the initial value by calling the combiner function with no arguments. Our preceding example works because +, called with no arguments, already returns zero:

(defn ex-5-6 []
  (+))
;; 0

To participate in folding then, it's important that the combiner function have two implementations: one with zero arguments that returns the identity value and another with two arguments that combines the arguments. Different folds will, of course, require different combiner functions and identity values. For example, the identity value for multiplication is 1.

We can visualize the process of seeding the computation with an identity value, iteratively reducing over the sequence of xs and combining the reductions into an output value as a tree:

big-data-img-1

There may be more than two reductions to combine, of course. The default implementation of fold will split the input collection into chunks of 512 elements. Our 166,000-element sequence will therefore generate 325 reductions to be combined. We're going to run out of page real estate quite quickly with a tree representation diagram, so let's visualize the process more schematically instead—as a two-step reduce and combine process.

The first step performs a parallel reduce across all the chunks in the collection. The second step performs a serial reduce over the intermediate results to arrive at the final result:

big-data-img-2

The preceding representation shows reduce over several sequences of xs, represented here as circles, into a series of outputs, represented here as squares. The squares are combined serially to produce the final result, represented by a star.

Loading large files with iota

Calling fold on a lazy sequence requires Clojure to realize the sequence into memory and then chunk the sequence into groups for parallel execution. For situations where the calculation performed on each row is small, the overhead involved in coordination outweighs the benefit of parallelism. We can improve the situation slightly by using a library called iota (https://github.com/thebusby/iota).

The iota library loads files directly into the data structures suitable for folding over with reducers that can handle files larger than available memory by making use of memory-mapped files.

With iota in the place of our line-seq function, our line count simply becomes:

(defn ex-5-7 []
  (->> (iota/seq "data/soi.csv")
       (r/fold + (fn [i x]
                   (inc i)))))

So far, we've just been working with the sequences of unformatted lines, but if we're going to do anything more than counting the rows, we'll want to parse them into a more useful data structure. This is another area in which Clojure's reducers can help make our code more efficient.

Creating a reducers processing pipeline

We already know that the file is comma-separated, so let's first create a function to turn each row into a vector of fields. All fields except the first two contain numeric data, so let's parse them into doubles while we're at it:

(defn parse-double [x]
  (Double/parseDouble x))

(defn parse-line [line]
  (let [[text-fields double-fields] (->> (str/split line #",")
                                         (split-at 2))]
    (concat text-fields
            (map parse-double double-fields))))

We're using the reducers version of map to apply our parse-line function to each of the lines from the file in turn:

(defn ex-5-8 []
   (->> (iota/seq "data/soi.csv")
        (r/drop 1)
        (r/map parse-line)
        (r/take 1)
        (into [])))

;; [("01" "AL" 0.0 1.0 889920.0 490850.0 ...)]

The final into function call converts the reducers' internal representation (a reducible collection) into a Clojure vector. The previous example should return a sequence of 77 fields, representing the first row of the file after the header.

We're just dropping the column names at the moment, but it would be great if we could make use of these to return a map representation of each record, associating the column name with the field value. The keys of the map would be the column headings and the values would be the parsed fields. The clojure.core function zipmap will create a map out of two sequences—one for the keys and one for the values:

(defn parse-columns [line]
  (->> (str/split line #",")
       (map keyword)))

(defn ex-5-9 []
  (let [data (iota/seq "data/soi.csv")
        column-names (parse-columns (first data))]
    (->> (r/drop 1 data)
         (r/map parse-line)
         (r/map (fn [fields]
                  (zipmap column-names fields)))
         (r/take 1)
         (into []))))

This function returns a map representation of each row, a much more user-friendly data structure:

[{:N2 1505430.0, :A19300 181519.0, :MARS4 256900.0 ...}]

A great thing about Clojure's reducers is that in the preceding computation, calls to r/map, r/drop and r/take are composed into a reduction that will be performed in a single pass over the data. This becomes particularly valuable as the number of operations increases.

Let's assume that we'd like to filter out zero ZIP codes. We could extend the reducers pipeline like this:

(defn ex-5-10 []
  (let [data (iota/seq "data/soi.csv")
        column-names (parse-columns (first data))]
    (->> (r/drop 1 data)
         (r/map parse-line)
         (r/map (fn [fields]
                  (zipmap column-names fields)))
         (r/remove (fn [record]
                     (zero? (:zipcode record))))
         (r/take 1)
         (into []))))

The r/remove step is now also being run together with the r/map, r/drop and r/take calls. As the size of the data increases, it becomes increasingly important to avoid making multiple iterations over the data unnecessarily. Using Clojure's reducers ensures that our calculations are compiled into a single pass.

Curried reductions with reducers

To make the process clearer, we can create a curried version of each of our previous steps. To parse the lines, create a record from the fields and filter zero ZIP codes. The curried version of the function is a reduction waiting for a collection:

(def line-formatter
  (r/map parse-line))

(defn record-formatter [column-names]
  (r/map (fn [fields]
           (zipmap column-names fields))))

(def remove-zero-zip
  (r/remove (fn [record]
              (zero? (:zipcode record)))))

In each case, we're calling one of reducers' functions, but without providing a collection. The response is a curried version of the function that can be applied to the collection at a later time. The curried functions can be composed together into a single parse-file function using comp:

(defn load-data [file]
  (let [data (iota/seq file)
        col-names  (parse-columns (first data))
        parse-file (comp remove-zero-zip
                         (record-formatter col-names)
                         line-formatter)]
    (parse-file (rest data))))

It's only when the parse-file function is called with a sequence that the pipeline is actually executed.

Statistical folds with reducers

With the data parsed, it's time to perform some descriptive statistics. Let's assume that we'd like to know the mean number of returns (column N1) submitted to the IRS by ZIP code. One way of doing this—the way we've done several times throughout the book—is by adding up the values and dividing it by the count. Our first attempt might look like this:

(defn ex-5-11 []
  (let [data (load-data "data/soi.csv")
        xs (into [] (r/map :N1 data))]
    (/ (reduce + xs)
       (count xs))))

;; 853.37

While this works, it's comparatively slow. We iterate over the data once to create xs, a second time to calculate the sum, and a third time to calculate the count. The bigger our dataset gets, the larger the time penalty we'll pay. Ideally, we would be able to calculate the mean value in a single pass over the data, just like our parse-file function previously. It would be even better if we can perform it in parallel too.

Associativity

Before we proceed, it's useful to take a moment to reflect on why the following code wouldn't do what we want:

(defn mean
  ([] 0)
  ([x y] (/ (+ x y) 2)))

Our mean function is a function of two arities. Without arguments, it returns zero, the identity for the mean computation. With two arguments, it returns their mean:

(defn ex-5-12 []
  (->> (load-data "data/soi.csv")
       (r/map :N1)
       (r/fold mean)))

;; 930.54

The preceding example folds over the N1 data with our mean function and produces a different result from the one we obtained previously. If we could expand out the computation for the first three xs, we might see something like the following code:

(mean (mean (mean 0 a) b) c)

This is a bad idea, because the mean function is not associative. For an associative function, the following holds true:

big-data-img-3

Addition is associative, but multiplication and division are not. So the mean function is not associative either. Contrast the mean function with the following simple addition:

(+ 1 (+ 2 3))

This yields an identical result to:

(+ (+ 1 2) 3)

It doesn't matter how the arguments to + are partitioned. Associativity is an important property of functions used to reduce over a set of data because, by definition, the results of a previous calculation are treated as inputs to the next.

The easiest way of converting the mean function into an associative function is to calculate the sum and the count separately. Since the sum and the count are associative, they can be calculated in parallel over the data. The mean function can be calculated simply by dividing one by the other.

Multiple regression with gradient descent

The normal equation uses matrix algebra to very quickly and efficiently arrive at the least squares estimates. Where all data fits in memory, this is a very convenient and concise equation. Where the data exceeds the memory available to a single machine however, the calculation becomes unwieldy. The reason for this is matrix inversion. The calculation of  is not something that can be accomplished on a fold over the data—each cell in the output matrix depends on many others in the input matrix. These complex relationships require that the matrix be processed in a nonsequential way.

An alternative approach to solve linear regression problems, and many other related machine learning problems, is a technique called gradient descent. Gradient descent reframes the problem as the solution to an iterative algorithm—one that does not calculate the answer in one very computationally intensive step, but rather converges towards the correct answer over a series of much smaller steps.

The gradient descent update rule

Gradient descent works by the iterative application of a function that moves the parameters in the direction of their optimum values. To apply this function, we need to know the gradient of the cost function with the current parameters.

Calculating the formula for the gradient involves calculus that's beyond the scope of this book. Fortunately, the resulting formula isn't terribly difficult to interpret:

big-data-img-4

 is the partial derivative, or the gradient, of our cost function J(β) for the parameter at index j. Therefore, we can see that the gradient of the cost function with respect to the parameter at index j is equal to the difference between our prediction and the true value of y multiplied by the value of x at index j.

Since we're seeking to descend the gradient, we want to subtract some proportion of the gradient from the current parameter values. Thus, at each step of gradient descent, we perform the following update:

big-data-img-5

Unlock access to the largest independent learning library in Tech for FREE!
Get unlimited access to 7500+ expert-authored eBooks and video courses covering every tech area you can think of.
Renews at $19.99/month. Cancel anytime

Here, := is the assigment operator and α is a factor called the learning rate. The learning rate controls how large an adjustment we wish make to the parameters at each iteration as a fraction of the gradient. If our prediction ŷ nearly matches the actual value of y, then there would be little need to change the parameters. In contrast, a larger error will result in a larger adjustment to the parameters. This rule is called the Widrow-Hoff learning rule or the Delta rule.

The gradient descent learning rate

As we've seen, gradient descent is an iterative algorithm. The learning rate, usually represented by α, dictates the speed at which the gradient descent converges to the final answer. If the learning rate is too small, convergence will happen very slowly. If it is too large, gradient descent will not find values close to the optimum and may even diverge from the correct answer:

big-data-img-6

In the preceding chart, a small learning rate leads to a show convergence over many iterations of the algorithm. While the algorithm does reach the minimum, it does so over many more steps than is ideal and, therefore, may take considerable time. By contrast, in following diagram, we can see the effect of a learning rate that is too large. The parameter estimates are changed so significantly between iterations that they actually overshoot the optimum values and diverge from the minimum value:

big-data-img-7

The gradient descent algorithm requires us to iterate repeatedly over our dataset. With the correct version of alpha, each iteration should successively yield better approximations of the ideal parameters. We can choose to terminate the algorithm when either the change between iterations is very small or after a predetermined number of iterations.

Feature scaling

As more features are added to the linear model, it is important to scale features appropriately. Gradient descent will not perform very well if the features have radically different scales, since it won't be possible to pick a learning rate to suit them all.

A simple scaling we can perform is to subtract the mean value from each of the values and divide it by the standard-deviation. This will tend to produce values with zero mean that generally vary between -3 and 3:

(

defn feature-scales [features]
  (->> (prepare-data)
       (t/map #(select-keys % features))
       (t/facet)
       (t/fuse {:mean (m/mean)
                :sd   (m/standard-deviation)})))

The feature-factors function in the preceding code uses t/facet to calculate the mean value and standard deviation of all the input features:

(defn ex-5-24 []
  (let [data (iota/seq "data/soi.csv")
        features [:A02300 :A00200 :AGI_STUB :NUMDEP :MARS2]]
    (->> (feature-scales features)
         (t/tesser (chunks data)))))

;; {:MARS2 {:sd 533.4496892658647, :mean 317.0412009748016}...}

If you run the preceding example, you'll see the different means and standard deviations returned by the feature-scales function. Since our feature scales and input records are represented as maps, we can perform the scale across all the features at once using Clojure's merge-with function:

(defn scale-features [factors]
  (let [f (fn [x {:keys [mean sd]}]
            (/ (- x mean) sd))]
    (fn [x]
      (merge-with f x factors))))

Likewise, we can perform the all-important reversal with unscale-features:

(defn unscale-features [factors]
  (let [f (fn [x {:keys [mean sd]}]
            (+ (* x sd) mean))]
    (fn [x]
      (merge-with f x factors))))

Let's scale our features and take a look at the very first feature. Tesser won't allow us to execute a fold without a reduce, so we'll temporarily revert to using Clojure's reducers:

(defn ex-5-25 []
  (let [data     (iota/seq "data/soi.csv")
        features [:A02300 :A00200 :AGI_STUB :NUMDEP :MARS2]
        factors (->> (feature-scales features)
                     (t/tesser (chunks data)))]
    (->> (load-data "data/soi.csv")
         (r/map #(select-keys % features ))
         (r/map (scale-features factors))
         (into [])
         (first))))

;; {:MARS2 -0.14837567114357617, :NUMDEP 0.30617757526890155,
;;  :AGI_STUB -0.714280814223704, :A00200 -0.5894942801950217,
;;  :A02300 0.031741856083514465}

This simple step will help gradient descent perform optimally on our data.

Feature extraction

Although we've used maps to represent our input data in this article, it's going to be more convenient when running gradient descent to represent our features as a matrix. Let's write a function to transform our input data into a map of xs and y. The y axis will be a scalar response value and xs will be a matrix of scaled feature values.

We're adding a bias term to the returned matrix of features:

(defn feature-matrix [record features]
  (let [xs (map #(% record) features)]
    (i/matrix (cons 1 xs))))

(defn extract-features [fy features]
  (fn [record]
    {:y  (fy record)
     :xs (feature-matrix record features)}))

Our feature-matrix function simply accepts an input of a record and the features to convert into a matrix. We call this from within extract-features, which returns a function that we can call on each input record:

(defn ex-5-26 []
  (let [data     (iota/seq "data/soi.csv")
        features [:A02300 :A00200 :AGI_STUB :NUMDEP :MARS2]
        factors (->> (feature-scales features)
                     (t/tesser (chunks data)))]
    (->> (load-data "data/soi.csv")
         (r/map (scale-features factors))
         (r/map (extract-features :A02300 features))
         (into [])
         (first))))

;; {:y 433.0, :xs  A 5x1 matrix
;;  -------------
;;  1.00e+00
;; -5.89e-01
;; -7.14e-01
;;  3.06e-01
;; -1.48e-01
;; }

The preceding example shows the data converted into a format suitable to perform gradient descent: a map containing the y response variable and a matrix of values, including the bias term.

Applying a single step of gradient descent

The objective of calculating the cost is to determine the amount by which to adjust each of the coefficients. Once we've calculated the average cost, as we did previously, we need to update the estimate of our coefficients β. Together, these steps represent a single iteration of gradient descent:

big-data-img-8

We can return the updated coefficients in a post-combiner step that makes use of the average cost, the value of alpha, and the previous coefficients. Let's create a utility function update-coefficients, which will receive the coefficients and alpha and return a function that will calculate the new coefficients, given a total model cost:

(defn update-coefficients [coefs alpha]
  (fn [cost]
    (->> (i/mult cost alpha)
         (i/minus coefs))))

With the preceding function in place, we have everything we need to package up a batch gradient descent update rule:

(defn gradient-descent-fold [{:keys [fy features factors
                                     coefs alpha]}]
  (let [zeros-matrix (i/matrix 0 (count features) 1)]
    (->> (prepare-data)
         (t/map (scale-features factors))
         (t/map (extract-features fy features))
         (t/map (calculate-error (i/trans coefs)))
         (t/fold (matrix-mean (inc (count features)) 1))
         (t/post-combine (update-coefficients coefs alpha)))))

(defn ex-5-31 []
  (let [features [:A00200 :AGI_STUB :NUMDEP :MARS2]
        fcount   (inc (count features))
        coefs    (vec (replicate fcount 0))
        data     (chunks (iota/seq "data/soi.csv"))
        factors  (->> (feature-scales features)
                      (t/tesser data))
        options {:fy :A02300 :features features
                 :factors factors :coefs coefs :alpha 0.1}]
    (->> (gradient-descent-fold options)
         (t/tesser data))))

;; A 6x1 matrix
;; -------------
;; -4.20e+02
;; -1.38e+06
;; -5.06e+07
;; -9.53e+02
;; -1.42e+06
;; -4.86e+05

The resulting matrix represents the values of the coefficients after the first iteration of gradient descent.

Running iterative gradient descent

Gradient descent is an iterative algorithm, and we will usually need to run it many times to convergence. With a large dataset, this can be very time-consuming.

To save time, we've included a random sample of soi.csv in the data directory called soi-sample.csv. The smaller size allows us to run iterative gradient descent in a reasonable timescale. The following code runs gradient descent for 100 iterations, plotting the values of the parameters between each iteration on an xy-plot:

(defn descend [options data]
  (fn [coefs]
    (->> (gradient-descent-fold (assoc options :coefs coefs))
         (t/tesser data))))

(defn ex-5-32 []
  (let [features [:A00200 :AGI_STUB :NUMDEP :MARS2]
        fcount   (inc (count features))
        coefs    (vec (replicate fcount 0))
        data     (chunks (iota/seq "data/soi-sample.csv"))
        factors  (->> (feature-scales features)
                      (t/tesser data))
        options  {:fy :A02300 :features features
                  :factors factors :coefs coefs :alpha 0.1}
        iterations 100
        xs (range iterations)
        ys (->> (iterate (descend options data) coefs)
                (take iterations))]
    (-> (c/xy-plot xs (map first ys)
                   :x-label "Iterations"
                   :y-label "Coefficient")
        (c/add-lines xs (map second ys))
        (c/add-lines xs (map #(nth % 2) ys))
        (c/add-lines xs (map #(nth % 3) ys))
        (c/add-lines xs (map #(nth % 4) ys))
        (i/view))))

If you run the example, you should see a chart similar to the following:

big-data-img-9

In the preceding chart, you can see how the parameters converge to relatively stable the values over the course of 100 iterations.

Scaling gradient descent with Hadoop

The length of time each iteration of batch gradient descent takes to run is determined by the size of your data and by how many processors your computer has. Although several chunks of data are processed in parallel, the dataset is large and the processors are finite. We've achieved a speed gain by performing calculations in parallel, but if we double the size of the dataset, the runtime will double as well.

Hadoop is one of several systems that has emerged in the last decade which aims to parallelize work that exceeds the capabilities of a single machine. Rather than running code across multiple processors, Hadoop takes care of running a calculation across many servers. In fact, Hadoop clusters can, and some do, consist of many thousands of servers.

Hadoop consists of two primary subsystems— the Hadoop Distributed File System (HDFS)—and the job processing system, MapReduce. HDFS stores files in chunks. A given file may be composed of many chunks and chunks are often replicated across many servers. In this way, Hadoop can store quantities of data much too large for any single server and, through replication, ensure that the data is stored reliably in the event of hardware failure too. As the name implies, the MapReduce programming model is built around the concept of map and reduce steps. Each job is composed of at least one map step and may optionally specify a reduce step. An entire job may consist of several map and reduce steps chained together.

big-data-img-10

In the respect that reduce steps are optional, Hadoop has a slightly more flexible approach to distributed calculation than Tesser.

Gradient descent on Hadoop with Tesser and Parkour

Tesser's Hadoop capabilities are available in the tesser.hadoop namespace, which we're including as h. The primary public API function in the Hadoop namespace is h/fold.

The fold function expects to receive at least four arguments, representing the configuration of the Hadoop job, the input file we want to process, a working directory for Hadoop to store its intermediate files, and the fold we want to run, referenced as a Clojure var. Any additional arguments supplied will be passed as arguments to the fold when it is executed.

The reason for using a var to represent our fold is that the function call initiating the fold may happen on a completely different computer than the one that actually executes it. In a distributed setting, the var and arguments must entirely specify the behavior of the function. We can't, in general, rely on other mutable local state (for example, the value of an atom, or the value of variables closing over the function) to provide any additional context.

Parkour distributed sources and sinks

The data which we want our Hadoop job to process may exist on multiple machines too, stored distributed in chunks on HDFS. Tesser makes use of a library called Parkour (https://github.com/damballa/parkour/) to handle accessing potentially distributed data sources.

Although Hadoop is designed to be run and distributed across many servers, it can also run in local mode. Local mode is suitable for testing and enables us to interact with the local filesystem as if it were HDFS. Another namespace we'll be using from Parkour is the parkour.conf namespace. This will allow us to create a default Hadoop configuration and operate it in local mode:

(defn ex-5-33 []
  (->> (text/dseq "data/soi.csv")
       (r/take 2)
       (into [])))

In the preceding example, we use Parkour's text/dseq function to create a representation of the IRS input data. The return value implements Clojure's reducers protocol, so we can use r/take on the result.

Running a feature scale fold with Hadoop

Hadoop needs a location to write its temporary files while working on a task, and will complain if we try to overwrite an existing directory. Since we'll be executing several jobs over the course of the next few examples, let's create a little utility function that returns a new file with a randomly-generated name.

(defn rand-file [path]
  (io/file path (str (long (rand 0x100000000)))))

(defn ex-5-34 []
  (let [conf     (conf/ig)
        input    (text/dseq "data/soi.csv")
        workdir  (rand-file "tmp")
        features [:A00200 :AGI_STUB :NUMDEP :MARS2]]
    (h/fold conf input workdir #'feature-scales features)))

Parkour provides a default Hadoop configuration object with the shorthand (conf/ig). This will return an empty configuration. The default value is enough, we don't need to supply any custom configuration.

All of our Hadoop jobs will write their temporary files to a random directory inside the project's tmp directory. Remember to delete this folder later, if you're concerned about preserving disk space.

If you run the preceding example now, you should get an output similar to the following:

;; {:MARS2 317.0412009748016, :NUMDEP 581.8504423822615,
;; :AGI_STUB 3.499939975269811, :A00200 37290.58880658831}

Although the return value is identical to the values we got previously, we're now making use of Hadoop behind the scenes to process our data. In spite of this, notice that Tesser will return the response from our fold as a single Clojure data structure.

Running gradient descent with Hadoop

Since tesser.hadoop folds return Clojure data structures just like tesser.core folds, defining a gradient descent function that makes use of our scaled features is very simple:

(defn hadoop-gradient-descent [conf input-file workdir]
  (let [features [:A00200 :AGI_STUB :NUMDEP :MARS2]
        fcount  (inc (count features))
        coefs   (vec (replicate fcount 0))
        input   (text/dseq input-file)
        options {:column-names column-names
                 :features features
                 :coefs coefs
                 :fy :A02300
                 :alpha 1e-3}
        factors (h/fold conf input (rand-file workdir)
                        #'feature-scales
                        features)
        descend (fn [coefs]
                  (h/fold conf input (rand-file workdir)
                          #'gradient-descent-fold
                          (merge options {:coefs coefs
                                          :factors factors})))]
    (take 5 (iterate descend coefs))))

The preceding code defines a hadoop-gradient-descent function that iterates a descend function 5 times. Each iteration of descend calculates the improved coefficients based on the gradient-descent-fold function. The final return value is a vector of coefficients after 5 iterations of a gradient descent.

We run the job on the full IRS data in the following example:

(

defn ex-5-35 []
  (let [workdir  "tmp"
        out-file (rand-file workdir)]
    (hadoop-gradient-descent (conf/ig) "data/soi.csv" workdir)))

After several iterations, you should see an output similar to the following:

;; ([0 0 0 0 0]
;; (20.9839310796048 46.87214911003046 -7.363493937722712
;;  101.46736841329326 55.67860863427868)
;; (40.918665605227744 56.55169901254631 -13.771345753228694
;;  162.1908841131747 81.23969785586247)
;; (59.85666340457121 50.559130068258995 -19.463888245285332
;;  202.32407094149158 92.77424653758085)
;; (77.8477613139478 38.67088624825574 -24.585818946408523
;;  231.42399118694212 97.75201693843269))

We've seen how we're able to calculate gradient descent using distributed techniques locally. Now, let's see how we can run this on a cluster of our own.

Summary

In this article, we learned some of the fundamental techniques of distributed data processing and saw how the functions used locally for data processing, map and reduce, are powerful ways of processing even very large quantities of data. We learned how Hadoop can scale unbounded by the capabilities of any single server by running functions on smaller subsets of the data whose outputs are themselves combined to finally produce a result. Once you understand the tradeoffs, this "divide and conquer" approach toward processing data is a simple and very general way of analyzing data on a large scale.

We saw both the power and limitations of simple folds to process data using both Clojure's reducers and Tesser. We've also begun exploring how Parkour exposes more of Hadoop's underlying capabilities.

Resources for Article:


Further resources on this subject: