Search icon CANCEL
Subscription
0
Cart icon
Your Cart (0 item)
Close icon
You have no products in your basket yet
Arrow left icon
Explore Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Conferences
Free Learning
Arrow right icon

Weaviate and PySpark for LLM Similarity Search

Save for later
  • 12 min read
  • 12 Sep 2023

article-image

Introduction

Weaviate is gaining popularity as a semantic graph database, while PySpark is a well-established data processing framework used for handling large datasets efficiently.

The integration of Weaviate and Spark enables the processing of large volumes of data, which can be stored in unstructured blob storages like S3. This integration allows for batch processing to structure the data to suit specific requirements. Subsequently, it empowers users to perform similarity searches and build contexts for applications based on Large Language Models (LLMs).

In this article, we will explore how to integrate Weaviate and PySpark, with a particular emphasis on leveraging their capabilities for similarity searches using Large Language Models (LLMs).

Before we delve into the integration of Weaviate and PySpark, let's start with a brief overview. We will begin by seamlessly importing a subset of the Sphere dataset, which contains a substantial 100k lines of data, into our newly initiated Spark Session. This dataset will provide valuable insights and nuances, enhancing our understanding of the collaboration between Weaviate and PySpark. Let's get started.

Preparing the Docker Compose Environment

Before we delve into the integration of Weaviate and PySpark, let's take a closer look at the components we'll be working with. In this scenario, we will utilize Docker Compose to deploy Spark, Jupyter, Weaviate, and the Transformers container in a local environment. The Transformers container will be instrumental in creating embeddings.

To get started, we'll walk you through the process of setting up the Docker Compose environment, making it conducive for seamlessly integrating Weaviate and PySpark.

version: '3'

services:
  spark-master:
    image: bitnami/spark:latest
    hostname: spark-master
    environment:
      - INIT_DAEMON_STEP=setup_spark

  jupyter:
    build: .
    ports:
      - "8888:8888"
    volumes:
      - ./local_lake:/home/jovyan/work
      - ./notebooks:/home/jovyan/
    depends_on:
      - spark-master
    command: "start-notebook.sh --NotebookApp.token='' --NotebookApp.password=''"

  weaviate:
    image: semitechnologies/weaviate:latest
    restart: on-failure:0
    environment:
      QUERY_DEFAULTS_LIMIT: 20
      AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED: 'true'
      PERSISTENCE_DATA_PATH: "./data"
      DEFAULT_VECTORIZER_MODULE: text2vec-transformers
      ENABLE_MODULES: text2vec-transformers
      TRANSFORMERS_INFERENCE_API: <http://t2v-transformers:8080>
      CLUSTER_HOSTNAME: 'node1'
  t2v-transformers:
    image: semitechnologies/transformers-inference:sentence-transformers-multi-qa-MiniLM-L6-cos-v1
    environment:
      ENABLE_CUDA: 0 # set to 1 to enable
      # NVIDIA_VISIBLE_DEVICES: all # enable if running with CUDA

volumes:
  myvol:

This Docker Compose configuration sets up a few different services:

  1. spark-master: This service uses the latest Bitnami Spark image. It sets the hostname to "spark-master" and defines an environment variable for initialization.
  2. jupyter: This service is built from the current directory and exposes port 8888. It also sets up volumes to link the local "local_lake" directory to the working directory inside the container and the "notebooks" directory to the home directory of the container. It depends on the "spark-master" service and runs a command to start Jupyter Notebook with certain configurations.
  3. weaviate: This service uses the latest Weaviate image. It specifies some environment variables for configuration, like setting query defaults, enabling anonymous access, defining data persistence paths, and configuring vectorizers.
  4. t2v-transformers: This service uses a specific image for transformers vector embedding creation. It also sets environment variables, including one for enabling CUDA if needed.

Additionally, there is a volume defined named "myvol" for potential data storage. This Docker Compose configuration essentially sets up an environment where Spark, Jupyter, Weaviate, and Transformers can work together, each with its specific configuration and dependencies.

Enabling Seamless Integration with the Spark Connector

The way in which Spark and Weaviate work together is through the Spark Connector. This connector serves as a bridge, allowing data to flow from Spark to Weaviate. It's especially important for tasks like Extract, Transform, Load (ETL) processes, where it allows to processing of the data with Spark and then populating Weaviate vector databases. One of its key features is its ability to automatically figure out the correct data type in Spark based on your Weaviate schema, making data transfer more straightforward. Another feature is that you can choose to vectorize data as you send it to Weaviate, or you can provide existing vectors. By default, Weaviate generates document IDs for new documents, but you can also supply your own IDs within the data frame. These capabilities can all be configured as options within the Spark Connector.

To start integrating Spark and Weaviate, you'll need to install two important components: the weaviate-client Python package and the essential PySpark framework. You can easily get these dependencies by running the following command with pip3:

pip3 install pyspark weaviate-client

To get the Weaviate Spark Connector, we can execute the following command in your terminal, which will download the JAR file that is used by the Spark Session:

curl <https://github.com/weaviate/spark-connector/releases/download/v1.2.8/spark-connector-assembly-1.2.8.jar> --output spark-connector-assembly-1.2.8.jar

Keep in mind that Java 8+ and Scala 2.12 are prerequisites for a seamless integration experience so please make sure that these components are installed on your system before proceeding. While here we demonstrate Spark's local operation using Docker, consider referring to the Apache Spark documentation or your cloud platform's resources for guidance on installation and deploying a Spark cluster in different environments, like EMR on AWS or Dataproc in GCP. Additionally, make sure to verify the compatibility of your chosen language runtime with your selected environment.

The way in which Spark and Weaviate work together is through the Spark Connector. This connector serves as a bridge, allowing data to flow from Spark to Weaviate. It's especially important for tasks like Extract, Transform, Load (ETL) processes, where it allows to processing of the data with Spark and then populate Weaviate vector databases. One of its key features is its ability to automatically figure out the correct data type in Spark based on your Weaviate schema, making data transfer more straightforward. Another feature is that you can choose to vectorize data as you send it to Weaviate, or you can provide existing vectors. By default, Weaviate generates document IDs for new documents, but you can also supply your own IDs within the data frame. These capabilities can all be configured as options within the Spark Connector.

Unlock access to the largest independent learning library in Tech for FREE!
Get unlimited access to 7500+ expert-authored eBooks and video courses covering every tech area you can think of.
Renews at R$50/month. Cancel anytime

In the next sections, we will dive into the practical implementation of the integration, showing the PySpark notebook that we can run in Jupyter with code snippets to guide us through each step of the implementation. In this case, we will be using the Sphere dataset – housing a robust 100k lines of data – in our Spark Session, and we will insert it into the running Weaviate dataset which will create embeddings by using the Transformers container.

Initializing the Spark Session and Loading Data

To begin, we initialize the Spark Session using the SparkSession.builder module. This code snippet configures the session with the necessary settings, including the specification of the spark-connector-assembly-1.2.8.jar – the Weaviate Spark Connector JAR file. We set the session's master to local[*] and define the application name as weaviate. The .getOrCreate() function ensures the session is created or retrieved as needed. To maintain clarity, we suppress log messages with a level of "WARN."

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.config(
        "spark.jars",
        "spark-connector-assembly-1.2.8.jar",  # specify the spark connector JAR
    )
    .master("local[*]")
    .appName("weaviate")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("WARN")

Remember that in this case, the connector needs to be in the proper location to be utilized by the Spark Session.
Now we can proceed to load the dataset using the .load() function, specifying the format as JSON. This command fetches the data into a DataFrame named df, which is then displayed using .show().
df = spark.read.load("sphere.100k.jsonl", format="json")
df.show()
weaviate-and-pyspark-for-llm-similarity-search-img-0

The next steps involve preparing the data for the integration with Weaviate. We first drop the vector column from the DataFrame, as it's not needed for our integration purpose.

df = df.drop(*["vector"])
df.show()
weaviate-and-pyspark-for-llm-similarity-search-img-1

To interact with Weaviate, we use the weaviate Python package. The code initializes the Weaviate client, specifying the base URL and setting timeout configurations. We then delete any existing schema and proceed to create a new class named Sphere with specific properties, including raw, sha, title, and url. The vectorizer is set to text2vec-transformers.

import weaviate
import json

# initiate the Weaviate client
client = weaviate.Client("<http://weaviate:8080>")
client.timeout_config = (3, 200)

# empty schema and create new schema
client.schema.delete_all()

client.schema.create_class(
    {
        "class": "Sphere",
        "properties": [
            {
                "name": "raw",
                "dataType": ["string"]
            },
            {
                "name": "sha",
                "dataType": ["string"]
            },
            {
                "name": "title",
                "dataType": ["string"]
            },
            {
                "name": "url",
                "dataType": ["string"]
            },
        ],
     "vectorizer":"text2vec-transformers"
    }
)

Now we can start the process of writing data from Spark to Weaviate. The code renames the id column to uuid and uses the .write.format() function to specify the Weaviate format for writing. Various options, such as batchSize, scheme, host, id, and className, can be set to configure the write process. The .mode("append") ensures that only the append write mode is currently supported. Additionally, the code highlights that both batch operations and streaming writes are supported.

df.limit(1500).withColumnRenamed("id", "uuid").write.format("io.weaviate.spark.Weaviate") \\\\
    .option("batchSize", 200) \\\\
    .option("scheme", "http") \\\\
    .option("host", "weaviate:8080") \\\\
    .option("id", "uuid") \\\\
    .option("className", "Sphere") \\\\
    .mode("append").save()

Querying Weaviate for Data Insights

Now we can conclude this hands-on section by showcasing how to query Weaviate for data insights. The code snippet demonstrates querying the Sphere class for title and raw properties, using the .get() and .with_near_text() functions. The concept parameter includes animals, and additional information like distance is requested. A limit of 5 results is set using .with_limit(5), and the query is executed with .do().

client.query\\\\
    .get("Sphere", ["title","raw"])\\\\
    .with_near_text({
        "concepts": ["animals"]
    })\\\\
    .with_additional(["distance"])\\\\
    .with_limit(5)\\\\
    .do()
weaviate-and-pyspark-for-llm-similarity-search-img-2

These guided steps provide a comprehensive view of the integration process, showcasing the seamless data transfer from Spark to Weaviate and enabling data analysis with enhanced insights.

Conclusion

In conclusion, the integration of Weaviate and PySpark represents the convergence of technologies to offer innovative solutions for data analysis and exploration. By integrating the capabilities of Weaviate, a semantic graph database, and PySpark, a versatile data processing framework, we enable new exciting possible applications to query and extract insights from our data.

Throughout this article, we started by explaining the Docker Compose environment, orchestrated the components, and introduced the Spark Connector, we set the stage for efficient data flow and analysis. The Spark Connector enables to transfer of data from Spark to Weaviate. Its flexibility in adapting to various data types and schema configurations showcased its significance in ETL processes and data interaction. Next, we continued with a hands-on exploration that guided us through the integration process, offering practical insights into initializing the Spark Session, loading and preparing data, configuring the Weaviate client, and orchestrating seamless data transfer.

In essence, the integration of Weaviate and PySpark not only simplifies data transfer but also unlocks enhanced data insights and analysis. This collaboration underscores the transformative potential of harnessing advanced technologies to extract meaningful insights from large datasets. As the realm of data analysis continues to evolve, the integration of Weaviate and PySpark emerges as a promising avenue for innovation and exploration.

Author Bio

Alan Bernardo Palacio is a data scientist and an engineer with vast experience in different engineering fields. His focus has been the development and application of state-of-the-art data products and algorithms in several industries. He has worked for companies such as Ernst and Young, and Globant, and now holds a data engineer position at Ebiquity Media helping the company to create a scalable data pipeline. Alan graduated with a Mechanical Engineering degree from the National University of Tucuman in 2015, participated as the founder of startups, and later on earned a Master's degree from the faculty of Mathematics at the Autonomous University of Barcelona in 2017. Originally from Argentina, he now works and resides in the Netherlands.

LinkedIn