Table of Contents
TL;DR
Learn how to build a reverse video search system using Mixpeek for video processing & embedding generation, combined with PostgreSQL as a vector database (powered by pgvector and pgvectorscale) hosted on Timescale Cloud.
With this system, you can query video data using both video and text queries to retrieve relevant video segments based on semantic similarity.
Prerequisites
- PostgreSQL database (We're using Timescale Cloud)
- Python 3.7+
- Basic understanding of vector embeddings
- Mixpeek API key (sign up at mixpeek.com)
How The Reverse Video System Works
Before we jump in to implementation details, let's review the system architecture.
Reverse video search system architecture
Video ingestion process
The ingestion process involves inputting the source video data into a vector database. Here's how it works:
- Source video: The process starts with uploading a source video.
- Video chunks: The video is split into chunks for optimized embedding creation using Mixpeek’s video indexing tool.
- Embeddings: Mixpeek’s indexing tool extracts video features from each video chunk to generate vector embeddings using one of the embedding models integrated with it.
- Vector database: The generated vector embeddings are stored in a database that supports vector similarity search—in this case, PostgreSQL with pgvector and pgvectorscale and hosted on Timescale Cloud. This database not only handles vector similarity searches but also enables you to store metadata, such as start and end times, alongside their embeddings.
Video retrieval process
- Query: the user submits a text or video query.
- Query embeddings: Mixpeek converts the query into embeddings to capture its semantic meaning.
- Vector search: these embeddings are then compared against the stored embeddings in the vector database to retrieve the closest matches using vector similarity search.
The Timescale and Mixpeek tech stacks complement each other. Mixpeek generates the vector embeddings, while Timescale’s PostgreSQL database—powered by pgvector and pgvectorscale—ensures optimized storage, management, and retrieval of the video data and its embeddings.
Implementation
This tutorial is divided into several sections. You can also follow along with this notebook.
Imports and setup
Setting up environment variables
In this section, we first create a .env
file to store our environment variables. Let’s first get these variables:
- Obtain a
https://docs.mixpeek.com/overview/introduction
from your Mixpeek account. If you don’t have an account yet, you can sign up for one through this page. - Then, create a PostgreSQL service for your data and embeddings by signing up for Timescale Cloud (it’s free for 30 days). Follow the setup instructions in the Getting Started with Timescale guide.
After creating your database service, get the connection string provided in the dashboard.
Database connection dashboard in the Timescale UI
Store these variables in the .env
file as follows:
MIXPEEK_API_KEY='...'
DATABASE_CONNECTION_STRING='...'
Installing libraries
To get started, let’s install the required libraries:
%pip install psycopg2-binary python-dotenv requests
psycopg2
enables the connection to PostgreSQL. python-dotenv
lets you read the values stored in your environment while requests allows you to send HTTP requests easily.
Then, you can import the libraries and load the environment variables as follows:
import json
import os
import psycopg2
import requests
import time
from dotenv import load_dotenv
load_dotenv()
MIXPEEK_API_KEY= os.environ["MIXPEEK_API_KEY"]
DATABASE_CONNECTION_STRING= os.environ["DATABASE_CONNECTION_STRING"]
Mixpeek workflow
In this section, we define different functions related to video indexing, feature extraction, and retrieving video chunks & their embeddings using Mixpeek’s API. Then, we demonstrate how to get embeddings using a video.
Video indexing and feature extraction
In the index_video_file
function, we use Mixpeek’s Index Video Url endpoint to process the source video and divide it into chunks. For each video chunk, this tool does the following:
- Reads on-screen text using the
video-descriptor-v1
model. - Generates a 1408-dimensional vector embedding with the
multimodal-v1
model - Transcribes the audio in the video chunk using the
polyglot-v1
model - Creates a comprehensive description of the chunk, including the screenplay and sound details
BASE_URL = "https://api.mixpeek.com"
headers = {
'Authorization': f'Bearer {MIXPEEK_API_KEY}',
'Content-Type': 'application/json'
}
def index_video_file(video_url, video_name, chunking_interval):
payload = json.dumps({
"url": video_url,
"collection_id": "mixpeek_timescaledb",
"metadata": {
"name": video_name
},
"video_settings": [
{
"interval_sec": chunking_interval,
"read": {"model_id": "video-descriptor-v1"},
"embed": {"model_id": "multimodal-v1"},
"transcribe": {"model_id": "polyglot-v1"},
"describe": {
"model_id": "video-descriptor-v1",
"prompt": "Create a holistic description of the video, include sounds and screenplay"
},
}
]
})
indexing_response = requests.post(url=f"{BASE_URL}/index/videos/url",
headers=headers,
data=payload)
task_id = indexing_response.json()["task_id"]
print(f"Indexing started. Task ID: {task_id}")
return task_id
Let's use the task_id
associated with the indexing process to check its status through the Get Task endpoint.
def check_task_status(task_id):
response = requests.get(f"{BASE_URL}/tasks/{task_id}", headers=headers)
return response.json()["status"]
def get_asset_id(task_id):
# poll task status every 5 seconds until video processing is done.
while True:
status = check_task_status(task_id)
print(f"Current task status: {status}")
if status == "DONE":
break
time.sleep(5)
get_task_response = requests.get(url=f"{BASE_URL}/tasks/{task_id}",
headers=headers)
asset_id = get_task_response.json()["asset_id"]
print(f"Task Done. Asset ID: {asset_id}")
return asset_id
Retrieving video chunks and embeddings
In this part, we access the metadata (start_time
and end_time
) and feature_ids
of the video chunks created using the Get Asset With Features endpoint and the asset_id
from the response from the Get Task endpoint.
def retrieve_video_chunks(asset_id):
get_asset_response = requests.get(url=f"{BASE_URL}/assets/{asset_id}/features",
headers=headers)
return get_asset_response.json()["features"]["video"]
Then using the feature_id
, let's access the generated vector embedding from each video chunk through the Get Feature endpoint.
def retrieve_video_chunks_embeddings(video_chunks):
chunks_embeddings = []
for chunk in video_chunks:
get_feature_response = requests.get(url=f"{BASE_URL}/features/{chunk['feature_id']}",
headers=headers,
params={"include_vectors":True})
chunks_embeddings.append({
"start_time": chunk["start_time"],
"end_time": chunk["end_time"],
"embedding": get_feature_response.json()["vectors"]["multimodal-v1"]
})
return chunks_embeddings
Example
Let's now combine all these parts into one function, get_mixpeek_embeddings
, and demonstrate using a video file.
def get_mixpeek_embeddings(video_url, video_name, chunking_interval):
task_id = index_video_file(video_url, video_name, chunking_interval)
asset_id = get_asset_id(task_id)
video_chunks = retrieve_video_chunks(asset_id)
return retrieve_video_chunks_embeddings(video_chunks)
source_video = "https://mixpeek-public-demo.s3.us-east-2.amazonaws.com/starter/jurassic_park_trailer.mp4"
source_video_embeddings = get_mixpeek_embeddings(video_url=source_video,
video_name="source_video",
chunking_interval=10)
PostgreSQL workflow
Connecting to PostgreSQL using Timescale Cloud
In this tutorial, we'll use PostgreSQL with the pgvector and pgvectorscale extensions as our vector database. This database instance is hosted on Timescale Cloud.
The pgvectorscale extension builds on top of pgvector, enabling PostgreSQL to efficiently store and query vector embeddings. You might wonder why you should upgrade from PostgreSQL with pgvector to Timescale Cloud’s AI stack (pgai, pgvectorscale, and pgai Vectorizer). Here’s why:
Faster and more cost-effective ANN search: PostgreSQL, powered by pgvector and pgvectorscale, is a faster, more accurate, and more affordable vector database. Compared to popular vector databases like Pinecone, PostgreSQL with pgvector and pgvectorscale achieved 28x lower p95 query latency and 16x higher query throughput for approximate nearest neighbor (ANN) queries at 99 % recall—at only 25 % of Pinecone’s monthly cost.
High-performance and scalability for your AI Applications: pgvectorscale boosts PostgreSQL’s ANN capabilities with StreamingDiskANN, a disk-based ANN algorithm that outperforms memory-based indexes like pgvector’s IVFFlat. With no ef_search cutoffs and its streaming model, it enhances query speed and accuracy, continuously retrieving the “next closest” item, potentially even traversing the entire graph!
A simplified AI stack: Timescale’s AI stack integrates vector embeddings, relational data, and time-series data in one place. This consolidation significantly reduces the complexity of infrastructure management and data synchronization, allowing you to focus on building AI applications.
Seamless PostgreSQL compatibility: Since Timescale inherits PostgreSQL’s syntax and robustness, developers with PostgreSQL experience can integrate AI capabilities in their application development without a steep learning curve.
Use the code below to connect to your database service and confirm database access:
def connect_db():
return psycopg2.connect(DATABASE_CONNECTION_STRING)
# Ensures database access
with connect_db() as conn:
with conn.cursor() as curs:
curs.execute("SELECT 'hello world'; ")
print(curs.fetchone())
Creating a table for video chunks
Since we are working with embedding data, we need to ensure our PostgreSQL service can support it. Therefore, we install the pgvector and pgvectorscale extensions before creating the table, video_embeddings
, that stores information about video segments (or chunks) and their embeddings.
with connect_db() as conn:
with conn.cursor() as curs:
# Installs both pgvector and pgvectorscale
curs.execute("CREATE EXTENSION IF NOT EXISTS vectorscale CASCADE;")
with conn.cursor() as curs:
curs.execute("""
CREATE TABLE IF NOT EXISTS video_embeddings(
id BIGINT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
embedding VECTOR(1408),
start_time NUMERIC,
end_time NUMERIC
);
""")
Here's a breakdown of the columns:
-
id
: a unique identifier for each video chunk. -
embedding
: a 1408-dimensional vector embedding of the video chunk. -
start_time
: the starting time of the video chunk. For example, if a video is split into segments, this could be each segment's start time (in seconds or another unit). -
end_time
: the ending time of the video chunk, indicating when the segment finishes.
Data insertion
Let’s ingest the video chunks and their embeddings into our database.
with connect_db() as conn:
with conn.cursor() as curs:
for chunk in source_video_embeddings:
curs.execute('''
INSERT INTO video_embeddings (embedding, start_time, end_time)
VALUES (%(embedding)s, %(start_time)s, %(end_time)s);
''',chunk)
with conn.cursor() as curs:
curs.execute('''
SELECT start_time, end_time, vector_dims(embedding)
FROM video_embeddings;
''')
for row in curs.fetchall():
print(f"start_time: {row[0]}, end_time: {row[1]}, embedding_dimensions: {row[2]}")
Creating the index on embeddings
Vector search queries will primarily target the embedding
column, so we create an index on this column using StreamingDiskANN. It significantly speeds up vector similarity searches.
with connect_db() as conn:
with conn.cursor() as curs:
curs.execute('''
CREATE INDEX video_embeddings_idx
ON video_embeddings
USING diskann (embedding);
''')
Search functions
In this section, we demonstrate two search functions for retrieving relevant video chunks: one based on video input and the other based on text query. The idea is to search for similar video chunks stored in the database by comparing embeddings, allowing us to match the content of the video query or find similar scenes based on descriptive text.
For each query, we first generate vector embeddings and then use them to search for the closest video chunks through the source video embeddings, ranking results by cosine distance. Let’s first define a helper function for vector similarity search.
# helper function for vector similarity search
def retrieve_closest_video_chunks(query_embedding, limit):
with connect_db() as conn:
with conn.cursor() as curs:
curs.execute('''
SELECT start_time, end_time
FROM video_embeddings
ORDER BY embedding <=> %s::vector
LIMIT %s
''', (query_embedding['embedding'], limit))
print("CLOSEST VIDEO CHUNKS:")
closest_video_chunks = []
for row in curs.fetchall():
print(f"start_time: {row[0]}, end_time: {row[1]}")
closest_video_chunks.append({
"start_time": row[0],
"end_time": row[1]
})
Video query search
video_query = "https://mixpeek-public-demo.s3.us-east-2.amazonaws.com/starter/jurassic_bunny.mp4"
video_query_embeddings = get_mixpeek_embeddings(video_url=video_query,
video_name="video_query",
chunking_interval=5)
retrieve_closest_video_chunks(video_query_embeddings[0], 2)
Here are the results of this query:
CLOSEST VIDEO CHUNKS:
start_time: 60.0, end_time: 70.0
start_time: 20.0, end_time: 30.0
![One of the frames in the video_segment (start_time: 60.0, end_time: 70.0)(https://dev-to-uploads.s3.amazonaws.com/uploads/articles/hp5hm8xh5rz8t939329b.jpg)
One of the frames in the video_segment (start_time: 60.0, end_time: 70.0)
Text query search
In this part, let's use the Index Text endpoint to generate embeddings for the text query and then use them to perform a vector similarity search.
text_query = "two people in a car"
payload = json.dumps({
"text": text_query,
"collection_id": "mixpeek_timescale",
"metadata": {
"author": "user"
},
"text_settings": {
"embed": {"model_id": "multimodal-v1"}
}
})
index_text_response = requests.post(url=f"{BASE_URL}/index/text",
headers=headers,
data=payload)
task_id = index_text_response.json()["task_id"]
print(f"Indexing started. Task ID: {task_id}")
# retrieve feature extracted from the text query
asset_id = get_asset_id(index_text_response.json()["task_id"])
get_asset_response = requests.get(url=f"{BASE_URL}/assets/{asset_id}/features",
headers=headers)
text_asset = get_asset_response.json()["features"]["text"]
# extract the generated text embedding
get_feature_response = requests.get(url=f"{BASE_URL}/features/{text_asset[0]['feature_id']}",
headers=headers,
params={"include_vectors":True})
text_query_embedding = {
"embedding": get_feature_response.json()["vectors"]['multimodal-v1']
}
retrieve_closest_video_chunks(text_query_embedding, 2)
Here are the results of this query:
CLOSEST VIDEO CHUNKS:
start_time: 30.0, end_time: 40.0
start_time: 40.0, end_time: 50.0
One of the frames from video segments (start_time: 30.0, end_time: 40.0)
This demo uses a single video. However, we can extend the same approach to handle a collection of videos.
Conclusion
In this article, we covered how to build a reverse video search engine using Mixpeek and Timescale Cloud’s mature PostgreSQL cloud platform. This stack potentially paves the way for many enhancements in multi-modal video analysis and retrieval. We can deploy add-ons to the current system, for example, integrating AI-generated sentiment analysis or treating support queries in several languages.
AI is still in its early stages. Video search and understanding will continue to evolve. If you're interested in implementing these solutions, check out Mixpeek’s API documentation and Timescale’s AI stack to start building your own advanced video search engine.
Further reading:
Pgvector Is Now Faster than Pinecone at 75% Less Cost
Building an AI Image Gallery: Advanced RAG With Pgvector and Claude Sonnet 3.5
timescale / pgai
A suite of tools to develop RAG, semantic search, and other AI applications more easily with PostgreSQL
pgai is a PostgreSQL extension that simplifies data storage and retrieval for Retrieval Augmented Generation (RAG), and other AI applications In particular, it automates the creation and sync of embeddings for your data stored in PostgreSQL, simplifies semantic search, and allows you to call LLM models from SQL.
Docker
See the install via docker guide for docker compose files and detailed container instructions.
Timescale Cloud
Try pgai on cloud by creating a free trial account on Timescale Cloud.
Installing pgai into an existing PostgreSQL instance (Linux / MacOS)
See the install from source guide for instructions on how to install pgai from source.
Quick Start
This section will walk you through the steps to get started with pgai and Ollama using docker and show you the major features of pgai. We also have a quick start with OpenAI and a quick start with…
Top comments (0)