Solution
We will get started as follows:
- First, we will import our libraries. Also, note that you must install
confluent_kafka
first usingpip
:from confluent_kafka.schema_registry import SchemaRegistryClient
import ssl
from pyspark.sql.functions import from_json
from pyspark.sql.functions import udf, col, expr
from pyspark.sql.types import StringType
- Here, we set up our variables for our initial Kafka connection:
kafka_cluster = "see confluent website"
kafka_api_key = "see confluent website"
kafka_api_secret = "see confluent website"
kafka_topic = "chapter_5"
boostrap_server = "see confluent website"
schema_reg_url = "see confluent website"
schema_api_key = "see confluent website"
schema_api_secret = "see confluent website"
- We will create a UDF to convert the value into a string:
binary_to_string = udf(lambda x: str(int.from_bytes(x, byteorder='big')), StringType())
- Now, we will...