Solution
Now, let’s explore the solutions to the aforementioned problems.
Solution 1
Here, we’re setting up the initial read of our test data. We are dynamically loading the schema from the file. In test cases, this is fine, but in production workloads, this is a very bad practice. I recommend statically writing out your schema:
location = "dbfs:/tmp/chapter_4_lab_test_data" fmt = "parquet" schema = spark.read.format(fmt).load(location).schema users = spark.readStream.schema(schema).format(fmt).load(location)
Now, the code will populate a bronze table from the data being loaded, and the write process will be appended:
bronze_schema = users.schema bronze_location = "dbfs:/tmp/chapter_4_lab_bronze" checkpoint_location = f"{bronze_location}/_checkpoint" output_mode = "append" bronze_query = users.writeStream.format("delta").trigger(once=True).option("checkpointLocation", bronze_location...