Chapter 5: Real-Time Data Insights Using Amazon Kinesis
Solution for Activity 6: Performing Data Transformations for Incoming Data
Start by creating a Kinesis Firehose Data Stream and follow the steps that we completed in the last exercise.
We disabled data transformation using Lambda in the last exercise. This time, enable the Transform source records with AWS Lambda option.
Once enabled, create a Lambda function to do the data transformation for incoming data:
Figure 5.54: The Transform source records with AWS Lambda window
There are already some sample functions that have been provided by Amazon. You can click on Create New and it will open up the list of transformation functions provided by AWS. Let's choose General Firehose Processing:
Figure 5.55: The Choose Lambda blueprint window
This opens up the Lambda function window. Here, you need to provide the name of the function, along with the IAM role information:
Figure 5.56: The Basic information window
Edit the code inline and replace the existing code with the code provided in the json2csv_transform.js file, under the code section. Keep the rest of the settings as is:
Figure 5.57: Window showing code of index.js
Once the Lambda function has been created, go back to the Firehose screen and configure the rest of the settings, such as the Amazon S3 bucket, which will work the same as the Firehose destination that we configured in the last exercise:
Figure 5.58: The Convert record format window
Also, once the Lambda function has been created, update the IAM role in the Firehose configuration to reflect the required access for the Lambda function:
Figure 5.59: The Test with demo data window
Everything else remains the same as in the last exercise.
Send the test data from the Test with demo data section by clicking on Start sending demo data:
Figure 5.60: Window showing the Start sending demo data button
Go to the S3 location that we configured earlier to receive the data and you should see the data file, as shown here:
Figure 5.61: Window showing the data file added successfully
Upon downloading this data file and opening it with Notepad, you should see the data in CSV format, as shown here:
Figure 5.62: Screenshot showing data in the CSV format
Solution for Activity 7: Adding Reference Data to the Application and Creating an Output, Joining Real-Time Data with the Reference Data
Ensure that you have Kinesis Data Analytics in working condition and that you are able to do real-time analysis, like we accomplished in the last exercise:
Figure 5.63: The kinesis-data-analytics page
Create a S3 bucket and upload the ka-reference-data.json file into the bucket:
Figure 5.64: Screenshot showing the ka-reference-data.json file added to the S3 bucket
Go to the Kinesis Data Analytics application page and click on Connect reference data. Provide the bucket, S3 object, and table details, and populate the schema using schema discovery:
Figure 5.65: The Connect reference data source page
You will notice in the preceding screenshot that the Kinesis application will create the IAM role with required access.
Schema discovery will detect the schema for the reference data file and show you the sample data:
Figure 5.66: The Schema section
Click on Save and close button. You will have successfully added the referenced data:
Figure 5.67: Page showing the referenced data added successfully
Now, you should have the real-time streaming data and reference data available in the Kinesis Data Analytics application. The following screenshot is showing real-time streaming data: The following image is showing the added reference data:
Figure 5.68: The Real-time analytics section
Figure 5.69: The Source data section
Go to the SQL prompt and write the SQL statement to join real-time streaming data with the reference data, and out the company details whose names are provided in the reference file.
Run the following query in the SQL prompt. In this query, we are joining (left join) SOURCE_SQL_STREAM_001 with the ka_reference_data dataset and filtering where company name is not null:
CREATE STREAM "KINESIS_SQL_STREAM" (ticker_symbol VARCHAR(14), "Company_Name" varchar(30), sector VARCHAR(22), change DOUBLE, price DOUBLE); CREATE PUMP "STREAM_PUMP" AS INSERT INTO "KINESIS_SQL_STREAM" SELECT STREAM ticker_symbol, "kar"."Company", sector, change, price FROM "SOURCE_SQL_STREAM_001" LEFT JOIN "ka_reference_data" as "kar" ON "SOURCE_SQL_STREAM_001".ticker_symbol = "kar"."Ticker" where "kar"."Company" is not null ;
Note
You can use the inner join while removing the where clause to achieve the same results.
Figure 5.70: The result page for real-time analytics
You should be able to see the output with both the ticker symbol and company name as output in real-time. It should get refreshed every few minutes:
Figure 5.71: Output showing both the ticker symbol and company name
This concludes our activity on adding reference data and using it to perform real-time data analytics on Amazon Kinesis Data Analytics.