DEV Community

Cover image for Tutorial: How We Built a Reverse Video Search System Directly in PostgreSQL
Team Timescale for Timescale

Posted on

Tutorial: How We Built a Reverse Video Search System Directly in PostgreSQL

Table of Contents

  1. TL;DR
  2. Prerequisites
  3. How The Reverse Video System Works
  4. Implementation
  5. Conclusion

TL;DR

Learn how to build a reverse video search system using Mixpeek for video processing & embedding generation, combined with PostgreSQL as a vector database (powered by pgvector and pgvectorscale) hosted on Timescale Cloud.

With this system, you can query video data using both video and text queries to retrieve relevant video segments based on semantic similarity.

Prerequisites

  • PostgreSQL database (We're using Timescale Cloud)
  • Python 3.7+
  • Basic understanding of vector embeddings
  • Mixpeek API key (sign up at mixpeek.com)

How The Reverse Video System Works

Before we jump in to implementation details, let's review the system architecture.

Reverse video search system architecture
Reverse video search system architecture

Video ingestion process

The ingestion process involves inputting the source video data into a vector database. Here's how it works:

  • Source video: The process starts with uploading a source video.
  • Video chunks: The video is split into chunks for optimized embedding creation using Mixpeek’s video indexing tool.
  • Embeddings: Mixpeek’s indexing tool extracts video features from each video chunk to generate vector embeddings using one of the embedding models integrated with it.
  • Vector database: The generated vector embeddings are stored in a database that supports vector similarity search—in this case, PostgreSQL with pgvector and pgvectorscale and hosted on Timescale Cloud. This database not only handles vector similarity searches but also enables you to store metadata, such as start and end times, alongside their embeddings.

Video retrieval process

  • Query: the user submits a text or video query.
  • Query embeddings: Mixpeek converts the query into embeddings to capture its semantic meaning.
  • Vector search: these embeddings are then compared against the stored embeddings in the vector database to retrieve the closest matches using vector similarity search.

The Timescale and Mixpeek tech stacks complement each other. Mixpeek generates the vector embeddings, while Timescale’s PostgreSQL database—powered by pgvector and pgvectorscale—ensures optimized storage, management, and retrieval of the video data and its embeddings.

Implementation

This tutorial is divided into several sections. You can also follow along with this notebook.

Imports and setup

Setting up environment variables

In this section, we first create a .env file to store our environment variables. Let’s first get these variables:

  • Obtain a https://docs.mixpeek.com/overview/introduction from your Mixpeek account. If you don’t have an account yet, you can sign up for one through this page.
  • Then, create a PostgreSQL service for your data and embeddings by signing up for Timescale Cloud (it’s free for 30 days). Follow the setup instructions in the Getting Started with Timescale guide.

After creating your database service, get the connection string provided in the dashboard.

Database connection dashboard in the Timescale UI
Database connection dashboard in the Timescale UI

Store these variables in the .env file as follows:

MIXPEEK_API_KEY='...'
DATABASE_CONNECTION_STRING='...'
Enter fullscreen mode Exit fullscreen mode

Installing libraries

To get started, let’s install the required libraries:

%pip install psycopg2-binary python-dotenv requests
Enter fullscreen mode Exit fullscreen mode

psycopg2 enables the connection to PostgreSQL. python-dotenv lets you read the values stored in your environment while requests allows you to send HTTP requests easily.

Then, you can import the libraries and load the environment variables as follows:

import json
import os
import psycopg2
import requests
import time
from dotenv import load_dotenv

load_dotenv()

MIXPEEK_API_KEY= os.environ["MIXPEEK_API_KEY"]
DATABASE_CONNECTION_STRING= os.environ["DATABASE_CONNECTION_STRING"]
Enter fullscreen mode Exit fullscreen mode

Mixpeek workflow

In this section, we define different functions related to video indexing, feature extraction, and retrieving video chunks & their embeddings using Mixpeek’s API. Then, we demonstrate how to get embeddings using a video.

Video indexing and feature extraction

In the index_video_file function, we use Mixpeek’s Index Video Url endpoint to process the source video and divide it into chunks. For each video chunk, this tool does the following:

  • Reads on-screen text using the video-descriptor-v1 model.
  • Generates a 1408-dimensional vector embedding with the multimodal-v1 model
  • Transcribes the audio in the video chunk using the polyglot-v1 model
  • Creates a comprehensive description of the chunk, including the screenplay and sound details
BASE_URL = "https://api.mixpeek.com"

headers = {
    'Authorization': f'Bearer {MIXPEEK_API_KEY}',
    'Content-Type': 'application/json'
}

def index_video_file(video_url, video_name, chunking_interval):
    payload = json.dumps({
        "url": video_url,
        "collection_id": "mixpeek_timescaledb",
        "metadata": {
            "name": video_name
        },
        "video_settings": [
            {
                "interval_sec": chunking_interval,
                "read": {"model_id": "video-descriptor-v1"},
                "embed": {"model_id": "multimodal-v1"},
                "transcribe": {"model_id": "polyglot-v1"},
                "describe": {
                    "model_id": "video-descriptor-v1",
                    "prompt": "Create a holistic description of the video, include sounds and screenplay"
                },
            }
        ]
    })

    indexing_response = requests.post(url=f"{BASE_URL}/index/videos/url", 
                                      headers=headers, 
                                      data=payload)

    task_id = indexing_response.json()["task_id"]
    print(f"Indexing started. Task ID: {task_id}")
    return task_id
Enter fullscreen mode Exit fullscreen mode

Let's use the task_id associated with the indexing process to check its status through the Get Task endpoint.

def check_task_status(task_id):
    response = requests.get(f"{BASE_URL}/tasks/{task_id}", headers=headers)
    return response.json()["status"]

def get_asset_id(task_id):
    # poll task status every 5 seconds until video processing is done.
    while True:
        status = check_task_status(task_id)
        print(f"Current task status: {status}")
        if status == "DONE":
            break
        time.sleep(5)

    get_task_response = requests.get(url=f"{BASE_URL}/tasks/{task_id}", 
                                     headers=headers)

    asset_id = get_task_response.json()["asset_id"]
    print(f"Task Done. Asset ID: {asset_id}")
    return asset_id
Enter fullscreen mode Exit fullscreen mode

Retrieving video chunks and embeddings

In this part, we access the metadata (start_time and end_time) and feature_ids of the video chunks created using the Get Asset With Features endpoint and the asset_id from the response from the Get Task endpoint.

def retrieve_video_chunks(asset_id):
    get_asset_response = requests.get(url=f"{BASE_URL}/assets/{asset_id}/features", 
                                      headers=headers)
    return get_asset_response.json()["features"]["video"]
Enter fullscreen mode Exit fullscreen mode

Then using the feature_id, let's access the generated vector embedding from each video chunk through the Get Feature endpoint.

def retrieve_video_chunks_embeddings(video_chunks):
    chunks_embeddings = []
    for chunk in video_chunks:
        get_feature_response = requests.get(url=f"{BASE_URL}/features/{chunk['feature_id']}", 
                                            headers=headers, 
                                            params={"include_vectors":True})

        chunks_embeddings.append({
            "start_time": chunk["start_time"],
            "end_time": chunk["end_time"],
            "embedding": get_feature_response.json()["vectors"]["multimodal-v1"]
        })
    return chunks_embeddings
Enter fullscreen mode Exit fullscreen mode

Example

Let's now combine all these parts into one function, get_mixpeek_embeddings, and demonstrate using a video file.

def get_mixpeek_embeddings(video_url, video_name, chunking_interval):
    task_id = index_video_file(video_url, video_name, chunking_interval)
    asset_id = get_asset_id(task_id)
    video_chunks = retrieve_video_chunks(asset_id)
    return retrieve_video_chunks_embeddings(video_chunks)
Enter fullscreen mode Exit fullscreen mode
source_video = "https://mixpeek-public-demo.s3.us-east-2.amazonaws.com/starter/jurassic_park_trailer.mp4"
source_video_embeddings = get_mixpeek_embeddings(video_url=source_video, 
                                                 video_name="source_video", 
                                                 chunking_interval=10)
Enter fullscreen mode Exit fullscreen mode

PostgreSQL workflow

Connecting to PostgreSQL using Timescale Cloud

In this tutorial, we'll use PostgreSQL with the pgvector and pgvectorscale extensions as our vector database. This database instance is hosted on Timescale Cloud.

The pgvectorscale extension builds on top of pgvector, enabling PostgreSQL to efficiently store and query vector embeddings. You might wonder why you should upgrade from PostgreSQL with pgvector to Timescale Cloud’s AI stack (pgai, pgvectorscale, and pgai Vectorizer). Here’s why:

  • Faster and more cost-effective ANN search: PostgreSQL, powered by pgvector and pgvectorscale, is a faster, more accurate, and more affordable vector database. Compared to popular vector databases like Pinecone, PostgreSQL with pgvector and pgvectorscale achieved 28x lower p95 query latency and 16x higher query throughput for approximate nearest neighbor (ANN) queries at 99 % recall—at only 25 % of Pinecone’s monthly cost.

  • High-performance and scalability for your AI Applications: pgvectorscale boosts PostgreSQL’s ANN capabilities with StreamingDiskANN, a disk-based ANN algorithm that outperforms memory-based indexes like pgvector’s IVFFlat. With no ef_search cutoffs and its streaming model, it enhances query speed and accuracy, continuously retrieving the “next closest” item, potentially even traversing the entire graph!

  • A simplified AI stack: Timescale’s AI stack integrates vector embeddings, relational data, and time-series data in one place. This consolidation significantly reduces the complexity of infrastructure management and data synchronization, allowing you to focus on building AI applications.

  • Seamless PostgreSQL compatibility: Since Timescale inherits PostgreSQL’s syntax and robustness, developers with PostgreSQL experience can integrate AI capabilities in their application development without a steep learning curve.

Use the code below to connect to your database service and confirm database access:

def connect_db():
    return psycopg2.connect(DATABASE_CONNECTION_STRING)

# Ensures database access
with connect_db() as conn:
       with conn.cursor() as curs:
              curs.execute("SELECT 'hello world'; ")
              print(curs.fetchone())
Enter fullscreen mode Exit fullscreen mode

Creating a table for video chunks

Since we are working with embedding data, we need to ensure our PostgreSQL service can support it. Therefore, we install the pgvector and pgvectorscale extensions before creating the table, video_embeddings, that stores information about video segments (or chunks) and their embeddings.

with connect_db() as conn:
    with conn.cursor() as curs:
        # Installs both pgvector and pgvectorscale
        curs.execute("CREATE EXTENSION IF NOT EXISTS vectorscale CASCADE;")

    with conn.cursor() as curs:
        curs.execute("""
            CREATE TABLE IF NOT EXISTS video_embeddings(
                id BIGINT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
                embedding VECTOR(1408),
                start_time NUMERIC,
                end_time NUMERIC
            ); 
        """)
Enter fullscreen mode Exit fullscreen mode

Here's a breakdown of the columns:

  • id: a unique identifier for each video chunk.
  • embedding: a 1408-dimensional vector embedding of the video chunk.
  • start_time: the starting time of the video chunk. For example, if a video is split into segments, this could be each segment's start time (in seconds or another unit).
  • end_time: the ending time of the video chunk, indicating when the segment finishes.

Data insertion

Let’s ingest the video chunks and their embeddings into our database.

with connect_db() as conn:
    with conn.cursor() as curs:
        for chunk in source_video_embeddings:
            curs.execute('''
                INSERT INTO video_embeddings (embedding, start_time, end_time)
                VALUES (%(embedding)s, %(start_time)s, %(end_time)s);
            ''',chunk)

    with conn.cursor() as curs:
        curs.execute('''
            SELECT start_time, end_time, vector_dims(embedding) 
            FROM video_embeddings;
        ''')
        for row in curs.fetchall():
            print(f"start_time: {row[0]}, end_time: {row[1]}, embedding_dimensions: {row[2]}")
Enter fullscreen mode Exit fullscreen mode

Creating the index on embeddings

Vector search queries will primarily target the embedding column, so we create an index on this column using StreamingDiskANN. It significantly speeds up vector similarity searches.

with connect_db() as conn:
    with conn.cursor() as curs:
        curs.execute('''
                CREATE INDEX video_embeddings_idx 
                ON video_embeddings
                USING diskann (embedding);
        ''')
Enter fullscreen mode Exit fullscreen mode

Search functions

In this section, we demonstrate two search functions for retrieving relevant video chunks: one based on video input and the other based on text query. The idea is to search for similar video chunks stored in the database by comparing embeddings, allowing us to match the content of the video query or find similar scenes based on descriptive text.

For each query, we first generate vector embeddings and then use them to search for the closest video chunks through the source video embeddings, ranking results by cosine distance. Let’s first define a helper function for vector similarity search.

# helper function for vector similarity search 
def retrieve_closest_video_chunks(query_embedding, limit):
  with connect_db() as conn:
    with conn.cursor() as curs:
        curs.execute('''
                    SELECT start_time, end_time
                    FROM video_embeddings
                    ORDER BY embedding <=> %s::vector
                    LIMIT %s
            ''', (query_embedding['embedding'], limit))

        print("CLOSEST VIDEO CHUNKS:")
        closest_video_chunks = []
        for row in curs.fetchall():
            print(f"start_time: {row[0]}, end_time: {row[1]}")
            closest_video_chunks.append({
               "start_time": row[0],
               "end_time": row[1]
            })
Enter fullscreen mode Exit fullscreen mode

Video query search

video_query = "https://mixpeek-public-demo.s3.us-east-2.amazonaws.com/starter/jurassic_bunny.mp4" 
video_query_embeddings = get_mixpeek_embeddings(video_url=video_query, 
                                                video_name="video_query", 
                                                chunking_interval=5)


retrieve_closest_video_chunks(video_query_embeddings[0], 2)
Enter fullscreen mode Exit fullscreen mode

Here are the results of this query:

CLOSEST VIDEO CHUNKS:
start_time: 60.0, end_time: 70.0
start_time: 20.0, end_time: 30.0
Enter fullscreen mode Exit fullscreen mode

![One of the frames in the video_segment (start_time: 60.0, end_time: 70.0)(https://dev-to-uploads.s3.amazonaws.com/uploads/articles/hp5hm8xh5rz8t939329b.jpg)
One of the frames in the video_segment (start_time: 60.0, end_time: 70.0)

Text query search

In this part, let's use the Index Text endpoint to generate embeddings for the text query and then use them to perform a vector similarity search.

text_query = "two people in a car"

payload = json.dumps({
    "text": text_query,
    "collection_id": "mixpeek_timescale",
    "metadata": {
        "author": "user"
    },
    "text_settings": {
        "embed": {"model_id": "multimodal-v1"}
    }
})

index_text_response = requests.post(url=f"{BASE_URL}/index/text", 
                                    headers=headers, 
                                    data=payload)

task_id = index_text_response.json()["task_id"]
print(f"Indexing started. Task ID: {task_id}")

# retrieve feature extracted from the text query
asset_id = get_asset_id(index_text_response.json()["task_id"])
get_asset_response = requests.get(url=f"{BASE_URL}/assets/{asset_id}/features", 
                                  headers=headers)
text_asset = get_asset_response.json()["features"]["text"]

# extract the generated text embedding
get_feature_response = requests.get(url=f"{BASE_URL}/features/{text_asset[0]['feature_id']}", 
                                    headers=headers, 
                                    params={"include_vectors":True})

text_query_embedding = {
    "embedding": get_feature_response.json()["vectors"]['multimodal-v1']
}


retrieve_closest_video_chunks(text_query_embedding, 2)
Enter fullscreen mode Exit fullscreen mode

Here are the results of this query:

CLOSEST VIDEO CHUNKS:
start_time: 30.0, end_time: 40.0
start_time: 40.0, end_time: 50.0
Enter fullscreen mode Exit fullscreen mode

One of the frames from video segments (start_time: 30.0, end_time: 40.0)
One of the frames from video segments (start_time: 30.0, end_time: 40.0)

This demo uses a single video. However, we can extend the same approach to handle a collection of videos.

Conclusion

In this article, we covered how to build a reverse video search engine using Mixpeek and Timescale Cloud’s mature PostgreSQL cloud platform. This stack potentially paves the way for many enhancements in multi-modal video analysis and retrieval. We can deploy add-ons to the current system, for example, integrating AI-generated sentiment analysis or treating support queries in several languages.

AI is still in its early stages. Video search and understanding will continue to evolve. If you're interested in implementing these solutions, check out Mixpeek’s API documentation and Timescale’s AI stack to start building your own advanced video search engine.

Further reading:

GitHub logo timescale / pgai

A suite of tools to develop RAG, semantic search, and other AI applications more easily with PostgreSQL

pgai

Power your AI applications with PostgreSQL

Discord Try Timescale for free

pgai is a PostgreSQL extension that simplifies data storage and retrieval for Retrieval Augmented Generation (RAG), and other AI applications In particular, it automates the creation and sync of embeddings for your data stored in PostgreSQL, simplifies semantic search, and allows you to call LLM models from SQL.

Auto Create and Sync Vector Embeddings in 1 Line of SQL (pgai Vectorizer)

Docker

See the install via docker guide for docker compose files and detailed container instructions.

Timescale Cloud

Try pgai on cloud by creating a free trial account on Timescale Cloud.

Installing pgai into an existing PostgreSQL instance (Linux / MacOS)

See the install from source guide for instructions on how to install pgai from source.

Quick Start

This section will walk you through the steps to get started with pgai and Ollama using docker and show you the major features of pgai. We also have a quick start with OpenAI and a quick start with

Top comments (0)