In today’s tutorial, we will talk about the confluent platform and how to get started with organizing and managing data from several sources in one high-performance and reliable system.
The Confluent Platform is a full stream data system. It enables you to organize and manage data from several sources in one high-performance and reliable system. As mentioned in the first few chapters, the goal of an enterprise service bus is not only to provide the system a means to transport messages and data but also to provide all the tools that are required to connect the data origins (data sources), applications, and data destinations (data sinks) to the platform.
The Confluent Platform has these parts:
The Confluent Platform open source has the following components:
The Confluent Platform enterprise has the following components:
All the components are open source except the Confluent Control Center, which is a proprietary of Confluent Inc.
An explanation of each component is as follows:
In order to use the REST proxy and the Schema Registry, we need to install the Confluent Platform. Also, the Confluent Platform has important administration, operation, and monitoring features fundamental for modern Kafka production systems.
At the time of writing this book, the Confluent Platform Version is 4.0.0. Currently, the supported operating systems are:
macOS currently is just supported for testing and development purposes, not for production environments. Windows is not yet supported. Oracle Java 1.7 or higher is required.
The default ports for the components are:
It is important to have these ports, or the ports where the components are going to run, Open
There are two ways to install: downloading the compressed files or with apt-get command.
To install the compressed files:
$ <confluent-path>/bin/confluent start
The output should be as follows:
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
To install with the apt-get command (in Debian and Ubuntu):
$ wget -qO - http://packages.confluent.io/deb/4.0/archive.key |sudo apt-key add -
$ sudo add-apt-repository "deb [arch=amd64] http://packages.confluent.io/deb/4.0 stable main"
$ sudo apt-get update && sudo apt-get install confluent-platformoss- 2.11
$ sudo apt-get update && sudo apt-get install confluentplatform-2.11
The end of the package name specifies the Scala version. Currently, the supported versions are 2.11 (recommended) and 2.10.
The Confluent Platform provides the system and component packages. The commands in this recipe are for installing all components of the platform. To install individual components, follow the instructions on this page: https://docs.confluent.io/current/installation/available_packages.html#avaiIable-packages.
With the Confluent Platform installed, the administration, operation, and monitoring of Kafka become very simple. Let's review how to operate Kafka with the Confluent Platform.
For this recipe, Confluent should be installed, up, and running.
The commands in this section should be executed from the directory where the Confluent Platform is installed:
$ confluent start schema-registry
The output of this command should be:
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
To execute the commands outside the installation directory, add Confluent's bin directory to PATH:
export PATH=<path_to_confluent>/bin:$PATH
$ ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties
$ ./bin/kafka-server-start ./etc/kafka/server.properties
$ ./bin/schema-registry-start ./etc/schema-registry/schemaregistry.
properties
Note that the syntax of all the commands is exactly the same as always but without the .sh extension.
$ ./bin/kafka-topics --zookeeper localhost:2181 --create --topic test_topic --partitions 1 --replication-factor 1
$ ./bin/kafka-avro-console-producer --broker-list localhost:9092
--topic test_topic --property
value.schema='{"name":"person","type":"record",
"fields":[{"name":"name","type":"string"},{"name":"age","type":"int
"}]}'
{"name": "Alice", "age": 27}
{"name": "Bob", "age": 30}
{"name": "Charles", "age":57}
$ ./bin/kafka-avro-console-consumer --topic test_topic --zookeeper
localhost:2181 --from-beginning
The messages created in the previous step will be written to the console in the format they were introduced.
$ ./bin/kafka-avro-console-producer --broker-list localhost:9092
--topic test_topic --property value.schema='{"type":"string"}'
org.apache.kafka.common.errors.SerializationException: Error
registering Avro schema: "string"
Caused by:
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClient
Exception: Schema being registered is incompatible with the latest
schema; error code: 409
at
io.confluent.kafka.schemaregistry.client.rest.utils.RestUtils.httpR
equest(RestUtils.java:146)
With the Confluent Platform, it is possible to manage all of the Kafka system through the
Kafka operations, which are classified as follows:
This recipe shows you how to use the metrics reporter of the Confluent Control Center.
The execution of the previous recipe is needed. Before starting the Control Center, configure the metrics reporter:
<confluent_path>/etc/kafka/server.properties
metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsRepo
rter
confluent.metrics.reporter.bootstrap.servers=localhost:9092
confluent.metrics.reporter.topic.replicas=1
<confluent_path>/etc/schema-registry/connect-avrodistributed.properties
consumer.interceptor.classes=io.confluent.monitoring.clients.interc
eptor.MonitoringConsumerInterceptor
producer.interceptor.classes=io.confluent.monitoring.clients.interc
eptor.MonitoringProducerInterceptor
$ <confluent_path>/bin/confluent start
Before starting the Control Center, change its configuration:
<confluent_path>/etc/confluent-control-center/controlcenter.properties
confluent.controlcenter.internal.topics.partitions=1
confluent.controlcenter.internal.topics.replication=1
confluent.controlcenter.command.topic.replication=1
confluent.monitoring.interceptor.topic.partitions=1
confluent.monitoring.interceptor.topic.replication=1
confluent.metrics.topic.partitions=1
confluent.metrics.topic.replication=1
<confluent_path>/bin/control-center-start
$ <confluent_path>/bin/kafka-topics --zookeeper localhost:2181 -- create --test_topic --partitions 1 --replication-factor 1
4. From the connector class, drop down the menu and select SchemaSourceConnector. Specify Connection Name as Schema-Avro-Source.
5. In the topic name, specify test_topic.
6. Click on Continue, and then click on the Save & Finish button to apply the configuration.
To create a new sink follow these steps:
Click on the Data streams tab and a chart shows the total number of messages produced and consumed on the cluster:
To summarize, we discussed how to get started with the Apache Kafka confluent platform.
If you liked our post, please be sure to check out Apache Kafka 1.0 Cookbook which consists of useful recipes to work with your Apache Kafka installation.