DEV Community

Tsubasa Kanno
Tsubasa Kanno

Posted on • Edited on

Streamlit in Snowflake App Catalog

Introduction

For all the ladies and gentlemen who love Streamlit in Snowflake, as you diligently create apps day after day, you might find yourself with a large number of apps on Snowflake before you know it. That's certainly true for me. As the number of apps increases, it can become challenging to keep track of the purpose of each app and how frequently they are used.

That's why I created an app catalog for Streamlit in Snowflake using Streamlit in Snowflake! Today, I'd like to introduce this self-made app. While it may not solve all problems, I hope it can serve as a useful method for managing apps in Snowflake. If you find it helpful or interesting, I'd be delighted to receive your feedback or a 👍!

Note: This article represents my personal views and not those of Snowflake.

Feature Overview

List of Features

App Portal Functionality

  • Search and Filtering
    • Hybrid search combining keyword and vector search
    • Sorting by app execution count and likes
    • Filtering by tags
  • Application List Display
    • Toggle between simple card view and detailed list view
    • Like feature for apps and display of like count
    • Display of app execution count for the past 30 days
    • Display of app tags
    • Display of app descriptions
    • Direct navigation to applications

Admin Panel

  • User-editable app descriptions
  • User-editable app tags (supports multiple tags)

Automatic Description Generation

  • AI-powered generation of app descriptions from source code
  • Bulk deletion of app descriptions

Automatic Tag Generation

  • Editing of categories for tags
  • AI-powered generation of tags from app descriptions
  • Bulk deletion of tags

Automatic Vector Data Generation

  • AI-powered generation of vector data from app descriptions
  • Bulk deletion of vector data

Finished Product Images

App Portal

Admin Panel

Automatic Description Generation

Automatic Tag Generation

Automatic Vector Data Generation

Prerequisites

Snowflake account

  • A Snowflake account with access to Cortex LLM (now available in most clouds and regions due to cross-region inference)

Streamlit in Snowflake installation package

  • Python 3.11 or later (no additional packages required)

Steps

Create a New Streamlit in Snowflake App

Click on "Streamlit" in the left pane of Snowsight, then click the "+ Streamlit" button to create a SiS app.

Run the Streamlit in Snowflake App

In the Streamlit in Snowflake app editing screen, copy and paste the following code to complete the setup. (The code is quite long, so please expand the accordion to view it)

import streamlit as st
from snowflake.snowpark.context import get_active_session
import pandas as pd
import json

# Page configuration
st.set_page_config(page_title="Streamlit in Snowflake App Catalog", layout='wide')

# Snowflake account information (please adjust according to your environment)
orgname = "<your_org_name>"
accountname = "<your_account_name>"

# Get Snowflake session
snowflake_session = get_active_session()

@st.cache_data
def get_streamlit_apps_info():
    """
    Retrieves Streamlit in Snowflake app information for the entire account.
    Native Apps are excluded.
    """
    query = "SHOW STREAMLITS IN ACCOUNT"
    result = snowflake_session.sql(query).collect()
    df = pd.DataFrame(result)
    return df[df['owner_role_type'] == 'ROLE']

@st.cache_data
def create_app_descriptions_table():
    """
    Creates APP_DESCRIPTIONS table and APP_CATEGORIES table if they don't exist.
    Also, bulk inserts initial data for apps not present in APP_DESCRIPTIONS.
    """
    # Table for app descriptions
    snowflake_session.sql("""
    CREATE TABLE IF NOT EXISTS APP_DESCRIPTIONS (
        database_name VARCHAR,
        schema_name VARCHAR,
        app_name VARCHAR,
        description VARCHAR,
        tags VARCHAR,
        embedding VECTOR(FLOAT, 1024),
        views INTEGER DEFAULT 0,
        likes INTEGER DEFAULT 0,
        last_updated TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
    )
    """).collect()

    # Table for category management
    snowflake_session.sql("""
    CREATE TABLE IF NOT EXISTS APP_CATEGORIES (
        category STRING PRIMARY KEY,
        created_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
    )
    """).collect()

    # Add default categories (bulk insert)
    default_categories = [
        'Data Analysis', 'Data Engineering', 'Dashboard', 'BI',
        'Report', 'Management Tool', 'Generative AI', 'Machine Learning'
    ]
    categories_values = ", ".join([f"('{category}')" for category in default_categories])
    snowflake_session.sql(f"""
    INSERT INTO APP_CATEGORIES (category)
    SELECT category FROM (VALUES {categories_values}) AS v(category)
    WHERE category NOT IN (SELECT category FROM APP_CATEGORIES)
    """).collect()

    # Get current Streamlit app list and update in bulk
    apps_df = get_streamlit_apps_info()
    if not apps_df.empty:
        # Create a temporary table with app list
        apps_values = ", ".join([
            f"('{app['database_name']}', '{app['schema_name']}', '{app['name']}')"
            for _, app in apps_df.iterrows()
        ])

        # Execute SQL for bulk update
        snowflake_session.sql(f"""
        INSERT INTO APP_DESCRIPTIONS (database_name, schema_name, app_name)
        SELECT src.* FROM (
            VALUES {apps_values}
        ) AS src(database_name, schema_name, app_name)
        WHERE NOT EXISTS (
            SELECT 1 
            FROM APP_DESCRIPTIONS dest 
            WHERE dest.database_name = src.database_name
            AND dest.schema_name = src.schema_name
            AND dest.app_name = src.app_name
        )
        """).collect()

def update_app_views():
    """
    Aggregates app execution count from QUERY_HISTORY and updates APP_DESCRIPTIONS table.
    """
    snowflake_session.sql("""
        MERGE INTO APP_DESCRIPTIONS target
        USING (
            SELECT 
                REGEXP_SUBSTR(QUERY_TEXT, '"([^"]+)"\\."([^"]+)"\\."([^"]+)"', 1, 1, 'e', 1) as database_name,
                REGEXP_SUBSTR(QUERY_TEXT, '"([^"]+)"\\."([^"]+)"\\."([^"]+)"', 1, 1, 'e', 2) as schema_name,
                REGEXP_SUBSTR(QUERY_TEXT, '"([^"]+)"\\."([^"]+)"\\."([^"]+)"', 1, 1, 'e', 3) as app_name,
                COUNT(*) as view_count
            FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
            WHERE QUERY_TEXT LIKE 'execute streamlit%'
            AND START_TIME >= DATEADD(month, -1, CURRENT_TIMESTAMP())
            GROUP BY 1, 2, 3
        ) as source
        ON target.database_name = source.database_name
        AND target.schema_name = source.schema_name
        AND target.app_name = source.app_name
        WHEN MATCHED THEN
            UPDATE SET views = source.view_count
    """).collect()

def get_app_data():
    """
    Retrieves app information and metadata.
    Fetches from database only if cache doesn't exist.
    """
    if 'apps_df' not in st.session_state or 'app_descriptions' not in st.session_state:
        # Update app execution count
        update_app_views()

        # Retrieve app information
        apps_df = get_streamlit_apps_info()
        app_data = snowflake_session.sql("""
            SELECT 
                DATABASE_NAME,
                SCHEMA_NAME,
                APP_NAME,
                DESCRIPTION,
                TAGS,
                EMBEDDING,
                VIEWS,
                LIKES,
                LAST_UPDATED
            FROM APP_DESCRIPTIONS
        """).collect()

        # Organize app metadata
        app_descriptions = {}
        for row in app_data:
            # Convert tags to array
            current_tags = []
            if row['TAGS']:
                current_tags = [tag.strip() for tag in row['TAGS'].split(',') if tag.strip()]

            app_descriptions[f"{row['DATABASE_NAME']}.{row['SCHEMA_NAME']}.{row['APP_NAME']}"] = {
                'description': row['DESCRIPTION'],
                'tags': current_tags,  # Always store as array
                'views': row['VIEWS'] if row['VIEWS'] is not None else 0,
                'likes': row['LIKES'] if row['LIKES'] is not None else 0,
                'last_updated': row['LAST_UPDATED']
            }

        # Save data to session_state
        st.session_state['apps_df'] = apps_df
        st.session_state['app_descriptions'] = app_descriptions

    return st.session_state['apps_df'], st.session_state['app_descriptions']

def clear_app_cache():
    """
    Clears the cache of app data.
    """
    if 'apps_df' in st.session_state:
        del st.session_state['apps_df']
    if 'app_descriptions' in st.session_state:
        del st.session_state['app_descriptions']

def update_app_description(database, schema, app_name, description, tags):
    """
    Updates or inserts app description and tags.
    """
    # Convert tags to comma-separated string
    tags_str = ','.join([tag['label'] if isinstance(tag, dict) else tag for tag in tags])

    snowflake_session.sql("""
        MERGE INTO APP_DESCRIPTIONS target
        USING (
            SELECT 
                ? AS database_name,
                ? AS schema_name,
                ? AS app_name,
                ? AS description,
                ? AS tags
        ) AS source
        ON target.database_name = source.database_name
        AND target.schema_name = source.schema_name
        AND target.app_name = source.app_name
        WHEN MATCHED THEN
            UPDATE SET 
                description = source.description,
                tags = source.tags,
                last_updated = CURRENT_TIMESTAMP()
        WHEN NOT MATCHED THEN
            INSERT (database_name, schema_name, app_name, description, tags)
            VALUES (
                source.database_name,
                source.schema_name,
                source.app_name,
                source.description,
                source.tags
            )
    """, params=[
        database,
        schema,
        app_name,
        description,
        tags_str
    ]).collect()

    # Clear cache to allow fetching of latest data
    clear_app_cache()

@st.fragment
def render_like_button(app, likes_key):
    """
    Function to render the like button
    """
    # Manage like state
    if "like_states" not in st.session_state:
        st.session_state.like_states = {}

    current_likes = st.session_state[likes_key]

    # Disable form style
    st.markdown("""
        <style>
        [data-testid="stForm"] {
            border: none;
            padding: 0;
            margin: 0;
        }
        </style>
    """, unsafe_allow_html=True)

    # Handle button click
    def on_like_click():
        # Update likes
        snowflake_session.sql("""
            UPDATE APP_DESCRIPTIONS
            SET likes = likes + 1,
                last_updated = CURRENT_TIMESTAMP()
            WHERE database_name = ?
            AND schema_name = ?
            AND app_name = ?
        """, params=[
            app['database_name'],
            app['schema_name'],
            app['name']
        ]).collect()
        # Update like count in session_state
        st.session_state[likes_key] = current_likes + 1
        # Clear cache in background
        clear_app_cache()
        # Update like state for this button
        st.session_state.like_states[likes_key] = True

    # Display like button
    if st.button(
        f"👍 {current_likes}",
        key=f"like_button_{likes_key}",
        on_click=on_like_click,
        disabled=st.session_state.like_states.get(likes_key, False)
    ):
        pass

@st.fragment
def render_app_card_view(app, metadata, app_key):
    """
    Function to render app information in card format
    """
    app_title = app['title'] if app['title'] is not None else app['name']
    app_description = metadata['description'] if metadata['description'] is not None else "No description"
    app_url = f"https://app.snowflake.com/{orgname}/{accountname}/#/streamlit-apps/{app['database_name']}.{app['schema_name']}.{app['name']}"

    # Key for managing like state
    likes_key = f"likes_{app_key}"
    if likes_key not in st.session_state:
        st.session_state[likes_key] = metadata.get('likes', 0)

    # Card container
    tile = st.container(height=160, border=True)

    # Title and basic information
    col1, col2 = tile.columns([3, 1])
    with col1:
        # Display title as a link (even smaller)
        st.markdown(f"###### [{app_title}]({app_url})")
        # Display metrics and tags side by side
        st.caption(f"👁 {metadata.get('views', 0)}")
        if metadata.get('tags'):
            tags = metadata['tags']
            if isinstance(tags, str):
                tags = [tag.strip() for tag in tags.split(',') if tag.strip()]
            st.caption(" ".join([f"#{tag}" for tag in tags]))

    with col2:
        # Like button
        render_like_button(app, likes_key)

    # Display description at the end (full text)
    tile.caption(app_description)

@st.fragment
def generate_description_with_progress(model_name):
    """
    Performs description generation process and displays progress.
    """
    with st.spinner("Generating descriptions..."):
        apps_processed = generate_description_for_all_apps(model_name)
        if apps_processed > 0:
            st.success("Generated descriptions for all apps.")

@st.fragment
def description_generation_form():
    """
    Displays the form for description generation.
    """
    with st.form("description_generation_form"):
        models = [
            'claude-3-5-sonnet',
            'mistral-large2',
            'llama3.3-70b',
            'snowflake-llama-3.1-405b', 'snowflake-llama-3.3-70b',
            'llama3.2-3b', 'llama3.2-1b',
            'llama3.1-405b', 'llama3.1-8b',
            'reka-core', 'reka-flash',
            'snowflake-arctic',
            'jamba-1.5-large', 'jamba-1.5-mini'
        ]

        selected_model = st.selectbox("Select model to use", models)
        submitted = st.form_submit_button("Generate descriptions in bulk")

        if submitted:
            generate_description_with_progress(selected_model)

@st.fragment
def tag_generation_with_progress():
    """
    Performs tag generation process and displays progress.
    """
    with st.spinner("Generating tags..."):
        generate_tags_for_all_apps()
    st.success("Generated tags for all apps.")

@st.fragment
def category_list():
    """
    Displays the category list and allows deletion of selected categories.
    """
    if "category_message" not in st.session_state:
        st.session_state.category_message = None

    # Get category list
    categories = [
        row['CATEGORY'] for row in 
        snowflake_session.sql("SELECT category FROM APP_CATEGORIES ORDER BY category").collect()
    ]

    with st.form("category_list_form", clear_on_submit=True):
        selected = st.selectbox(
            "Select category",
            categories,
            key=f"category_selector_{len(categories)}"
        )

        submitted = st.form_submit_button("Delete selected category")
        if submitted and selected:
            snowflake_session.sql(
                "DELETE FROM APP_CATEGORIES WHERE category = ?",
                params=[selected]
            ).collect()
            st.session_state.category_message = f"Deleted category '{selected}'."
            st.rerun()

    # Placeholder for message display (moved below the form)
    if st.session_state.category_message:
        st.success(st.session_state.category_message)
        st.session_state.category_message = None

@st.fragment
def add_category_form():
    """
    Displays a form to add a new category.
    """
    if "add_category_message" not in st.session_state:
        st.session_state.add_category_message = None

    with st.form("add_category_form", clear_on_submit=True):
        new_category = st.text_input("New category name")
        submitted = st.form_submit_button("Add")
        if submitted and new_category:
            snowflake_session.sql(
                "INSERT INTO APP_CATEGORIES (category) SELECT ? WHERE NOT EXISTS (SELECT 1 FROM APP_CATEGORIES WHERE category = ?)",
                params=[new_category, new_category]
            ).collect()
            st.session_state.add_category_message = f"Added category '{new_category}'."
            st.rerun()

    # Placeholder for message display (moved below the form)
    if st.session_state.add_category_message:
        st.success(st.session_state.add_category_message)
        st.session_state.add_category_message = None

@st.fragment
def category_management():
    """
    Provides category management functionality.
    """
    st.write("### Category Management")
    st.write("Manage categories used for tag generation.")

    col1, col2 = st.columns(2)

    with col1:
        st.write("#### Current Category List")
        category_list()

    with col2:
        st.write("#### Add New Category")
        add_category_form()

def description_generation_page():
    """
    Displays the automatic description generation page.
    """
    st.title("Automatic Description Generation")

    # Bulk description generation
    st.write("### Bulk Description Generation")
    st.write("Automatically generate descriptions for all apps.")
    description_generation_form()

    st.divider()

    # Bulk description deletion
    st.write("### Bulk Description Deletion")
    st.write("Delete descriptions for all apps.")
    confirm_and_delete_descriptions()

@st.fragment
def tag_generation_form():
    """
    Displays the form for tag generation.
    """
    if "tag_generation_message" not in st.session_state:
        st.session_state.tag_generation_message = None

    with st.form("tag_generation_form"):
        st.write("Automatically generate tags for all apps using their descriptions and the set categories.")
        submitted = st.form_submit_button("Generate tags in bulk")

        if submitted:
            with st.spinner("Generating tags..."):
                apps_processed = generate_tags_for_all_apps()
                if apps_processed > 0:
                    st.success("Generated tags for all apps.")

def tag_generation_page():
    """
    Displays the automatic tag generation page.
    """
    st.title("Automatic Tag Generation")

    # Category management
    category_management()

    st.divider()

    # Bulk tag generation
    st.write("### Bulk Tag Generation")
    tag_generation_form()

    st.divider()

    # Bulk tag deletion
    st.write("### Bulk Tag Deletion")
    st.write("Delete tags for all apps.")
    confirm_and_delete_tags()

def main_page():
    """
    Displays the main page.
    """
    st.title("Streamlit in Snowflake App Catalog")

    # Get cached data
    apps_df, app_descriptions = get_app_data()

    # Select display mode
    display_mode = st.radio("Display mode", ["Card view", "List view"], horizontal=True)

    # Search and filter controls
    search_query, sort_by, selected_tags, _ = admin_search_filter()

    # Display filtered apps
    filtered_apps = filter_apps(apps_df, app_descriptions, search_query, selected_tags)

    # Sort
    if sort_by == "Popularity":
        filtered_apps.sort(key=lambda x: x[1].get('views', 0), reverse=True)
    elif sort_by == "Rating":
        filtered_apps.sort(key=lambda x: x[1].get('likes', 0), reverse=True)

    # Display app list
    if display_mode == "Card view":
        # Create 3-column grid layout for card view
        cols = st.columns(3)
        for i, (app, metadata) in enumerate(filtered_apps):
            with cols[i % 3]:
                render_app_card_view(app, metadata, f"{app['database_name']}.{app['schema_name']}.{app['name']}")
                if search_query.strip() and metadata.get('similarity', 0.0) > 0:
                    st.caption(f"Match rate: {metadata['similarity']:.1%}")
    else:
        # List view
        for app, metadata in filtered_apps:
            app_title = app['title'] if app['title'] is not None else app['name']
            app_url = f"https://app.snowflake.com/{orgname}/{accountname}/#/streamlit-apps/{app['database_name']}.{app['schema_name']}.{app['name']}"
            app_key = f"{app['database_name']}.{app['schema_name']}.{app['name']}"

            # Key for managing like state
            likes_key = f"likes_{app_key}"
            if likes_key not in st.session_state:
                st.session_state[likes_key] = metadata.get('likes', 0)

            # App information container
            with st.container():
                # Title and basic information
                col1, col2 = st.columns([3, 1])
                with col1:
                    st.markdown(f"### [{app_title}]({app_url})")
                    if search_query.strip() and metadata.get('similarity', 0.0) > 0:
                        st.caption(f"Match rate: {metadata['similarity']:.1%}")

                with col2:
                    # Metrics
                    st.caption(f"👁 {metadata.get('views', 0)}")
                    # Like button
                    render_like_button(app, likes_key)

                # Display tags
                if metadata.get('tags'):
                    tags = metadata['tags']
                    if isinstance(tags, str):
                        tags = [tag.strip() for tag in tags.split(',') if tag.strip()]
                    st.caption(" ".join([f"#{tag}" for tag in tags]))

                # Display detailed information in an expander
                with st.expander("Detailed information"):
                    # Description
                    if metadata.get('description'):
                        st.markdown(metadata['description'])
                    else:
                        st.markdown("*No description*")

                    # Detailed information table
                    details = {
                        "App name": app['name'],
                        "Database": app['database_name'],
                        "Schema": app['schema_name'],
                        "Owner": app.get('owner', 'Unknown'),
                        "Created on": str(app.get('created_on', 'Unknown'))[:19] if app.get('created_on') else 'Unknown',
                        "URL": app_url
                    }
                    st.table(pd.DataFrame([details]).T.rename(columns={0: "Value"}))

                st.divider()

@st.fragment
def render_app_metadata_editor(app, metadata, categories):
    """
    Displays the metadata editing form for an app.
    """
    app_key = f"{app['database_name']}.{app['schema_name']}.{app['name']}"
    app_title = app['title'] if app['title'] is not None else app['name']

    st.subheader(f"{app_title} ({app['name']})")

    with st.form(key=f"metadata_form_{app_key}"):
        col1, col2 = st.columns([3, 1])
        with col1:
            new_desc = st.text_area("Description", metadata['description'] or "", key=f"desc_{app_key}")
            selected_app_tags = st.multiselect(
                "Tags",
                categories,
                default=metadata['tags'],
                key=f"tags_{app_key}"
            )
            if metadata['last_updated']:
                st.caption(f"Last updated: {metadata['last_updated']}")

        with col2:
            submitted = st.form_submit_button("Update")
            if submitted:
                update_app_description(app['database_name'], app['schema_name'], app['name'], new_desc, selected_app_tags)
                st.success(f"Metadata for {app_title} has been updated.")

    st.divider()

def admin_search_filter():
    """
    Displays search and filter controls for the admin page.
    """
    with st.form("search_form"):
        col1, col2 = st.columns([2, 1])
        with col1:
            search_query = st.text_input("Search apps", "")
        with col2:
            sort_by = st.selectbox("Sort by", ["Popularity", "Rating"])

        # Get tag list
        all_tags = set()
        app_data = snowflake_session.sql("""
            SELECT DISTINCT TRIM(value) as tag
            FROM APP_DESCRIPTIONS,
            LATERAL FLATTEN(input => SPLIT(tags, ',')) as t
            WHERE tags IS NOT NULL
            ORDER BY tag
        """).collect()

        for row in app_data:
            if row['TAG']:
                all_tags.add(row['TAG'])

        selected_tags = st.multiselect("Filter by tags", sorted(list(all_tags)))

        search_button = st.form_submit_button("Search")

    return search_query, sort_by, selected_tags, search_button

@st.cache_data
def get_cached_categories():
    """
    Retrieves and caches the category list.
    """
    return [row['CATEGORY'] for row in snowflake_session.sql("SELECT category FROM APP_CATEGORIES ORDER BY category").collect()]

def filter_apps(apps_df, app_descriptions, search_query, selected_tags):
    """
    Filters the app list.
    Performs a hybrid search combining keyword search and vector search.
    """
    filtered_apps = []

    if search_query.strip():
        # Escape search query
        escaped_query = search_query.replace("'", "''")

        # Get vector search results
        vector_results = snowflake_session.sql(f"""
        WITH search_vector AS (
            SELECT SNOWFLAKE.CORTEX.EMBED_TEXT_1024('voyage-multilingual-2', '{escaped_query}') as query_embedding
        )
        SELECT 
            a.database_name,
            a.schema_name,
            a.app_name,
            VECTOR_COSINE_SIMILARITY(a.embedding, s.query_embedding) as similarity
        FROM 
            APP_DESCRIPTIONS a,
            search_vector s
        WHERE 
            a.embedding IS NOT NULL
        ORDER BY 
            similarity DESC
        """).collect()

        # Convert vector search results to dictionary
        similarity_dict = {
            f"{row['DATABASE_NAME']}.{row['SCHEMA_NAME']}.{row['APP_NAME']}": row['SIMILARITY']
            for row in vector_results
        }

    # Filter apps
    for _, app in apps_df.iterrows():
        app_key = f"{app['database_name']}.{app['schema_name']}.{app['name']}"
        metadata = app_descriptions.get(app_key, {'description': "", 'tags': [], 'last_updated': None})
        app_title = app['title'] if app['title'] is not None else app['name']

        # Search and tag filtering conditions
        show_app = True
        similarity = 0.0

        if search_query.strip():
            # Keyword search
            keyword_match = (
                search_query.lower() in app_title.lower() or 
                search_query.lower() in (metadata['description'] or "").lower()
            )
            # Get vector search similarity
            vector_similarity = similarity_dict.get(app_key, 0.0)

            # Show if keyword matches or similarity is 0.4 or higher
            show_app = keyword_match or vector_similarity >= 0.4
            similarity = vector_similarity if vector_similarity >= 0.4 else 0.0

        if selected_tags:
            # Convert tags to array if it's a string
            app_tags = metadata['tags']
            if isinstance(app_tags, str):
                app_tags = [tag.strip() for tag in app_tags.split(',') if tag.strip()]
            # Check if all selected tags are included (AND condition)
            show_app = show_app and all(tag in app_tags for tag in selected_tags)

        if show_app:
            metadata['similarity'] = similarity
            filtered_apps.append((app, metadata))

    return filtered_apps

def admin_page():
    """
    Admin page: Manually edit app metadata.
    """
    st.title("Admin Page: Edit App Metadata")

    # Get cached data
    apps_df, app_descriptions = get_app_data()
    categories = get_cached_categories()

    # Search and filter controls
    search_query, sort_by, selected_tags, _ = admin_search_filter()

    # Display filtered apps
    filtered_apps = filter_apps(apps_df, app_descriptions, search_query, selected_tags)

    # Sort
    if sort_by == "Popularity":
        filtered_apps.sort(key=lambda x: x[1].get('views', 0), reverse=True)
    elif sort_by == "Rating":
        filtered_apps.sort(key=lambda x: x[1].get('likes', 0), reverse=True)

    # Display app list
    for app, metadata in filtered_apps:
        render_app_metadata_editor(app, metadata, categories)

def get_app_source_code(database, schema, app_name):
    """
    Retrieves the source code of an app.
    """
    try:
        # Get app details
        app_details = snowflake_session.sql(f"""
            DESCRIBE STREAMLIT {database}.{schema}.{app_name}
        """).collect()
        app_details_df = pd.DataFrame(app_details)

        # Get root_location
        root_location = app_details_df['root_location'].iloc[0]

        if not root_location:
            return None

        # Get stage file list
        files_df = pd.DataFrame(snowflake_session.sql(f"LIST '{root_location}'").collect())

        # Find .py files
        py_files = files_df[files_df['name'].str.endswith('.py')]['name'].tolist()
        if not py_files:
            return None

        # Get contents of the first .py file
        file_path = py_files[0]
        stage_parts = root_location.lstrip('@').split('.')
        if len(stage_parts) >= 3:
            database = stage_parts[0]
            schema = stage_parts[1]
            stage = '.'.join(stage_parts[2:])

            # Remove stage name if included
            stage_suffix = stage.strip('"').lower()
            clean_file_path = file_path
            if file_path.lower().startswith(stage_suffix + '/'):
                clean_file_path = file_path[len(stage_suffix + '/'):]

            # Construct fully qualified stage path
            stage_path = f"@{database}.{schema}.{stage}/{clean_file_path}"

            # Get file contents
            with snowflake_session.file.get_stream(stage_path) as file_stream:
                content = file_stream.read()

                # Convert binary data to string
                try:
                    return content.decode('utf-8')
                except UnicodeDecodeError:
                    try:
                        return content.decode('shift-jis')
                    except UnicodeDecodeError:
                        st.error("Unable to determine file encoding")
                        return None

        return None

    except Exception as e:
        st.error(f"An error occurred while retrieving source code: {str(e)}")
        return None

def generate_description_for_all_apps(model_name):
    """
    Automatically generates descriptions for apps without descriptions.
    Returns: Number of processed apps
    """
    # Get apps without descriptions
    apps_df = get_streamlit_apps_info()

    # Get apps with existing descriptions
    existing_descriptions = snowflake_session.sql("""
        SELECT 
            database_name || '.' || schema_name || '.' || app_name as app_key
        FROM APP_DESCRIPTIONS 
        WHERE description IS NOT NULL 
        AND TRIM(description) != ''
    """).collect()

    # Convert existing app keys to a set
    existing_app_keys = {row['APP_KEY'] for row in existing_descriptions}

    # Filter apps without descriptions
    apps_to_process = []
    for _, app in apps_df.iterrows():
        app_key = f"{app['database_name']}.{app['schema_name']}.{app['name']}"
        if app_key not in existing_app_keys:
            apps_to_process.append(app)

    total_apps = len(apps_to_process)
    if total_apps == 0:
        st.info("All apps have descriptions set.")
        return 0

    progress_bar = st.progress(0)
    status_text = st.empty()

    for i, app in enumerate(apps_to_process):
        progress = (i + 1) / total_apps
        progress_bar.progress(progress)
        status_text.text(f"Processing... {i + 1}/{total_apps} apps ({int(progress * 100)}%)")

        app_title = app['title'] if app['title'] is not None else app['name']

        # Get app source code
        source_code = get_app_source_code(app['database_name'], app['schema_name'], app['name'])

        # Create prompt (pass source code as a separate parameter)
        prompt_template = """Generate a description for the following Streamlit app.
- Output only the description in your response.
- Do not include phrases like "Certainly, I understand" in your response.
- Output in plain text, not markdown format.
- Summarize the description concisely in about 100 characters.

App name: {app_title} ({app_name})"""

        try:
            # Pass prompt and source code as separate parameters
            prompt_params = {
                'app_title': app_title,
                'app_name': app['name'],
                'source_code': source_code if source_code else 'Source code could not be retrieved.'
            }

            # Use bind parameters to prevent SQL injection
            description = snowflake_session.sql("""
                SELECT SNOWFLAKE.CORTEX.COMPLETE(?, 
                    CONCAT(
                        ?, 
                        '\n\nSource code:\n', 
                        ?
                    )
                )
            """, params=[
                model_name,
                prompt_template.format(app_title=app_title, app_name=app['name']),
                prompt_params['source_code']
            ]).collect()[0][0]

            # Update description
            snowflake_session.sql("""
                MERGE INTO APP_DESCRIPTIONS
                USING (SELECT 
                    ? AS db, 
                    ? AS sc, 
                    ? AS app, 
                    ? AS desc
                ) AS src
                ON APP_DESCRIPTIONS.database_name = src.db 
                AND APP_DESCRIPTIONS.schema_name = src.sc 
                AND APP_DESCRIPTIONS.app_name = src.app
                WHEN MATCHED THEN
                    UPDATE SET 
                        description = src.desc,
                        last_updated = CURRENT_TIMESTAMP()
                WHEN NOT MATCHED THEN
                    INSERT (database_name, schema_name, app_name, description)
                    VALUES (src.db, src.sc, src.app, src.desc)
            """, params=[
                app['database_name'],
                app['schema_name'],
                app['name'],
                description
            ]).collect()

        except Exception as e:
            st.error(f"An error occurred while processing app '{app_title}': {str(e)}")
            continue

    # Clear cache after processing
    clear_app_cache()

    progress_bar.progress(1.0)
    status_text.text(f"Completed! Generated descriptions for {total_apps} apps.")
    return total_apps

@st.fragment
def confirm_and_delete_descriptions():
    """
    Confirms and executes bulk deletion of descriptions.
    """
    if "show_desc_confirm" not in st.session_state:
        st.session_state.show_desc_confirm = False
    if "desc_message" not in st.session_state:
        st.session_state.desc_message = None

    container = st.container()
    with container:
        button_placeholder = st.empty()
        message_placeholder = st.empty()
        confirm_placeholder = st.empty()

        if button_placeholder.button("Delete all descriptions", type="secondary") or st.session_state.show_desc_confirm:
            st.session_state.show_desc_confirm = True

            with confirm_placeholder.container():
                st.warning("Are you sure you want to delete all descriptions?", icon="⚠️")
                col1, col2 = st.columns(2)
                with col1:
                    if st.button("Delete", key="confirm_delete_desc", type="primary"):
                        with st.spinner("Deleting descriptions..."):
                            clear_all_descriptions()
                        st.session_state.desc_message = "success"
                        st.session_state.show_desc_confirm = False
                with col2:
                    if st.button("Cancel", key="cancel_delete_desc"):
                        st.session_state.desc_message = "cancel"
                        st.session_state.show_desc_confirm = False

        if st.session_state.desc_message == "success":
            message_placeholder.success("Deleted descriptions for all apps.")
            confirm_placeholder.empty()
            st.session_state.desc_message = None
        elif st.session_state.desc_message == "cancel":
            message_placeholder.info("Deletion cancelled.")
            confirm_placeholder.empty()
            st.session_state.desc_message = None

def clear_all_descriptions():
    """
    Deletes descriptions for all apps.
    """
    snowflake_session.sql("""
    UPDATE APP_DESCRIPTIONS
    SET description = NULL,
        last_updated = CURRENT_TIMESTAMP()
    """).collect()
    # Clear cache
    clear_app_cache()

def clear_all_tags():
    """
    Deletes tags for all apps.
    """
    snowflake_session.sql("""
    UPDATE APP_DESCRIPTIONS
    SET tags = NULL,
        last_updated = CURRENT_TIMESTAMP()
    """).collect()
    # Clear cache
    clear_app_cache()

@st.fragment
def confirm_and_delete_tags():
    """
    Confirms and executes bulk deletion of tags.
    """
    if "show_tags_confirm" not in st.session_state:
        st.session_state.show_tags_confirm = False
    if "tags_message" not in st.session_state:
        st.session_state.tags_message = None

    container = st.container()
    with container:
        button_placeholder = st.empty()
        message_placeholder = st.empty()
        confirm_placeholder = st.empty()

        if button_placeholder.button("Delete all tags", type="secondary") or st.session_state.show_tags_confirm:
            st.session_state.show_tags_confirm = True

            with confirm_placeholder.container():
                st.warning("Are you sure you want to delete all tags?", icon="⚠️")
                col1, col2 = st.columns(2)
                with col1:
                    if st.button("Delete", key="confirm_delete_tags", type="primary"):
                        with st.spinner("Deleting tags..."):
                            clear_all_tags()
                        st.session_state.tags_message = "success"
                        st.session_state.show_tags_confirm = False
                with col2:
                    if st.button("Cancel", key="cancel_delete_tags"):
                        st.session_state.tags_message = "cancel"
                        st.session_state.show_tags_confirm = False

        if st.session_state.tags_message == "success":
            message_placeholder.success("Deleted tags for all apps.")
            confirm_placeholder.empty()
            st.session_state.tags_message = None
        elif st.session_state.tags_message == "cancel":
            message_placeholder.info("Deletion cancelled.")
            confirm_placeholder.empty()
            st.session_state.tags_message = None

def generate_tags_for_all_apps():
    """
    Automatically generates tags for apps without tags.
    Returns: Number of processed apps
    """
    # Check the number of apps with descriptions
    apps_with_description = snowflake_session.sql("""
        SELECT COUNT(*) as count
        FROM APP_DESCRIPTIONS 
        WHERE description IS NOT NULL 
        AND TRIM(description) != ''
    """).collect()[0]['COUNT']

    if apps_with_description == 0:
        st.warning("To generate tags, please set descriptions for the apps first.")
        return 0

    # Get apps without tags
    app_data = snowflake_session.sql("""
        SELECT 
            database_name,
            schema_name,
            app_name,
            description
        FROM APP_DESCRIPTIONS 
        WHERE (tags IS NULL OR TRIM(tags) = '')  -- Check for empty string only
        AND description IS NOT NULL
        AND TRIM(description) != ''
    """).collect()

    total_apps = len(app_data)
    if total_apps == 0:
        st.info("All apps with descriptions have tags set.")
        return 0

    # Get category list
    categories = [row['CATEGORY'] for row in snowflake_session.sql("SELECT category FROM APP_CATEGORIES ORDER BY category").collect()]
    categories_json = json.dumps(categories, ensure_ascii=False)

    progress_bar = st.progress(0)
    status_text = st.empty()

    for i, row in enumerate(app_data):
        try:
            progress = (i + 1) / total_apps
            progress_bar.progress(progress)
            status_text.text(f"Processing... {i + 1}/{total_apps} apps ({int(progress * 100)}%)")

            # Get CLASSIFY_TEXT function result
            result = snowflake_session.sql("""
                SELECT SNOWFLAKE.CORTEX.CLASSIFY_TEXT(?, PARSE_JSON(?))
            """, params=[row['DESCRIPTION'], categories_json]).collect()[0][0]

            # Parse result if it's a JSON string
            if isinstance(result, str):
                result = json.loads(result)

            # Extract tags (handle both single tag and list of tags)
            if isinstance(result, list):
                tags = [item['label'] for item in result if isinstance(item, dict) and 'label' in item]
            elif isinstance(result, dict) and 'label' in result:
                tags = [result['label']]
            else:
                tags = []

            # Update tags
            if tags:  # Only update if tags are not empty
                snowflake_session.sql("""
                    UPDATE APP_DESCRIPTIONS
                    SET 
                        tags = ?,
                        last_updated = CURRENT_TIMESTAMP()
                    WHERE database_name = ?
                    AND schema_name = ?
                    AND app_name = ?
                """, params=[
                    ','.join(tags),
                    row['DATABASE_NAME'],
                    row['SCHEMA_NAME'],
                    row['APP_NAME']
                ]).collect()

        except Exception as e:
            st.error(f"An error occurred while generating tags for app '{row['APP_NAME']}': {str(e)}")
            continue

    # Clear cache after processing
    clear_app_cache()

    progress_bar.progress(1.0)
    status_text.text(f"Completed! Generated tags for {total_apps} apps.")
    return total_apps

def vector_generation_page():
    """
    Displays the automatic vector data generation page.
    """
    st.title("Automatic Vector Data Generation")
    st.write("Generate vector data from app descriptions.")

    # Check the number of apps with descriptions
    apps_with_description = snowflake_session.sql("""
        SELECT COUNT(*) as count
        FROM APP_DESCRIPTIONS 
        WHERE description IS NOT NULL 
        AND TRIM(description) != ''
    """).collect()[0]['COUNT']

    if apps_with_description == 0:
        st.warning("To generate vector data, please set descriptions for the apps first.")
        return

    # Get apps without vector data
    apps_without_vector = snowflake_session.sql("""
        SELECT 
            DATABASE_NAME,
            SCHEMA_NAME,
            APP_NAME,
            DESCRIPTION
        FROM APP_DESCRIPTIONS
        WHERE DESCRIPTION IS NOT NULL 
        AND TRIM(DESCRIPTION) != ''
        AND EMBEDDING IS NULL  -- Check for NULL only
    """).collect()

    total_apps = len(apps_without_vector)
    if total_apps == 0:
        st.info("All apps with descriptions have vector data set.")
    else:
        st.write(f"Vector data can be generated for {total_apps} apps.")

        if st.button("Generate Vector Data"):
            progress_bar = st.progress(0)
            status_text = st.empty()

            for i, row in enumerate(apps_without_vector):
                try:
                    progress = (i + 1) / total_apps
                    progress_bar.progress(progress)
                    status_text.text(f"Processing... {i + 1}/{total_apps} apps ({int(progress * 100)}%)")

                    # Generate and save vector data
                    snowflake_session.sql("""
                        UPDATE APP_DESCRIPTIONS
                        SET 
                            EMBEDDING = SNOWFLAKE.CORTEX.EMBED_TEXT_1024('voyage-multilingual-2', ?),
                            last_updated = CURRENT_TIMESTAMP()
                        WHERE database_name = ?
                        AND schema_name = ?
                        AND app_name = ?
                        AND EMBEDDING IS NULL  -- Check for NULL only
                    """, params=[
                        row['DESCRIPTION'],
                        row['DATABASE_NAME'],
                        row['SCHEMA_NAME'],
                        row['APP_NAME']
                    ]).collect()

                except Exception as e:
                    st.error(f"An error occurred while generating vector data for app '{row['APP_NAME']}': {str(e)}")
                    continue

            # Clear cache after processing
            clear_app_cache()

            progress_bar.progress(1.0)
            status_text.text(f"Completed! Generated vector data for {total_apps} apps.")
            st.success("Vector data has been generated for all target apps!")

    st.divider()

    # Bulk deletion of vector data
    st.write("### Bulk Vector Data Deletion")
    st.write("Delete vector data for all apps.")
    confirm_and_delete_vectors()

@st.fragment
def confirm_and_delete_vectors():
    """
    Confirms and executes bulk deletion of vector data.
    """
    if "show_vectors_confirm" not in st.session_state:
        st.session_state.show_vectors_confirm = False
    if "vectors_message" not in st.session_state:
        st.session_state.vectors_message = None

    container = st.container()
    with container:
        button_placeholder = st.empty()
        message_placeholder = st.empty()
        confirm_placeholder = st.empty()

        if button_placeholder.button("Delete all vector data", type="secondary") or st.session_state.show_vectors_confirm:
            st.session_state.show_vectors_confirm = True

            with confirm_placeholder.container():
                st.warning("Are you sure you want to delete all vector data?", icon="⚠️")
                col1, col2 = st.columns(2)
                with col1:
                    if st.button("Delete", key="confirm_delete_vectors", type="primary"):
                        with st.spinner("Deleting vector data..."):
                            clear_all_vectors()
                        st.session_state.vectors_message = "success"
                        st.session_state.show_vectors_confirm = False
                with col2:
                    if st.button("Cancel", key="cancel_delete_vectors"):
                        st.session_state.vectors_message = "cancel"
                        st.session_state.show_vectors_confirm = False

        if st.session_state.vectors_message == "success":
            message_placeholder.success("Deleted vector data for all apps.")
            confirm_placeholder.empty()
            st.session_state.vectors_message = None
        elif st.session_state.vectors_message == "cancel":
            message_placeholder.info("Deletion cancelled.")
            confirm_placeholder.empty()
            st.session_state.vectors_message = None

def clear_all_vectors():
    """
    Deletes vector data for all apps.
    """
    snowflake_session.sql("""
    UPDATE APP_DESCRIPTIONS
    SET embedding = NULL,
        last_updated = CURRENT_TIMESTAMP()
    """).collect()

def main():
    """
    Main function of the application.
    """
    # Initialize and update tables only once at app startup
    create_app_descriptions_table()

    page = st.sidebar.radio(
        "Select Page", 
        ["App Portal", "Admin Page", "Auto Description Generation", "Auto Tag Generation", "Auto Vector Data Generation"]
    )

    if page == "App Portal":
        main_page()
    elif page == "Admin Page":
        admin_page()
    elif page == "Auto Description Generation":
        description_generation_page()
    elif page == "Auto Tag Generation":
        tag_generation_page()
    elif page == "Auto Vector Data Generation":
        vector_generation_page()

if __name__ == "__main__":
    main()
Enter fullscreen mode Exit fullscreen mode

Conclusion

What do you think? By using an app catalog, you can search and filter applications from various angles. This is particularly important for Streamlit in Snowflake, as it's often used by business users. Making it easier to find applications can help make data more accessible to a wider audience.

Promotion

Snowflake What's New Updates on X

I'm sharing updates on Snowflake's What's New on X. I'd be happy if you could follow:

English Version

Snowflake What's New Bot (English Version)

Japanese Version

Snowflake's What's New Bot (Japanese Version)

Change Log

(20250204) Initial post

Original Japanese Article

https://zenn.dev/tsubasa_tech/articles/ab10c634612e0f

Top comments (0)