DEV Community

Cover image for Stress Testing Vector Databases: Dockerizing a Flask App with Chroma DB, pgvector, and Weaviate Running Locally (Part 1)
Md Mehrajul Islam
Md Mehrajul Islam

Posted on

Stress Testing Vector Databases: Dockerizing a Flask App with Chroma DB, pgvector, and Weaviate Running Locally (Part 1)

Introduction

In the world of modern AI applications, handling and searching large volumes of data through vector embeddings has become a critical task. Vector databases are specifically designed for this, providing efficient ways to store and search embeddings. But with several options available, how do you know which one performs best under real-world conditions?

In this two-part blog series, I'll dive into a hands-on comparison of three popular vector databases: Chroma DB, pgvector, and Weaviate. The goal is to stress test each database to determine how well it handles the workload in terms of embedding storage and retrieval efficiency.

You can also check out the Git Repo for this test:
https://github.com/codermehraj/vectordb-stress-test

IN PART ONE
we’ll focus on building the infrastructure needed for this comparison. Specifically, I’ll walk you through how to dockerize a Flask app that will:

  • Read a PDF file
  • Split it into smaller chunks (embeddings)
  • Store those embeddings in each of the three vector databases running locally
  • Provide endpoints to query and find the closest matching embeddings.

Summary of the service made in step 1 of this blog

IN PART TWO
we’ll focus on the stress testing itself, where we’ll simulate various workloads to see how each database holds up under pressure.

Now, let’s jump into Part 1, where we set up our Dockerized Flask app and integrate it with Chroma DB, pgvector, and Weaviate.

Prerequisites and Setup

Before we dive into the code, let’s make sure we have the necessary tools in place to get started. For this project, you’ll need the following installed:

  • Docker: We will be containerizing the Flask app along with the vector databases, so Docker is essential for creating isolated environments.
  • Python 3.9: The core of the application will be written in Python using Flask.
  • Flask: A lightweight web framework that will allow us to build the endpoints for storing and querying embeddings.

Here’s how you can set up your environment:

1. Install Docker

If you don’t already have Docker installed, you can get it from the official Docker website. Follow the installation instructions for your operating system.

2. Set Up a Flask Application

We will use Flask to create an API that interacts with the vector databases. Start by setting up a basic Flask project. Create a new folder (e.g: flask-service) for the project, and in that folder, create a requirements.txt file with the following dependencies:

Flask
psycopg2
chromadb
weaviate-client
transformers
torch
pymupdf
Enter fullscreen mode Exit fullscreen mode

Next, create a Python file (let’s call it app.py) to define the structure of the Flask app:

from flask import Flask, request, jsonify
import logging

app = Flask(__name__)
logging.basicConfig(level=logging.INFO)

@app.route('/')
def index():
    return "Welcome to the Vector DB Stress Test!", 200


if __name__ == '__main__':   
    logging.info("Starting the Vector DB Stress Test service")     
    app.run(host='0.0.0.0', port=5001, debug=True)
Enter fullscreen mode Exit fullscreen mode

This sets up a simple Flask app with one route (a home page) to check if everything is running correctly. I have also added logging for better observability.

3. Dockerize the Flask App

Once we have the basics in place, the next step is to Dockerize the app so that it can run in a container. To do this, we need to create a Dockerfile that will define how the app is built.

Create a file named Dockerfile in your project folder with the following contents:

# Use an official Python runtime as the base image
FROM python:3.9-slim

# Set the working directory in the container
WORKDIR /app

# Install any necessary packages
RUN pip install --no-cache-dir -r requirements.txt

# Copy the current directory contents into the container at /app
COPY . /app

# Make port 5000 available to the world outside this container
EXPOSE 5001

# Define the command to run the app
CMD ["python", "app.py"]
Enter fullscreen mode Exit fullscreen mode

This Dockerfile creates a container for the Flask app, installs the necessary Python dependencies, and exposes port 5000 so you can access the app externally.

4. Running the Flask App in Docker

Now that we have the Dockerfile, we can build and run the Flask app in Docker. In your terminal, navigate to your project folder and run the following commands:

# Build the Docker image
docker build -t flask-vector-db .

# Run the Flask app in a container
docker run -p 5001:5001 flask-vector-db
Enter fullscreen mode Exit fullscreen mode

You should now see your Flask app running in the terminal, accessible at http://localhost:5001. You may also run the server locally by using the command python app.py

Additional Setup: Handling File Uploads and Text Chunking

Before we start integrating the vector databases, we need to handle some additional setups for processing files and generating embeddings. We’ll create a temporary folder for storing uploaded files, define a method to chunk the text and project our embeddings from 384 dimensions to 512 dimensions. So, we need the following steps:

1. Loading the Pre-trained Model and Tokenizer

We begin by loading a pre-trained model and tokenizer from the transformers library. In this case, we are using the sentence-transformers/all-MiniLM-L6-v2 model, which converts text into vector embeddings. The model and tokenizer will help us process the text and generate embeddings.

from transformers import AutoTokenizer, AutoModel
import torch

# Load the pre-trained model and tokenizer
tokenizer = AutoTokenizer.from_pretrained('sentence-transformers/all-MiniLM-L6-v2')
model = AutoModel.from_pretrained('sentence-transformers/all-MiniLM-L6-v2')
Enter fullscreen mode Exit fullscreen mode

Here,

  • AutoTokenizer tokenizes input text into a format that the model can understand.
  • AutoModel loads the pre-trained transformer model that converts text into embeddings.

2. Defining a Projection Matrix

Next, we define a projection matrix that will transform the 384-dimensional embeddings generated by the model into 512-dimensional embeddings. This step ensures that the embeddings have a consistent shape before they are stored in the vector database.

import numpy as np

# Define a projection matrix to transform the 384-dim embedding to 512-dim
projection_matrix = np.random.rand(384, 512)
Enter fullscreen mode Exit fullscreen mode

Here,

  • A projection matrix is created using np.random.rand(). It randomly initializes the matrix with dimensions 384x512. This will project the 384-dimensional embeddings produced by the transformer model into a 512-dimensional space, which may be useful for compatibility or performance reasons in downstream tasks.

3. Creating a Temporary Folder for Uploaded Files

We create a temporary folder to store files that users upload. This folder will be used to store PDFs and other files while they are being processed by the Flask app.

import os

# Create a temporary folder to store uploaded files
try:
    path = os.path.dirname(os.path.abspath(__file__))  # Get the current file's path
    upload_folder = os.path.join(path.replace("/file_folder", ""), "tmp")  # Set the 'tmp' folder as the upload folder
    os.makedirs(upload_folder, exist_ok=True)  # Create the folder if it doesn't already exist
    app.config['uploads'] = upload_folder  # Configure the Flask app to use this folder for uploads
except Exception as e:
    app.logger.info('An error occurred while creating temp folder')
    app.logger.error('Exception occurred: {}'.format(e))
Enter fullscreen mode Exit fullscreen mode

Here,

  • We use os.path.abspath(__file__) to get the current file's directory and create a tmp folder within it.
  • os.makedirs() ensures the folder is created, and if it already exists, no error is raised.
  • The Flask app’s configuration is updated to use this folder for file uploads.

4. Function to Split Text into Chunks

The chunk_text function is responsible for splitting a given text into smaller chunks. This is particularly useful for long text documents, where processing smaller portions of the text (chunks) is necessary for memory efficiency.

def chunk_text(text, chunk_size):
    words = text.split()  # Split the text into words
    chunks = []

    for i in range(0, len(words), chunk_size):
        chunk = " ".join(words[i:i + chunk_size])  # Create text chunks
        chunks.append(chunk)

    return chunks
Enter fullscreen mode Exit fullscreen mode

Here,

  • The chunk_text function splits the text into smaller chunks by separating it into words. The chunk_size parameter controls how many words are included in each chunk.
  • This method ensures that long text documents are processed in manageable portions for generating embeddings.

5. Function to Generate Embeddings and Project to 512 Dimensions

The generate_embedding function takes in text, processes it through the pre-trained model to generate an embedding, and then projects that embedding from 384 dimensions to 512 dimensions using the previously defined projection matrix.

def generate_embedding(text):
    inputs = tokenizer(text, return_tensors='pt', max_length=512, truncation=True)
    with torch.no_grad():
        embeddings = model(**inputs).last_hidden_state.mean(dim=1)
        embedding = embeddings.squeeze().numpy()

    # Project the 384-dim embedding to 512-dim using the projection matrix
    projected_embedding = np.dot(embedding, projection_matrix)

    return projected_embedding.tolist()
Enter fullscreen mode Exit fullscreen mode

Here,

  • Tokenization: The input text is tokenized using tokenizer(). The result is passed as input to the pre-trained model.
  • Embedding Generation: The model generates embeddings for the input, which is extracted as the mean of the last hidden state.
  • Projection: The generated 384-dimensional embeddings are projected into a 512-dimensional space using matrix multiplication with the projection matrix.
  • Returning the Result: The function returns the projected embedding as a list, making it suitable for storage in databases.

Integrating Vector Databases

Now that our Flask app is set up and running in Docker, let’s dive into the core part of our project—integrating the vector databases. We’ll be using Chroma DB, pgvector, and Weaviate to handle and store the embeddings. Each database will have its own endpoint for processing and querying vector data.

1. Chroma DB Integration

First, we’ll start with Chroma DB. Chroma DB is a high-performance, open-source vector database built for AI applications. It allows us to efficiently store and query vector embeddings.

Setting up Chroma DB

we will initialize the Chroma client with a persistent storage path.

chroma_client = chromadb.PersistentClient(
    path="test",  # Path to store Chroma DB data
    settings=Settings(),  # Use default settings
    tenant=DEFAULT_TENANT,
    database=DEFAULT_DATABASE
)

# Create or get a collection named 'embeddings'
collection = chroma_client.get_or_create_collection("embeddings")
Enter fullscreen mode Exit fullscreen mode

Explanation:

  • chroma_client: Initializes the Chroma DB client with persistent storage.
  • collection: A collection to store document embeddings. We either get an existing collection or create a new one named 'embeddings'.

1. PDF Upload and Embedding Extraction Endpoint (/upload/chroma)

This API allows users to upload a PDF document, extract its text, and generate embeddings using Chroma DB.

@app.route('/upload/chroma', methods=['POST'])
def upload_pdf_chroma():
    """
    API to upload a PDF document and extract embeddings using Chroma DB.
    Expects a PDF file in the 'pdf' form-data field.
    """
    if 'pdf' not in request.files:
        return jsonify({"error": "No pdf given"}), 400

    file = request.files['pdf']

    if file.filename == '':
        return jsonify({"error": "No selected file"}), 400

    try:
        # Save the PDF file to disk
        file_path = os.path.join(app.config.get('uploads'), file.filename)
        file.save(file_path)

        # Open the PDF and extract text
        doc = pymupdf.open(file_path)
        full_text = ""
        for page in doc:
            full_text += page.get_text()

        # Chunk the text and generate embeddings
        chunks = chunk_text(full_text, chunk_size=50)
        for i, chunk in enumerate(chunks):
            embedding = generate_embedding(chunk)
            embedding_id = str(uuid.uuid4())

            # Add the embedding to Chroma DB
            collection.add(
                embeddings=[embedding],
                metadatas=[{"fileName": file.filename, "chunkNo": i + 1}],
                ids=[embedding_id],
                documents=[chunk]
            )
        return jsonify({"message": "PDF uploaded and embeddings extracted successfully"}), 201

    except Exception as e:
        return jsonify({"error": str(e)}), 500
Enter fullscreen mode Exit fullscreen mode

Here,

  • Upload File: The API checks for a PDF file in the request. If missing, it returns an error.
  • Save File: The uploaded PDF is saved to a temporary folder.
  • Extract Text: Text is extracted from the PDF using pymupdf.
  • Chunk Text: The full text is split into smaller chunks for easier embedding generation.
  • Generate Embeddings: Each chunk of text is passed through the embedding generator, creating a vector representation.
  • Store in Chroma DB: The embeddings are stored in the 'embeddings' collection in Chroma DB, along with metadata like the file name and chunk number.

2. Search Query Endpoint (/search/chroma)

This API allows users to search for the top N embeddings that closely match a query.

@app.route('/search/chroma', methods=['POST'])
def search_query_chroma():
    """
    API to search for the top N embeddings that match a given query embedding.
    Expects a text in the 'query' form-data field.
    """
    data = request.get_json()

    if not data:
        return jsonify({"error": "Invalid JSON payload"}), 400

    query = data.get('query')
    top_k = data.get('top_k', 2)  # Default to top 2 matches

    if not query:
        return jsonify({"error": "Missing 'query' parameter"}), 400

    try:
        # Generate an embedding for the query text
        query_embedding = generate_embedding(query)

        # Perform a similarity search in Chroma DB
        results = collection.query(
            query_embeddings=[query_embedding],
            n_results=top_k,
            include=['documents', 'metadatas']
        )

        # Extract and format results
        matched_metadatas = results['metadatas']
        matched_documents = results['documents']

        data = []
        for meta, doc in zip(matched_metadatas, matched_documents):
            data.append({"metadata": meta, "document": doc})

        return jsonify({"matches": data}), 200

    except Exception as e:
        return jsonify({"error": str(e)}), 500
Enter fullscreen mode Exit fullscreen mode

Here,

  • Get Query: The API receives a JSON payload with the query text and the number of top results (top_k).
  • Generate Query Embedding: The query text is transformed into an embedding using the same embedding generation process as before.
  • Perform Similarity Search: Chroma DB searches for the most similar embeddings to the query embedding, returning the top N matches.
  • Return Matches: The results include the matched documents and their associated metadata, which are returned in the response.

2. pgvector Integration

In this step we will integrate pgvector into your application using Docker, Flask, and PostgreSQL.

1. Docker Setup for pgvector

We will manage the PostgreSQL database (with pgvector extension) and Flask using Docker Compose. First, you need a directory (e.g: postgres) for storing the DockerFile and init_pgvector.sql of the Postgres service. The init_pgvector.sql file will contain the initial schema and will also add the vector extension in the db. The init_pgvector.sql will contain:


CREATE EXTENSION IF NOT EXISTS vector;

CREATE TABLE items (id bigserial PRIMARY KEY, document TEXT, embedding vector(512), FileName TEXT, chunkNo INT);

Enter fullscreen mode Exit fullscreen mode

And then The DockerFile will contain the following:


FROM postgres:17

# Install dependencies for building pgvector
RUN apt-get update && apt-get install -y \
    git \
    make \
    gcc \
    postgresql-server-dev-17

# Clone the pgvector repository and install the extension
RUN cd /tmp && \
    git clone --branch v0.7.4 https://github.com/pgvector/pgvector.git && \
    cd pgvector && \
    make && \
    make install

# Set environment variables for PostgreSQL user, password, and database
ENV POSTGRES_USER=myuser
ENV POSTGRES_PASSWORD=mypassword
ENV POSTGRES_DB=mydb

# Copy initialization SQL scripts
# Ensure the init_pgvector.sql script is in the same directory as the Dockerfile
COPY ./init_pgvector.sql /docker-entrypoint-initdb.d/

# Clean up the build environment to reduce the image size
RUN rm -rf /tmp/pgvector && apt-get clean && rm -rf /var/lib/apt/lists/*

# Expose PostgreSQL port
EXPOSE 5432

Enter fullscreen mode Exit fullscreen mode

Lastly, Below is the necessary configuration in the docker-compose.yml file to set up the services.

version: "3"

services:
  postgres:
    build:
      context: ./postgres  # Update this to point to the directory where the Dockerfile is located
      dockerfile: DockerFile
    ports:
      - "5432:5432"
    volumes:
      - ./data:/var/lib/postgresql/data
    environment:
      POSTGRES_USER: myuser
      POSTGRES_PASSWORD: mypassword
      POSTGRES_DB: mydb
    networks:
      - mynetwork

  flask-service:
    build:
      context: ./flask-service      
      dockerfile: DockerFile
    ports:
      - "5001:5001"
    volumes:
      - ./flask-service:/test
    networks:
      - mynetwork
    restart: always
    environment:
      - PG_URL=postgres      
    depends_on:
      - postgres

volumes:
  flask-service:

networks:
  mynetwork:
    driver: bridge
Enter fullscreen mode Exit fullscreen mode

Here,

  • PostgreSQL Service:

    • A PostgreSQL service is set up with a user (myuser), password (mypassword), and database (mydb).
    • The exposed port is 5432, which is mapped from the container to the host.
    • Data is persisted via the ./data volume.
  • Flask Service:

    • The flask-service will interact with PostgreSQL. It depends on PostgreSQL and communicates over the same network.
    • The environment variable PG_URL=postgres is used to connect Flask to the PostgreSQL service.

2. Connecting to pgvector from Flask

To establish a connection to the PostgreSQL database (with pgvector) from Flask, we use the following code snippet:

# The host of the PostgreSQL server (e.g., "localhost", "postgres", "service_name")
pg_host = os.getenv("PG_URL", "localhost") 
print("Connecting to PostgreSQL at", pg_host)

pgvector_conn = psycopg2.connect(
    user="myuser",
    password="mypassword",
    host=pg_host,
    port=5432,  # The port exposed in docker-compose.yml
    database="mydb"
)
Enter fullscreen mode Exit fullscreen mode
  • The psycopg2 library is used to connect to PostgreSQL, and pgvector-specific queries can now be executed.

3. PDF Upload and Embedding Extraction Endpoint (/upload/pgvector)

This endpoint allows users to upload a PDF, extract its text, and generate embeddings, which are stored in the pgvector-enabled PostgreSQL database.

@app.route('/upload/pgvector', methods=['POST'])
def upload_pdf_pgvector():
    """
    API to upload a PDF document and extract embeddings using pgvector DB.
    Expects a PDF file in the 'pdf' form-data field.
    """
    if 'pdf' not in request.files:
        logging.error("No pdf given")
        return jsonify({"error": "No pdf given"}), 400

    file = request.files['pdf']

    if file.filename == '':
        logging.error("No selected file")
        return jsonify({"error": "No selected file"}), 400

    try:
        # Save the PDF file to disk
        logging.info("Uploading PDF file: " + file.filename)
        file_path = os.path.join(app.config.get('uploads'), file.filename)
        file.save(file_path)

        doc = pymupdf.open(file_path)

        # Extract text from the PDF
        full_text = ""
        for page in doc:
            text = page.get_text()
            full_text += text

        short_text = " ".join(full_text.split()[:20]) + "..."
        logging.info("Full text extracted from PDF: " + short_text)

        # Chunk the text into smaller parts
        chunk_size = 50
        chunks = chunk_text(full_text, chunk_size)

        for i, chunk in enumerate(chunks):
            logging.info("Embedding Chunk: " + chunk[:50])

            # Generate embedding for the text chunk
            embedding = generate_embedding(chunk)

            # Add the embedding to pgvector
            pg_cursor = pgvector_conn.cursor()
            pg_cursor.execute(
                "INSERT INTO items (document, embedding, FileName, chunkNo) VALUES (%s, %s, %s, %s)",
                (chunk, embedding, file.filename, i + 1)
            )
            pg_cursor.close()

        return jsonify({"message": "PDF uploaded and embeddings extracted successfully"}), 201

    except Exception as e:
        logging.error("Error while storing embedding in pgvector: " + str(e))
        return jsonify({"error": str(e)}), 500
Enter fullscreen mode Exit fullscreen mode

Here,

  • File Upload: The endpoint accepts a PDF file from the form data.
  • Text Extraction: Extracts text from the PDF file.
  • Text Chunking: Splits the text into smaller chunks (e.g., 50-word chunks).
  • Generate Embedding: Embeddings are generated for each chunk of text.
  • Store in pgvector: Embeddings are inserted into the PostgreSQL database using pgvector for vector-based search.

4. Search Query Endpoint (/search/pgvector)

This endpoint allows users to search for the top N similar embeddings in pgvector based on a query text.

@app.route('/search/pgvector', methods=['POST'])
def search_query_pgvector():
    """
    API to search for the top N embeddings that match a given query embedding.
    Expects a text in the 'query' form-data field.
    """
    data = request.get_json()

    if not data:
        logging.error("Invalid JSON payload")
        return jsonify({"error": "Invalid JSON payload"}), 400

    query = data.get('query')
    top_k = data.get('top_k', 2)  # Default to top 2 matches

    if not query:
        logging.error("Missing 'query' parameter")
        return jsonify({"error": "Missing 'query' parameter"}), 400

    try:
        # Generate embedding for the query text
        query_embedding = generate_embedding(query)

        pg_cursor = pgvector_conn.cursor()

        # Perform cosine similarity search
        pg_cursor.execute(
            """SELECT id, document, FileName, chunkNo, 1 - (embedding <=> %s::vector) AS cosine_similarity
               FROM items
               ORDER BY cosine_similarity DESC LIMIT %s""",
            (query_embedding, top_k)
        )

        results = []
        for row in pg_cursor.fetchall():
            results.append({
                "id": row[0],
                "document": row[1],
                "FileName": row[2],
                "chunkNo": row[3],
                "cosine_similarity": row[4]
            })

        pg_cursor.close()

        return jsonify({"matches": results}), 200

    except Exception as e:
        return jsonify({"error": str(e)}), 500
Enter fullscreen mode Exit fullscreen mode

Here,

  • Query Embedding: The provided query text is transformed into an embedding.
  • Cosine Similarity Search: A cosine similarity search is performed on the embeddings stored in pgvector.
  • Return Matches: The top N matching embeddings are returned, along with their metadata (e.g., file name, chunk number).

3. Setting up Weaviate

Weaviate is a vector search engine that allows for storing and querying data based on similarity. Now we’ll cover how to set up Weaviate using Docker Compose, connect it with a Flask service, and implement API endpoints for uploading documents and querying similar embeddings.

Step 1: Docker Compose Configuration

Make sure your docker-compose.yml file includes the Weaviate service. Here’s the relevant section for Weaviate:

weaviate:
  command:
  - --host
  - 0.0.0.0
  - --port
  - '8080'
  - --scheme
  - http
  image: cr.weaviate.io/semitechnologies/weaviate:1.26.5
  ports:
  - 8080:8080
  - 50051:50051
  volumes:
  - weaviate_data:/var/lib/weaviate
  restart: on-failure:0
  environment:
    QUERY_DEFAULTS_LIMIT: 25
    AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED: 'true'
    PERSISTENCE_DATA_PATH: '/var/lib/weaviate'
    DEFAULT_VECTORIZER_MODULE: 'none'
    ENABLE_API_BASED_MODULES: 'true'
    CLUSTER_HOSTNAME: 'node1'
  networks:
    - mynetwork
Enter fullscreen mode Exit fullscreen mode
  • Ports: Exposes Weaviate on port 8080.
  • Volumes: Stores data persistently in the specified volume.
  • Environment Variables: Configure Weaviate’s behavior, such as enabling anonymous access and setting the persistence data path.

Step 2: Connecting to Weaviate

In your Flask application, you need to connect to the Weaviate instance. Use the following code to establish the connection:

import os
import time
import logging
import weaviate

weaviate_client = None

weaviate_host = os.getenv("WEAVIATE_URL", "localhost")  # Default to localhost or get from ENV
logging.basicConfig(level=logging.INFO)

try:
    weaviate_client = weaviate.connect_to_local(host=weaviate_host)
    logging.info("Weaviate client connected successfully")
except Exception as e:
    logging.error("[Retrying] Error connecting to Weaviate client:", e)
    time.sleep(5)
    weaviate_client = weaviate.connect_to_local(host=weaviate_host)
    logging.info("Weaviate client connected successfully")

Enter fullscreen mode Exit fullscreen mode
  • Error Handling: The connection attempts to reconnect if it fails initially.
  • Logging: Log connection status for debugging purposes.
  • Create client: It first creates a connection to the host URL that we have prepared

Step 3: Creating a Collection

You need to create a collection in Weaviate to store your documents. Use the following code:

try:
    weaviate_collection = weaviate_client.collections.create(
        name="DocumentSearch",
        properties=[
            Property(name="document", data_type=DataType.TEXT),
            Property(name="fileName", data_type=DataType.TEXT),
            Property(name="chunkNo", data_type=DataType.INT),
            # Uncomment if you want to store vectors
            # Property(name="vector", data_type=DataType.NUMBER_ARRAY),
        ]
    )
    logging.info("Collection created successfully")
except Exception as e:
    if "Collection already exists" in str(e):
        weaviate_collection = weaviate_client.collections.get(name="DocumentSearch")
        logging.info("Collection already exists")
Enter fullscreen mode Exit fullscreen mode
  • Properties: Defines the schema for the collection. You can store document text, file names, and chunk numbers.
  • Create Collection: Then it creates a collection named DocumentSearch where document, fileName & chunkNo are the fields. If the collection exists it takes the reference of the collection created before.

Step 4: Uploading PDF Documents

Implement an endpoint to upload PDF documents and store their embeddings in Weaviate:

@app.route('/upload/weviate', methods=['POST'])
def upload_pdf_weviate():
    """
    API to upload a PDF document and extract embeddings using Weviate DB.
    Expects a PDF file in the 'pdf' form-data field.
    """
    if 'pdf' not in request.files:
        logging.error("No pdf given")
        return jsonify({"error": "No pdf given"}), 400

    file = request.files['pdf']

    if file.filename == '':
        logging.error("No selected file")
        return jsonify({"error": "No selected file"}), 400

    try:
        # Save the PDF file to disk
        logging.info("Uploading PDF file:")
        logging.info(file.filename)

        file_path = os.path.join(app.config.get('uploads'), file.filename)
        file.save(file_path)

        logging.info("PDF uploaded successfully")

        doc = pymupdf.open(file_path)

        # Extract text from the PDF
        full_text = ""
        for page in doc:
            text = page.get_text()
            full_text += text

        short_text = " ".join(full_text.split()[:20]) + "..."
        logging.info("Full text extracted from PDF:")
        logging.info(short_text)

        # Chunk the text into smaller parts
        chunk_size = 50
        chunks = chunk_text(text, chunk_size)

        for i, chunk in enumerate(chunks):
            short_text = str(i + 1) + ") ".join(chunk.split()[:10]) + "..."
            logging.info("Embedding Chunk: ")
            logging.info(short_text)

            # Generate an embedding for the text
            embedding = generate_embedding(chunk)

            # Add the embedding to Weviate DB
            try:
                weaviate_collection = weaviate_client.collections.get(name="DocumentSearch")
                weaviate_collection.data.insert({"document": chunk, "fileName": file.filename, "chunkNo": i + 1}, vector=embedding)

                logging.info("Embedding added successfully for chunk")
            except Exception as e:
                logging.error("ERROR WHILE STORING IN WEVIATE => ", str(e))
                return jsonify({"error": str(e)}), 500        

        return jsonify({"message": "PDF uploaded and embeddings extracted successfully"}), 201

    except Exception as e:
        logging.error("ERROR WHILE STORING EMBEDDING FROM PDF => ", str(e))
        return jsonify({"error": str(e)}), 500
Enter fullscreen mode Exit fullscreen mode
  • File Handling: Saves the uploaded PDF and extracts text for processing.
  • Chunking: Splits the text into smaller parts for embedding and storage.
  • Embedding Storage: Inserts the chunk along with its metadata into the Weaviate collection.

Step 5: Searching for Similar Embeddings

Create an endpoint to search for similar embeddings based on a query:

@app.route('/search/weviate', methods=['POST'])
def search_query_weviate():
    """
    API to search for the top N embeddings that match a given query embedding.
    Expects a text in the 'query' form-data field.
    """
    data = request.get_json()

    if not data:
        logging.error("Invalid JSON payload")
        return jsonify({"error": "Invalid JSON payload"}), 400

    query = data.get('query')
    top_k = data.get('top_k', 2)

    logging.info("Received query:", query)

    if not query:
        logging.error("Missing 'query' parameter")
        return jsonify({"error": "Missing 'query' parameter"}), 400

    try:
        # Generate an embedding for the query text
        query_embedding = generate_embedding(query)

        logging.info("Generated embedding for query")
        logging.info("Searching for similar embeddings in DB")

        # Perform a similarity search in Weviate DB
        weaviate_collection = weaviate_client.collections.get(name="DocumentSearch")

        result = weaviate_collection.query.near_vector(
            near_vector=query_embedding,
            limit=top_k,
            return_metadata=MetadataQuery(distance=True)
        )

        results = []

        for o in result.objects:
            results.append({
                "properties": o.properties,
                "distance": o.metadata.distance
            })

        logging.info("DB QUERY SUCCESSFUL")

        return jsonify({"matches": results}), 200

    except Exception as e:
        logging.error("ERROR WHILE SEARCHING IN WEVIATE => ", str(e))
        return jsonify({"error": str(e)}), 500
Enter fullscreen mode Exit fullscreen mode

Here,

  • Query Handling: Takes a query from the user, generates its embedding, and performs a similarity search in the Weaviate collection.
  • Response Formatting: Returns matched results along with their properties and distances.

Conclusion

In conclusion, our exploration into vector databases through this hands-on comparison of Chroma DB, pgvector, and Weaviate illustrates the diverse capabilities and considerations each offers for modern AI applications. As we’ve seen, setting up a robust infrastructure to manage and query vector embeddings is essential for efficiently handling large datasets.

In Part 1, we successfully dockerized a Flask application to read PDF files, extract embeddings, and integrate them with the three databases. This foundational setup not only equips us for the next phase of stress testing but also highlights the practical implementation of vector databases in real-world scenarios.

In Part 2, we will delve deeper into the performance aspects of these databases under various workloads. By simulating different scenarios, we will uncover which database excels in terms of speed, efficiency, and scalability. Ultimately, this analysis will provide valuable insights for developers and data scientists seeking to optimize their systems for handling embedding storage and retrieval effectively.

Stay tuned for the next installment, where we put these databases to the test and reveal the results of our stress tests!

Top comments (0)