DEV Community

SavvyShivam
SavvyShivam

Posted on

Configure python file in vscode

  1. Read data ` from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

def read_data(spark, customSchema):

Step 1: Define the S3 bucket and file location
bucket_name = " loan-data "
s3_input_path = f"s3://{bucket_name}/Inputfile/loan_data.csv"

Step 2: Read the CSV file into a DataFrame
df = spark.read.csv(s3_input_path, header=True, schema=customSchema)

Step 3: Return the DataFrame
return df
Enter fullscreen mode Exit fullscreen mode

`

  1. Clean Data

` def clean_data(input_df):

 Step 1: Drop rows with any null values
df_no_nulls = input_df.dropna()

Step 2: Remove duplicate rows
df_no_duplicates = df_no_nulls.dropDuplicates()

Step 3: Drop rows where the 'purpose' column contains the string 'null'
df_cleaned = df_no_duplicates.filter(df_no_duplicates["purpose"] != "null")

Return the cleaned DataFrame
return df_cleaned
Enter fullscreen mode Exit fullscreen mode

`

  1. S3_load_data
    `def s3_load_data(data, file_name):

    Saves the final DataFrame to an S3 bucket as a single CSV file with a header.

    Parameters:
    data (DataFrame): The output data of the result_1 and result_2 functions.
    file_name (str): The name of the output file to be stored inside the S3 bucket.

    Step 1: Mention the bucket name
    bucket_name = "loan-data" # Replace with the unique bucket name if applicable

    Step 2: Define the output path
    output_path = "s3://" + bucket_name + "/output/" + file_name

    Step 3: Check if the DataFrame has the expected row count
    if data.count() != 0:
    print("Loading the data to:", output_path)

    # Write the DataFrame to S3 as a single partition CSV file
    data.coalesce(1).write.csv(output_path, mode="overwrite", header=True)
    

    else:
    print("Empty DataFrame, hence cannot save the data to:", output_path)
    `

  2. Result 1
    `from pyspark.sql.functions import when, col

def result_1(input_df):

Performs the following operations:
1. Filters rows where 'purpose' is either 'educational' or 'small business'.
2. Creates a new column 'income_to_installment_ratio' as the ratio of 'log annual inc' to 'installment'.
3. Creates a new column 'int_rate_category' based on 'int_rate' categorization:
    - "low" if int_rate < 0.1
    - "medium" if 0.1 <= int_rate < 0.15
    - "high" if int_rate >= 0.15
4. Creates a new column 'high_risk_borrower' with value "1" if:
    - dti > 20
    - fico < 700
    - revol_util > 80
   Otherwise, sets it to "0".

Parameters:
    input_df (DataFrame): The cleaned data DataFrame.

Returns:
    DataFrame: The transformed DataFrame with the new columns.

Step 1: Filter rows where 'purpose' is 'educational' or 'small business'
filtered_df = input_df.filter((col("purpose") == "educational") | (col("purpose") == "small business"))

Step 2: Add 'income_to_installment_ratio' column
with_income_ratio = filtered_df.withColumn(
    "income_to_installment_ratio", col("log annual inc") / col("installment")
)

Step 3: Add 'int_rate_category' column
with_int_rate_category = with_income_ratio.withColumn(
    "int_rate_category",
    when(col("int_rate") < 0.1, "low")
    .when((col("int_rate") >= 0.1) & (col("int_rate") < 0.15), "medium")
    .otherwise("high")
)

Step 4: Add 'high_risk_borrower' column
final_df = with_int_rate_category.withColumn(
    "high_risk_borrower",
    when(
        (col("dti") > 20) & (col("fico") < 700) & (col("revol_util") > 80),
        1
    ).otherwise(0)
)

Return the final DataFrame
return final_df
Enter fullscreen mode Exit fullscreen mode

`

  1. Result 2

`from pyspark.sql import *

def result_2(input_df):

Calculates the default rate for each purpose. The default rate is defined as:
- The count of loans that are not fully paid (not_fully_paid == 1) divided by the total count of loans for each purpose.

Parameters:
    input_df (DataFrame): The cleaned data DataFrame.

Returns:
    DataFrame: The DataFrame with purpose and default_rate columns.

Step 1: Calculate the total number of loans and number of not fully paid loans for each purpose
total_loans = input_df.groupBy("purpose").count().withColumnRenamed("count", "total_loans")

not_fully_paid_loans = input_df.filter(col("not_fully_paid") == 1).groupBy("purpose").count().withColumnRenamed("count", "not_fully_paid")

Step 2: Join the two DataFrames to get total loans and not fully paid loans in the same table
result_df = total_loans.join(not_fully_paid_loans, on="purpose", how="left_outer").fillna(0)

#
Enter fullscreen mode Exit fullscreen mode

Step 3: Calculate the default rate as the ratio of not fully paid loans to total loans
result_df = result_df.withColumn(
"default_rate",
round(result_df["not_fully_paid"] / result_df["total_loans"], 2)
)

Step 4: Select the desired columns and return the DataFrame
final_df = result_df.select("purpose", "default_rate")

return final_df
Enter fullscreen mode Exit fullscreen mode

`

  1. Redshift load data

`def redshift_load_data(data):

Loads the final DataFrame to the Redshift table.

Parameters:
    data (DataFrame): The output of the result_2 function.

Step 1: Define the Redshift connection parameters
jdbcUrl = "jdbc:redshift://<your-cluster-endpoint>:5439/<your-database-name>"
username = "<your-username>"
password = "<your-password>"
table_name = "result_2_table"  # Specify the Redshift table where data will be loaded

Step 2: Load data into Redshift
data.write \
    .format("jdbc") \
    .option("url", jdbcUrl) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    .mode("overwrite") \
    .save()

print(f"Data successfully loaded into the {table_name} table in Redshift.")
Enter fullscreen mode Exit fullscreen mode

`

Top comments (0)