Weaviate is gaining popularity as a semantic graph database, while PySpark is a well-established data processing framework used for handling large datasets efficiently.
The integration of Weaviate and Spark enables the processing of large volumes of data, which can be stored in unstructured blob storages like S3. This integration allows for batch processing to structure the data to suit specific requirements. Subsequently, it empowers users to perform similarity searches and build contexts for applications based on Large Language Models (LLMs).
In this article, we will explore how to integrate Weaviate and PySpark, with a particular emphasis on leveraging their capabilities for similarity searches using Large Language Models (LLMs).
Before we delve into the integration of Weaviate and PySpark, let's start with a brief overview. We will begin by seamlessly importing a subset of the Sphere dataset, which contains a substantial 100k lines of data, into our newly initiated Spark Session. This dataset will provide valuable insights and nuances, enhancing our understanding of the collaboration between Weaviate and PySpark. Let's get started.
Before we delve into the integration of Weaviate and PySpark, let's take a closer look at the components we'll be working with. In this scenario, we will utilize Docker Compose to deploy Spark, Jupyter, Weaviate, and the Transformers container in a local environment. The Transformers container will be instrumental in creating embeddings.
To get started, we'll walk you through the process of setting up the Docker Compose environment, making it conducive for seamlessly integrating Weaviate and PySpark.
version: '3'
services:
spark-master:
image: bitnami/spark:latest
hostname: spark-master
environment:
- INIT_DAEMON_STEP=setup_spark
jupyter:
build: .
ports:
- "8888:8888"
volumes:
- ./local_lake:/home/jovyan/work
- ./notebooks:/home/jovyan/
depends_on:
- spark-master
command: "start-notebook.sh --NotebookApp.token='' --NotebookApp.password=''"
weaviate:
image: semitechnologies/weaviate:latest
restart: on-failure:0
environment:
QUERY_DEFAULTS_LIMIT: 20
AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED: 'true'
PERSISTENCE_DATA_PATH: "./data"
DEFAULT_VECTORIZER_MODULE: text2vec-transformers
ENABLE_MODULES: text2vec-transformers
TRANSFORMERS_INFERENCE_API: <http://t2v-transformers:8080>
CLUSTER_HOSTNAME: 'node1'
t2v-transformers:
image: semitechnologies/transformers-inference:sentence-transformers-multi-qa-MiniLM-L6-cos-v1
environment:
ENABLE_CUDA: 0 # set to 1 to enable
# NVIDIA_VISIBLE_DEVICES: all # enable if running with CUDA
volumes:
myvol:
This Docker Compose configuration sets up a few different services:
spark-master
" and defines an environment variable for initialization.local_lake
" directory to the working directory inside the container and the "notebooks
" directory to the home directory of the container. It depends on the "spark-master" service and runs a command to start Jupyter Notebook with certain configurations.Additionally, there is a volume defined named "myvol" for potential data storage. This Docker Compose configuration essentially sets up an environment where Spark, Jupyter, Weaviate, and Transformers can work together, each with its specific configuration and dependencies.
The way in which Spark and Weaviate work together is through the Spark Connector. This connector serves as a bridge, allowing data to flow from Spark to Weaviate. It's especially important for tasks like Extract, Transform, Load (ETL) processes, where it allows to processing of the data with Spark and then populating Weaviate vector databases. One of its key features is its ability to automatically figure out the correct data type in Spark based on your Weaviate schema, making data transfer more straightforward. Another feature is that you can choose to vectorize data as you send it to Weaviate, or you can provide existing vectors. By default, Weaviate generates document IDs for new documents, but you can also supply your own IDs within the data frame. These capabilities can all be configured as options within the Spark Connector.
To start integrating Spark and Weaviate, you'll need to install two important components: the weaviate-client
Python package and the essential PySpark framework. You can easily get these dependencies by running the following command with pip3
:
pip3 install pyspark weaviate-client
To get the Weaviate Spark Connector, we can execute the following command in your terminal, which will download the JAR file that is used by the Spark Session:
curl <https://github.com/weaviate/spark-connector/releases/download/v1.2.8/spark-connector-assembly-1.2.8.jar> --output spark-connector-assembly-1.2.8.jar
Keep in mind that Java 8+ and Scala 2.12 are prerequisites for a seamless integration experience so please make sure that these components are installed on your system before proceeding. While here we demonstrate Spark's local operation using Docker, consider referring to the Apache Spark documentation or your cloud platform's resources for guidance on installation and deploying a Spark cluster in different environments, like EMR on AWS or Dataproc in GCP. Additionally, make sure to verify the compatibility of your chosen language runtime with your selected environment.
The way in which Spark and Weaviate work together is through the Spark Connector. This connector serves as a bridge, allowing data to flow from Spark to Weaviate. It's especially important for tasks like Extract, Transform, Load (ETL) processes, where it allows to processing of the data with Spark and then populate Weaviate vector databases. One of its key features is its ability to automatically figure out the correct data type in Spark based on your Weaviate schema, making data transfer more straightforward. Another feature is that you can choose to vectorize data as you send it to Weaviate, or you can provide existing vectors. By default, Weaviate generates document IDs for new documents, but you can also supply your own IDs within the data frame. These capabilities can all be configured as options within the Spark Connector.
In the next sections, we will dive into the practical implementation of the integration, showing the PySpark notebook that we can run in Jupyter with code snippets to guide us through each step of the implementation. In this case, we will be using the Sphere dataset – housing a robust 100k lines of data – in our Spark Session, and we will insert it into the running Weaviate dataset which will create embeddings by using the Transformers container.
To begin, we initialize the Spark Session using the SparkSession.builder
module. This code snippet configures the session with the necessary settings, including the specification of the spark-connector-assembly-1.2.8.jar
– the Weaviate Spark Connector JAR file. We set the session's master to local[*] and define the application name as weaviate
. The .getOrCreate()
function ensures the session is created or retrieved as needed. To maintain clarity, we suppress log messages with a level of "WARN."
from pyspark.sql import SparkSession
spark = (
SparkSession.builder.config(
"spark.jars",
"spark-connector-assembly-1.2.8.jar", # specify the spark connector JAR
)
.master("local[*]")
.appName("weaviate")
.getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")
Remember that in this case, the connector needs to be in the proper location to be utilized by the Spark Session.
Now we can proceed to load the dataset using the .load() function, specifying the format as JSON. This command fetches the data into a DataFrame named df, which is then displayed using .show().
df = spark.read.load("sphere.100k.jsonl", format="json")
df.show()
The next steps involve preparing the data for the integration with Weaviate. We first drop the vector
column from the DataFrame, as it's not needed for our integration purpose.
df = df.drop(*["vector"])
df.show()
To interact with Weaviate, we use the weaviate
Python package. The code initializes the Weaviate client, specifying the base URL and setting timeout configurations. We then delete any existing schema and proceed to create a new class named Sphere
with specific properties, including raw, sha, title, and url. The vectorizer
is set to text2vec-transformers
.
import weaviate
import json
# initiate the Weaviate client
client = weaviate.Client("<http://weaviate:8080>")
client.timeout_config = (3, 200)
# empty schema and create new schema
client.schema.delete_all()
client.schema.create_class(
{
"class": "Sphere",
"properties": [
{
"name": "raw",
"dataType": ["string"]
},
{
"name": "sha",
"dataType": ["string"]
},
{
"name": "title",
"dataType": ["string"]
},
{
"name": "url",
"dataType": ["string"]
},
],
"vectorizer":"text2vec-transformers"
}
)
Now we can start the process of writing data from Spark to Weaviate. The code renames the id column to uuid and uses the .write.format() function to specify the Weaviate format for writing. Various options, such as batchSize, scheme, host, id, and className, can be set to configure the write process. The .mode("append") ensures that only the append write mode is currently supported. Additionally, the code highlights that both batch operations and streaming writes are supported.
df.limit(1500).withColumnRenamed("id", "uuid").write.format("io.weaviate.spark.Weaviate") \\\\
.option("batchSize", 200) \\\\
.option("scheme", "http") \\\\
.option("host", "weaviate:8080") \\\\
.option("id", "uuid") \\\\
.option("className", "Sphere") \\\\
.mode("append").save()
Now we can conclude this hands-on section by showcasing how to query Weaviate for data insights. The code snippet demonstrates querying the Sphere class for title and raw properties, using the .get()
and .with_near_text()
functions. The concept parameter includes animals, and additional information like distance is requested. A limit of 5 results is set using .with_limit(5)
, and the query is executed with .do()
.
client.query\\\\
.get("Sphere", ["title","raw"])\\\\
.with_near_text({
"concepts": ["animals"]
})\\\\
.with_additional(["distance"])\\\\
.with_limit(5)\\\\
.do()
These guided steps provide a comprehensive view of the integration process, showcasing the seamless data transfer from Spark to Weaviate and enabling data analysis with enhanced insights.
In conclusion, the integration of Weaviate and PySpark represents the convergence of technologies to offer innovative solutions for data analysis and exploration. By integrating the capabilities of Weaviate, a semantic graph database, and PySpark, a versatile data processing framework, we enable new exciting possible applications to query and extract insights from our data.
Throughout this article, we started by explaining the Docker Compose environment, orchestrated the components, and introduced the Spark Connector, we set the stage for efficient data flow and analysis. The Spark Connector enables to transfer of data from Spark to Weaviate. Its flexibility in adapting to various data types and schema configurations showcased its significance in ETL processes and data interaction. Next, we continued with a hands-on exploration that guided us through the integration process, offering practical insights into initializing the Spark Session, loading and preparing data, configuring the Weaviate client, and orchestrating seamless data transfer.
In essence, the integration of Weaviate and PySpark not only simplifies data transfer but also unlocks enhanced data insights and analysis. This collaboration underscores the transformative potential of harnessing advanced technologies to extract meaningful insights from large datasets. As the realm of data analysis continues to evolve, the integration of Weaviate and PySpark emerges as a promising avenue for innovation and exploration.
Alan Bernardo Palacio is a data scientist and an engineer with vast experience in different engineering fields. His focus has been the development and application of state-of-the-art data products and algorithms in several industries. He has worked for companies such as Ernst and Young, and Globant, and now holds a data engineer position at Ebiquity Media helping the company to create a scalable data pipeline. Alan graduated with a Mechanical Engineering degree from the National University of Tucuman in 2015, participated as the founder of startups, and later on earned a Master's degree from the faculty of Mathematics at the Autonomous University of Barcelona in 2017. Originally from Argentina, he now works and resides in the Netherlands.