import streamlit as st
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import os
from io import BytesIO
from typing import Optional, Union, List, Tuple, Dict
def read_excel_file(file_path: str) -> List[pd.DataFrame]:
"""Read all sheets from an Excel file."""
try:
excel_file = pd.ExcelFile(file_path, engine='openpyxl')
dfs = []
for sheet_name in excel_file.sheet_names:
try:
# Read the Excel sheet with all columns as strings initially
df = pd.read_excel(
file_path,
sheet_name=sheet_name,
engine='openpyxl',
dtype=str # Read all columns as strings first
)
if df is not None and not df.empty:
# Clean up the data
df = df.apply(lambda x: x.str.strip() if x.dtype == "object" else x)
df = df.replace(['nan', 'NaN', 'NaT', ''], None)
# Add metadata columns
df['_file_name'] = os.path.basename(file_path)
df['_sheet_name'] = sheet_name
dfs.append(df)
else:
st.warning(f"Sheet {sheet_name} in {file_path} is empty")
except Exception as e:
st.warning(f"Error reading sheet {sheet_name} from {file_path}: {str(e)}")
continue
if not dfs:
st.error(f"No valid data found in {file_path}")
return [pd.DataFrame()] # Return list with empty DataFrame if no valid sheets
return dfs
except Exception as e:
st.error(f"Failed to open Excel file {file_path}: {str(e)}")
return [pd.DataFrame()] # Return list with empty DataFrame on error
def read_csv_file(file_path: str) -> pd.DataFrame:
"""Read a CSV file with support for different encodings."""
encodings = ['utf-8', 'latin1', 'iso-8859-1', 'cp1252']
for encoding in encodings:
try:
# Read CSV with specified encoding and all columns as strings initially
df = pd.read_csv(file_path, dtype=str, encoding=encoding)
# Clean up the data
df = df.apply(lambda x: x.str.strip() if x.dtype == "object" else x)
df = df.replace(['nan', 'NaN', 'NaT', ''], None)
# Add metadata columns
df['_file_name'] = os.path.basename(file_path)
df['_sheet_name'] = 'csv'
return df
except UnicodeDecodeError:
continue # Try next encoding
except Exception as e:
st.warning(f"Error reading CSV file {file_path} with {encoding} encoding: {str(e)}")
continue
st.error(f"Failed to read CSV file {file_path} with any supported encoding")
return pd.DataFrame() # Return empty DataFrame if all encodings fail
def process_folder(folder_path: str) -> Tuple[Optional[str], Dict[str, str]]:
"""Process all files in the folder and create both combined and individual parquet files."""
combined_dfs = []
individual_parquets = {}
# Create progress bar
files = [f for f in os.listdir(folder_path) if f.lower().endswith(('.csv', '.xlsx', '.xls'))]
progress_bar = st.progress(0)
for idx, file_name in enumerate(files):
file_path = os.path.join(folder_path, file_name)
base_name = os.path.splitext(file_name)[0]
try:
try:
if file_name.lower().endswith(('.xlsx', '.xls')):
dfs = read_excel_file(file_path)
if dfs and not all(df.empty for df in dfs):
# Create individual parquet for Excel file (all sheets)
excel_df = pd.concat([df for df in dfs if not df.empty], ignore_index=True)
if not excel_df.empty and '_file_name' in excel_df.columns and '_sheet_name' in excel_df.columns:
excel_df = excel_df[['_file_name', '_sheet_name'] +
[col for col in excel_df.columns
if col not in ['_file_name', '_sheet_name']]]
parquet_path = os.path.join(folder_path, f"{base_name}.parquet")
table = pa.Table.from_pandas(excel_df)
pq.write_table(table, parquet_path)
individual_parquets[file_name] = parquet_path
combined_dfs.extend([df for df in dfs if not df.empty])
else:
st.warning(f"No valid data found in Excel file: {file_name}")
elif file_name.lower().endswith('.csv'):
df = read_csv_file(file_path)
if df is not None and not df.empty and '_file_name' in df.columns and '_sheet_name' in df.columns:
# Create individual parquet for CSV file
df = df[['_file_name', '_sheet_name'] +
[col for col in df.columns
if col not in ['_file_name', '_sheet_name']]]
parquet_path = os.path.join(folder_path, f"{base_name}.parquet")
table = pa.Table.from_pandas(df)
pq.write_table(table, parquet_path)
individual_parquets[file_name] = parquet_path
combined_dfs.append(df)
else:
st.warning(f"No valid data found in CSV file: {file_name}")
except Exception as e:
st.error(f"Error processing file {file_name}: {str(e)}")
continue
# Update progress
progress_bar.progress((idx + 1) / len(files))
except Exception as e:
st.warning(f"Error processing {file_name}: {str(e)}")
if not combined_dfs:
return None, {}
# Combine all dataframes
combined_df = pd.concat(combined_dfs, ignore_index=True)
# Reorder columns to put file_name and sheet_name first
cols = ['_file_name', '_sheet_name'] + [col for col in combined_df.columns
if col not in ['_file_name', '_sheet_name']]
combined_df = combined_df[cols]
# Save combined parquet
output_path = os.path.join(folder_path, 'combined.parquet')
table = pa.Table.from_pandas(combined_df)
pq.write_table(table, output_path)
return output_path, individual_parquets
def load_combined_data(file_path: str) -> Optional[pd.DataFrame]:
"""Load the combined parquet file."""
try:
return pd.read_parquet(file_path)
except Exception as e:
st.error(f"Error loading combined data: {str(e)}")
return None
def load_individual_data(file_path: str) -> Optional[pd.DataFrame]:
"""Load individual parquet file."""
try:
return pd.read_parquet(file_path)
except Exception as e:
st.error(f"Error loading individual data: {str(e)}")
return None
def setup_page_config():
"""Configure the Streamlit page."""
st.set_page_config(
page_title="Data Explorer",
page_icon="📊",
layout="wide",
initial_sidebar_state="expanded"
)
def apply_custom_css():
"""Apply custom CSS styles."""
st.markdown("""
<style>
.stSelectbox {
margin-bottom: 1rem;
}
.st-emotion-cache-1y4p8pa {
max-width: 100%;
}
.source-file {
color: #FF4B4B;
font-weight: 600;
}
.sheet-name {
color: #0068C9;
font-weight: 600;
}
</style>
""", unsafe_allow_html=True)
def render_sidebar(folders: List[str], available_files: Optional[Dict[str, str]] = None) -> Tuple[str, bool, str, Optional[str], Optional[str]]:
"""Render the sidebar with folder selection and viewing options."""
with st.sidebar:
st.header("📁 Folder Selection")
selected_folder = st.selectbox(
"Choose a folder to process",
options=folders,
format_func=lambda x: x.replace('_', ' ').title(),
key="folder_select"
)
# View mode selection
st.markdown("### View Mode")
view_mode = st.radio(
"Select View Mode",
["Combined Mode", "Single Mode", "Compare Mode", "Enhanced Combined Mode"],
key="view_mode"
)
# File selection based on mode
selected_file = None
selected_file_2 = None
if view_mode != "Combined Mode" and view_mode != "Enhanced Combined Mode" and available_files:
if view_mode == "Single Mode":
selected_file = st.selectbox(
"Select File to View",
options=list(available_files.keys()),
format_func=lambda x: x.replace('_', ' ').title(),
key="file_select_1"
)
elif view_mode == "Compare Mode":
col1, col2 = st.columns(2)
with col1:
selected_file = st.selectbox(
"Select First File",
options=list(available_files.keys()),
format_func=lambda x: x.replace('_', ' ').title(),
key="file_select_compare_1"
)
with col2:
# Filter out the first selected file from options
remaining_files = [f for f in available_files.keys() if f != selected_file]
selected_file_2 = st.selectbox(
"Select Second File",
options=remaining_files,
format_func=lambda x: x.replace('_', ' ').title(),
key="file_select_compare_2"
)
# Add option to recreate data
recreate_data = st.button("🔄 Recreate Data",
help="Click to reprocess all files and update the parquet files",
key="recreate_button"
)
st.markdown("---")
return selected_folder, recreate_data, view_mode, selected_file, selected_file_2
def filter_empty_rows_and_columns(df: pd.DataFrame) -> pd.DataFrame:
"""Remove empty rows and columns from the DataFrame."""
if df.empty:
return df
def has_value(x):
# Convert to string and check if it's not empty/None
str_val = str(x).strip()
return str_val not in ['', 'None', 'nan', 'NaN', 'NaT'] and len(str_val) > 0
# Remove rows where all values are empty/None
df = df.dropna(how='all')
df = df[df.astype(str).apply(lambda x: x.str.strip().str.len() > 0).any(axis=1)]
# Check each column for meaningful values
non_empty_cols = []
for col in df.columns:
if df[col].apply(has_value).any():
non_empty_cols.append(col)
# Return DataFrame with only non-empty columns and rows
return df[non_empty_cols]
def render_main_content(df: pd.DataFrame, mode: str = "Combined Mode", df2: Optional[pd.DataFrame] = None,
selected_file: Optional[str] = None, selected_file_2: Optional[str] = None):
"""Render the main content area with data display and search."""
# Search functionality with multiple keywords
st.write("🔍 Search in data")
filtered_df = df
filtered_df2 = df2
search_terms = []
if mode != "Compare Mode":
# Create 6 columns for all search boxes including file and sheet filters
search_cols = st.columns(6)
# File and Sheet name filters first
with search_cols[0]:
file_filter = st.text_input("File Name", key="file_filter", placeholder="Filter by file name")
with search_cols[1]:
sheet_filter = st.text_input("Sheet Name", key="sheet_filter", placeholder="Filter by sheet name")
# General search boxes in remaining columns
for i in range(4):
with search_cols[i + 2]:
term = st.text_input(f"Keyword {i+1}", key=f"search_{i}", placeholder=f"Search term {i+1}")
if term.strip():
search_terms.append(term.strip())
else:
# For Compare Mode, use 4 evenly-spaced columns for search terms only
search_cols = st.columns(4)
file_filter = ""
sheet_filter = ""
# Search boxes take full width
for i in range(4):
with search_cols[i]:
term = st.text_input(f"Keyword {i+1}", key=f"search_{i}", placeholder=f"Search term {i+1}")
if term.strip():
search_terms.append(term.strip())
# Apply file and sheet filters (for all modes)
if file_filter:
mask = filtered_df['_file_name'].astype(str).str.contains(file_filter, case=False, na=False)
filtered_df = filtered_df[mask]
if filtered_df2 is not None:
mask2 = filtered_df2['_file_name'].astype(str).str.contains(file_filter, case=False, na=False)
filtered_df2 = filtered_df2[mask2]
if sheet_filter:
mask = filtered_df['_sheet_name'].astype(str).str.contains(sheet_filter, case=False, na=False)
filtered_df = filtered_df[mask]
if filtered_df2 is not None:
mask2 = filtered_df2['_sheet_name'].astype(str).str.contains(sheet_filter, case=False, na=False)
filtered_df2 = filtered_df2[mask2]
# Apply general search terms
if search_terms:
for term in search_terms:
# Apply to first dataframe
mask = filtered_df.astype(str).apply(lambda x: x.str.contains(term, case=False)).any(axis=1)
filtered_df = filtered_df[mask]
# Apply to second dataframe if in compare mode
if filtered_df2 is not None:
mask2 = filtered_df2.astype(str).apply(lambda x: x.str.contains(term, case=False)).any(axis=1)
filtered_df2 = filtered_df2[mask2]
# Remove empty rows and columns
filtered_df = filter_empty_rows_and_columns(filtered_df)
if filtered_df2 is not None:
filtered_df2 = filter_empty_rows_and_columns(filtered_df2)
# Display data info
if mode == "Compare Mode":
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("File 1 Total Rows", len(df))
with col2:
st.metric("File 1 Filtered Rows", len(filtered_df))
with col3:
st.metric("File 2 Total Rows", len(df2))
with col4:
st.metric("File 2 Filtered Rows", len(filtered_df2))
else:
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Total Rows", len(df))
with col2:
st.metric("Filtered Rows", len(filtered_df))
with col3:
st.metric("Columns", len(filtered_df.columns))
# Display the dataframe(s) based on mode
if mode == "Compare Mode":
display_side_by_side = st.checkbox("Display Side by Side", value=True,
help="Toggle between side-by-side and stacked view")
if display_side_by_side:
col1, col2 = st.columns(2)
with col1:
# Always show file metadata - get from original df before filtering
file_name = selected_file if selected_file else "No file selected"
sheet_name = df['_sheet_name'].iloc[0] if not df.empty else "No sheet"
st.markdown("""
<h3>
<span class="source-file">{}</span> -
<span class="sheet-name">{}</span>
</h3>
""".format(file_name, sheet_name),
unsafe_allow_html=True)
# Remove empty rows before display
if not filtered_df.empty:
filtered_df = filtered_df.dropna(how='all').reset_index(drop=True)
filtered_df = filtered_df[filtered_df.astype(str).apply(lambda x: x.str.strip().str.len() > 0).any(axis=1)]
st.dataframe(filtered_df, height=400, use_container_width=True, hide_index=True)
# Add download buttons for first dataframe
if not filtered_df.empty:
col1a, col1b = st.columns(2)
with col1a:
buffer = BytesIO()
filtered_df.to_excel(buffer, index=False, engine='openpyxl')
buffer.seek(0)
st.download_button(
label="📥 Download Excel",
data=buffer,
file_name=f"{filtered_df['_file_name'].iloc[0]}_filtered.xlsx",
mime="application/vnd.ms-excel",
)
with col1b:
csv_data = filtered_df.to_csv(index=False).encode('utf-8')
st.download_button(
label="📥 Download CSV",
data=csv_data,
file_name=f"{filtered_df['_file_name'].iloc[0]}_filtered.csv",
mime="text/csv",
)
with col2:
# Always show file metadata for second file
file_name2 = selected_file_2 if selected_file_2 else "No file selected"
sheet_name2 = df2['_sheet_name'].iloc[0] if not df2.empty else "No sheet"
st.markdown("""
<h3>
<span class="source-file">{}</span> -
<span class="sheet-name">{}</span>
</h3>
""".format(file_name2, sheet_name2),
unsafe_allow_html=True)
# Remove empty rows before display
if not filtered_df2.empty:
filtered_df2 = filtered_df2.dropna(how='all').reset_index(drop=True)
filtered_df2 = filtered_df2[filtered_df2.astype(str).apply(lambda x: x.str.strip().str.len() > 0).any(axis=1)]
st.dataframe(filtered_df2, height=400, use_container_width=True, hide_index=True)
# Add download buttons for second dataframe
if not filtered_df2.empty:
col2a, col2b = st.columns(2)
with col2a:
buffer = BytesIO()
filtered_df2.to_excel(buffer, index=False, engine='openpyxl')
buffer.seek(0)
st.download_button(
label="📥 Download Excel",
data=buffer,
file_name=f"{filtered_df2['_file_name'].iloc[0]}_filtered.xlsx",
mime="application/vnd.ms-excel",
)
with col2b:
csv_data = filtered_df2.to_csv(index=False).encode('utf-8')
st.download_button(
label="📥 Download CSV",
data=csv_data,
file_name=f"{filtered_df2['_file_name'].iloc[0]}_filtered.csv",
mime="text/csv",
)
else:
if not filtered_df.empty:
st.markdown("""
<h3>
<span class="source-file">{}</span> -
<span class="sheet_name">{}</span>
</h3>
""".format(filtered_df['_file_name'].iloc[0], filtered_df['_sheet_name'].iloc[0]),
unsafe_allow_html=True)
# Remove empty rows
filtered_df = filtered_df.dropna(how='all').reset_index(drop=True)
filtered_df = filtered_df[filtered_df.astype(str).apply(lambda x: x.str.strip().str.len() > 0).any(axis=1)]
st.dataframe(filtered_df, height=300, use_container_width=True, hide_index=True)
if not filtered_df2.empty:
st.markdown("""
<h3>
<span class="source-file">{}</span> -
<span class="sheet_name">{}</span>
</h3>
""".format(filtered_df2['_file_name'].iloc[0], filtered_df2['_sheet_name'].iloc[0]),
unsafe_allow_html=True)
# Remove empty rows
filtered_df2 = filtered_df2.dropna(how='all').reset_index(drop=True)
filtered_df2 = filtered_df2[filtered_df2.astype(str).apply(lambda x: x.str.strip().str.len() > 0).any(axis=1)]
st.dataframe(filtered_df2, height=300, use_container_width=True, hide_index=True)
elif mode == "Enhanced Combined Mode":
# Group data by file and sheet name
grouped = filtered_df.groupby(['_file_name', '_sheet_name'])
# If no groups exist, show a message
if len(grouped) == 0:
st.warning("No data available in the selected files")
for (file_name, sheet_name), group_df in grouped:
# Always create a header for each group
st.markdown(f"""
<h3>
<span class="source-file">{file_name}</span> -
<span class="sheet-name">{sheet_name}</span>
</h3>
""", unsafe_allow_html=True)
# Filter empty rows and columns for this specific group
group_df_filtered = filter_empty_rows_and_columns(group_df)
# Always show the dataframe, even if empty
if not group_df_filtered.empty:
# Drop rows where all values are None/NaN/empty strings
group_df_filtered = group_df_filtered.dropna(how='all').reset_index(drop=True)
group_df_filtered = group_df_filtered[group_df_filtered.astype(str).apply(lambda x: x.str.strip().str.len() > 0).any(axis=1)]
# Display the filtered group data
st.dataframe(
group_df_filtered,
height=300,
use_container_width=True,
hide_index=True
)
# Add download buttons for each group
if not group_df_filtered.empty:
col1, col2 = st.columns(2)
with col1:
buffer = BytesIO()
group_df_filtered.to_excel(buffer, index=False, engine='openpyxl')
buffer.seek(0)
st.download_button(
label="📥 Download Excel",
data=buffer,
file_name=f"{file_name}_filtered.xlsx",
mime="application/vnd.ms-excel",
)
with col2:
csv_data = group_df_filtered.to_csv(index=False).encode('utf-8')
st.download_button(
label="📥 Download CSV",
data=csv_data,
file_name=f"{file_name}_filtered.csv",
mime="text/csv",
)
st.markdown("---") # Add a separator between groups
else:
# Combined Mode - show data without file/sheet name header
# Remove completely empty rows and display the dataframe
if not filtered_df.empty:
# Drop rows where all values are None/NaN/empty strings
filtered_df = filtered_df.dropna(how='all').reset_index(drop=True)
filtered_df = filtered_df[filtered_df.astype(str).apply(lambda x: x.str.strip().str.len() > 0).any(axis=1)]
st.dataframe(filtered_df, height=400, use_container_width=True, hide_index=True)
# Add download buttons for filtered data
if not filtered_df.empty:
col1, col2 = st.columns(2)
with col1:
buffer = BytesIO()
filtered_df.to_excel(buffer, index=False, engine='openpyxl')
buffer.seek(0)
st.download_button(
label="📥 Download Excel",
data=buffer,
file_name="filtered_data.xlsx",
mime="application/vnd.ms-excel",
)
with col2:
csv_data = filtered_df.to_csv(index=False).encode('utf-8')
st.download_button(
label="📥 Download CSV",
data=csv_data,
file_name="filtered_data.csv",
mime="text/csv",
)
def show_error_message(error: str):
"""Display error message."""
st.error(f"An error occurred: {str(error)}")
def main():
# Setup page configuration
setup_page_config()
# Apply custom CSS
apply_custom_css()
# Set up the main title
st.title("Data Explorer")
try:
# Scan for folders in the current directory
folders = [d for d in os.listdir('.') if os.path.isdir(d) and not d.startswith(('.', '__'))]
if folders:
# Initialize variables
available_files = {}
# Get initial folder selection from session state
if 'selected_folder' not in st.session_state:
st.session_state.selected_folder = folders[0]
# Scan for existing parquet files if we have a selected folder
if st.session_state.selected_folder:
existing_parquet = os.path.join(st.session_state.selected_folder, 'combined.parquet')
# Scan for existing parquet files
for file in os.listdir(st.session_state.selected_folder):
if file.endswith('.parquet') and file != 'combined.parquet':
source_file = file.replace('.parquet', '')
source_xlsx = os.path.join(st.session_state.selected_folder, f"{source_file}.xlsx")
source_csv = os.path.join(st.session_state.selected_folder, f"{source_file}.csv")
if os.path.exists(source_xlsx):
available_files[f"{source_file}.xlsx"] = os.path.join(st.session_state.selected_folder, file)
elif os.path.exists(source_csv):
available_files[f"{source_file}.csv"] = os.path.join(st.session_state.selected_folder, file)
# Render sidebar with available files
selected_folder, recreate_data, view_mode, selected_file, selected_file_2 = render_sidebar(folders, available_files)
st.session_state.selected_folder = selected_folder
if selected_folder:
# Process files if needed
if not os.path.exists(existing_parquet) or recreate_data:
with st.spinner('Processing files...'):
combined_file_path, new_available_files = process_folder(selected_folder)
available_files = new_available_files # Update available files after processing
else:
combined_file_path = existing_parquet
# Display data based on mode
if view_mode in ["Combined Mode", "Enhanced Combined Mode"]:
if combined_file_path and os.path.exists(combined_file_path):
df = load_combined_data(combined_file_path)
if df is not None:
render_main_content(df, mode=view_mode, selected_file="combined_data")
else:
st.warning("No data found in the selected folder.")
else:
st.warning("Combined data not found. Please recreate the data.")
elif view_mode == "Single Mode":
if selected_file and selected_file in available_files:
df = load_individual_data(available_files[selected_file])
if df is not None:
render_main_content(df, mode=view_mode, selected_file=selected_file)
else:
st.warning(f"No data found in {selected_file}")
else:
st.info("Please select a file to view from the sidebar.")
elif view_mode == "Compare Mode":
if selected_file and selected_file_2 and \
selected_file in available_files and selected_file_2 in available_files:
df1 = load_individual_data(available_files[selected_file])
df2 = load_individual_data(available_files[selected_file_2])
if df1 is not None and df2 is not None:
render_main_content(df1, mode=view_mode, df2=df2,
selected_file=selected_file, selected_file_2=selected_file_2)
else:
st.warning("Error loading one or both files for comparison.")
else:
st.info("Please select two files to compare from the sidebar.")
else:
st.info("Please select a folder from the sidebar to begin.")
else:
st.info("No folders found in the current directory.")
except Exception as e:
show_error_message(str(e))
if __name__ == "__main__":
main()
For further actions, you may consider blocking this person and/or reporting abuse
Top comments (0)