Data processing with Hadoop
In the remaining chapters of this book, we will introduce the core components of the Hadoop ecosystem as well as a number of third-party tools and libraries that will make writing robust, distributed code an accessible and hopefully enjoyable task. While reading this book, you will learn how to collect, process, store, and extract information from large amounts of structured and unstructured data.
We will use a dataset generated from Twitter's (http://www.twitter.com) real-time fire hose. This approach will allow us to experiment with relatively small datasets locally and, once ready, scale the examples up to production-level data sizes.
Why Twitter?
Thanks to its programmatic APIs, Twitter provides an easy way to generate datasets of arbitrary size and inject them into our local- or cloud-based Hadoop clusters. Other than the sheer size, the dataset that we will use has a number of properties that fit several interesting data modeling and processing use cases.
Twitter data possesses the following properties:
- Unstructured: each status update is a text message that can contain references to media content such as URLs and images
- Structured: tweets are timestamped, sequential records
- Graph: relationships such as replies and mentions can be modeled as a network of interactions
- Geolocated: the location where a tweet was posted or where a user resides
- Real time: all data generated on Twitter is available via a real-time fire hose
These properties will be reflected in the type of application that we can build with Hadoop. These include examples of sentiment analysis, social network, and trend analysis.
Building our first dataset
Twitter's terms of service prohibit redistribution of user-generated data in any form; for this reason, we cannot make available a common dataset. Instead, we will use a Python script to programmatically access the platform and create a dump of user tweets collected from a live stream.
One service, multiple APIs
Twitter users share more than 200 million tweets, also known as status updates, a day. The platform offers access to this corpus of data via four types of APIs, each of which represents a facet of Twitter and aims at satisfying specific use cases, such as linking and interacting with twitter content from third-party sources (Twitter for Products), programmatic access to specific users' or sites' content (REST), search capabilities across users' or sites' timelines (Search), and access to all content created on the Twitter network in real time (Streaming).
The Streaming API allows direct access to the Twitter stream, tracking keywords, retrieving geotagged tweets from a certain region, and much more. In this book, we will make use of this API as a data source to illustrate both the batch and real-time capabilities of Hadoop. We will not, however, interact with the API itself; rather, we will make use of third-party libraries to offload chores such as authentication and connection management.
Anatomy of a Tweet
Each tweet object returned by a call to the real-time APIs is represented as a serialized JSON string that contains a set of attributes and metadata in addition to a textual message. This additional content includes a numerical ID that uniquely identifies the tweet, the location where the tweet was shared, the user who shared it (user object), whether it was republished by other users (retweeted) and how many times (retweet count), the machine-detected language of its text, whether the tweet was posted in reply to someone and, if so, the user and tweet IDs it replied to, and so on.
The structure of a Tweet, and any other object exposed by the API, is constantly evolving. An up-to-date reference can be found at https://dev.twitter.com/docs/platform-objects/tweets.
Twitter credentials
Twitter makes use of the OAuth protocol to authenticate and authorize access from third-party software to its platform.
The application obtains through an external channel, for instance a web form, the following pair of credentials:
- Consumer key
- Consumer secret
The consumer secret is never directly transmitted to the third party as it is used to sign each request.
The user authorizes the application to access the service via a three-way process that, once completed, grants the application a token consisting of the following:
- Access token
- Access secret
Similarly, to the consumer, the access secret is never directly transmitted to the third party, and it is used to sign each request.
In order to use the Streaming API, we will first need to register an application and grant it programmatic access to the system. If you require a new Twitter account, proceed to the signup page at https://twitter.com/signup, and fill in the required information. Once this step is completed, we need to create a sample application that will access the API on our behalf and grant it the proper authorization rights. We will do so using the web form found at https://dev.twitter.com/apps.
When creating a new app, we are asked to give it a name, a description, and a URL. The following screenshot shows the settings of a sample application named Learning Hadoop 2 Book Dataset
. For the purpose of this book, we do not need to specify a valid URL, so we used a placeholder instead.
Once the form is filled in, we need to review and accept the terms of service and click on the Create Application button in the bottom-left corner of the page.
We are now presented with a page that summarizes our application details as seen in the following screenshot; the authentication and authorization credentials can be found under the OAuth Tool tab.
We are finally ready to generate our very first Twitter dataset.
Programmatic access with Python
In this section, we will use Python and the tweepy
library, found at https://github.com/tweepy/tweepy, to collect Twitter's data. The stream.py
file found in the ch1
directory of the book code archive instantiates a listener to the real-time fire hose, grabs a data sample, and echoes each tweet's text to standard output.
The tweepy
library can be installed using either the easy_install
or pip
package managers or by cloning the repository at https://github.com/tweepy/tweepy.
On the CDH QuickStart VM, we can install tweepy
using the following command line:
$ pip install tweepy
When invoked with the -j
parameter, the script will output a JSON tweet to standard output; -t
extracts and prints the text field. We specify how many tweets to print with–n <num tweets>
. When –n
is not specified, the script will run indefinitely. Execution can be terminated by pressing Ctrl + C.
The script expects OAuth credentials to be stored as shell environment variables; the following credentials will have to be set in the terminal session from where stream.py
will be executed.
$ export TWITTER_CONSUMER_KEY="your_consumer_key" $ export TWITTER_CONSUMER_SECRET="your_consumer_secret" $ export TWITTER_ACCESS_KEY="your_access_key" $ export TWITTER_ACCESS_SECRET="your_access_secret"
Once the required dependency has been installed and the OAuth data in the shell environment has been set, we can run the program as follows:
$ python stream.py –t –n 1000 > tweets.txt
We are relying on Linux's shell I/O to redirect the output with the >
operator of stream.py
to a file called tweets.txt
. If everything was executed correctly, you should see a wall of text, where each line is a tweet.
Notice that in this example, we did not make use of Hadoop at all. In the next chapters, we will show how to import a dataset generated from the Streaming API into Hadoop and analyze its content on the local cluster and Amazon EMR.
For now, let's take a look at the source code of stream.py
, which can be found at
https://github.com/learninghadoop2/book-examples/blob/master/ch1/stream.py:
import tweepy import os import json import argparse consumer_key = os.environ['TWITTER_CONSUMER_KEY'] consumer_secret = os.environ['TWITTER_CONSUMER_SECRET'] access_key = os.environ['TWITTER_ACCESS_KEY'] access_secret = os.environ['TWITTER_ACCESS_SECRET'] class EchoStreamListener(tweepy.StreamListener): def __init__(self, api, dump_json=False, numtweets=0): self.api = api self.dump_json = dump_json self.count = 0 self.limit = int(numtweets) super(tweepy.StreamListener, self).__init__() def on_data(self, tweet): tweet_data = json.loads(tweet) if 'text' in tweet_data: if self.dump_json: print tweet.rstrip() else: print tweet_data['text'].encode("utf-8").rstrip() self.count = self.count+1 return False if self.count == self.limit else True def on_error(self, status_code): return True def on_timeout(self): return True … if __name__ == '__main__': parser = get_parser() args = parser.parse_args() auth = tweepy.OAuthHandler(consumer_key, consumer_secret) auth.set_access_token(access_key, access_secret) api = tweepy.API(auth) sapi = tweepy.streaming.Stream( auth, EchoStreamListener( api=api, dump_json=args.json, numtweets=args.numtweets)) sapi.sample()
First, we import three dependencies: tweepy
, and the os
and json
modules, which come with the Python interpreter version 2.6 or greater.
We then define a class, EchoStreamListener
, that inherits and extends StreamListener
from tweepy
. As the name suggests, StreamListener
listens for events and tweets being published on the real-time stream and performs actions accordingly.
Whenever a new event is detected, it triggers a call to on_data()
. In this method, we extract the text
field from a tweet object and print it to standard output with UTF-8 encoding. Alternatively, if the script is invoked with -j
, we print the whole JSON tweet. When the script is executed, we instantiate a tweepy.OAuthHandler
object with the OAuth credentials that identify our Twitter account, and then we use this object to authenticate with the application access and secret key. We then use the auth
object to create an instance of the tweepy.API
class (api
)
Upon successful authentication, we tell Python to listen for events on the real-time stream using EchoStreamListener
.
An http GET request to the statuses/sample
endpoint is performed by sample()
. The request returns a random sample of all public statuses.
Note
Beware! By default, sample()
will run indefinitely. Remember to explicitly terminate the method call by pressing Ctrl + C.