AWS provides hybrid capabilities for networking, storage, database, application development, and management tools for secure and seamless integration. In today's tutorial, we will integrate applications with the two popular AWS services namely Amazon DynamoDB and Amazon Kinesis.
Amazon DynamoDB is a fast, fully managed, highly available, and scalable NoSQL database service from AWS. DynamoDB uses key-value and document store data models. Amazon Kinesis is used to collect real-time data to process and analyze it.
This article is an excerpt from a book 'Expert AWS Development' written by Atul V. Mistry. By the end of this tutorial, you will know how to integrate applications with the relative AWS services and best practices.
The Amazon DynamoDB service falls under the Database category. It is a fast NoSQL database service from Amazon. It is highly durable as it will replicate data across three distinct geographical facilities in AWS regions. It's great for web, mobile, gaming, and IoT applications.
DynamoDB will take care of software patching, hardware provisioning, cluster scaling, setup, configuration, and replication. You can create a database table and store and retrieve any amount and variety of data. It will delete expired data automatically from the table. It will help to reduce the usage storage and cost of storing data which is no longer needed.
Amazon DynamoDB Accelerator (DAX) is a highly available, fully managed, and in-memory cache. For millions of requests per second, it reduces the response time from milliseconds to microseconds.
DynamoDB is allowed to store up to 400 KB of large text and binary objects. It uses SSD storage to provide high I/O performance.
The following diagram provides a high-level overview of integration between your application and DynamoDB:
Please perform the following steps to understand this integration:
The AWS SDK provides three kinds of interfaces to connect with DynamoDB. These interfaces are as follows:
Let's explore all three interfaces. The following diagram is the Movies table, which is created in DynamoDB and used in all our examples:
AWS SDK programming languages provide low-level interfaces for DynamoDB. These SDKs provide methods that are similar to low-level DynamoDB API requests.
The following example uses the Java language for the low-level interface of AWS SDKs. Here you can use Eclipse IDE for the example.
In this Java program, we request getItem from the Movies table, pass the movie name as an attribute, and print the movie release year:
AmazonDynamoDBClient is used to create the DynamoDB client instance. AttributeValue is used to construct the data. In AttributeValue, name is datatype and value is data:
Static AmazonDynamoDBClient dynamoDB;
String tableName = "Movies";
HashMap<String, AttributeValue> key = new HashMap<String, AttributeValue>();
key.put("name", new AttributeValue().withS("Airplane"));
GetItemRequest request = new GetItemRequest() .withTableName(tableName).withKey(key);
GetItemResult result = dynamoDB.getItem(request);
if (result.getItem() != null) { AttributeValue yearObj = result.getItem().get("year"); System.out.println("The movie Released in " + yearObj.getN()); } else { System.out.println("No matching movie was found"); }
This interface enables you to do Create, Read, Update, and Delete (CRUD) operations on tables and indexes. The datatype will be implied with data from this interface and you do not need to specify it.
The AWS SDKs for Java, Node.js, JavaScript, and .NET provides support for document interfaces.
The following example uses the Java language for the document interface in AWS SDKs. Here you can use the Eclipse IDE for the example.
In this Java program, we will create a table object from the Movies table, pass the movie name as attribute, and print the movie release year.
We have to import a few classes. DynamoDB is the entry point to use this library in your class. GetItemOutcomeis is used to get items from the DynamoDB table. Table is used to get table details:
static AmazonDynamoDB client;
The preceding code will create the client instance. You have to assign the credentials and region to this instance:
String tableName = "Movies"; DynamoDB docClient = new DynamoDB(client); Table movieTable = docClient.getTable(tableName);
DynamoDB will create the instance of docClient by passing the client instance. It is the entry point for the document interface library. This docClient instance will get the table details by passing the tableName and assign it to the movieTable instance:
GetItemOutcome outcome = movieTable.getItemOutcome("name","Airplane"); int yearObj = outcome.getItem().getInt("year"); System.out.println("The movie was released in " + yearObj);
GetItemOutcome will create an outcome instance from movieTable by passing the name as key and movie name as parameter. It will retrieve the item year from the outcome object and store it into the yearObj object and print it:
In the object persistence interface, you will not perform any CRUD operations directly on the data; instead, you have to create objects which represent DynamoDB tables and indexes and perform operations on those objects. It will allow you to write object-centric code and not database-centric code.
Let's create a DynamoDBMapper object in AWS SDK for Java. It will represent data in the Movies table. This is the MovieObjectMapper.java class. Here you can use the Eclipse IDE for the example.
You need to import a few classes for annotations. DynamoDBAttribute is applied to the getter method. If it will apply to the class field then its getter and setter method must be declared in the same class. The DynamoDBHashKey annotation marks property as the hash key for the modeled class. The DynamoDBTable annotation marks DynamoDB as the table name:
@DynamoDBTable(tableName="Movies")
It specifies the table name:
@DynamoDBHashKey(attributeName="name") public String getName() { return name;} public void setName(String name) {this.name = name;}
@DynamoDBAttribute(attributeName = "year")
public int getYear() { return year; }
public void setYear(int year) { this.year = year; }
In the preceding code, DynamoDBHashKey has been defined as the hash key for the name attribute and its getter and setter methods. DynamoDBAttribute specifies the column name and its getter and setter methods.
Now create MovieObjectPersistenceExample.java to retrieve the movie year:
static AmazonDynamoDB client;
The preceding code will create the client instance. You have to assign the credentials and region to this instance. You need to import DynamoDBMapper, which will be used to fetch the year from the Movies table:
DynamoDBMapper mapper = new DynamoDBMapper(client); MovieObjectMapper movieObjectMapper = new MovieObjectMapper(); movieObjectMapper.setName("Airplane");
The mapper object will be created from DynamoDBMapper by passing the client.
The movieObjectMapper object will be created from the POJO class, which we created earlier. In this object, set the movie name as the parameter:
MovieObjectMapper result = mapper.load(movieObjectMapper); if (result != null) { System.out.println("The song was released in "+ result.getYear()); }
Create the result object by calling DynamoDBMapper object's load method. If the result is not null then it will print the year from the result's getYear() method.
This API is a protocol-level interface which will convert every HTTP or HTTPS request into the correct format with a valid digital signature. It uses JavaScript Object Notation (JSON) as a transfer protocol. AWS SDK will construct requests on your behalf and it will help you concentrate on the application/business logic.
The AWS SDK will send a request in JSON format to DynamoDB and DynamoDB will respond in JSON format back to the AWS SDK API. DynamoDB will not persist data in JSON format.
The following are common problems and their solutions:
Now let's move on to Amazon Kinesis, which will help to collect and process real-time streaming data.
The Amazon Kinesis service is under the Analytics product category. This is a fully managed, real-time, highly scalable service. You can easily send data to other AWS services such as Amazon DynamoDB, AmazaonS3, and Amazon Redshift.
You can ingest real-time data such as application logs, website clickstream data, IoT data, and social stream data into Amazon Kinesis. You can process and analyze data when it comes and responds immediately instead of waiting to collect all data before the process begins.
Now, let's explore an example of using Kinesis streams and Kinesis Firehose using AWS SDK API for Java.
In this example, we will create the stream if it does not exist and then we will put the records into the stream. Here you can use Eclipse IDE for the example.
You need to import a few classes. AmazonKinesis and AmazonKinesisClientBuilder are used to create the Kinesis clients. CreateStreamRequest will help to create the stream. DescribeStreamRequest will describe the stream request. PutRecordRequest will put the request into the stream and PutRecordResult will print the resulting record. ResourceNotFoundException will throw an exception when the stream does not exist. StreamDescription will provide the stream description:
Static AmazonKinesis kinesisClient;
kinesisClient is the instance of AmazonKinesis. You have to assign the credentials and region to this instance:
final String streamName = "MyExampleStream"; final Integer streamSize = 1; DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest().withStreamName(streamName);
Here you are creating an instance of describeStreamRequest. For that, you will pass the streamNameas parameter to the withStreamName() method:
StreamDescription streamDescription = kinesisClient.describeStream(describeStreamRequest).getStreamDescription();
It will create an instance of streamDescription. You can get information such as the stream name, stream status, and shards from this instance:
CreateStreamRequest createStreamRequest = new CreateStreamRequest(); createStreamRequest.setStreamName(streamName); createStreamRequest.setShardCount(streamSize); kinesisClient.createStream(createStreamRequest);
The createStreamRequest instance will help to create a stream request. You can set the stream name, shard count, and SDK request timeout. In the createStream method, you will pass the createStreamRequest:
long createTime = System.currentTimeMillis(); PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName(streamName); putRecordRequest.setData(ByteBuffer.wrap(String.format("testData-%d", createTime).getBytes())); putRecordRequest.setPartitionKey(String.format("partitionKey-%d", createTime));
Here we are creating a record request and putting it into the stream. We are setting the data and PartitionKey for the instance. It will create the records:
PutRecordResult putRecordResult = kinesisClient.putRecord(putRecordRequest);
It will create the record from the putRecord method and pass putRecordRequest as a parameter:
System.out.printf("Success : Partition key "%s", ShardID "%s" and SequenceNumber "%s".n", putRecordRequest.getPartitionKey(), putRecordResult.getShardId(), putRecordResult.getSequenceNumber());
It will print the output on the console as follows:
The following are common problems and their solutions:
Check whether the producer is throwing throughput exceptions from the service, and validate what API operations are being throttled.
You can also check Amazon Kinesis Streams limits because of different limits based on the call. If calls are not an issue, check you have selected a partition key that allows distributing put operations evenly across all shards, and that you don't have a particular partition key that's bumping into the service limits when the rest are not. This requires you to measure peak throughput and the number of shards in your stream.
It has either a large producer or small producer. A large producer is running from an EC2 instance or on-premises while a small producer is running from the web client, mobile app, or IoT device. Customers can use different strategies for latency. Kinesis Produce Library or multiple threads are useful while writing for buffer/micro-batch records, PutRecords for multi-record operation, PutRecord for single-record operation.
The shard iterator expires because its GetRecord methods have not been called for more than 5 minutes, or you have performed a restart of your consumer application.
The shard iterator expires immediately before you use it. This might indicate that the DynamoDB table used by Kinesis does not have enough capacity to store the data. It might happen if you have a large number of shards. Increase the write capacity assigned to the shard table to solve this.
The following are common reasons for read throughput being slower than expected:
We have covered Amazon Kinesis streams. Now, we will cover Kinesis Firehose.
Amazon Kinesis Firehose is a fully managed, highly available and durable service to load real-time streaming data easily into AWS services such as Amazon S3, Amazon Redshift, or Amazon Elasticsearch. It replicates your data synchronously at three different facilities. It will automatically scale as per throughput data. You can compress your data into different formats and also encrypt it before loading.
AWS SDK for Java, Node.js, Python, .NET, and Ruby can be used to send data to a Kinesis Firehose stream using the Kinesis Firehose API.
The Kinesis Firehose API provides two operations to send data to the Kinesis Firehose delivery stream:
Let's explore an example using PutRecord. In this example, the MyFirehoseStream stream has been created. Here you can use Eclipse IDE for the example.
You need to import a few classes such as AmazonKinesisFirehoseClient, which will help to create the client for accessing Firehose. PutRecordRequest and PutRecordResult will help to put the stream record request and its result:
private static AmazonKinesisFirehoseClient client;
AmazonKinesisFirehoseClient will create the instance firehoseClient. You have to assign the credentials and region to this instance:
String data = "My Kinesis Firehose data"; String myFirehoseStream = "MyFirehoseStream"; Record record = new Record(); record.setData(ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_8)));
As mentioned earlier, myFirehoseStream has already been created.
A record in the delivery stream is a unit of data. In the setData method, we are passing a data blob. It is base-64 encoded. Before sending a request to the AWS service, Java will perform base-64 encoding on this field.
Now you have to mention the name of the delivery stream and the data records you want to create the PutRecordRequest instance:
PutRecordRequest putRecordRequest = new PutRecordRequest() .withDeliveryStreamName(myFirehoseStream) .withRecord(record); putRecordRequest.setRecord(record); PutRecordResult putRecordResult = client.putRecord(putRecordRequest); System.out.println("Put Request Record ID: " + putRecordResult.getRecordId());
putRecordResult will write a single record into the delivery stream by passing the putRecordRequest and get the result and print the RecordID:
PutRecordBatchRequest putRecordBatchRequest = new PutRecordBatchRequest().withDeliveryStreamName("MyFirehoseStream") .withRecords(getBatchRecords());
You have to mention the name of the delivery stream and the data records you want to create the PutRecordBatchRequest instance. The getBatchRecord method has been created to pass multiple records as mentioned in the next step:
JSONObject jsonObject = new JSONObject(); jsonObject.put("userid", "userid_1"); jsonObject.put("password", "password1"); Record record = new Record().withData(ByteBuffer.wrap(jsonObject.toString().getBytes())); records.add(record);
In the getBatchRecord method, you will create the jsonObject and put data into this jsonObject . You will pass jsonObject to create the record. These records add to a list of records and return it:
PutRecordBatchResult putRecordBatchResult = client.putRecordBatch(putRecordBatchRequest); for(int i=0;i<putRecordBatchResult.getRequestResponses().size();i++){ System.out.println("Put Batch Request Record ID :"+i+": " + putRecordBatchResult.getRequestResponses().get(i).getRecordId()); }
putRecordBatchResult will write multiple records into the delivery stream by passing the putRecordBatchRequest, get the result, and print the RecordID. You will see the output like the following screen:
Sometimes data is not delivered at specified destinations. The following are steps to solve common issues while working with Kinesis Firehose:
We completed implementations, examples, and best practices for Amazon DynamoDB and Amazon Kinesis AWS services using AWS SDK.
If you found this post useful, do check out the book 'Expert AWS Development' to learn application integration with other AWS services like Amazon Lambda, Amazon SQS, and Amazon SWF.
A serverless online store on AWS could save you money. Build one.
Why is AWS the preferred cloud platform for developers working with big data?
Verizon chooses Amazon Web Services(AWS) as its preferred cloud provider