Introduction
For all the ladies and gentlemen who love Streamlit in Snowflake, as you diligently create apps day after day, you might find yourself with a large number of apps on Snowflake before you know it. That's certainly true for me. As the number of apps increases, it can become challenging to keep track of the purpose of each app and how frequently they are used.
That's why I created an app catalog for Streamlit in Snowflake using Streamlit in Snowflake! Today, I'd like to introduce this self-made app. While it may not solve all problems, I hope it can serve as a useful method for managing apps in Snowflake. If you find it helpful or interesting, I'd be delighted to receive your feedback or a 👍!
Note: This article represents my personal views and not those of Snowflake.
Feature Overview
List of Features
App Portal Functionality
- Search and Filtering
- Hybrid search combining keyword and vector search
- Sorting by app execution count and likes
- Filtering by tags
- Application List Display
- Toggle between simple card view and detailed list view
- Like feature for apps and display of like count
- Display of app execution count for the past 30 days
- Display of app tags
- Display of app descriptions
- Direct navigation to applications
Admin Panel
- User-editable app descriptions
- User-editable app tags (supports multiple tags)
Automatic Description Generation
- AI-powered generation of app descriptions from source code
- Bulk deletion of app descriptions
Automatic Tag Generation
- Editing of categories for tags
- AI-powered generation of tags from app descriptions
- Bulk deletion of tags
Automatic Vector Data Generation
- AI-powered generation of vector data from app descriptions
- Bulk deletion of vector data
Finished Product Images
Prerequisites
Snowflake account
- A Snowflake account with access to Cortex LLM (now available in most clouds and regions due to cross-region inference)
Streamlit in Snowflake installation package
- Python 3.11 or later (no additional packages required)
Steps
Create a New Streamlit in Snowflake App
Click on "Streamlit" in the left pane of Snowsight, then click the "+ Streamlit" button to create a SiS app.
Run the Streamlit in Snowflake App
In the Streamlit in Snowflake app editing screen, copy and paste the following code to complete the setup. (The code is quite long, so please expand the accordion to view it)
import streamlit as st
from snowflake.snowpark.context import get_active_session
import pandas as pd
import json
# Page configuration
st.set_page_config(page_title="Streamlit in Snowflake App Catalog", layout='wide')
# Snowflake account information (please adjust according to your environment)
orgname = "<your_org_name>"
accountname = "<your_account_name>"
# Get Snowflake session
snowflake_session = get_active_session()
@st.cache_data
def get_streamlit_apps_info():
"""
Retrieves Streamlit in Snowflake app information for the entire account.
Native Apps are excluded.
"""
query = "SHOW STREAMLITS IN ACCOUNT"
result = snowflake_session.sql(query).collect()
df = pd.DataFrame(result)
return df[df['owner_role_type'] == 'ROLE']
@st.cache_data
def create_app_descriptions_table():
"""
Creates APP_DESCRIPTIONS table and APP_CATEGORIES table if they don't exist.
Also, bulk inserts initial data for apps not present in APP_DESCRIPTIONS.
"""
# Table for app descriptions
snowflake_session.sql("""
CREATE TABLE IF NOT EXISTS APP_DESCRIPTIONS (
database_name VARCHAR,
schema_name VARCHAR,
app_name VARCHAR,
description VARCHAR,
tags VARCHAR,
embedding VECTOR(FLOAT, 1024),
views INTEGER DEFAULT 0,
likes INTEGER DEFAULT 0,
last_updated TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
)
""").collect()
# Table for category management
snowflake_session.sql("""
CREATE TABLE IF NOT EXISTS APP_CATEGORIES (
category STRING PRIMARY KEY,
created_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
)
""").collect()
# Add default categories (bulk insert)
default_categories = [
'Data Analysis', 'Data Engineering', 'Dashboard', 'BI',
'Report', 'Management Tool', 'Generative AI', 'Machine Learning'
]
categories_values = ", ".join([f"('{category}')" for category in default_categories])
snowflake_session.sql(f"""
INSERT INTO APP_CATEGORIES (category)
SELECT category FROM (VALUES {categories_values}) AS v(category)
WHERE category NOT IN (SELECT category FROM APP_CATEGORIES)
""").collect()
# Get current Streamlit app list and update in bulk
apps_df = get_streamlit_apps_info()
if not apps_df.empty:
# Create a temporary table with app list
apps_values = ", ".join([
f"('{app['database_name']}', '{app['schema_name']}', '{app['name']}')"
for _, app in apps_df.iterrows()
])
# Execute SQL for bulk update
snowflake_session.sql(f"""
INSERT INTO APP_DESCRIPTIONS (database_name, schema_name, app_name)
SELECT src.* FROM (
VALUES {apps_values}
) AS src(database_name, schema_name, app_name)
WHERE NOT EXISTS (
SELECT 1
FROM APP_DESCRIPTIONS dest
WHERE dest.database_name = src.database_name
AND dest.schema_name = src.schema_name
AND dest.app_name = src.app_name
)
""").collect()
def update_app_views():
"""
Aggregates app execution count from QUERY_HISTORY and updates APP_DESCRIPTIONS table.
"""
snowflake_session.sql("""
MERGE INTO APP_DESCRIPTIONS target
USING (
SELECT
REGEXP_SUBSTR(QUERY_TEXT, '"([^"]+)"\\."([^"]+)"\\."([^"]+)"', 1, 1, 'e', 1) as database_name,
REGEXP_SUBSTR(QUERY_TEXT, '"([^"]+)"\\."([^"]+)"\\."([^"]+)"', 1, 1, 'e', 2) as schema_name,
REGEXP_SUBSTR(QUERY_TEXT, '"([^"]+)"\\."([^"]+)"\\."([^"]+)"', 1, 1, 'e', 3) as app_name,
COUNT(*) as view_count
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
WHERE QUERY_TEXT LIKE 'execute streamlit%'
AND START_TIME >= DATEADD(month, -1, CURRENT_TIMESTAMP())
GROUP BY 1, 2, 3
) as source
ON target.database_name = source.database_name
AND target.schema_name = source.schema_name
AND target.app_name = source.app_name
WHEN MATCHED THEN
UPDATE SET views = source.view_count
""").collect()
def get_app_data():
"""
Retrieves app information and metadata.
Fetches from database only if cache doesn't exist.
"""
if 'apps_df' not in st.session_state or 'app_descriptions' not in st.session_state:
# Update app execution count
update_app_views()
# Retrieve app information
apps_df = get_streamlit_apps_info()
app_data = snowflake_session.sql("""
SELECT
DATABASE_NAME,
SCHEMA_NAME,
APP_NAME,
DESCRIPTION,
TAGS,
EMBEDDING,
VIEWS,
LIKES,
LAST_UPDATED
FROM APP_DESCRIPTIONS
""").collect()
# Organize app metadata
app_descriptions = {}
for row in app_data:
# Convert tags to array
current_tags = []
if row['TAGS']:
current_tags = [tag.strip() for tag in row['TAGS'].split(',') if tag.strip()]
app_descriptions[f"{row['DATABASE_NAME']}.{row['SCHEMA_NAME']}.{row['APP_NAME']}"] = {
'description': row['DESCRIPTION'],
'tags': current_tags, # Always store as array
'views': row['VIEWS'] if row['VIEWS'] is not None else 0,
'likes': row['LIKES'] if row['LIKES'] is not None else 0,
'last_updated': row['LAST_UPDATED']
}
# Save data to session_state
st.session_state['apps_df'] = apps_df
st.session_state['app_descriptions'] = app_descriptions
return st.session_state['apps_df'], st.session_state['app_descriptions']
def clear_app_cache():
"""
Clears the cache of app data.
"""
if 'apps_df' in st.session_state:
del st.session_state['apps_df']
if 'app_descriptions' in st.session_state:
del st.session_state['app_descriptions']
def update_app_description(database, schema, app_name, description, tags):
"""
Updates or inserts app description and tags.
"""
# Convert tags to comma-separated string
tags_str = ','.join([tag['label'] if isinstance(tag, dict) else tag for tag in tags])
snowflake_session.sql("""
MERGE INTO APP_DESCRIPTIONS target
USING (
SELECT
? AS database_name,
? AS schema_name,
? AS app_name,
? AS description,
? AS tags
) AS source
ON target.database_name = source.database_name
AND target.schema_name = source.schema_name
AND target.app_name = source.app_name
WHEN MATCHED THEN
UPDATE SET
description = source.description,
tags = source.tags,
last_updated = CURRENT_TIMESTAMP()
WHEN NOT MATCHED THEN
INSERT (database_name, schema_name, app_name, description, tags)
VALUES (
source.database_name,
source.schema_name,
source.app_name,
source.description,
source.tags
)
""", params=[
database,
schema,
app_name,
description,
tags_str
]).collect()
# Clear cache to allow fetching of latest data
clear_app_cache()
@st.fragment
def render_like_button(app, likes_key):
"""
Function to render the like button
"""
# Manage like state
if "like_states" not in st.session_state:
st.session_state.like_states = {}
current_likes = st.session_state[likes_key]
# Disable form style
st.markdown("""
<style>
[data-testid="stForm"] {
border: none;
padding: 0;
margin: 0;
}
</style>
""", unsafe_allow_html=True)
# Handle button click
def on_like_click():
# Update likes
snowflake_session.sql("""
UPDATE APP_DESCRIPTIONS
SET likes = likes + 1,
last_updated = CURRENT_TIMESTAMP()
WHERE database_name = ?
AND schema_name = ?
AND app_name = ?
""", params=[
app['database_name'],
app['schema_name'],
app['name']
]).collect()
# Update like count in session_state
st.session_state[likes_key] = current_likes + 1
# Clear cache in background
clear_app_cache()
# Update like state for this button
st.session_state.like_states[likes_key] = True
# Display like button
if st.button(
f"👍 {current_likes}",
key=f"like_button_{likes_key}",
on_click=on_like_click,
disabled=st.session_state.like_states.get(likes_key, False)
):
pass
@st.fragment
def render_app_card_view(app, metadata, app_key):
"""
Function to render app information in card format
"""
app_title = app['title'] if app['title'] is not None else app['name']
app_description = metadata['description'] if metadata['description'] is not None else "No description"
app_url = f"https://app.snowflake.com/{orgname}/{accountname}/#/streamlit-apps/{app['database_name']}.{app['schema_name']}.{app['name']}"
# Key for managing like state
likes_key = f"likes_{app_key}"
if likes_key not in st.session_state:
st.session_state[likes_key] = metadata.get('likes', 0)
# Card container
tile = st.container(height=160, border=True)
# Title and basic information
col1, col2 = tile.columns([3, 1])
with col1:
# Display title as a link (even smaller)
st.markdown(f"###### [{app_title}]({app_url})")
# Display metrics and tags side by side
st.caption(f"👁 {metadata.get('views', 0)}")
if metadata.get('tags'):
tags = metadata['tags']
if isinstance(tags, str):
tags = [tag.strip() for tag in tags.split(',') if tag.strip()]
st.caption(" ".join([f"#{tag}" for tag in tags]))
with col2:
# Like button
render_like_button(app, likes_key)
# Display description at the end (full text)
tile.caption(app_description)
@st.fragment
def generate_description_with_progress(model_name):
"""
Performs description generation process and displays progress.
"""
with st.spinner("Generating descriptions..."):
apps_processed = generate_description_for_all_apps(model_name)
if apps_processed > 0:
st.success("Generated descriptions for all apps.")
@st.fragment
def description_generation_form():
"""
Displays the form for description generation.
"""
with st.form("description_generation_form"):
models = [
'claude-3-5-sonnet',
'mistral-large2',
'llama3.3-70b',
'snowflake-llama-3.1-405b', 'snowflake-llama-3.3-70b',
'llama3.2-3b', 'llama3.2-1b',
'llama3.1-405b', 'llama3.1-8b',
'reka-core', 'reka-flash',
'snowflake-arctic',
'jamba-1.5-large', 'jamba-1.5-mini'
]
selected_model = st.selectbox("Select model to use", models)
submitted = st.form_submit_button("Generate descriptions in bulk")
if submitted:
generate_description_with_progress(selected_model)
@st.fragment
def tag_generation_with_progress():
"""
Performs tag generation process and displays progress.
"""
with st.spinner("Generating tags..."):
generate_tags_for_all_apps()
st.success("Generated tags for all apps.")
@st.fragment
def category_list():
"""
Displays the category list and allows deletion of selected categories.
"""
if "category_message" not in st.session_state:
st.session_state.category_message = None
# Get category list
categories = [
row['CATEGORY'] for row in
snowflake_session.sql("SELECT category FROM APP_CATEGORIES ORDER BY category").collect()
]
with st.form("category_list_form", clear_on_submit=True):
selected = st.selectbox(
"Select category",
categories,
key=f"category_selector_{len(categories)}"
)
submitted = st.form_submit_button("Delete selected category")
if submitted and selected:
snowflake_session.sql(
"DELETE FROM APP_CATEGORIES WHERE category = ?",
params=[selected]
).collect()
st.session_state.category_message = f"Deleted category '{selected}'."
st.rerun()
# Placeholder for message display (moved below the form)
if st.session_state.category_message:
st.success(st.session_state.category_message)
st.session_state.category_message = None
@st.fragment
def add_category_form():
"""
Displays a form to add a new category.
"""
if "add_category_message" not in st.session_state:
st.session_state.add_category_message = None
with st.form("add_category_form", clear_on_submit=True):
new_category = st.text_input("New category name")
submitted = st.form_submit_button("Add")
if submitted and new_category:
snowflake_session.sql(
"INSERT INTO APP_CATEGORIES (category) SELECT ? WHERE NOT EXISTS (SELECT 1 FROM APP_CATEGORIES WHERE category = ?)",
params=[new_category, new_category]
).collect()
st.session_state.add_category_message = f"Added category '{new_category}'."
st.rerun()
# Placeholder for message display (moved below the form)
if st.session_state.add_category_message:
st.success(st.session_state.add_category_message)
st.session_state.add_category_message = None
@st.fragment
def category_management():
"""
Provides category management functionality.
"""
st.write("### Category Management")
st.write("Manage categories used for tag generation.")
col1, col2 = st.columns(2)
with col1:
st.write("#### Current Category List")
category_list()
with col2:
st.write("#### Add New Category")
add_category_form()
def description_generation_page():
"""
Displays the automatic description generation page.
"""
st.title("Automatic Description Generation")
# Bulk description generation
st.write("### Bulk Description Generation")
st.write("Automatically generate descriptions for all apps.")
description_generation_form()
st.divider()
# Bulk description deletion
st.write("### Bulk Description Deletion")
st.write("Delete descriptions for all apps.")
confirm_and_delete_descriptions()
@st.fragment
def tag_generation_form():
"""
Displays the form for tag generation.
"""
if "tag_generation_message" not in st.session_state:
st.session_state.tag_generation_message = None
with st.form("tag_generation_form"):
st.write("Automatically generate tags for all apps using their descriptions and the set categories.")
submitted = st.form_submit_button("Generate tags in bulk")
if submitted:
with st.spinner("Generating tags..."):
apps_processed = generate_tags_for_all_apps()
if apps_processed > 0:
st.success("Generated tags for all apps.")
def tag_generation_page():
"""
Displays the automatic tag generation page.
"""
st.title("Automatic Tag Generation")
# Category management
category_management()
st.divider()
# Bulk tag generation
st.write("### Bulk Tag Generation")
tag_generation_form()
st.divider()
# Bulk tag deletion
st.write("### Bulk Tag Deletion")
st.write("Delete tags for all apps.")
confirm_and_delete_tags()
def main_page():
"""
Displays the main page.
"""
st.title("Streamlit in Snowflake App Catalog")
# Get cached data
apps_df, app_descriptions = get_app_data()
# Select display mode
display_mode = st.radio("Display mode", ["Card view", "List view"], horizontal=True)
# Search and filter controls
search_query, sort_by, selected_tags, _ = admin_search_filter()
# Display filtered apps
filtered_apps = filter_apps(apps_df, app_descriptions, search_query, selected_tags)
# Sort
if sort_by == "Popularity":
filtered_apps.sort(key=lambda x: x[1].get('views', 0), reverse=True)
elif sort_by == "Rating":
filtered_apps.sort(key=lambda x: x[1].get('likes', 0), reverse=True)
# Display app list
if display_mode == "Card view":
# Create 3-column grid layout for card view
cols = st.columns(3)
for i, (app, metadata) in enumerate(filtered_apps):
with cols[i % 3]:
render_app_card_view(app, metadata, f"{app['database_name']}.{app['schema_name']}.{app['name']}")
if search_query.strip() and metadata.get('similarity', 0.0) > 0:
st.caption(f"Match rate: {metadata['similarity']:.1%}")
else:
# List view
for app, metadata in filtered_apps:
app_title = app['title'] if app['title'] is not None else app['name']
app_url = f"https://app.snowflake.com/{orgname}/{accountname}/#/streamlit-apps/{app['database_name']}.{app['schema_name']}.{app['name']}"
app_key = f"{app['database_name']}.{app['schema_name']}.{app['name']}"
# Key for managing like state
likes_key = f"likes_{app_key}"
if likes_key not in st.session_state:
st.session_state[likes_key] = metadata.get('likes', 0)
# App information container
with st.container():
# Title and basic information
col1, col2 = st.columns([3, 1])
with col1:
st.markdown(f"### [{app_title}]({app_url})")
if search_query.strip() and metadata.get('similarity', 0.0) > 0:
st.caption(f"Match rate: {metadata['similarity']:.1%}")
with col2:
# Metrics
st.caption(f"👁 {metadata.get('views', 0)}")
# Like button
render_like_button(app, likes_key)
# Display tags
if metadata.get('tags'):
tags = metadata['tags']
if isinstance(tags, str):
tags = [tag.strip() for tag in tags.split(',') if tag.strip()]
st.caption(" ".join([f"#{tag}" for tag in tags]))
# Display detailed information in an expander
with st.expander("Detailed information"):
# Description
if metadata.get('description'):
st.markdown(metadata['description'])
else:
st.markdown("*No description*")
# Detailed information table
details = {
"App name": app['name'],
"Database": app['database_name'],
"Schema": app['schema_name'],
"Owner": app.get('owner', 'Unknown'),
"Created on": str(app.get('created_on', 'Unknown'))[:19] if app.get('created_on') else 'Unknown',
"URL": app_url
}
st.table(pd.DataFrame([details]).T.rename(columns={0: "Value"}))
st.divider()
@st.fragment
def render_app_metadata_editor(app, metadata, categories):
"""
Displays the metadata editing form for an app.
"""
app_key = f"{app['database_name']}.{app['schema_name']}.{app['name']}"
app_title = app['title'] if app['title'] is not None else app['name']
st.subheader(f"{app_title} ({app['name']})")
with st.form(key=f"metadata_form_{app_key}"):
col1, col2 = st.columns([3, 1])
with col1:
new_desc = st.text_area("Description", metadata['description'] or "", key=f"desc_{app_key}")
selected_app_tags = st.multiselect(
"Tags",
categories,
default=metadata['tags'],
key=f"tags_{app_key}"
)
if metadata['last_updated']:
st.caption(f"Last updated: {metadata['last_updated']}")
with col2:
submitted = st.form_submit_button("Update")
if submitted:
update_app_description(app['database_name'], app['schema_name'], app['name'], new_desc, selected_app_tags)
st.success(f"Metadata for {app_title} has been updated.")
st.divider()
def admin_search_filter():
"""
Displays search and filter controls for the admin page.
"""
with st.form("search_form"):
col1, col2 = st.columns([2, 1])
with col1:
search_query = st.text_input("Search apps", "")
with col2:
sort_by = st.selectbox("Sort by", ["Popularity", "Rating"])
# Get tag list
all_tags = set()
app_data = snowflake_session.sql("""
SELECT DISTINCT TRIM(value) as tag
FROM APP_DESCRIPTIONS,
LATERAL FLATTEN(input => SPLIT(tags, ',')) as t
WHERE tags IS NOT NULL
ORDER BY tag
""").collect()
for row in app_data:
if row['TAG']:
all_tags.add(row['TAG'])
selected_tags = st.multiselect("Filter by tags", sorted(list(all_tags)))
search_button = st.form_submit_button("Search")
return search_query, sort_by, selected_tags, search_button
@st.cache_data
def get_cached_categories():
"""
Retrieves and caches the category list.
"""
return [row['CATEGORY'] for row in snowflake_session.sql("SELECT category FROM APP_CATEGORIES ORDER BY category").collect()]
def filter_apps(apps_df, app_descriptions, search_query, selected_tags):
"""
Filters the app list.
Performs a hybrid search combining keyword search and vector search.
"""
filtered_apps = []
if search_query.strip():
# Escape search query
escaped_query = search_query.replace("'", "''")
# Get vector search results
vector_results = snowflake_session.sql(f"""
WITH search_vector AS (
SELECT SNOWFLAKE.CORTEX.EMBED_TEXT_1024('voyage-multilingual-2', '{escaped_query}') as query_embedding
)
SELECT
a.database_name,
a.schema_name,
a.app_name,
VECTOR_COSINE_SIMILARITY(a.embedding, s.query_embedding) as similarity
FROM
APP_DESCRIPTIONS a,
search_vector s
WHERE
a.embedding IS NOT NULL
ORDER BY
similarity DESC
""").collect()
# Convert vector search results to dictionary
similarity_dict = {
f"{row['DATABASE_NAME']}.{row['SCHEMA_NAME']}.{row['APP_NAME']}": row['SIMILARITY']
for row in vector_results
}
# Filter apps
for _, app in apps_df.iterrows():
app_key = f"{app['database_name']}.{app['schema_name']}.{app['name']}"
metadata = app_descriptions.get(app_key, {'description': "", 'tags': [], 'last_updated': None})
app_title = app['title'] if app['title'] is not None else app['name']
# Search and tag filtering conditions
show_app = True
similarity = 0.0
if search_query.strip():
# Keyword search
keyword_match = (
search_query.lower() in app_title.lower() or
search_query.lower() in (metadata['description'] or "").lower()
)
# Get vector search similarity
vector_similarity = similarity_dict.get(app_key, 0.0)
# Show if keyword matches or similarity is 0.4 or higher
show_app = keyword_match or vector_similarity >= 0.4
similarity = vector_similarity if vector_similarity >= 0.4 else 0.0
if selected_tags:
# Convert tags to array if it's a string
app_tags = metadata['tags']
if isinstance(app_tags, str):
app_tags = [tag.strip() for tag in app_tags.split(',') if tag.strip()]
# Check if all selected tags are included (AND condition)
show_app = show_app and all(tag in app_tags for tag in selected_tags)
if show_app:
metadata['similarity'] = similarity
filtered_apps.append((app, metadata))
return filtered_apps
def admin_page():
"""
Admin page: Manually edit app metadata.
"""
st.title("Admin Page: Edit App Metadata")
# Get cached data
apps_df, app_descriptions = get_app_data()
categories = get_cached_categories()
# Search and filter controls
search_query, sort_by, selected_tags, _ = admin_search_filter()
# Display filtered apps
filtered_apps = filter_apps(apps_df, app_descriptions, search_query, selected_tags)
# Sort
if sort_by == "Popularity":
filtered_apps.sort(key=lambda x: x[1].get('views', 0), reverse=True)
elif sort_by == "Rating":
filtered_apps.sort(key=lambda x: x[1].get('likes', 0), reverse=True)
# Display app list
for app, metadata in filtered_apps:
render_app_metadata_editor(app, metadata, categories)
def get_app_source_code(database, schema, app_name):
"""
Retrieves the source code of an app.
"""
try:
# Get app details
app_details = snowflake_session.sql(f"""
DESCRIBE STREAMLIT {database}.{schema}.{app_name}
""").collect()
app_details_df = pd.DataFrame(app_details)
# Get root_location
root_location = app_details_df['root_location'].iloc[0]
if not root_location:
return None
# Get stage file list
files_df = pd.DataFrame(snowflake_session.sql(f"LIST '{root_location}'").collect())
# Find .py files
py_files = files_df[files_df['name'].str.endswith('.py')]['name'].tolist()
if not py_files:
return None
# Get contents of the first .py file
file_path = py_files[0]
stage_parts = root_location.lstrip('@').split('.')
if len(stage_parts) >= 3:
database = stage_parts[0]
schema = stage_parts[1]
stage = '.'.join(stage_parts[2:])
# Remove stage name if included
stage_suffix = stage.strip('"').lower()
clean_file_path = file_path
if file_path.lower().startswith(stage_suffix + '/'):
clean_file_path = file_path[len(stage_suffix + '/'):]
# Construct fully qualified stage path
stage_path = f"@{database}.{schema}.{stage}/{clean_file_path}"
# Get file contents
with snowflake_session.file.get_stream(stage_path) as file_stream:
content = file_stream.read()
# Convert binary data to string
try:
return content.decode('utf-8')
except UnicodeDecodeError:
try:
return content.decode('shift-jis')
except UnicodeDecodeError:
st.error("Unable to determine file encoding")
return None
return None
except Exception as e:
st.error(f"An error occurred while retrieving source code: {str(e)}")
return None
def generate_description_for_all_apps(model_name):
"""
Automatically generates descriptions for apps without descriptions.
Returns: Number of processed apps
"""
# Get apps without descriptions
apps_df = get_streamlit_apps_info()
# Get apps with existing descriptions
existing_descriptions = snowflake_session.sql("""
SELECT
database_name || '.' || schema_name || '.' || app_name as app_key
FROM APP_DESCRIPTIONS
WHERE description IS NOT NULL
AND TRIM(description) != ''
""").collect()
# Convert existing app keys to a set
existing_app_keys = {row['APP_KEY'] for row in existing_descriptions}
# Filter apps without descriptions
apps_to_process = []
for _, app in apps_df.iterrows():
app_key = f"{app['database_name']}.{app['schema_name']}.{app['name']}"
if app_key not in existing_app_keys:
apps_to_process.append(app)
total_apps = len(apps_to_process)
if total_apps == 0:
st.info("All apps have descriptions set.")
return 0
progress_bar = st.progress(0)
status_text = st.empty()
for i, app in enumerate(apps_to_process):
progress = (i + 1) / total_apps
progress_bar.progress(progress)
status_text.text(f"Processing... {i + 1}/{total_apps} apps ({int(progress * 100)}%)")
app_title = app['title'] if app['title'] is not None else app['name']
# Get app source code
source_code = get_app_source_code(app['database_name'], app['schema_name'], app['name'])
# Create prompt (pass source code as a separate parameter)
prompt_template = """Generate a description for the following Streamlit app.
- Output only the description in your response.
- Do not include phrases like "Certainly, I understand" in your response.
- Output in plain text, not markdown format.
- Summarize the description concisely in about 100 characters.
App name: {app_title} ({app_name})"""
try:
# Pass prompt and source code as separate parameters
prompt_params = {
'app_title': app_title,
'app_name': app['name'],
'source_code': source_code if source_code else 'Source code could not be retrieved.'
}
# Use bind parameters to prevent SQL injection
description = snowflake_session.sql("""
SELECT SNOWFLAKE.CORTEX.COMPLETE(?,
CONCAT(
?,
'\n\nSource code:\n',
?
)
)
""", params=[
model_name,
prompt_template.format(app_title=app_title, app_name=app['name']),
prompt_params['source_code']
]).collect()[0][0]
# Update description
snowflake_session.sql("""
MERGE INTO APP_DESCRIPTIONS
USING (SELECT
? AS db,
? AS sc,
? AS app,
? AS desc
) AS src
ON APP_DESCRIPTIONS.database_name = src.db
AND APP_DESCRIPTIONS.schema_name = src.sc
AND APP_DESCRIPTIONS.app_name = src.app
WHEN MATCHED THEN
UPDATE SET
description = src.desc,
last_updated = CURRENT_TIMESTAMP()
WHEN NOT MATCHED THEN
INSERT (database_name, schema_name, app_name, description)
VALUES (src.db, src.sc, src.app, src.desc)
""", params=[
app['database_name'],
app['schema_name'],
app['name'],
description
]).collect()
except Exception as e:
st.error(f"An error occurred while processing app '{app_title}': {str(e)}")
continue
# Clear cache after processing
clear_app_cache()
progress_bar.progress(1.0)
status_text.text(f"Completed! Generated descriptions for {total_apps} apps.")
return total_apps
@st.fragment
def confirm_and_delete_descriptions():
"""
Confirms and executes bulk deletion of descriptions.
"""
if "show_desc_confirm" not in st.session_state:
st.session_state.show_desc_confirm = False
if "desc_message" not in st.session_state:
st.session_state.desc_message = None
container = st.container()
with container:
button_placeholder = st.empty()
message_placeholder = st.empty()
confirm_placeholder = st.empty()
if button_placeholder.button("Delete all descriptions", type="secondary") or st.session_state.show_desc_confirm:
st.session_state.show_desc_confirm = True
with confirm_placeholder.container():
st.warning("Are you sure you want to delete all descriptions?", icon="⚠️")
col1, col2 = st.columns(2)
with col1:
if st.button("Delete", key="confirm_delete_desc", type="primary"):
with st.spinner("Deleting descriptions..."):
clear_all_descriptions()
st.session_state.desc_message = "success"
st.session_state.show_desc_confirm = False
with col2:
if st.button("Cancel", key="cancel_delete_desc"):
st.session_state.desc_message = "cancel"
st.session_state.show_desc_confirm = False
if st.session_state.desc_message == "success":
message_placeholder.success("Deleted descriptions for all apps.")
confirm_placeholder.empty()
st.session_state.desc_message = None
elif st.session_state.desc_message == "cancel":
message_placeholder.info("Deletion cancelled.")
confirm_placeholder.empty()
st.session_state.desc_message = None
def clear_all_descriptions():
"""
Deletes descriptions for all apps.
"""
snowflake_session.sql("""
UPDATE APP_DESCRIPTIONS
SET description = NULL,
last_updated = CURRENT_TIMESTAMP()
""").collect()
# Clear cache
clear_app_cache()
def clear_all_tags():
"""
Deletes tags for all apps.
"""
snowflake_session.sql("""
UPDATE APP_DESCRIPTIONS
SET tags = NULL,
last_updated = CURRENT_TIMESTAMP()
""").collect()
# Clear cache
clear_app_cache()
@st.fragment
def confirm_and_delete_tags():
"""
Confirms and executes bulk deletion of tags.
"""
if "show_tags_confirm" not in st.session_state:
st.session_state.show_tags_confirm = False
if "tags_message" not in st.session_state:
st.session_state.tags_message = None
container = st.container()
with container:
button_placeholder = st.empty()
message_placeholder = st.empty()
confirm_placeholder = st.empty()
if button_placeholder.button("Delete all tags", type="secondary") or st.session_state.show_tags_confirm:
st.session_state.show_tags_confirm = True
with confirm_placeholder.container():
st.warning("Are you sure you want to delete all tags?", icon="⚠️")
col1, col2 = st.columns(2)
with col1:
if st.button("Delete", key="confirm_delete_tags", type="primary"):
with st.spinner("Deleting tags..."):
clear_all_tags()
st.session_state.tags_message = "success"
st.session_state.show_tags_confirm = False
with col2:
if st.button("Cancel", key="cancel_delete_tags"):
st.session_state.tags_message = "cancel"
st.session_state.show_tags_confirm = False
if st.session_state.tags_message == "success":
message_placeholder.success("Deleted tags for all apps.")
confirm_placeholder.empty()
st.session_state.tags_message = None
elif st.session_state.tags_message == "cancel":
message_placeholder.info("Deletion cancelled.")
confirm_placeholder.empty()
st.session_state.tags_message = None
def generate_tags_for_all_apps():
"""
Automatically generates tags for apps without tags.
Returns: Number of processed apps
"""
# Check the number of apps with descriptions
apps_with_description = snowflake_session.sql("""
SELECT COUNT(*) as count
FROM APP_DESCRIPTIONS
WHERE description IS NOT NULL
AND TRIM(description) != ''
""").collect()[0]['COUNT']
if apps_with_description == 0:
st.warning("To generate tags, please set descriptions for the apps first.")
return 0
# Get apps without tags
app_data = snowflake_session.sql("""
SELECT
database_name,
schema_name,
app_name,
description
FROM APP_DESCRIPTIONS
WHERE (tags IS NULL OR TRIM(tags) = '') -- Check for empty string only
AND description IS NOT NULL
AND TRIM(description) != ''
""").collect()
total_apps = len(app_data)
if total_apps == 0:
st.info("All apps with descriptions have tags set.")
return 0
# Get category list
categories = [row['CATEGORY'] for row in snowflake_session.sql("SELECT category FROM APP_CATEGORIES ORDER BY category").collect()]
categories_json = json.dumps(categories, ensure_ascii=False)
progress_bar = st.progress(0)
status_text = st.empty()
for i, row in enumerate(app_data):
try:
progress = (i + 1) / total_apps
progress_bar.progress(progress)
status_text.text(f"Processing... {i + 1}/{total_apps} apps ({int(progress * 100)}%)")
# Get CLASSIFY_TEXT function result
result = snowflake_session.sql("""
SELECT SNOWFLAKE.CORTEX.CLASSIFY_TEXT(?, PARSE_JSON(?))
""", params=[row['DESCRIPTION'], categories_json]).collect()[0][0]
# Parse result if it's a JSON string
if isinstance(result, str):
result = json.loads(result)
# Extract tags (handle both single tag and list of tags)
if isinstance(result, list):
tags = [item['label'] for item in result if isinstance(item, dict) and 'label' in item]
elif isinstance(result, dict) and 'label' in result:
tags = [result['label']]
else:
tags = []
# Update tags
if tags: # Only update if tags are not empty
snowflake_session.sql("""
UPDATE APP_DESCRIPTIONS
SET
tags = ?,
last_updated = CURRENT_TIMESTAMP()
WHERE database_name = ?
AND schema_name = ?
AND app_name = ?
""", params=[
','.join(tags),
row['DATABASE_NAME'],
row['SCHEMA_NAME'],
row['APP_NAME']
]).collect()
except Exception as e:
st.error(f"An error occurred while generating tags for app '{row['APP_NAME']}': {str(e)}")
continue
# Clear cache after processing
clear_app_cache()
progress_bar.progress(1.0)
status_text.text(f"Completed! Generated tags for {total_apps} apps.")
return total_apps
def vector_generation_page():
"""
Displays the automatic vector data generation page.
"""
st.title("Automatic Vector Data Generation")
st.write("Generate vector data from app descriptions.")
# Check the number of apps with descriptions
apps_with_description = snowflake_session.sql("""
SELECT COUNT(*) as count
FROM APP_DESCRIPTIONS
WHERE description IS NOT NULL
AND TRIM(description) != ''
""").collect()[0]['COUNT']
if apps_with_description == 0:
st.warning("To generate vector data, please set descriptions for the apps first.")
return
# Get apps without vector data
apps_without_vector = snowflake_session.sql("""
SELECT
DATABASE_NAME,
SCHEMA_NAME,
APP_NAME,
DESCRIPTION
FROM APP_DESCRIPTIONS
WHERE DESCRIPTION IS NOT NULL
AND TRIM(DESCRIPTION) != ''
AND EMBEDDING IS NULL -- Check for NULL only
""").collect()
total_apps = len(apps_without_vector)
if total_apps == 0:
st.info("All apps with descriptions have vector data set.")
else:
st.write(f"Vector data can be generated for {total_apps} apps.")
if st.button("Generate Vector Data"):
progress_bar = st.progress(0)
status_text = st.empty()
for i, row in enumerate(apps_without_vector):
try:
progress = (i + 1) / total_apps
progress_bar.progress(progress)
status_text.text(f"Processing... {i + 1}/{total_apps} apps ({int(progress * 100)}%)")
# Generate and save vector data
snowflake_session.sql("""
UPDATE APP_DESCRIPTIONS
SET
EMBEDDING = SNOWFLAKE.CORTEX.EMBED_TEXT_1024('voyage-multilingual-2', ?),
last_updated = CURRENT_TIMESTAMP()
WHERE database_name = ?
AND schema_name = ?
AND app_name = ?
AND EMBEDDING IS NULL -- Check for NULL only
""", params=[
row['DESCRIPTION'],
row['DATABASE_NAME'],
row['SCHEMA_NAME'],
row['APP_NAME']
]).collect()
except Exception as e:
st.error(f"An error occurred while generating vector data for app '{row['APP_NAME']}': {str(e)}")
continue
# Clear cache after processing
clear_app_cache()
progress_bar.progress(1.0)
status_text.text(f"Completed! Generated vector data for {total_apps} apps.")
st.success("Vector data has been generated for all target apps!")
st.divider()
# Bulk deletion of vector data
st.write("### Bulk Vector Data Deletion")
st.write("Delete vector data for all apps.")
confirm_and_delete_vectors()
@st.fragment
def confirm_and_delete_vectors():
"""
Confirms and executes bulk deletion of vector data.
"""
if "show_vectors_confirm" not in st.session_state:
st.session_state.show_vectors_confirm = False
if "vectors_message" not in st.session_state:
st.session_state.vectors_message = None
container = st.container()
with container:
button_placeholder = st.empty()
message_placeholder = st.empty()
confirm_placeholder = st.empty()
if button_placeholder.button("Delete all vector data", type="secondary") or st.session_state.show_vectors_confirm:
st.session_state.show_vectors_confirm = True
with confirm_placeholder.container():
st.warning("Are you sure you want to delete all vector data?", icon="⚠️")
col1, col2 = st.columns(2)
with col1:
if st.button("Delete", key="confirm_delete_vectors", type="primary"):
with st.spinner("Deleting vector data..."):
clear_all_vectors()
st.session_state.vectors_message = "success"
st.session_state.show_vectors_confirm = False
with col2:
if st.button("Cancel", key="cancel_delete_vectors"):
st.session_state.vectors_message = "cancel"
st.session_state.show_vectors_confirm = False
if st.session_state.vectors_message == "success":
message_placeholder.success("Deleted vector data for all apps.")
confirm_placeholder.empty()
st.session_state.vectors_message = None
elif st.session_state.vectors_message == "cancel":
message_placeholder.info("Deletion cancelled.")
confirm_placeholder.empty()
st.session_state.vectors_message = None
def clear_all_vectors():
"""
Deletes vector data for all apps.
"""
snowflake_session.sql("""
UPDATE APP_DESCRIPTIONS
SET embedding = NULL,
last_updated = CURRENT_TIMESTAMP()
""").collect()
def main():
"""
Main function of the application.
"""
# Initialize and update tables only once at app startup
create_app_descriptions_table()
page = st.sidebar.radio(
"Select Page",
["App Portal", "Admin Page", "Auto Description Generation", "Auto Tag Generation", "Auto Vector Data Generation"]
)
if page == "App Portal":
main_page()
elif page == "Admin Page":
admin_page()
elif page == "Auto Description Generation":
description_generation_page()
elif page == "Auto Tag Generation":
tag_generation_page()
elif page == "Auto Vector Data Generation":
vector_generation_page()
if __name__ == "__main__":
main()
Conclusion
What do you think? By using an app catalog, you can search and filter applications from various angles. This is particularly important for Streamlit in Snowflake, as it's often used by business users. Making it easier to find applications can help make data more accessible to a wider audience.
Promotion
Snowflake What's New Updates on X
I'm sharing updates on Snowflake's What's New on X. I'd be happy if you could follow:
English Version
Snowflake What's New Bot (English Version)
Japanese Version
Snowflake's What's New Bot (Japanese Version)
Change Log
(20250204) Initial post
Top comments (0)