The power inside a broker is the topic, namely the queues inside it. Now that we have two brokers running, let's create a Kafka topic on them.
Kafka, like almost all modern infrastructure projects, has three ways of building things: through the command line, through programming, and through a web console (in this case the Confluent Control Center). The management (creation, modification, and destruction) of Kafka brokers can be done through programs written in most modern programming languages. If the language is not supported, it could be managed through the Kafka REST API. The previous section showed how to build a broker using the command line. In later chapters, we will see how to do this process through programming.
Is it possible to only manage (create, modify, or destroy) brokers through programming? No, we can also manage the topics. The topics can also be created through the command line. Kafka has pre-built utilities to manage brokers as we already saw and to manage topics, as we will see next.
To create a topic called amazingTopic in our running cluster, use the following command:
> <confluent-path>/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic amazingTopic
The output should be as follows:
Created topic amazingTopic
Here, the kafka-topics command is used. With the --create parameter it is specified that we want to create a new topic. The --topic parameter sets the name of the topic, in this case, amazingTopic.
Do you remember the terms parallelism and redundancy? Well, the –-partitions parameter controls the parallelism and the --replication-factor parameter controls the redundancy.
The --replication-factor parameter is fundamental as it specifies in how many servers of the cluster the topic is going to replicate (for example, running). On the other hand, one broker can run just one replica.
Obviously, if a greater number than the number of running servers on the cluster is specified, it will result in an error (you don't believe me? Try it in your environment). The error will be like this:
Error while executing topic command: replication factor: 3 larger than available brokers: 2
[2018-09-01 07:13:31,350] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: replication factor: 3 larger than available brokers: 2
(kafka.admin.TopicCommand$)
To be considered, the broker should be running (don't be shy and test all this theory in your environment).
The --partitions parameter, as its name implies, says how many partitions the topic will have. The number determines the parallelism that can be achieved on the consumer's side. This parameter is very important when doing cluster fine-tuning.
Finally, as expected, the --zookeeper parameter indicates where the Zookeeper cluster is running.
When a topic is created, the output in the broker log is something like this:
[2018-09-01 07:05:53,910] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions amazingTopic-0 (kafka.server.ReplicaFetcherManager)
[2018-09-01 07:05:53,950] INFO Completed load of log amazingTopic-0 with 1 log segments and log end offset 0 in 21 ms (kafka.log.Log)
In short, this message reads like a new topic has been born in our cluster.
How can I check my new and shiny topic? By using the same command: kafka-topics.
There are more parameters than --create. To check the status of a topic, run the kafka-topics command with the --list parameter, as follows:
> <confluent-path>/bin/kafka-topics.sh --list --zookeeper localhost:2181
The output is the list of topics, as we know, is as follows:
amazingTopic
This command returns the list with the names of all of the running topics in the cluster.
How can I get details of a topic? Using the same command: kafka-topics.
For a particular topic, run the kafka-topics command with the --describe parameter, as follows:
> <confluent-path>/bin/kafka-topics --describe --zookeeper localhost:2181 --topic amazingTopic
The command output is as follows:
Topic:amazingTopic PartitionCount:1 ReplicationFactor:1 Configs: Topic: amazingTopic Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Here is a brief explanation of the output:
- PartitionCount: Number of partitions on the topic (parallelism)
- ReplicationFactor: Number of replicas on the topic (redundancy)
- Leader: Node responsible for reading and writing operations of a given partition
- Replicas: List of brokers replicating this topic data; some of these might even be dead
- Isr: List of nodes that are currently in-sync replicas
Let's create a topic with multiple replicas (for example, we will run with more brokers in the cluster); we type the following:
> <confluent-path>/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic redundantTopic
The output is as follows:
Created topic redundantTopic
Now, call the kafka-topics command with the --describe parameter to check the topic details, as follows:
> <confluent-path>/bin/kafka-topics --describe --zookeeper localhost:2181 --topic redundantTopic
Topic:redundantTopic PartitionCount:1 ReplicationFactor:2 Configs:
Topic: redundantTopic Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
As you can see, Replicas and Isr are the same lists; we infer that all of the nodes are in-sync.
Your turn: play with the kafka-topics command, and try to create replicated topics on dead brokers and see the output. Also, create topics on running servers and then kill them to see the results. Was the output what you expected?
As mentioned before, all of these commands executed through the command line can be executed programmatically or performed through the Confluent Control Center web console.