Cassandra in action
There is no better way to learn a technology than by performing a proof of concept of the technology. In this section, we will work on a very simple application to get you familiarized with Cassandra. We will build the backend of a simple blogging application, where a user can perform the following tasks:
- Create a blogging account
- Publish posts
- Tag the posts, and posts can be searched using those tags
- Have people comment on those posts
- Have people upvote or downvote a post or a comment
Modeling data
In the RDBMS world, you would glance over the entities and think about relations while modeling the application. Then, you will join tables to get the required data. There is no join option in Cassandra, so we will have to denormalize things. Looking at the previously mentioned specifications, we can say that:
- We need a blogs table to store the blog name and other global information, such as the blogger's username and password
- We will have to pull posts for the blog, ideally, sorted in reverse chronological order
- We will also have to pull all the comments for each post, when we see the post page
- We will have to maintain tags in such a way that tags can be used to pull all the posts with the same tag
- We will also have to have counters for the upvotes and downvotes for posts and comments
With the preceding details, let's see the tables we need:
blogs
: This table will hold global blog metadata and user information, such as blog name, username, password, and other metadata.posts
: This table will hold individual posts. At first glance,posts
seems to be an ordinary table with primary keys as post ID and a reference to the blog that it belongs to. The problem arises when we add the requirement of being able to be sorted by timestamp. Unlike RDBMS, you cannot just perform anORDER BY
operation across partitions. The work-around for this is to use a composite key. A composite key consists of a partition key and one or more column(s) that determines where the other columns are going to be stored. Also, the other columns in the composite key determine relative ordering for the set of columns that are being inserted as a row with the key.Remember that a partition is completely stored on a node. The benefit of this is that the fetches are faster, but at the same time a partition is limited by the total number of cells that it can hold, which is 2 billion cells. The other downside of having everything on one partition may cause lots of requests to go to only a couple of nodes (replicas), making them a hotspot in the cluster, which is not good. You can avoid this by using some sort of bucketing such as involving months and years in the partition key. This will make sure that the partition changes every month and each partition has only one month worth of records. This will solve both the problems: the cap on the number of records and the hotspot issue. However, we will still need a way to order buckets. For this example, we will have all the posts in one partition just to keep things simple. We will tackle the bucketing issues in Chapter 3, Effective CQL. The following figure shows how to write time series grouped data using composite columns:
comments
: They have a similar property as post, except it is linked to a post instead of being linked to a blog.tags
: They are a part of post. We use theSet
data type to represent tags on the posts. One of the features that we mentioned earlier is to be able to search posts by tags. The best way to do it is to create an index on thetags
column and make it searchable. Unfortunately, index on collections data types has not been supported until Cassandra Version 2.1 (https://issues.apache.org/jira/browse/CASSANDRA-4511). In our case, we will have to create and manage this sort of indexing manually. So, we will create atags
table that will have a compound primary key with tag and blog ID as its components.counters
: Ideally, you would think that you want to put upvote and downvote counters as a part of theposts
andcomments
tables' column definition, but Cassandra does not support a table that has a counter type column(s) and some other type column unless the counter is a part of the primary key definition. So, in our case, we will create two new tables just to keep track of votes.
With this, we are done with data modeling. The next step is inserting and getting data back.
Writing code
Time to start something tangible! In this section, we will create the schema, insert the data, and make interesting queries to retrieve the data. In a real application, you will have a GUI with button and links to be able to log in, post, comment, upvote and downvote, and navigate. Here, we will stick to what happens in the backend when you perform those actions. This will keep the discussion from any clutter introduced by other software components. Also, this section contains Cassandra Query Language (CQL), a SQL-like query language for Cassandra. So, you can just copy these statements and paste them into your CQL shell ($CASSANDRA_HOME/bin/cqlsh
) to see it working. If you want to build an application using these statements, you should be able to just use these statements in your favorite language via the CQL driver library that you can find at http://www.datastax.com/download#dl-datastax-drivers. You can also download a simple Java application that is built using these statements from my GitHub account (https://github.com/naishe/mastering-cassandra-v2).
Setting up
Setting up a project involves creating a keyspace and tables. This can be done via the CQL shell or from your favorite programming language.
Here are the statements to create the schema:
cqlsh> CREATE KEYSPACE weblog WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}; cqlsh> USE weblog; cqlsh:weblog> CREATE TABLE blogs (id uuid PRIMARY KEY, blog_name varchar, author varchar, email varchar, password varchar); cqlsh:weblog> CREATE TABLE posts (id timeuuid, blog_id uuid, posted_on timestamp, title text, content text, tags set<varchar>, PRIMARY KEY(blog_id, id)); cqlsh:weblog> CREATE TABLE categories (cat_name varchar, blog_id uuid, post_id timeuuid, post_title text, PRIMARY KEY(cat_name, blog_id, post_id)); cqlsh:weblog> CREATE TABLE comments (id timeuuid, post_id timeuuid, title text, content text, posted_on timestamp, commenter varchar, PRIMARY KEY(post_id, id)); cqlsh:weblog> CREATE TABLE post_votes(post_id timeuuid PRIMARY KEY, upvotes counter, downvotes counter); cqlsh:weblog> CREATE TABLE comment_votes(comment_id timeuuid PRIMARY KEY, upvotes counter, downvotes counter);
Note
Universally unique identifiers: uuid
and timeuuid
In the preceding CQL statements, there are two interesting data types—uuid
and timeuuid
. uuid
stands for universally unique identifier. There are five types of them. One of these uuid
types is timeuuid
, which is essentially uuid
type 1 that takes timestamp
as its first component. This means it can be used to sort things by time. This is what we wanted to do in this example: sort posts by the time they were published.
On the other hand, uuid
accepts any of these five types of uuid
as long as the format follows the standard uuid
format.
In Cassandra, if you have chosen the uuid
type for a column, you will need to pass uuid
while inserting the data. With timeuuid
, just passing timestamp
is enough.
The first statement requests Cassandra to create a keyspace named weblog
with replication factor 1 because we are running a single node Cassandra on a local machine. Here are a couple of things to notice:
- The
column
tags in theposts
table is a set of strings. - The primary key for
posts
,categories
, andcomments
has more than one component. The first of these components is a partition key. Data with the same primary key in a table resides on the same machine. This means, all the posts' records that belong to one blog stays on one machine (not really; if the replication factor is more than one, the records get replicated to as many machines). This is true for all the tables with composite keys. - Categories have three components in its primary key. One is the category name, which is the partition key, another is the blog ID, and then the post ID. One can argue that inclusion of the post ID in the primary key was unnecessary. You could just use the category name and blog ID. The reason to include the post ID in the primary key was to enable sorting by the post ID.
- Note that some of the IDs in the table definition are
timeuuid
. Thetimeuuid
data type is an interesting ID generation mechanism. It generates a timestamp-based (provided by you)uuid
, which is unique and you can use it in applications where you want things to be ordered by chronology.
Inserting records
This section demonstrates inserting the records in the schema. Unlike RDBMS, you will find that there are some redundancies in the system. You may notice that you cannot have a lot of rules enforced by Cassandra. It is up to the developer to make sure the records are inserted, updated, and deleted from appropriate places.
Note
Note that the CQL code is just for instruction purposes and is just a snippet. Your output may vary.
We will see a simple INSERT
example now:
cqlsh:weblog> INSERT INTO blogs (id, blog_name, author, email, password) VALUES ( blobAsUuid(timeuuidAsBlob(now())), 'Random Ramblings', 'JRR Rowling', 'rowling@yahoo.com', 'someHashed#passwrd'); cqlsh:weblog> SELECT * FROM blogs; id | author | blog_name | email | password ------------+-------------+------------------+-------------------+-------------------- 83cec... | JRR Rowling | Random Ramblings | rowling@yahoo.com | someHashed#passwrd (1 rows)
Tip
Downloading the example code
You can download the example code files for all Packt books you have purchased from your account at http://www.packtpub.com. If you purchased this book elsewhere, you can visit http://www.packtpub.com/support and register to have the files e-mailed directly to you.
The application would generate uuid
or you will get uuid
from an existing record in the blogs
table based on a user's e-mail address or some other criteria. Here, just to be concise, the uuid
generation is left to Cassandra, and it is retrieved by running the SELECT
statement. Let's insert some posts to this blog:
# First post cqlsh:weblog> INSERT INTO posts (id, blog_id, title, content, tags, posted_on) VALUES (now(), 83cec740-22b1-11e4-a4f0-7f1a8b30f852, 'first post', 'hey howdy!', {'random','welcome'}, 1407822921000); cqlsh:weblog> SELECT * FROM posts; blog_id | id | content | posted_on | tags | title ------------+-----------+------------+--------------------------+-----------------------+------------ 83cec... | 04722... | hey howdy! | 2014-08-12 11:25:21+0530 | {'random', 'welcome'} | first post (1 rows) cqlsh:weblog> INSERT INTO categories (cat_name, blog_id, post_id, post_title) VALUES ( 'random', 83cec740-22b1-11e4-a4f0-7f1a8b30f852, 047224f0-22b2-11e4-a4f0-7f1a8b30f852, 'first post'); cqlsh:weblog> INSERT INTO categories (cat_name, blog_id, post_id, post_title) VALUES ( 'welcome', 83cec740-22b1-11e4-a4f0-7f1a8b30f852, 047224f0-22b2-11e4-a4f0-7f1a8b30f852, 'first post'); # Second post cqlsh:weblog> INSERT INTO posts (id, blog_id, title, content, tags, posted_on) VALUES (now(), 83cec740-22b1-11e4-a4f0-7f1a8b30f852, 'Fooled by randomness...', 'posterior=(prior*likelihood)/evidence', {'random','maths'}, 1407823189000); cqlsh:weblog> select * from posts; blog_id | id | content | posted_on | tags | title ------------+-----------+---------------------------------------+--------------------------+-----------------------+------------------------- 83cec.... | 04722... | hey howdy! | 2014-08-12 11:25:21+0530 | {'random', 'welcome'} | first post 83cec... | c06a4... | posterior=(prior*likelihood)/evidence | 2014-08-12 11:29:49+0530 | {'maths', 'random'} | Fooled by randomness... (2 rows) cqlsh:weblog> INSERT INTO categories (cat_name, blog_id, post_id, post_title) VALUES ( 'random', 83cec740-22b1-11e4-a4f0-7f1a8b30f852, c06a42f0-22b2-11e4-a4f0-7f1a8b30f852, 'Fooled by randomness...'); cqlsh:weblog> INSERT INTO categories (cat_name, blog_id, post_id, post_title) VALUES ( 'maths', 83cec740-22b1-11e4-a4f0-7f1a8b30f852, c06a42f0-22b2-11e4-a4f0-7f1a8b30f852, 'Fooled by randomness...');
Note
You may want to insert more rows so that we can experiment with pagination in the upcoming sections.
You may notice that the primary key, which is of type timeuuid
, is created using Cassandra's built-in now()
function, and we repeated the title in the categories
table. The rationale behind repetition is that we may want to display the title of all the posts that match a tag that a user clicked. These titles will have URLs to redirect us to the posts (a post can be retrieved by the blog ID and post ID). Alternatively, Cassandra does not support a relational connect between two tables, so you cannot join categories and posts to display the title. The other option is to use the blog ID and post ID to retrieve the post's title. However, that's more work, and somewhat inefficient.
Let's insert some comments and upvote and downvote some posts and comments:
# Insert some comments cqlsh:weblog> INSERT INTO comments (id, post_id, commenter, title, content, posted_on) VALUES (now(), c06a42f0-22b2-11e4-a4f0-7f1a8b30f852, 'liz@gmail.com', 'Thoughful article but...', 'It is too short to describe the complexity.', 1407868973000); cqlsh:weblog> INSERT INTO comments (id, post_id, commenter, title, content, posted_on) VALUES (now(), c06a42f0-22b2-11e4-a4f0-7f1a8b30f852, 'tom@gmail.com', 'Nice!', 'Thanks, this is good stuff.', 1407868975000); cqlsh:weblog> INSERT INTO comments (id, post_id, commenter, title, content, posted_on) VALUES (now(), c06a42f0-22b2-11e4-a4f0-7f1a8b30f852, 'g@ouv.com', 'Follow my blog', 'Please follow my blog.', 1407868979000); cqlsh:weblog> INSERT INTO comments (id, post_id, commenter, title, content, posted_on) VALUES (now(), 047224f0-22b2-11e4-a4f0-7f1a8b30f852, 'liz@gmail.com', 'New blogger?', 'Welcome to weblog application.', 1407868981000); # Insert some votes cqlsh:weblog> UPDATE comment_votes SET upvotes = upvotes + 1 WHERE comment_id = be127d00-22c2-11e4-a4f0-7f1a8b30f852; cqlsh:weblog> UPDATE comment_votes SET upvotes = upvotes + 1 WHERE comment_id = be127d00-22c2-11e4-a4f0-7f1a8b30f852; cqlsh:weblog> UPDATE comment_votes SET downvotes = downvotes + 1 WHERE comment_id = be127d00-22c2-11e4-a4f0-7f1a8b30f852; cqlsh:weblog> UPDATE post_votes SET downvotes = downvotes + 1 WHERE post_id = d44e0440-22c2-11e4-a4f0-7f1a8b30f852; cqlsh:weblog> UPDATE post_votes SET upvotes = upvotes + 1 WHERE post_id = d44e0440-22c2-11e4-a4f0-7f1a8b30f852;
Counters are always inserted or updated using the UPDATE
statement.
Retrieving data
Now that we have data inserted for our application, we need to retrieve it. To blog applications, usually the blog name serves as the primary key in their database. So, when you request cold-caffein.blogspot.com, a blog metadata table with the blog ID as cold-caffein
exists. We, on the other hand, can use the blog uuid
to request to serve the contents. So, we assume that having the blog ID is handy.
Let's display posts. We should not load all the posts for the user upfront. It is not a good idea from the usability point of view. It demands more bandwidth, and it is probably a lot of reads for Cassandra. So first, let's pull two posts at a time from ones posted earlier:
cqlsh:weblog> select * from posts where blog_id = 83cec740-22b1-11e4-a4f0-7f1a8b30f852 order by id desc limit 2; blog_id | id | content | posted_on | tags | title -----------+-------------+-----------------+--------------------------+-----------------------+-------------- 83cec… | c2240… | posterior=(prior*likelihood)/evidence | 2014-08-12 11:29:49+0530 | {'maths', 'random'} | Fooled by randomness... 83cec… | 965a2… | hey howdy! | 2014-08-12 11:25:21+0530 | {'random', 'welcome'} | first post (2 rows)
This was the first page. For the next page, we can use an anchor. We can use the last post's ID as an anchor, as its timeuuid
increases monotonically with time. So, posts older than that will have the post ID with smaller values, and this will work as our anchor:
cqlsh:weblog> select * from posts where blog_id = 83cec740-22b1-11e4-a4f0-7f1a8b30f852 and id < 8eab0c10-2314-11e4-bac7-3f5f68a133d8 order by id desc limit 2; blog_id | id | content | posted_on | tags | title -----------+-----------+-----------------+--------------------------+-----------------------+-------------- 83cec... | 83f16... | random content8 | 2014-08-13 23:33:00+0530 | {'garbage', 'random'} | random post8 83cec... | 76738... | random content7 | 2014-08-13 23:32:58+0530 | {'garbage', 'random'} | random post7 (2 rows)
You can retrieve the posts on the next page as follows:
cqlsh:weblog> select * from posts where blog_id = 83cec740-22b1-11e4-a4f0-7f1a8b30f852 and id < 76738dc0-2314-11e4-bac7-3f5f68a133d8 order by id desc limit 2; blog_id | id | content | posted_on | tags | title -----------+-----------+-----------------+--------------------------+-----------------------+-------------- 83cec... | 6f85d... | random content6 | 2014-08-13 23:32:56+0530 | {'garbage', 'random'} | random post6 83cec... | 684c5... | random content5 | 2014-08-13 23:32:54+0530 | {'garbage', 'random'} | random post5 (2 rows)
Now for each post, we need to perform the following tasks:
- Pull a list of comments
- Up and downvotes
- Load comments as follows:
cqlsh:weblog> select * from comments where post_id = c06a42f0-22b2-11e4-a4f0-7f1a8b30f852 order by id desc; post_id | id | commenter | content | posted_on | title ------------+----------+---------------+---------------------------------------------+--------------------------+-------------------------- c06a4... | cd5a8... | g@ouv.com | Please follow my blog. | 2014-08-13 00:12:59+0530 | Follow my blog c06a4... | c6aff... | tom@gmail.com | Thanks, this is good stuff. | 2014-08-13 00:12:55+0530 | Nice! c06a4... | be127... | liz@gmail.com | It is too short to describe the complexity. | 2014-08-13 00:12:53+0530 | Thoughful article but...
- Individually fetch counters for each post and comment as follows:
cqlsh:weblog> select * from comment_votes where comment_id = be127d00-22c2-11e4-a4f0-7f1a8b30f852; comment_id | downvotes | upvotes --------------------+-----------+--------- be127... | 1 | 6 (1 rows) cqlsh:weblog> select * from post_votes where post_id = c06a42f0-22b2-11e4-a4f0-7f1a8b30f852; post_id | downvotes | upvotes ------------+-----------+--------- c06a4... | 2 | 7 (1 rows)
Now, we want to facilitate the users of our blogging website with the ability to click on a tag and see a list of all the posts with that tag. Here is what we do:
cqlsh:weblog> select * from categories where cat_name = 'maths' and blog_id = 83cec740-22b1-11e4-a4f0-7f1a8b30f852 order by blog_id desc; cat_name | blog_id| post_id | post_title ----------+--------------+-----------+------------------------- maths | 83cec... | a865c... | YARA maths | 83cec... | c06a4... | Fooled by randomness... (2 rows)
We can obviously use the pagination and sorting here. I think you have got the idea.
Sometimes, it is nice to see what people generally comment. It would be great if we could find all the comments by a user. To make a nonprimary key field searchable in Cassandra, you need to create an index on that column. So, let's do that:
cqlsh:weblog> CREATE INDEX commenter_idx ON comments (commenter); cqlsh:weblog> select * from comments where commenter = 'liz@gmail.com'; post_id | id | commenter | content | posted_on | title -------------+-----------+---------------+---------------------------------------------+--------------------------+-------------------------- 04722... | d44e0... | liz@gmail.com | Welcome to weblog application. | 2014-08-13 00:13:01+0530 | New blogger? c06a4... | be127... | liz@gmail.com | It is too short to describe the complexity. | 2014-08-13 00:12:53+0530 | Thoughful article but... (2 rows)
This completes all the requirements we stated. We did not cover the update
and delete
operations. They follow the same pattern as the insertion of records. The developer needs to make sure that the data is updated or deleted from all the places. So, if you want to update a post's title, it needs to be done in the posts
and category
tables.
Writing your application
Cassandra provides the API for almost all the main stream programming languages. Developing applications for Cassandra is nothing more than actually executing CQL through an API and collection result set or iterator for the query. This section will give you a glimpse of the Java code for the example we discussed earlier. It uses the DataStax Java driver for Cassandra. The full code is available at https://github.com/naishe/mastering-cassandra-v2.
Getting the connection
An application creates a single instance of the Cluster
object and keeps it for its life cycle. Every time you want to execute a query or a bunch of queries, you ask for a session
object from the Cluster
object. In a way, it is like a connection pool. Let's take a look at the following example:
public class CassandraConnection { private static Cluster cluster = getCluster(); public static final Session getSession(){ if ( cluster == null ){ cluster = getCluster(); } return cluster.connect(); } private static Cluster getCluster(){ Cluster clust = Cluster .builder() .addContactPoint(Constants.HOST) .build(); return clust; } [-- snip --]
Executing queries
Query execution is barely different from what we did in the command prompt earlier:
private static final String BLOGS_TABLE_DEF = "CREATE TABLE IF NOT EXISTS " + Constants.KEYSPACE + ".blogs " + "(" + "id uuid PRIMARY KEY, " + "blog_name varchar, " + "author varchar, " + "email varchar, " + "password varchar" + ")"; [-- snip --] Session conn = CassandraConnection.getSession(); [--snip--] conn.execute(BLOGS_TABLE_DEF); [-- snip --] conn.close();
Object mapping
The DataStax Java driver provides an easy-to use, annotation-based object mapper, which can help you avoid a lot of code bloat and marshalling effort. Here is an example of the Blog
object that maps to the blogs
table:
@Table(keyspace = Constants.KEYSPACE, name = "blogs") public class Blog extends AbstractVO<Blog> { @PartitionKey private UUID id; @Column(name = "blog_name") private String blogName; private String author; private String email; private String password; public UUID getId() { return id; } public void setId(UUID id) { this.id = id; } public String getBlogName() { return blogName; } public void setBlogName(String blogName) { this.blogName = blogName; } public String getAuthor() { return author; } public void setAuthor(String author) { this.author = author; } public String getEmail() { return email; } public void setEmail(String email) { this.email = email; } public String getPassword() { return password; } public void setPassword(String password) { /* Ideally, you'd use a unique salt with this hashing */ this.password = Hashing .sha256() .hashString(password, Charsets.UTF_8) .toString(); } @Override public boolean equals(Object that) { return this.getId().equals(((Blog)that).getId()); } @Override public int hashCode() { return Objects.hashCode(getId(), getEmail(), getAuthor(), getBlogName()); } @Override protected Blog getInstance() { return this; } @Override protected Class<Blog> getType() { return Blog.class; } // ----- ACCESS VIA QUERIES ----- public static Blog getBlogByName(String blogName, SessionWrapper sessionWrapper) throws BlogNotFoundException { AllQueries queries = sessionWrapper.getAllQueries(); Result<Blog> rs = queries.getBlogByName(blogName); if (rs.isExhausted()){ throw new BlogNotFoundException(); } return rs.one(); } }
For now, forget about the AbstractVO
super class. That is just some abstraction, where common things are thrown into AbstractVO
. You can see the annotations that basically show which keyspace and table this class is mapped to. Each instance variable is mapped with a column in the table. For any column that has a different name than the attribute name in the class, you will have to explicitly state that. Getters and setters do not have to be dumb. You can get creative in there. For example, setPassword
setter takes a plain text password and hashes it before storing. Note that you must mention which field acts as the partition key. You do not have to specify all the fields that consist of the primary key, just the first component. Now you can use DataStax's mapper to create, retrieve, update, and delete an object without having to marshal the results into the object. Here is an example:
Blog blog = new MappingManager(session) .mapper(Blog.class) .get(blogUUID);
You can execute any arbitrary queries and map it to an object. To do that, you will have to write an interface that contains a method signature of what the query consumes as its argument and what it returns as the method return type, as follows:
@Accessor public interface AllQueries { [--snip--] @Query("SELECT * FROM " + Constants.KEYSPACE + ".blogs WHERE blog_name = :blogName") public Result<Blog> getBlogByName(@Param("blogName") String blogName); [-- snip --]
Tip
This interface is annotated with Accessor
, and it has methods that basically satisfy the Query
annotation that it carries. The snippet of the Blog
class uses this method to retrieve the names blog by blog.