We've been introduced to the subject of Machine Learning in Operations, familarized with Experiment Tracking with MLflow in the past week, now we work with orchestration tools, where we can manage all our processes all the way to deployment of the models - think CI/CD.. Data Engineering
Here, we use the free Mage Ai orchestration tool.
Mage SetUp
This is going to a bit technical, but please follow-through.
There are a few ways to set-up Mage, however, the safest is by using the containerized method - through Docker. Remember to create a separate folder for this week's assignment, just to be organized.
0.1 First launch the Docker Engine then from your terminal, or preferably git bash run:
docker pull mageai:mageai:latest
We would start to see some logs, wait for this to complete - then you can
0.2 Clone quickstart files
git clone https://github.com/mage-ai/compose-quickstart.git mage-quickstart
cd mage-quickstart
cp dev.env .env
rm dev.env
Update your requirements.txt file with these packages:
boto3
fastparquet
graphviz
hyperopt
jupyter
mlflow==2.12.1
pandas
scikit-learn
seaborn
shap
xgboost
This sequence of commands is used to set up a local development environment for a new Mage project:
-
git clone https://github.com/mage-ai/compose-quickstart.git
: This command clones the repository from the provided URL into mage-quickstart directory on your local machine. -
cd mage-quickstart
navigates into the directory. -
cp dev.env .env
: Copies thedev.env
file to a new file named.env
, which is used for environment configuration. -
rm dev.env
: Removes thedev.env
file, which is no longer needed after copying its contents to.env
.
Now, we are ready to face the questions
Question 1. Run Mage
1.1 Run Mage with Docker Compose.
To do this all we need to do is run docker compose up
- but not yet.
1.2 What's the version of Mage we run?
We will get this from the page's ui, still, hold on.
Question 2. Creating a project
2.1 Create Project Name
Create a new project called "homework_03" - How many lines are in the created metadata.yaml file?
Yes, so there are many ways to create a project but since we are yet to launch the servers, let's make this by simply tweaking the PROJECT_NAME variable in the .env file - like so:
PROJECT_NAME=homework_03
2.2 Launch Server
Now - docker compose up
: Starts up all the services defined in the docker-compose.yml
file using Docker Compose. This will build and run the containers necessary for the project.
We should start to receive some logs indicating that the server is up on http://localhost:6789
, visit the page and we would have version v0.9.71:
And two new folders storing these actions:
2.3 Lines in metadata
To get the number of lines in the metadata.yaml file simply run:
wc -l homework_03/metadata.yaml
=> 55 homework_03/metadata.yaml
Question 3. Creating a pipeline
3.1 Create Standard Pipeline
Click on the Pipeline Icon, start a new Standard (Batch) Pipeline and fill out the details.
3.2 Load/Ingest Data
3.1 takes us to this page:
Select Data loader > Python > API, fill out the required details.
We've created our first block! It gets fun from here. We can see some default ingestion code in our block - which we would modify into the this:
import io
import pandas as pd
import requests
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_data_from_api(*args, **kwargs):
"""
Template for loading data from API
"""
taxi_type = "yellow_tripdata"
year = "2023"
month = "03"
url = f'https://d37ci6vzurychx.cloudfront.net/trip-data/{taxi_type}_{year}-{month}.parquet'
response = requests.get(url)
## Data Types
taxi_dtypes = {
'VendorID': 'Int64',
'store_and_fwd_flag': 'str',
'RatecodeID': 'Int64',
'PULocationID': 'Int64',
'DOLocationID': 'Int64',
'passenger_count': 'Int64',
'trip_distance': 'float64',
'fare_amount': 'float64',
'extra': 'float64',
'mta_tax': 'float64',
'tip_amount': 'float64',
'tolls_amount': 'float64',
'ehail_fee': 'float64',
'improvement_surcharge': 'float64',
'total_amount': 'float64',
'payment_type': 'float64',
'trip_type': 'float64',
'congestion_surcharge': 'float64'
}
parse_dates_taxi = ['lpep_pickup_datetime', 'lpep_dropoff_datetime']
df = pd.read_parquet(io.BytesIO(response.content))
## Convert data types
for col, dtype in taxi_dtypes.items():
if col in df.columns:
df[col] = df[col].astype(dtype)
## Parse Dates
for col in parse_dates_taxi:
if col in df.columns:
df[col] = pd.to_datetime(df[col])
row_count = df.shape[0]
print(f'Total Number of rows retrieved: {row_count}')
return df
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
The Python script is designed to load and process data from an API. Here's a breakdown of what it does:
Import necessary libraries: It imports
io
,pandas
,requests
, and some decorators (data_loader
andtest
) frommage_ai.data_preparation.decorators
.-
Define the
load_data_from_api
function: This function is decorated with@data_loader
, which suggests that it's used for loading data. The function does the following:- Constructs a URL to fetch a specific
.parquet
file from a cloud storage. The file name is constructed usingtaxi_type
,year
, andmonth
. - Sends a GET request to the constructed URL and receives the response.
- Defines the data types (
taxi_dtypes
) for each column in the dataset. - Defines the columns (
parse_dates_taxi
) that need to be parsed as dates. - Reads the
.parquet
file from the response content into a pandas DataFrame (df
). - Converts the data types of the columns in the DataFrame as per
taxi_dtypes
. - Parses the date columns in the DataFrame as per
parse_dates_taxi
. - Prints the total number of rows retrieved.
- Returns the DataFrame.
- Constructs a URL to fetch a specific
Define the
test_output
function: This function is decorated with@test
, which is used for testing. The function checks if the output of the block (load_data_from_api
function) is notNone
.
Now, click on the play button and wait for the check mark on your trail.
We added this line print(f'Total Number of rows retrieved: {row_count}')
to print out the rows in the data: => 3403766
Question 4. Data preparation
Here and just like before, we add a _transformer _ block then modify the script like this:
if 'transformer' not in globals():
from mage_ai.data_preparation.decorators import transformer
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@transformer
def transform(data, *args, **kwargs):
"""
Template code for a transformer block.
Add more parameters to this function if this block has multiple parent blocks.
There should be one parameter for each output variable from each parent block.
Args:
data: The output from the upstream parent block
args: The output from any additional upstream blocks (if applicable)
Returns:
Anything (e.g. data frame, dictionary, array, int, str, etc.)
"""
# Specify your transformation logic here
data['duration'] = data.tpep_dropoff_datetime - data.tpep_pickup_datetime
data.duration = data.duration.dt.total_seconds() / 60
data = data[(data.duration >= 1) & (data.duration <= 60)]
categorical = ['PULocationID', 'DOLocationID']
data[categorical] = data[categorical].astype(str)
row_count = data.shape[0]
print(f'Total Number of rows in transformed data: {row_count}')
return data
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
This script is designed to transform data, specifically the data loaded from the previous block. Here's a breakdown of what it does:
Import necessary decorators: It imports
transformer
andtest
decorators frommage_ai.data_preparation.decorators
if they are not already in the global scope.-
Define the
transform
function: This function is decorated with@transformer
, which suggests that it's used for transforming data. The function does the following:- Adds a new column
duration
to the DataFramedata
. This column is calculated as the difference betweentpep_dropoff_datetime
andtpep_pickup_datetime
, converted to minutes. - Filters the DataFrame to only include rows where
duration
is between 1 and 60 minutes. - Converts the data types of the columns
PULocationID
andDOLocationID
to string. - Prints the total number of rows in the transformed data.
- Returns the transformed DataFrame.
- Adds a new column
Define the
test_output
function: This function is decorated with@test
, which suggests that it's used for testing. The function checks if the output of the block (presumably thetransform
function) is notNone
.
The size of the result: 3316216
Question 5. Train a model
Yet again we build another transformer block to train a Regression Model:
## Linear Regression Model
from sklearn.feature_extraction import DictVectorizer
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
if 'transformer' not in globals():
from mage_ai.data_preparation.decorators import transformer
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@transformer
def transform(data, *args, **kwargs):
"""
Template code for a transformer block.
Add more parameters to this function if this block has multiple parent blocks.
There should be one parameter for each output variable from each parent block.
Args:
data: The output from the upstream parent block
args: The output from any additional upstream blocks (if applicable)
Returns:
Anything (e.g. data frame, dictionary, array, int, str, etc.)
"""
# Specify your transformation logic here
dicts_train = data[['PULocationID', 'DOLocationID']].to_dict(orient='records')
vec = DictVectorizer(sparse=True)
feature_matrix = vec.fit_transform(dicts_train)
# Use `transform` not `fit_transform` the validation data according to the feature space learned from the training data
feature_matrix_val = vec.fit_transform(dicts_train)
print(f"Dimension of feature_matrix: {feature_matrix_val.shape} \n")
## Target Var
y = data['duration']
## Fit Linear Regression Model
model = LinearRegression()
model.fit(feature_matrix, y)
## Print Model's intercept
intercept = model.intercept_
print(f'Linear Regression Model Intercept: {intercept}')
return model, vec
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
This Python script is designed to transform data and fit a Linear Regression model:
Import necessary libraries and decorators: It imports
DictVectorizer
fromsklearn.feature_extraction
,LinearRegression
fromsklearn.linear_model
,mean_squared_error
fromsklearn.metrics
, andtransformer
andtest
decorators frommage_ai.data_preparation.decorators
if they are not already in the global scope.-
Define the
transform
function: This function is decorated with@transformer
, which suggests that it's used for transforming data. The function does the following:- Converts the
PULocationID
andDOLocationID
columns of the DataFramedata
to a list of dictionaries (dicts_train
). - Initializes a
DictVectorizer
(vec
) and fits and transformsdicts_train
into a feature matrix. - Fits and transforms
dicts_train
again into a validation feature matrix (feature_matrix_val
). Note: This seems to be a mistake. It should be transforming validation data, not the training data again. - Sets
y
as theduration
column ofdata
. - Fits a
LinearRegression
model (model
) on the feature matrix andy
. - Prints the intercept of the model.
- Returns the model and the vectorizer.
- Converts the
Linear Regression Model Intercept: 24.774803905297286
Question 6. Register the model
Here, we would have to make some configurations to the docker-compose.yaml file to enable us connect to the MLflow servers - follow the next steps:
6.1 Stop server
Ctrl + C or docker compose down
:
Container mage-quickstart-magic-1 Stopping
6.2 Create Dockerfile for MLflow
mlflow.dockerfile:
FROM python:3.10-slim
RUN pip install mlflow==2.12.1
EXPOSE 5000
CMD [ \
"mlflow", "server", \
"--backend-store-uri", "sqlite:///home/mlflow/mlflow.db", \
"--default-artifact-root, ./artifacts", \
"--host", "0.0.0.0", \
"--port", "5000" \
]
6.2.2 Modify docker-compose yaml
As seen on the homework open the docker-compose.yaml file and make the changes, the file should now look like this:
version: '3'
services:
magic:
image: mageai/mageai:latest
command: mage start ${PROJECT_NAME}
env_file:
- .env
build:
context: .
dockerfile: Dockerfile
environment:
USER_CODE_PATH: /home/src/${PROJECT_NAME}
ENV: ${ENV}
ports:
- 6789:6789
volumes:
- .:/home/src/
restart: on-failure:5
mlflow:
build:
context: .
dockerfile: mlflow.dockerfile
ports:
- "5000:5000"
volumes:
- "${PWD}/mlflow:/home/mlflow/"
networks:
- app-network
networks:
app-network:
driver: bridge
6.2.3 Run docker compose up
This will start up the server and set-up your new requirements - it might take a while to complete, check that both MLflow and Mage are running.
NB. For the Author mlflow worked only on http:/<IP/Proxy>:5000
and not on http://mlflow:5000
.
6.3 Data Explorer
Create a Generic Data Explorer and we pull the generated model and vectorizer from the previous Transformer and like the assignment requires push the results to mlflow.
Paste the following in your exporter block:
import os
import pickle
import click
import mlflow
from mlflow.entities import ViewType
from mlflow.tracking import MlflowClient
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
HPO_EXPERIMENT_NAME = "Linear-Regression"
EXPERIMENT_NAME = "sklearn-track-models"
mlflow.set_tracking_uri("http://127.0.0.1:5000")
mlflow.set_experiment(EXPERIMENT_NAME)
mlflow.sklearn.autolog()
if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter
@data_exporter
def export_data(data, *args, **kwargs):
"""
Exports data to some source.
Args:
data: The output from the upstream parent block
args: The output from any additional upstream blocks (if applicable)
Output (optional):
Optionally return any object and it'll be logged and
displayed when inspecting the block run.
"""
# Start an MLflow run
with mlflow.start_run():
# Log the model
mlflow.sklearn.log_model(data['model'], "model")
# Save the DictVectorizer as an artifact
vec = data['vec']
artifact_path = "dict_vectorizer"
vec_path = os.path.join(artifact_path, "vec.pkl")
joblib.dump(vec, vec_path)
mlflow.log_artifact(vec_path, artifact_path)
That's it!
Visit wk3_submission to review the codes and Cheers!
Comment below if there are any issues.
Top comments (0)