DEV Community

Cover image for Wk 3 Orchestration: MLOPs with DataTalks
Akan
Akan

Posted on

Wk 3 Orchestration: MLOPs with DataTalks

We've been introduced to the subject of Machine Learning in Operations, familarized with Experiment Tracking with MLflow in the past week, now we work with orchestration tools, where we can manage all our processes all the way to deployment of the models - think CI/CD.. Data Engineering

Here, we use the free Mage Ai orchestration tool.

Mage SetUp

This is going to a bit technical, but please follow-through.
There are a few ways to set-up Mage, however, the safest is by using the containerized method - through Docker. Remember to create a separate folder for this week's assignment, just to be organized.

0.1 First launch the Docker Engine then from your terminal, or preferably git bash run:
docker pull mageai:mageai:latest
We would start to see some logs, wait for this to complete - then you can

0.2 Clone quickstart files

git clone https://github.com/mage-ai/compose-quickstart.git mage-quickstart
cd mage-quickstart
cp dev.env .env
rm dev.env
Enter fullscreen mode Exit fullscreen mode

Update your requirements.txt file with these packages:

boto3
fastparquet
graphviz
hyperopt
jupyter
mlflow==2.12.1
pandas
scikit-learn
seaborn
shap
xgboost
Enter fullscreen mode Exit fullscreen mode

This sequence of commands is used to set up a local development environment for a new Mage project:

  • git clone https://github.com/mage-ai/compose-quickstart.git: This command clones the repository from the provided URL into mage-quickstart directory on your local machine.
  • cd mage-quickstart navigates into the directory.
  • cp dev.env .env: Copies the dev.env file to a new file named .env, which is used for environment configuration.
  • rm dev.env: Removes the dev.env file, which is no longer needed after copying its contents to .env.

Now, we are ready to face the questions

Question 1. Run Mage
1.1 Run Mage with Docker Compose.
To do this all we need to do is run docker compose up - but not yet.
1.2 What's the version of Mage we run?
We will get this from the page's ui, still, hold on.

Question 2. Creating a project
2.1 Create Project Name
Create a new project called "homework_03" - How many lines are in the created metadata.yaml file?

Yes, so there are many ways to create a project but since we are yet to launch the servers, let's make this by simply tweaking the PROJECT_NAME variable in the .env file - like so:
PROJECT_NAME=homework_03
2.2 Launch Server
Now - docker compose up: Starts up all the services defined in the docker-compose.yml file using Docker Compose. This will build and run the containers necessary for the project.

We should start to receive some logs indicating that the server is up on http://localhost:6789, visit the page and we would have version v0.9.71:

Mage Server

And two new folders storing these actions:

Project Folders
2.3 Lines in metadata
To get the number of lines in the metadata.yaml file simply run:
wc -l homework_03/metadata.yaml => 55 homework_03/metadata.yaml

Question 3. Creating a pipeline
3.1 Create Standard Pipeline
Click on the Pipeline Icon, start a new Standard (Batch) Pipeline and fill out the details.

Standard Pipeline

3.2 Load/Ingest Data
3.1 takes us to this page:
Select Data loader > Python > API, fill out the required details.
Image description

Data Loader Block

We've created our first block! It gets fun from here. We can see some default ingestion code in our block - which we would modify into the this:

import io
import pandas as pd
import requests
if 'data_loader' not in globals():
    from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test


@data_loader
def load_data_from_api(*args, **kwargs):
    """
    Template for loading data from API
    """
    taxi_type = "yellow_tripdata"
    year = "2023"
    month = "03"

    url = f'https://d37ci6vzurychx.cloudfront.net/trip-data/{taxi_type}_{year}-{month}.parquet'
    response = requests.get(url)

    ## Data Types
    taxi_dtypes = {
        'VendorID': 'Int64',
        'store_and_fwd_flag': 'str',
        'RatecodeID': 'Int64',
        'PULocationID': 'Int64',
        'DOLocationID': 'Int64',
        'passenger_count': 'Int64',
        'trip_distance': 'float64',
        'fare_amount': 'float64',
        'extra': 'float64',
        'mta_tax': 'float64',
        'tip_amount': 'float64',
        'tolls_amount': 'float64',
        'ehail_fee': 'float64',
        'improvement_surcharge': 'float64',
        'total_amount': 'float64',
        'payment_type': 'float64',
        'trip_type': 'float64',
        'congestion_surcharge': 'float64'
    }

    parse_dates_taxi = ['lpep_pickup_datetime', 'lpep_dropoff_datetime']

    df = pd.read_parquet(io.BytesIO(response.content))

    ## Convert data types
    for col, dtype in taxi_dtypes.items():
        if col in df.columns:
            df[col] = df[col].astype(dtype)

    ## Parse Dates
    for col in parse_dates_taxi:
        if col in df.columns:
            df[col] = pd.to_datetime(df[col])

    row_count = df.shape[0]
    print(f'Total Number of rows retrieved: {row_count}')

    return df


@test
def test_output(output, *args) -> None:
    """
    Template code for testing the output of the block.
    """
    assert output is not None, 'The output is undefined'
Enter fullscreen mode Exit fullscreen mode

The Python script is designed to load and process data from an API. Here's a breakdown of what it does:

  1. Import necessary libraries: It imports io, pandas, requests, and some decorators (data_loader and test) from mage_ai.data_preparation.decorators.

  2. Define the load_data_from_api function: This function is decorated with @data_loader, which suggests that it's used for loading data. The function does the following:

    • Constructs a URL to fetch a specific .parquet file from a cloud storage. The file name is constructed using taxi_type, year, and month.
    • Sends a GET request to the constructed URL and receives the response.
    • Defines the data types (taxi_dtypes) for each column in the dataset.
    • Defines the columns (parse_dates_taxi) that need to be parsed as dates.
    • Reads the .parquet file from the response content into a pandas DataFrame (df).
    • Converts the data types of the columns in the DataFrame as per taxi_dtypes.
    • Parses the date columns in the DataFrame as per parse_dates_taxi.
    • Prints the total number of rows retrieved.
    • Returns the DataFrame.
  3. Define the test_output function: This function is decorated with @test, which is used for testing. The function checks if the output of the block (load_data_from_api function) is not None.

Now, click on the play button and wait for the check mark on your trail.

Ingestion Completed

We added this line print(f'Total Number of rows retrieved: {row_count}') to print out the rows in the data: => 3403766

Question 4. Data preparation
Here and just like before, we add a _transformer _ block then modify the script like this:

if 'transformer' not in globals():
    from mage_ai.data_preparation.decorators import transformer
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test


@transformer
def transform(data, *args, **kwargs):
    """
    Template code for a transformer block.

    Add more parameters to this function if this block has multiple parent blocks.
    There should be one parameter for each output variable from each parent block.

    Args:
        data: The output from the upstream parent block
        args: The output from any additional upstream blocks (if applicable)

    Returns:
        Anything (e.g. data frame, dictionary, array, int, str, etc.)
    """
    # Specify your transformation logic here

    data['duration'] = data.tpep_dropoff_datetime - data.tpep_pickup_datetime
    data.duration = data.duration.dt.total_seconds() / 60

    data = data[(data.duration >= 1) & (data.duration <= 60)]

    categorical = ['PULocationID', 'DOLocationID']
    data[categorical] = data[categorical].astype(str)

    row_count = data.shape[0]
    print(f'Total Number of rows in transformed data: {row_count}')

    return data


@test
def test_output(output, *args) -> None:
    """
    Template code for testing the output of the block.
    """
    assert output is not None, 'The output is undefined'
Enter fullscreen mode Exit fullscreen mode

This script is designed to transform data, specifically the data loaded from the previous block. Here's a breakdown of what it does:

  1. Import necessary decorators: It imports transformer and test decorators from mage_ai.data_preparation.decorators if they are not already in the global scope.

  2. Define the transform function: This function is decorated with @transformer, which suggests that it's used for transforming data. The function does the following:

    • Adds a new column duration to the DataFrame data. This column is calculated as the difference between tpep_dropoff_datetime and tpep_pickup_datetime, converted to minutes.
    • Filters the DataFrame to only include rows where duration is between 1 and 60 minutes.
    • Converts the data types of the columns PULocationID and DOLocationID to string.
    • Prints the total number of rows in the transformed data.
    • Returns the transformed DataFrame.
  3. Define the test_output function: This function is decorated with @test, which suggests that it's used for testing. The function checks if the output of the block (presumably the transform function) is not None.

The size of the result: 3316216

Question 5. Train a model
Yet again we build another transformer block to train a Regression Model:

## Linear Regression Model
from sklearn.feature_extraction import DictVectorizer
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error

if 'transformer' not in globals():
    from mage_ai.data_preparation.decorators import transformer
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test


@transformer
def transform(data, *args, **kwargs):
    """
    Template code for a transformer block.

    Add more parameters to this function if this block has multiple parent blocks.
    There should be one parameter for each output variable from each parent block.

    Args:
        data: The output from the upstream parent block
        args: The output from any additional upstream blocks (if applicable)

    Returns:
        Anything (e.g. data frame, dictionary, array, int, str, etc.)
    """
    # Specify your transformation logic here

    dicts_train = data[['PULocationID', 'DOLocationID']].to_dict(orient='records')
    vec = DictVectorizer(sparse=True)
    feature_matrix = vec.fit_transform(dicts_train)

    # Use `transform` not `fit_transform` the validation data according to the feature space learned from the training data
    feature_matrix_val = vec.fit_transform(dicts_train)
    print(f"Dimension of feature_matrix: {feature_matrix_val.shape} \n")

    ## Target Var
    y = data['duration']

    ## Fit Linear Regression Model
    model = LinearRegression()
    model.fit(feature_matrix, y)

    ## Print Model's intercept
    intercept = model.intercept_
    print(f'Linear Regression Model Intercept: {intercept}')

    return model, vec


@test
def test_output(output, *args) -> None:
    """
    Template code for testing the output of the block.
    """
    assert output is not None, 'The output is undefined'
Enter fullscreen mode Exit fullscreen mode

This Python script is designed to transform data and fit a Linear Regression model:

  1. Import necessary libraries and decorators: It imports DictVectorizer from sklearn.feature_extraction, LinearRegression from sklearn.linear_model, mean_squared_error from sklearn.metrics, and transformer and test decorators from mage_ai.data_preparation.decorators if they are not already in the global scope.

  2. Define the transform function: This function is decorated with @transformer, which suggests that it's used for transforming data. The function does the following:

    • Converts the PULocationID and DOLocationID columns of the DataFrame data to a list of dictionaries (dicts_train).
    • Initializes a DictVectorizer (vec) and fits and transforms dicts_train into a feature matrix.
    • Fits and transforms dicts_train again into a validation feature matrix (feature_matrix_val). Note: This seems to be a mistake. It should be transforming validation data, not the training data again.
    • Sets y as the duration column of data.
    • Fits a LinearRegression model (model) on the feature matrix and y.
    • Prints the intercept of the model.
    • Returns the model and the vectorizer.

Linear Regression Model Intercept: 24.774803905297286

Question 6. Register the model
Here, we would have to make some configurations to the docker-compose.yaml file to enable us connect to the MLflow servers - follow the next steps:

6.1 Stop server
Ctrl + C or docker compose down:

Container mage-quickstart-magic-1 Stopping

6.2 Create Dockerfile for MLflow
mlflow.dockerfile:

FROM python:3.10-slim

RUN pip install mlflow==2.12.1

EXPOSE 5000

CMD [ \
    "mlflow", "server", \
    "--backend-store-uri", "sqlite:///home/mlflow/mlflow.db", \
    "--default-artifact-root, ./artifacts", \
    "--host", "0.0.0.0", \
    "--port", "5000" \
]
Enter fullscreen mode Exit fullscreen mode

6.2.2 Modify docker-compose yaml
As seen on the homework open the docker-compose.yaml file and make the changes, the file should now look like this:

version: '3'
services:
  magic:
    image: mageai/mageai:latest
    command: mage start ${PROJECT_NAME}
    env_file:
      - .env
    build:
      context: .
      dockerfile: Dockerfile
    environment:
      USER_CODE_PATH: /home/src/${PROJECT_NAME}
      ENV: ${ENV}
    ports:
      - 6789:6789
    volumes:
      - .:/home/src/
    restart: on-failure:5
  mlflow:
    build:
      context: .
      dockerfile: mlflow.dockerfile
    ports:
      - "5000:5000"
    volumes:
      - "${PWD}/mlflow:/home/mlflow/"
    networks:
      - app-network
networks:
  app-network:
    driver: bridge
Enter fullscreen mode Exit fullscreen mode

6.2.3 Run docker compose up
This will start up the server and set-up your new requirements - it might take a while to complete, check that both MLflow and Mage are running.

NB. For the Author mlflow worked only on http:/<IP/Proxy>:5000 and not on http://mlflow:5000.

6.3 Data Explorer
Create a Generic Data Explorer and we pull the generated model and vectorizer from the previous Transformer and like the assignment requires push the results to mlflow.

Paste the following in your exporter block:

import os
import pickle
import click
import mlflow

from mlflow.entities import ViewType
from mlflow.tracking import MlflowClient
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error

HPO_EXPERIMENT_NAME = "Linear-Regression"
EXPERIMENT_NAME = "sklearn-track-models"

mlflow.set_tracking_uri("http://127.0.0.1:5000")
mlflow.set_experiment(EXPERIMENT_NAME)
mlflow.sklearn.autolog()

if 'data_exporter' not in globals():
    from mage_ai.data_preparation.decorators import data_exporter


@data_exporter
def export_data(data, *args, **kwargs):
    """
    Exports data to some source.

    Args:
        data: The output from the upstream parent block
        args: The output from any additional upstream blocks (if applicable)

    Output (optional):
        Optionally return any object and it'll be logged and
        displayed when inspecting the block run.
    """
    # Start an MLflow run
    with mlflow.start_run():

        # Log the model
        mlflow.sklearn.log_model(data['model'], "model")

        # Save the DictVectorizer as an artifact
        vec = data['vec']
        artifact_path = "dict_vectorizer"
        vec_path = os.path.join(artifact_path, "vec.pkl")
        joblib.dump(vec, vec_path)
        mlflow.log_artifact(vec_path, artifact_path)
Enter fullscreen mode Exit fullscreen mode

Now you are on track:
Exporting to MLflow

That's it!
Visit wk3_submission to review the codes and Cheers!
Comment below if there are any issues.

Top comments (0)