One of the standout services within the AWS ecosystem is the Elastic Container Service (ECS) with the Fargate launch type. This service is often referred to as “serverless” because it requires minimal operational setup to launch. However, while Fargate offers ease of use, it lacks one of the crucial benefits of AWS Lambda: automatic scaling. Regardless, Matillion leverages Fargate to create a cost-effective way to integrate your AWS environment with its services.
The Scalability Challenge with Big Data
While Matillion provides a straightforward integration, challenges can arise when discussing scalability and the movement of big data. For instance, if your data pipelines require processing gigabytes of data, this can be managed in a relatively static way using Matillion components. However, if you need to implement Python logic using Pandas, the situation becomes more complex.
You might be wondering: how do you handle gigabytes of data with a Fargate runner that has only 2 vCPUs and 4GB of memory? Yes, this can be tricky, especially if you need to wrangle the data during the transfer, as it must be loaded into memory.
Leveraging Python Pandas on Big Data with Fargate
A key issue to understand is that Python Pandas transforms data in memory. This design choice allows for high-speed processing, but it also introduces the risk of an “OutOfMemoryError,” which can lead to the termination of your Fargate task. If this happens, Matillion will not be able to track the state of your pipeline, as the executing task disappeared.
Your first reaction might be to increase the task size permanently. However, this could lead to a significant increase in your AWS bill. For instance, running production agents with Matillion’s suggested configuration (2 vCPUs, 4GB RAM, 20GB Storage) costs about $144.16 for a 24-hour period over 30 days. In contrast, scaling up to 16 vCPUs and 128GB RAM could result in costs of $1,724.28. Since not all pipelines require such extensive resources, introducing static scaling is not advisable. Instead, dynamic scaling could be an option, similar to how Snowflake resources can be adjusted using ALTER WAREHOUSE
SQLs. However, managing dynamic scaling outside of your Matillion pipeline is essential to avoid losing tracking information. Having dynamic scaling from within a pipeline could be a valuable feature request for Matillion.
Implementing Chunking for Large Data Files
An easier implementation for your Pandas workloads can be achieved by using the sophisticated methods provided by the Pandas package itself. Since Pandas operates in memory, why not explore whether it has an option for chunking large data files into smaller parts? If so, can we apply transformation logic to each chunk? Does reading bytes introduce resilience against encoding issues and allow for generic data type identification in each chunk? Can we upload each chunk to the target storage system and free memory afterward? Finally, if we encounter errors, are we able to track them chunk by chunk?
The question of how to chunk a large DataFrame into smaller ones, such as converting a large CSV into multiple smaller Parquet files, is not new. Fortunately, today we have a chunksize parameter in methods like:
pandas.read_csv()
pandas.read_excel()
pandas.read_json()
pandas.read_sql()
pandas.read_hdf()
The authors of Pandas advise that chunking works well when the operation requires zero or minimal coordination between chunks. For more complicated workflows, other libraries might be a better choice.
Example of Chunking Implementation
Here’s a simple implementation of chunking:
import pandas as pd
chunk_size = 1000
chunk_number = 0
for chunk in pd.read_csv('large_file.csv', chunksize=chunk_size):
chunk.to_parquet(f'chunk_{chunk_number}.parquet')
chunk_number += 1
It’s important to note that the total number of rows is only meaningful if you know how many columns you have. Both dimensions drive the size of the DataFrame.
Applying Transformation Logic to Each Chunk
The answer to whether we can apply transformation logic for each chunk is a resounding yes. While chunking the data, we can apply transformations to each segment. However, we might encounter slight discrepancies in how Pandas interprets column data types, especially with CSV and JSON files where quotation marks and delimiters are crucial for resilient pipelines.
Here’s an example of applying a transformation to each chunk:
import pandas as pd
# Define a transformation function
def transform(df):
df['numeric_column'] = df['numeric_column'].apply(lambda x: x * 1.1)
return df
chunk_size = 1000
chunk_number = 0
for chunk in pd.read_csv('large_file.csv', chunksize=chunk_size):
transformed_chunk = transform(chunk)
transformed_chunk.to_parquet(f'chunk_{chunk_number}.parquet')
chunk_number += 1
Handling Encoding Issues
To get a robust treatment of encodning errors, reading bytes is the better approach. However, it also has its drawbacks. For example, if we read the entire file at once, we risk hitting an OutOfMemory error:
import pandas as pd
# Define a transformation function
def transform(df):
df['numeric_column'] = df['numeric_column'].apply(lambda x: x * 1.1)
return df
file_path = 'large_file.csv'
chunk_size = 1000
chunk_number = 0
with open(file_path, mode="rb") as fd:
for chunk in pd.read_csv(io.StringIO(fd.read().decode("latin_1")), chunksize=chunk_size):
# Apply the transformation to each chunk
transformed_chunk = transform(chunk)
# Write the transformed chunk to a Parquet file
transformed_chunk.to_parquet(f'chunk_{chunk_number}.parquet')
chunk_number += 1
As a result, our Fargate Task will be killed by AWS ECS or the process with the Matillion Hybrid Agent will result in an Error 137, as chunking won’t take effect. Therefore, we need to adjust our code to read in chunks:
import pandas as pd
# Define a transformation function
def transform(df):
df['numeric_column'] = df['numeric_column'].apply(lambda x: x * 1.1)
return df
file_path = 'large_file.csv'
chunk_size = 10000
chunk_number = 0
with open (file_path, mode="rb") as fd:
for chunk in pd.read_csv(
fd,
low_memory=False,
chunksize=chunk_size,
encoding="latin_1"
):
# Apply the transformation to each chunk
transformed_chunk = transform(chunk)
# Write the transformed chunk to a Parquet file
transformed_chunk.to_parquet(f'chunk_{chunk_number}.parquet')
chunk_number += 1
This method ensures that encoding is applied in a chunk-wise manner, preventing memory issues. The parameter low_memory=False
will be used to ensure data type consistency.
Uploading Chunks to Target Storage and Freeing Memory
To free memory, after a chunk has been created and uploading it to a target storage system, such as Amazon S3, here’s how you can do it:
import pandas as pd
import logging
import gc
# Define a transformation function
def transform(df):
df['numeric_column'] = df['numeric_column'].apply(lambda x: x * 1.1)
return df
file_path = 'large_file.csv'
s3_bucket = 'your_s3_bucket'
prefix_name = 'my_prefix'
file_name = 'large_file.csv'
access_key = 'your_access_key'
secret_access_key = 'your_secret_access_key'
sessions_token = 'your_session_token'
chunk_size = 10000
chunk_number = 0
with open (file_path, mode="rb") as fd:
# Statt die gesamte Datei auf einmal zu lesen, verwenden wir einen iterativen Ansatz
for chunk in pd.read_csv(
fd,
low_memory=False,
chunksize=chunk_size,
encoding="latin_1"
):
# Apply the transformation to each chunk
transformed_chunk = transform(chunk)
# Define the Parquet file path
parquet_file = f"s3://{s3_bucket}/{prefix_name}/part_{chunk_number}_{file_name.replace('.csv', '.snappy.parquet')}"
# Write the transformed chunk to a Parquet file
transformed_chunk.to_parquet(
parquet_file,
engine="auto",
compression="snappy",
storage_options={
"key": access_key,
"secret": secret_access_key,
"token": sessions_token,
},
)
logging.info(f"Uploaded chunk of {file_name} to {parquet_file}")
chunk_number += 1
del chunk
gc.collect()
The parameter storage_options
helps us to make of the boto3 implmentation of Pandas, while del chunk
and gc.collect()
free the memory.
Tracking Errors During Processing
Currently, tracking errors on a chunk-by-chunk basis can be challenging. However, you can leverage the on_bad_lines
parameter introduced in Pandas v1.4 to receive warnings for lines that cannot be processed. With it's callback feature, you can write great log files to keep track on error understanding. This feature allows you to handle errors more effectively, although it may not be compatible with all configurations. This also the case, when using low_memory=False
. In our usecase we are using on_bad_lines='warn'
to parse aftwards the logs from Matillion.
Final Thoughts
The biggest drawback highlighted in this blog post is the lack of robust logging for errors. However, despite these challenges, using Matillion with AWS Fargate allows for cost-effective solutions without the need for complex clusters. By implementing chunking and careful memory management, you can avoid memory overflow errors and streamline the processing of large datasets.
As AWS, Matillion, and Snowflake continue to evolve, leveraging these tools together can optimize your data workflows, reduce execution time, and enhance overall efficiency. Remember, in the cloud, effective resource utilization is key to maximizing your investment.
Happy Coding :-)!
Top comments (0)