DEV Community

RUDRA SHARMA
RUDRA SHARMA

Posted on

TEST

import streamlit as st
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import os
from io import BytesIO
from typing import Optional, Union, List, Tuple, Dict

def read_excel_file(file_path: str) -> List[pd.DataFrame]:
    """Read all sheets from an Excel file."""
    try:
        excel_file = pd.ExcelFile(file_path, engine='openpyxl')
        dfs = []

        for sheet_name in excel_file.sheet_names:
            try:
                # Read the Excel sheet with all columns as strings initially
                df = pd.read_excel(
                    file_path,
                    sheet_name=sheet_name,
                    engine='openpyxl',
                    dtype=str  # Read all columns as strings first
                )

                if df is not None and not df.empty:
                    # Clean up the data
                    df = df.apply(lambda x: x.str.strip() if x.dtype == "object" else x)
                    df = df.replace(['nan', 'NaN', 'NaT', ''], None)

                    # Add metadata columns
                    df['_file_name'] = os.path.basename(file_path)
                    df['_sheet_name'] = sheet_name
                    dfs.append(df)
                else:
                    st.warning(f"Sheet {sheet_name} in {file_path} is empty")

            except Exception as e:
                st.warning(f"Error reading sheet {sheet_name} from {file_path}: {str(e)}")
                continue

        if not dfs:
            st.error(f"No valid data found in {file_path}")
            return [pd.DataFrame()]  # Return list with empty DataFrame if no valid sheets

        return dfs

    except Exception as e:
        st.error(f"Failed to open Excel file {file_path}: {str(e)}")
        return [pd.DataFrame()]  # Return list with empty DataFrame on error

def read_csv_file(file_path: str) -> pd.DataFrame:
    """Read a CSV file with support for different encodings."""
    encodings = ['utf-8', 'latin1', 'iso-8859-1', 'cp1252']

    for encoding in encodings:
        try:
            # Read CSV with specified encoding and all columns as strings initially
            df = pd.read_csv(file_path, dtype=str, encoding=encoding)

            # Clean up the data
            df = df.apply(lambda x: x.str.strip() if x.dtype == "object" else x)
            df = df.replace(['nan', 'NaN', 'NaT', ''], None)

            # Add metadata columns
            df['_file_name'] = os.path.basename(file_path)
            df['_sheet_name'] = 'csv'
            return df

        except UnicodeDecodeError:
            continue  # Try next encoding
        except Exception as e:
            st.warning(f"Error reading CSV file {file_path} with {encoding} encoding: {str(e)}")
            continue

    st.error(f"Failed to read CSV file {file_path} with any supported encoding")
    return pd.DataFrame()  # Return empty DataFrame if all encodings fail

def process_folder(folder_path: str) -> Tuple[Optional[str], Dict[str, str]]:
    """Process all files in the folder and create both combined and individual parquet files."""
    combined_dfs = []
    individual_parquets = {}

    # Create progress bar
    files = [f for f in os.listdir(folder_path) if f.lower().endswith(('.csv', '.xlsx', '.xls'))]
    progress_bar = st.progress(0)

    for idx, file_name in enumerate(files):
        file_path = os.path.join(folder_path, file_name)
        base_name = os.path.splitext(file_name)[0]

        try:
            try:
                if file_name.lower().endswith(('.xlsx', '.xls')):
                    dfs = read_excel_file(file_path)
                    if dfs and not all(df.empty for df in dfs):
                        # Create individual parquet for Excel file (all sheets)
                        excel_df = pd.concat([df for df in dfs if not df.empty], ignore_index=True)
                        if not excel_df.empty and '_file_name' in excel_df.columns and '_sheet_name' in excel_df.columns:
                            excel_df = excel_df[['_file_name', '_sheet_name'] + 
                                              [col for col in excel_df.columns 
                                               if col not in ['_file_name', '_sheet_name']]]
                            parquet_path = os.path.join(folder_path, f"{base_name}.parquet")
                            table = pa.Table.from_pandas(excel_df)
                            pq.write_table(table, parquet_path)
                            individual_parquets[file_name] = parquet_path
                            combined_dfs.extend([df for df in dfs if not df.empty])
                    else:
                        st.warning(f"No valid data found in Excel file: {file_name}")

                elif file_name.lower().endswith('.csv'):
                    df = read_csv_file(file_path)
                    if df is not None and not df.empty and '_file_name' in df.columns and '_sheet_name' in df.columns:
                        # Create individual parquet for CSV file
                        df = df[['_file_name', '_sheet_name'] + 
                               [col for col in df.columns 
                                if col not in ['_file_name', '_sheet_name']]]
                        parquet_path = os.path.join(folder_path, f"{base_name}.parquet")
                        table = pa.Table.from_pandas(df)
                        pq.write_table(table, parquet_path)
                        individual_parquets[file_name] = parquet_path
                        combined_dfs.append(df)
                    else:
                        st.warning(f"No valid data found in CSV file: {file_name}")
            except Exception as e:
                st.error(f"Error processing file {file_name}: {str(e)}")
                continue

            # Update progress
            progress_bar.progress((idx + 1) / len(files))

        except Exception as e:
            st.warning(f"Error processing {file_name}: {str(e)}")

    if not combined_dfs:
        return None, {}

    # Combine all dataframes
    combined_df = pd.concat(combined_dfs, ignore_index=True)

    # Reorder columns to put file_name and sheet_name first
    cols = ['_file_name', '_sheet_name'] + [col for col in combined_df.columns 
                                           if col not in ['_file_name', '_sheet_name']]
    combined_df = combined_df[cols]

    # Save combined parquet
    output_path = os.path.join(folder_path, 'combined.parquet')
    table = pa.Table.from_pandas(combined_df)
    pq.write_table(table, output_path)

    return output_path, individual_parquets

def load_combined_data(file_path: str) -> Optional[pd.DataFrame]:
    """Load the combined parquet file."""
    try:
        return pd.read_parquet(file_path)
    except Exception as e:
        st.error(f"Error loading combined data: {str(e)}")
        return None

def load_individual_data(file_path: str) -> Optional[pd.DataFrame]:
    """Load individual parquet file."""
    try:
        return pd.read_parquet(file_path)
    except Exception as e:
        st.error(f"Error loading individual data: {str(e)}")
        return None

def setup_page_config():
    """Configure the Streamlit page."""
    st.set_page_config(
        page_title="Data Explorer",
        page_icon="📊",
        layout="wide",
        initial_sidebar_state="expanded"
    )

def apply_custom_css():
    """Apply custom CSS styles."""
    st.markdown("""
        <style>
        .stSelectbox {
            margin-bottom: 1rem;
        }
        .st-emotion-cache-1y4p8pa {
            max-width: 100%;
        }
        .source-file {
            color: #FF4B4B;
            font-weight: 600;
        }
        .sheet-name {
            color: #0068C9;
            font-weight: 600;
        }
        </style>
    """, unsafe_allow_html=True)

def render_sidebar(folders: List[str], available_files: Optional[Dict[str, str]] = None) -> Tuple[str, bool, str, Optional[str], Optional[str]]:
    """Render the sidebar with folder selection and viewing options."""
    with st.sidebar:
        st.header("📁 Folder Selection")
        selected_folder = st.selectbox(
            "Choose a folder to process",
            options=folders,
            format_func=lambda x: x.replace('_', ' ').title(),
            key="folder_select"
        )

        # View mode selection
        st.markdown("### View Mode")
        view_mode = st.radio(
            "Select View Mode",
            ["Combined Mode", "Single Mode", "Compare Mode", "Enhanced Combined Mode"],
            key="view_mode"
        )

        # File selection based on mode
        selected_file = None
        selected_file_2 = None
        if view_mode != "Combined Mode" and view_mode != "Enhanced Combined Mode" and available_files:
            if view_mode == "Single Mode":
                selected_file = st.selectbox(
                    "Select File to View",
                    options=list(available_files.keys()),
                    format_func=lambda x: x.replace('_', ' ').title(),
                    key="file_select_1"
                )
            elif view_mode == "Compare Mode":
                col1, col2 = st.columns(2)
                with col1:
                    selected_file = st.selectbox(
                        "Select First File",
                        options=list(available_files.keys()),
                        format_func=lambda x: x.replace('_', ' ').title(),
                        key="file_select_compare_1"
                    )
                with col2:
                    # Filter out the first selected file from options
                    remaining_files = [f for f in available_files.keys() if f != selected_file]
                    selected_file_2 = st.selectbox(
                        "Select Second File",
                        options=remaining_files,
                        format_func=lambda x: x.replace('_', ' ').title(),
                        key="file_select_compare_2"
                    )

        # Add option to recreate data
        recreate_data = st.button("🔄 Recreate Data", 
            help="Click to reprocess all files and update the parquet files",
            key="recreate_button"
        )

        st.markdown("---")

    return selected_folder, recreate_data, view_mode, selected_file, selected_file_2

def filter_empty_rows_and_columns(df: pd.DataFrame) -> pd.DataFrame:
    """Remove empty rows and columns from the DataFrame."""
    if df.empty:
        return df

    def has_value(x):
        # Convert to string and check if it's not empty/None
        str_val = str(x).strip()
        return str_val not in ['', 'None', 'nan', 'NaN', 'NaT'] and len(str_val) > 0

    # Remove rows where all values are empty/None
    df = df.dropna(how='all')
    df = df[df.astype(str).apply(lambda x: x.str.strip().str.len() > 0).any(axis=1)]

    # Check each column for meaningful values
    non_empty_cols = []
    for col in df.columns:
        if df[col].apply(has_value).any():
            non_empty_cols.append(col)

    # Return DataFrame with only non-empty columns and rows
    return df[non_empty_cols]

def render_main_content(df: pd.DataFrame, mode: str = "Combined Mode", df2: Optional[pd.DataFrame] = None,
                       selected_file: Optional[str] = None, selected_file_2: Optional[str] = None):
    """Render the main content area with data display and search."""
    # Search functionality with multiple keywords
    st.write("🔍 Search in data")

    filtered_df = df
    filtered_df2 = df2
    search_terms = []

    if mode != "Compare Mode":
        # Create 6 columns for all search boxes including file and sheet filters
        search_cols = st.columns(6)

        # File and Sheet name filters first
        with search_cols[0]:
            file_filter = st.text_input("File Name", key="file_filter", placeholder="Filter by file name")
        with search_cols[1]:
            sheet_filter = st.text_input("Sheet Name", key="sheet_filter", placeholder="Filter by sheet name")

        # General search boxes in remaining columns
        for i in range(4):
            with search_cols[i + 2]:
                term = st.text_input(f"Keyword {i+1}", key=f"search_{i}", placeholder=f"Search term {i+1}")
                if term.strip():
                    search_terms.append(term.strip())
    else:
        # For Compare Mode, use 4 evenly-spaced columns for search terms only
        search_cols = st.columns(4)
        file_filter = ""
        sheet_filter = ""

        # Search boxes take full width
        for i in range(4):
            with search_cols[i]:
                term = st.text_input(f"Keyword {i+1}", key=f"search_{i}", placeholder=f"Search term {i+1}")
                if term.strip():
                    search_terms.append(term.strip())

        # Apply file and sheet filters (for all modes)
    if file_filter:
        mask = filtered_df['_file_name'].astype(str).str.contains(file_filter, case=False, na=False)
        filtered_df = filtered_df[mask]
        if filtered_df2 is not None:
            mask2 = filtered_df2['_file_name'].astype(str).str.contains(file_filter, case=False, na=False)
            filtered_df2 = filtered_df2[mask2]

    if sheet_filter:
        mask = filtered_df['_sheet_name'].astype(str).str.contains(sheet_filter, case=False, na=False)
        filtered_df = filtered_df[mask]
        if filtered_df2 is not None:
            mask2 = filtered_df2['_sheet_name'].astype(str).str.contains(sheet_filter, case=False, na=False)
            filtered_df2 = filtered_df2[mask2]

    # Apply general search terms
    if search_terms:
        for term in search_terms:
            # Apply to first dataframe
            mask = filtered_df.astype(str).apply(lambda x: x.str.contains(term, case=False)).any(axis=1)
            filtered_df = filtered_df[mask]

            # Apply to second dataframe if in compare mode
            if filtered_df2 is not None:
                mask2 = filtered_df2.astype(str).apply(lambda x: x.str.contains(term, case=False)).any(axis=1)
                filtered_df2 = filtered_df2[mask2]

    # Remove empty rows and columns
    filtered_df = filter_empty_rows_and_columns(filtered_df)
    if filtered_df2 is not None:
        filtered_df2 = filter_empty_rows_and_columns(filtered_df2)

    # Display data info
    if mode == "Compare Mode":
        col1, col2, col3, col4 = st.columns(4)
        with col1:
            st.metric("File 1 Total Rows", len(df))
        with col2:
            st.metric("File 1 Filtered Rows", len(filtered_df))
        with col3:
            st.metric("File 2 Total Rows", len(df2))
        with col4:
            st.metric("File 2 Filtered Rows", len(filtered_df2))
    else:
        col1, col2, col3 = st.columns(3)
        with col1:
            st.metric("Total Rows", len(df))
        with col2:
            st.metric("Filtered Rows", len(filtered_df))
        with col3:
            st.metric("Columns", len(filtered_df.columns))

    # Display the dataframe(s) based on mode
    if mode == "Compare Mode":
        display_side_by_side = st.checkbox("Display Side by Side", value=True, 
                                       help="Toggle between side-by-side and stacked view")

        if display_side_by_side:
            col1, col2 = st.columns(2)
            with col1:
                # Always show file metadata - get from original df before filtering
                file_name = selected_file if selected_file else "No file selected"
                sheet_name = df['_sheet_name'].iloc[0] if not df.empty else "No sheet"
                st.markdown("""
                    <h3>
                        <span class="source-file">{}</span> - 
                        <span class="sheet-name">{}</span>
                    </h3>
                """.format(file_name, sheet_name), 
                unsafe_allow_html=True)

                # Remove empty rows before display
                if not filtered_df.empty:
                    filtered_df = filtered_df.dropna(how='all').reset_index(drop=True)
                    filtered_df = filtered_df[filtered_df.astype(str).apply(lambda x: x.str.strip().str.len() > 0).any(axis=1)]
                st.dataframe(filtered_df, height=400, use_container_width=True, hide_index=True)

                # Add download buttons for first dataframe
                if not filtered_df.empty:
                    col1a, col1b = st.columns(2)
                    with col1a:
                        buffer = BytesIO()
                        filtered_df.to_excel(buffer, index=False, engine='openpyxl')
                        buffer.seek(0)
                        st.download_button(
                            label="📥 Download Excel",
                            data=buffer,
                            file_name=f"{filtered_df['_file_name'].iloc[0]}_filtered.xlsx",
                            mime="application/vnd.ms-excel",
                        )
                    with col1b:
                        csv_data = filtered_df.to_csv(index=False).encode('utf-8')
                        st.download_button(
                            label="📥 Download CSV",
                            data=csv_data,
                            file_name=f"{filtered_df['_file_name'].iloc[0]}_filtered.csv",
                            mime="text/csv",
                        )

            with col2:
                # Always show file metadata for second file
                file_name2 = selected_file_2 if selected_file_2 else "No file selected"
                sheet_name2 = df2['_sheet_name'].iloc[0] if not df2.empty else "No sheet"
                st.markdown("""
                    <h3>
                        <span class="source-file">{}</span> - 
                        <span class="sheet-name">{}</span>
                    </h3>
                """.format(file_name2, sheet_name2), 
                unsafe_allow_html=True)

                # Remove empty rows before display
                if not filtered_df2.empty:
                    filtered_df2 = filtered_df2.dropna(how='all').reset_index(drop=True)
                    filtered_df2 = filtered_df2[filtered_df2.astype(str).apply(lambda x: x.str.strip().str.len() > 0).any(axis=1)]
                st.dataframe(filtered_df2, height=400, use_container_width=True, hide_index=True)

                # Add download buttons for second dataframe
                if not filtered_df2.empty:
                    col2a, col2b = st.columns(2)
                    with col2a:
                        buffer = BytesIO()
                        filtered_df2.to_excel(buffer, index=False, engine='openpyxl')
                        buffer.seek(0)
                        st.download_button(
                            label="📥 Download Excel",
                            data=buffer,
                            file_name=f"{filtered_df2['_file_name'].iloc[0]}_filtered.xlsx",
                            mime="application/vnd.ms-excel",
                        )
                    with col2b:
                        csv_data = filtered_df2.to_csv(index=False).encode('utf-8')
                        st.download_button(
                            label="📥 Download CSV",
                            data=csv_data,
                            file_name=f"{filtered_df2['_file_name'].iloc[0]}_filtered.csv",
                            mime="text/csv",
                        )
        else:
            if not filtered_df.empty:
                st.markdown("""
                    <h3>
                        <span class="source-file">{}</span> - 
                        <span class="sheet_name">{}</span>
                    </h3>
                """.format(filtered_df['_file_name'].iloc[0], filtered_df['_sheet_name'].iloc[0]), 
                unsafe_allow_html=True)
                # Remove empty rows
                filtered_df = filtered_df.dropna(how='all').reset_index(drop=True)
                filtered_df = filtered_df[filtered_df.astype(str).apply(lambda x: x.str.strip().str.len() > 0).any(axis=1)]
                st.dataframe(filtered_df, height=300, use_container_width=True, hide_index=True)

            if not filtered_df2.empty:
                st.markdown("""
                    <h3>
                        <span class="source-file">{}</span> - 
                        <span class="sheet_name">{}</span>
                    </h3>
                """.format(filtered_df2['_file_name'].iloc[0], filtered_df2['_sheet_name'].iloc[0]), 
                unsafe_allow_html=True)
                # Remove empty rows
                filtered_df2 = filtered_df2.dropna(how='all').reset_index(drop=True)
                filtered_df2 = filtered_df2[filtered_df2.astype(str).apply(lambda x: x.str.strip().str.len() > 0).any(axis=1)]
                st.dataframe(filtered_df2, height=300, use_container_width=True, hide_index=True)
    elif mode == "Enhanced Combined Mode":
        # Group data by file and sheet name
        grouped = filtered_df.groupby(['_file_name', '_sheet_name'])

        # If no groups exist, show a message
        if len(grouped) == 0:
            st.warning("No data available in the selected files")

        for (file_name, sheet_name), group_df in grouped:
            # Always create a header for each group
            st.markdown(f"""
                <h3>
                    <span class="source-file">{file_name}</span> - 
                    <span class="sheet-name">{sheet_name}</span>
                </h3>
            """, unsafe_allow_html=True)

            # Filter empty rows and columns for this specific group
            group_df_filtered = filter_empty_rows_and_columns(group_df)

            # Always show the dataframe, even if empty
            if not group_df_filtered.empty:
                # Drop rows where all values are None/NaN/empty strings
                group_df_filtered = group_df_filtered.dropna(how='all').reset_index(drop=True)
                group_df_filtered = group_df_filtered[group_df_filtered.astype(str).apply(lambda x: x.str.strip().str.len() > 0).any(axis=1)]

            # Display the filtered group data
            st.dataframe(
                group_df_filtered,
                height=300,
                use_container_width=True,
                hide_index=True
            )

            # Add download buttons for each group
            if not group_df_filtered.empty:
                col1, col2 = st.columns(2)
                with col1:
                    buffer = BytesIO()
                    group_df_filtered.to_excel(buffer, index=False, engine='openpyxl')
                    buffer.seek(0)
                    st.download_button(
                        label="📥 Download Excel",
                        data=buffer,
                        file_name=f"{file_name}_filtered.xlsx",
                        mime="application/vnd.ms-excel",
                    )
                with col2:
                    csv_data = group_df_filtered.to_csv(index=False).encode('utf-8')
                    st.download_button(
                        label="📥 Download CSV",
                        data=csv_data,
                        file_name=f"{file_name}_filtered.csv",
                        mime="text/csv",
                    )
            st.markdown("---")  # Add a separator between groups
    else:
        # Combined Mode - show data without file/sheet name header
        # Remove completely empty rows and display the dataframe
        if not filtered_df.empty:
            # Drop rows where all values are None/NaN/empty strings
            filtered_df = filtered_df.dropna(how='all').reset_index(drop=True)
            filtered_df = filtered_df[filtered_df.astype(str).apply(lambda x: x.str.strip().str.len() > 0).any(axis=1)]

        st.dataframe(filtered_df, height=400, use_container_width=True, hide_index=True)

        # Add download buttons for filtered data
        if not filtered_df.empty:
            col1, col2 = st.columns(2)
            with col1:
                buffer = BytesIO()
                filtered_df.to_excel(buffer, index=False, engine='openpyxl')
                buffer.seek(0)
                st.download_button(
                    label="📥 Download Excel",
                    data=buffer,
                    file_name="filtered_data.xlsx",
                    mime="application/vnd.ms-excel",
                )
            with col2:
                csv_data = filtered_df.to_csv(index=False).encode('utf-8')
                st.download_button(
                    label="📥 Download CSV",
                    data=csv_data,
                    file_name="filtered_data.csv",
                    mime="text/csv",
                )

def show_error_message(error: str):
    """Display error message."""
    st.error(f"An error occurred: {str(error)}")

def main():
    # Setup page configuration
    setup_page_config()

    # Apply custom CSS
    apply_custom_css()

    # Set up the main title
    st.title("Data Explorer")

    try:
        # Scan for folders in the current directory
        folders = [d for d in os.listdir('.') if os.path.isdir(d) and not d.startswith(('.', '__'))]

        if folders:
            # Initialize variables
            available_files = {}

            # Get initial folder selection from session state
            if 'selected_folder' not in st.session_state:
                st.session_state.selected_folder = folders[0]

            # Scan for existing parquet files if we have a selected folder
            if st.session_state.selected_folder:
                existing_parquet = os.path.join(st.session_state.selected_folder, 'combined.parquet')

                # Scan for existing parquet files
                for file in os.listdir(st.session_state.selected_folder):
                    if file.endswith('.parquet') and file != 'combined.parquet':
                        source_file = file.replace('.parquet', '')
                        source_xlsx = os.path.join(st.session_state.selected_folder, f"{source_file}.xlsx")
                        source_csv = os.path.join(st.session_state.selected_folder, f"{source_file}.csv")

                        if os.path.exists(source_xlsx):
                            available_files[f"{source_file}.xlsx"] = os.path.join(st.session_state.selected_folder, file)
                        elif os.path.exists(source_csv):
                            available_files[f"{source_file}.csv"] = os.path.join(st.session_state.selected_folder, file)

            # Render sidebar with available files
            selected_folder, recreate_data, view_mode, selected_file, selected_file_2 = render_sidebar(folders, available_files)
            st.session_state.selected_folder = selected_folder

            if selected_folder:
                # Process files if needed
                if not os.path.exists(existing_parquet) or recreate_data:
                    with st.spinner('Processing files...'):
                        combined_file_path, new_available_files = process_folder(selected_folder)
                        available_files = new_available_files  # Update available files after processing
                else:
                    combined_file_path = existing_parquet

                # Display data based on mode
                if view_mode in ["Combined Mode", "Enhanced Combined Mode"]:
                    if combined_file_path and os.path.exists(combined_file_path):
                        df = load_combined_data(combined_file_path)
                        if df is not None:
                            render_main_content(df, mode=view_mode, selected_file="combined_data")
                        else:
                            st.warning("No data found in the selected folder.")
                    else:
                        st.warning("Combined data not found. Please recreate the data.")

                elif view_mode == "Single Mode":
                    if selected_file and selected_file in available_files:
                        df = load_individual_data(available_files[selected_file])
                        if df is not None:
                            render_main_content(df, mode=view_mode, selected_file=selected_file)
                        else:
                            st.warning(f"No data found in {selected_file}")
                    else:
                        st.info("Please select a file to view from the sidebar.")

                elif view_mode == "Compare Mode":
                    if selected_file and selected_file_2 and \
                       selected_file in available_files and selected_file_2 in available_files:
                        df1 = load_individual_data(available_files[selected_file])
                        df2 = load_individual_data(available_files[selected_file_2])
                        if df1 is not None and df2 is not None:
                            render_main_content(df1, mode=view_mode, df2=df2, 
                                           selected_file=selected_file, selected_file_2=selected_file_2)
                        else:
                            st.warning("Error loading one or both files for comparison.")
                    else:
                        st.info("Please select two files to compare from the sidebar.")
            else:
                st.info("Please select a folder from the sidebar to begin.")
        else:
            st.info("No folders found in the current directory.")

    except Exception as e:
        show_error_message(str(e))

if __name__ == "__main__":
    main()
Enter fullscreen mode Exit fullscreen mode

Top comments (0)