Chapter 03: Working with Big Data Frameworks
Activity 8: Parsing Text
Read the text files into the Spark object using the text method:
rdd_df = spark.read.text("/localdata/myfile.txt").rdd
To parse the file that we are reading, we will use lambda functions and Spark operations such as map, flatMap, and reduceByKey. flatmap applies a function to all elements of an RDD, flattens the results, and returns the transformed RDD. reduceByKey merges the values based on the given key, combining the values. With these functions, we can count the number of lines and words in the text.
Extract the lines from the text using the following command:
lines = rdd_df.map(lambda line: line[0])
This splits each line in the file as an entry in the list. To check the result, you can use the collect method, which gathers all data back to the driver process:
lines.collect()
Now, let's count the number of lines, using the count method:
lines.count()
Note
Be careful when using the collect method! If the DataFrame or RDD being collected is larger than the memory of the local driver, Spark will throw an error.
Now, let's first split each line into words, breaking it by the space around it, and combining all elements, removing words in uppercase:
splits = lines.flatMap(lambda x: x.split(' ')) lower_splits = splits.map(lambda x: x.lower())
Let's also remove the stop words. We could use a more consistent stop words list from NLTK, but for now, we will row our own:
stop_words = ['of', 'a', 'and', 'to']
Use the following command to remove the stop words from our token list:
tokens = lower_splits.filter(lambda x: x and x not in stop_words)
We can now process our token list and count the unique words. The idea is to generate a list of tuples, where the first element is the token and the second element is the count of that particular token.
First, let's map our token to a list:
token_list = tokens.map(lambda x: [x, 1])
Use the reduceByKey operation, which will apply the operation to each of the lists:
count = token_list.reduceByKey(add).sortBy(lambda x: x[1], ascending=False) count.collect()
Remember, collect all data back to the driver node! Always check whether there is enough memory by using tools such as top and htop.