In this article by Andrew Morgan, Antoine Amend, Matthew Hallett, David George, the author of the book Mastering Spark for Data Science, readers will learn how to construct a content registerand use it to track all input loaded to the system, and to deliver metrics on ingestion pipelines, so that these flows can be reliably run as an automated, lights-out process.
Readers will learn how to construct a content registerand use it to track all input loaded to the system, and to deliver metrics on ingestion pipelines, so that these flows can be reliably run as an automated, lights-out process.
In this article we will cover the following topics:
(For more resources related to this topic, see here.)
Even with the most basic of analytics, we always require some data. In fact, finding the right data is probably among the hardest problems to solve in data science (but that’s a whole topic for another book!). We have already seen that the way in which we obtain our data can be as simple or complicated as is needed. In practice, we can break this decision into two distinct areas: Ad-hoc and scheduled.
It’s clear we need a flexible approach to data acquisition that supports a variety of procurement options.
There are many ways to approach data acquisition ranging from home grown bash scripts through to high-end commercial tools. The aim of this section is to introduce a highly flexible framework that we can use for small scale data ingest, and then grow as our requirements change - all the way through to a full corporately managed workflow if needed - that framework will be build using Apache NiFi. NiFi enables us to build large-scale integrated data pipelines that move data around the planet. In addition, it’s also incredibly flexible and easy to build simple pipelines - usually quicker even than using Bash or any other traditional scripting method.
If an ad-hoc approach is taken to source the same dataset on a number of occasions, then some serious thought should be given as to whether it falls into the scheduled category, or at least whether a more robust storage and versioning setup should be introduced.
We have chosen to use Apache NiFi as it offers a solution that provides the ability to create many, varied complexity pipelines that can be scaled to truly Big Data and IoT levels, and it also provides a great drag & drop interface (using what’s known as flow-based programming[1]). With patterns, templates and modules for workflow production, it automatically takes care of many of the complex features that traditionally plague developers such as multi-threading, connection management and scalable processing. For our purposes it will enable us to quickly build simple pipelines for prototyping, and scale these to full production where required.
It’s pretty well documented and easy to get running https://nifi.apache.org/download.html, it runs in a browser and looks like this:
https://en.wikipedia.org/wiki/Flow-based_programming
We leave the installation of NiFi as an exercise for the reader - which we would encourage you to do - as we will be using it in the following section.
Hopefully, we have NiFi up and running now and can start to ingest some data. So let’s start with some global news media data from GDELT. Here’s our brief, taken from the GDELT website http://blog.gdeltproject.org/gdelt-2-0-our-global-world-in-realtime/:
“Within 15 minutes of GDELT monitoring a news report breaking anywhere the world, it has translated it, processed it to identify all events, counts, quotes, people, organizations, locations, themes, emotions, relevant imagery, video, and embedded social media posts, placed it into global context, and made all of this available via a live open metadata firehose enabling open research on the planet itself.
[As] the single largest deployment in the world of sentiment analysis, we hope that by bringing together so many emotional and thematic dimensions crossing so many languages and disciplines, and applying all of it in realtime to breaking news from across the planet, that this will spur an entirely new era in how we think about emotion and the ways in which it can help us better understand how we contextualize, interpret, respond to, and understand global events.”
In order to start consuming this open data, we’ll need to hook into that metadata firehose and ingest the news streams onto our platform. How do we do this? Let’s start by finding out what data is available.
GDELT publish a list of the latest files on their website - this list is updated every 15 minutes. In NiFi, we can setup a dataflow that will poll the GDELT website, source a file from this list and save it to HDFS so we can use it later.
Inside the NiFi dataflow designer, create a HTTP connector by dragging a processor onto the canvas and selecting GetHTTP.
To configure this processor, you’ll need to enter the URL of the file list as:
http://data.gdeltproject.org/gdeltv2/lastupdate.txt
And also provide a temporary filename for the file list you will download. In the example below, we’ve used the NiFi’s expression language to generate a universally unique key so that files are not overwritten (UUID()).
It’s worth noting that with this type of processor (GetHTTP), NiFi supports a number of scheduling and timing options for the polling and retrieval. For now, we’re just going to use the default options and let NiFi manage the polling intervals for us.
An example of latest file list from GDELT is shown below.
Next, we will parse the URL of the GKG news stream so that we can fetch it in a moment. Create a Regular Expression parser by dragging a processor onto the canvas and selecting ExtractText. Now position the new processor underneath the existing one and drag a line from the top processor to the bottom one. Finish by selecting the success relationship in the connection dialog that pops up.
This is shown in the example below.
Next, let’s configure the ExtractText processor to use a regular expression that matches only the relevant text of the file list, for example:
([^ ]*gkg.csv.*)
From this regular expression, NiFi will create a new property (in this case, called url) associated with the flow design, which will take on a new value as each particular instance goes through the flow. It can even be configured to support multiple threads.
Again, this is example is shown below.
It’s worth noting here that while this is a fairly specific example, the technique is deliberately general purpose and can be used in many situations.
Now that we have the URL of the GKG feed, we fetch it by configuring an InvokeHTTP processor to use the url property we previously created as it’s remote endpoint, and dragging the line as before.
All that remains is to decompress the zipped content with a UnpackContent processor (using the basic zip format) and save to HDFS using a PutHDFS processor, like so:
So far, this flow looks very “point-to-point”, meaning that if we were to introduce a new consumer of data, for example, a Spark-streaming job, the flow must be changed. For example, the flow design might have to change to look like this:
If we add yet another, the flow must change again. In fact, each time we add a new consumer, the flow gets a little more complicated, particularly when all the error handling is added. This is clearly not always desirable, as introducing or removing consumers (or producers) of data, might be something we want to do often, even frequently. Plus, it’s also a good idea to try to keep your flows as simple and reusable as possible.
Therefore, for a more flexible pattern, instead of writing directly to HDFS, we can publish to Apache Kafka. This gives us the ability to add and remove consumers at any time without changing the data ingestion pipeline. We can also still write to HDFS from Kafka if needed, possibly even by designing a separate NiFi flow, or connect directly to Kafka using Spark-streaming.
To do this, we create a Kafka writer by dragging a processor onto the canvas and selecting PutKafka.
We now have a simple flow that continuously polls for an available file list, routinely retrieving the latest copy of a new stream over the web as it becomes available, decompressing the content and streaming it record-by-record into Kafka, a durable, fault-tolerant, distributed message queue, for processing by spark-streaming or storage in HDFS. And what’s more, without writing a single line of bash!
We have seen in this article that data ingestion is an area that is often overlooked, and that its importance cannot be underestimated. At this point we have a pipeline that enables us to ingest data from a source, schedule that ingest and direct the data to our repository of choice. But the story does not end there. Now we have the data, we need to fulfil our data management responsibilities. Enter the content registry.
We’re going to build an index of metadata related to that data we have ingested. The data itself will still be directed to storage (HDFS, in our example) but, in addition, we will store metadata about the data, so that we can track what we’ve received and understand basic information about it, such as, when we received it, where it came from, how big it is, what type it is, etc.
The choice of which technology we use to store this metadata is, as we have seen, one based upon knowledge and experience. For metadata indexing, we will require at least the following attributes:
There are many ways to meet these requirements, for example we could write the metadata to Parquet, store in HDFS and search using Spark SQL. However, here we will use Elasticsearch as it meets the requirements a little better, most notably because it facilitates low latency queries of our metadata over a REST API - very useful for creating dashboards. In fact, Elasticsearch has the advantage of integrating directly with Kibana, meaning it can quickly produce rich visualizations of our content registry. For this reason, we will proceed with Elasticsearch in mind.
Using our current NiFi pipeline flow, let’s fork the output from “Fetch GKG files from URL” to add an additional set of steps to allow us to capture and store this metadata in Elasticsearch. These are:
Here’s what this looks like in NiFi:
So, the first step here is to define our metadata model. And there are many areas we could consider, but let’s select a set that helps tackle a few key points from earlier discussions. This will provide a good basis upon which further data can be added in the future, if required. So, let’s keep it simple and use the following three attributes:
These will provide basic registration of received files.
Next, inside the NiFi flow, we’ll need to replace the actual data content with this new metadata model. An easy way to do this, is to create a JSON template file from our model. We’ll save it to local disk and use it inside a FetchFile processor to replace the flow’s content with this skeleton object. This template will look something like:
{
"FileSize": SIZE,
"FileName": "FILENAME",
"IngestedDate": "DATE"
}
Note the use of placeholder names (SIZE, FILENAME, DATE) in place of the attribute values. These will be substituted, one-by-one, by a sequence of ReplaceText processors, that swap the placeholder names for an appropriate flow attribute using regular expressions provided by the NiFi Expression Language, for example DATE becomes ${now()}.
The last step is to output the new metadata payload to Elasticsearch. Once again, NiFi comes ready with a processor for this; the PutElasticsearch processor.
An example metadata entry in Elasticsearch:
{
"_index": "gkg",
"_type": "files",
"_id": "AVZHCvGIV6x-JwdgvCzW",
"_score": 1,
"source": {
"FileSize": 11279827,
"FileName": "20150218233000.gkg.csv.zip",
"IngestedDate": "2016-08-01T17:43:00+01:00"
}
}
Now that we have added the ability to collect and interrogate metadata, we now have access to more statistics that can be used for analysis. This includes:
If there is a particular analytic that is required, the NIFI metadata component can be adjusted to provide the relevant data points. Indeed, an analytic could be built to look at historical data and update the index accordingly if the metadata does not exist in current data.
We have mentioned Kibana a number of times in this article, now that we have an index of metadata in Elasticsearch, we can use the tool to visualize some analytics. The purpose of this brief section is to demonstrate that we can immediately start to model and visualize our data. In this simple example we have completed the following steps:
Added the Elasticsearch index for our GDELT metadata to the “Settings” tab
The resultant graph displays the file size distribution:
From here we are free to create new visualizations or even a fully featured dashboard that can be used to monitor the status of our file ingest. By increasing the variety of metadata written to Elasticsearch from NiFi, we can make more fields available in Kibana and even start our data science journey right here with some ingest based actionable insights.
Now that we have a fully-functioning data pipeline delivering us real-time feeds of data, how do we ensure data quality of the payload we are receiving? Let’s take a look at the options.
With an initial data ingestion capability implemented, and data streaming onto your platform, you will need to decide how much quality assurance is required at the front door. It’s perfectly viable to start with no initial quality controls and build them up over time (retrospectively scanning historical data as time and resources allow). However, it may be prudent to install a basic level of verification to begin with. For example, basic checks such as file integrity, parity checking, completeness, checksums, type checking, field counting, overdue files, security field pre-population, denormalization, etc.
You should take care that your up-front checks do not take too long. Depending on the intensity of your examinations and the size of your data, it’s not uncommon to encounter a situation where there is not enough time to perform all processing before the next dataset arrives. You will always need to monitor your cluster resources and calculate the most efficient use of time.
Here are some examples of the type of rough capacity planning calculation you can perform:
There are 10 minutes of resources available for other tasks.
As there are no other users on the cluster, this is satisfactory - no action needs to be taken.
There is only 1 minute of resource available for other tasks.
We probably need to consider, either:
There are 6 minutes of resources available for other tasks (15 - 1 - (4 * (100 / 50))). Since there are other users there is a danger that, at least some of the time, we will not be able to complete our processing and a backlog of jobs will occur.
When you run into timing issues, you have a number of options available to you in order to circumvent any backlog:
In any case, you will eventually get a good idea of how the various parts to your jobs perform and will then be in a position to calculate what changes could be made to improve efficiency. There’s always the option of throwing more resources at the problem, especially when using a cloud provider, but we would certainly encourage the intelligent use of existing resources - this is far more scalable, cheaper and builds data expertise.
In this article we walked through the full setup of an Apache NiFi GDELT ingest pipeline, complete with metadata forks and a brief introduction to visualizing the resultant data. This section is particularly important as GDELT is used extensively throughout the book and the NiFi method is a highly effective way to source data in a scalable and modular way.
Further resources on this subject: