Storing and Analyzing Financial Data with ArcticDB
ArcticDB is a high-performance database designed for financial and time-series data, making it an excellent choice for stock data storage and analytics. In this guide, we will explore how to use ArcticDB to efficiently store financial data from CSV files and perform financial analysis using Python.
This solution integrates Streamlit for visualization, ArcticDB for data storage, and yFinance for stock data retrieval.
Key Features
- Efficient Data Storage: Store multi-year historical stock data with a powerful ArcticDB backend.
- Financial Data Analysis: Perform stock screening, backtesting strategies, and historical data analysis.
- User-Friendly Dashboard: Streamlit interface for intuitive interactions.
Step-by-Step Guide
Step 1: Directory and Database Configuration
The project assumes a specific directory structure for storing CSV files and the database.
# Configuration
DATA_DIR = "./Data/sources/Index"
DB_PATH = "lmdb://./stock_db"
CSV_MAPPING = {
"US_DATA": ["SP500list.csv", "NASDAQ.csv", "NYSE01.csv", "NYSE02.csv"],
"IND_DATA": ["NIFTY750.csv", "NSE_ALL.csv", "AllCash.csv"]
}
Ensure that the directory contains the required CSV files, each listing stock symbols.
Step 2: ArcticDB Connection Setup
Create a function to establish a connection to ArcticDB.
from arcticdb import Arctic
# Initialize ArcticDB connection
def get_arctic_connection():
ac = Arctic(DB_PATH)
return ac
Step 3: CSV File Processing
Fetch available CSV files and map them to appropriate database libraries.
import os
def get_library_name(csv_file):
csv_lower = csv_file.lower()
for lib_name, files in CSV_MAPPING.items():
if any(f.lower() == csv_lower for f in files):
return lib_name
if "nse" in csv_lower or "nifty" in csv_lower:
return "IND_DATA"
return "US_DATA"
# Get list of available CSV files
def get_available_csvs():
mapped_files = [f for files in CSV_MAPPING.values() for f in files]
return [f for f in mapped_files if os.path.exists(os.path.join(DATA_DIR, f))]
Step 4: Updating the Database
Download stock data for the symbols listed in the selected CSV file and store it in ArcticDB.
import yfinance as yf
import pandas as pd
from datetime import datetime
import streamlit as st
def update_database(csv_file, lib_name):
ac = get_arctic_connection()
if lib_name not in ac.list_libraries():
ac.create_library(lib_name)
lib = ac[lib_name]
symbols = pd.read_csv(os.path.join(DATA_DIR, csv_file))['Symbol'].tolist()
arctic_symbol = os.path.splitext(csv_file)[0]
existing_data = pd.DataFrame()
if lib.has_symbol(arctic_symbol):
existing_data = lib.read(arctic_symbol).data
new_data_list = []
progress_bar = st.progress(0)
status_text = st.empty()
for i, symbol in enumerate(symbols):
try:
# Download 2 years of data
data = yf.download(
symbol,
start=(datetime.today() - pd.DateOffset(years=2)).strftime('%Y-%m-%d'),
end=datetime.today().strftime('%Y-%m-%d'),
auto_adjust=False
)
if not data.empty:
data = data[['Open', 'High', 'Low', 'Close', 'Volume']]
data.columns = [c.lower() for c in data.columns]
data.index = data.index.tz_localize(None)
data = data.reset_index().assign(symbol=symbol)
new_data_list.append(data)
progress_bar.progress((i + 1) / len(symbols))
status_text.text(f"Processing {i + 1}/{len(symbols)}: {symbol}")
except Exception as e:
st.error(f"Error processing {symbol}: {str(e)}")
if new_data_list:
new_data = pd.concat(new_data_list)
new_data.set_index(['symbol', 'Date'], inplace=True)
new_data.sort_index(inplace=True)
if not existing_data.empty:
combined_data = pd.concat([existing_data, new_data])
combined_data = combined_data[~combined_data.index.duplicated(keep='last')]
else:
combined_data = new_data
lib.write(arctic_symbol, combined_data.sort_index())
st.success(f"Database {lib_name} updated with {csv_file} data in symbol {arctic_symbol}!")
else:
st.warning("No new data was downloaded.")
Step 5: Database Status Overview
Generate a status report for all available databases.
from datetime import datetime
# Check database status
def get_database_status():
status_data = []
ac = get_arctic_connection()
for csv_file in get_available_csvs():
lib_name = get_library_name(csv_file)
arctic_symbol = os.path.splitext(csv_file)[0]
record_count = 0
latest_date = "N/A"
if lib_name in ac.list_libraries():
lib = ac[lib_name]
if lib.has_symbol(arctic_symbol):
df = lib.read(arctic_symbol).data
record_count = len(df)
latest_date = df.index.get_level_values('Date').max().strftime("%Y-%m-%d") if not df.empty else "N/A"
status_data.append({
"CSV File": csv_file,
"Database": lib_name,
"Arctic Symbol": arctic_symbol,
"Records": record_count,
"Last Updated": latest_date
})
return pd.DataFrame(status_data)
Step 6: Running the Streamlit App
Use the main()
function to initialize the Streamlit app.
import streamlit as st
import yfinance as yf
import pandas as pd
from arcticdb import Arctic
import os
from datetime import datetime
from glob import glob
# Configuration
DATA_DIR = "./Data/sources/Index"
DB_PATH = "lmdb://./stock_data"
CSV_MAPPING = {
"US_DATA": [
"SP500list.csv", "NASDAQ.csv", "NYSE01.csv", "NYSE02.csv",
],
"IND_DATA": ["NIFTY750.csv", "NSE_ALL.csv", "AllCash.csv"]
}
def get_arctic_connection():
"""Initialize and return ArcticDB connection"""
ac = Arctic(DB_PATH)
return ac
def get_library_name(csv_file):
"""Determine library name based on CSV filename"""
csv_lower = csv_file.lower()
for lib_name, files in CSV_MAPPING.items():
if any(f.lower() == csv_lower for f in files):
return lib_name
if "nse" in csv_lower or "nifty" in csv_lower:
return "IND_DATA"
return "US_DATA"
def get_available_csvs():
"""Get list of available CSV files from mapping that exist in data directory"""
mapped_files = [f for files in CSV_MAPPING.values() for f in files]
return [f for f in mapped_files if os.path.exists(os.path.join(DATA_DIR, f))]
def update_database(csv_file, lib_name):
"""Update database with symbols from CSV file using MultiIndex"""
ac = get_arctic_connection()
if lib_name not in ac.list_libraries():
ac.create_library(lib_name)
lib = ac[lib_name]
symbols = pd.read_csv(os.path.join(DATA_DIR, csv_file))['Symbol'].tolist()
arctic_symbol = os.path.splitext(csv_file)[0] # Use CSV filename as symbol name
# Load existing data if available
existing_data = pd.DataFrame()
if lib.has_symbol(arctic_symbol):
existing_data = lib.read(arctic_symbol).data
new_data_list = []
progress_bar = st.progress(0)
status_text = st.empty()
for i, symbol in enumerate(symbols):
try:
# Download 2 years of data
data = yf.download(
symbol,
start=(datetime.today() - pd.DateOffset(years=2)).strftime('%Y-%m-%d'),
end=datetime.today().strftime('%Y-%m-%d'),
auto_adjust=False
)
if not data.empty:
# Prepare data with MultiIndex
data = data[['Open', 'High', 'Low', 'Close', 'Volume']]
data.columns = [c.lower() for c in data.columns]
data.index = data.index.tz_localize(None)
data = data.reset_index().assign(symbol=symbol)
new_data_list.append(data)
progress_bar.progress((i+1)/len(symbols))
status_text.text(f"Processing {i+1}/{len(symbols)}: {symbol}")
except Exception as e:
st.error(f"Error processing {symbol}: {str(e)}")
if new_data_list:
# Combine all new data
new_data = pd.concat(new_data_list)
new_data.set_index(['symbol', 'Date'], inplace=True)
new_data.sort_index(inplace=True)
# Merge with existing data
if not existing_data.empty:
combined_data = pd.concat([existing_data, new_data])
# Remove duplicates keeping latest data
combined_data = combined_data[~combined_data.index.duplicated(keep='last')]
else:
combined_data = new_data
# Write to ArcticDB
lib.write(arctic_symbol, combined_data.sort_index())
st.success(f"Database {lib_name} updated with {csv_file} data in symbol {arctic_symbol}!")
else:
st.warning("No new data was downloaded.")
def get_database_status():
"""Generate status report for all databases"""
status_data = []
ac = get_arctic_connection()
for csv_file in get_available_csvs():
lib_name = get_library_name(csv_file)
arctic_symbol = os.path.splitext(csv_file)[0]
record_count = 0
latest_date = "N/A"
if lib_name in ac.list_libraries():
lib = ac[lib_name]
if lib.has_symbol(arctic_symbol):
df = lib.read(arctic_symbol).data
record_count = len(df)
latest_date = df.index.get_level_values('Date').max().strftime("%Y-%m-%d") if not df.empty else "N/A"
status_data.append({
"CSV File": csv_file,
"Database": lib_name,
"Arctic Symbol": arctic_symbol,
"Records": record_count,
"Last Updated": latest_date
})
return pd.DataFrame(status_data)
def main():
st.set_page_config(page_title="Stock Database Manager", layout="wide")
st.title("๐ Stock Data Management System")
# Sidebar controls
st.sidebar.header("Database Operations")
selected_csv = st.sidebar.selectbox("Select CSV File", get_available_csvs())
lib_name = get_library_name(selected_csv)
if st.sidebar.button("๐ฅ Download to DB"):
update_database(selected_csv, lib_name)
# Main display area
st.header("Database Status Overview")
status_df = get_database_status()
st.dataframe(
status_df.style.applymap(
lambda x: "background-color: #e6ffe6" if x != "N/A" else "",
subset=["Last Updated"]
),
use_container_width=True
)
# Show raw data preview
st.subheader("Selected CSV Preview")
try:
csv_path = os.path.join(DATA_DIR, selected_csv)
df_preview = pd.read_csv(csv_path).head(10)
st.dataframe(df_preview, use_container_width=True)
except Exception as e:
st.error(f"Error loading CSV: {str(e)}")
if __name__ == "__main__":
main()
Conclusion
By following this guide, you can efficiently manage and analyze stock data using ArcticDB. The provided code snippets help in setting up a robust system for data storage and financial analysis. With features like stock screening, backtesting strategies, and historical analysis, this solution provides a comprehensive platform for financial analytics.
Top comments (0)