DEV Community

Vivek0712
Vivek0712

Posted on

Analyse Customer Sentiment using Azure Data Factory pipeline

This guide demonstrates how to:

  1. Copy Data from Azure Blob Storage to a sink.
  2. Transform (preprocess) the data via an Azure Data Factory Data Flow.
  3. Invoke an Azure ML pipeline to run an inference script (using the AzureMLExecutePipeline activity).

Prerequisites

  1. Azure Subscription
    • You need an active Azure subscription.
  2. Azure Data Factory (V2) instance
  3. Azure Blob Storage (with data to be processed)
  4. Azure Machine Learning Workspace
    • A published ML pipeline that can be triggered
  5. Sufficient Permissions
    • Admin or Contributor on the above resources

High-Level Flow

  1. Linked Service in ADF to Azure Blob Storage
  2. Datasets (input and output)
  3. Copy Data Activity (Blob -> Blob or other sink)
  4. Mapping Data Flow (for data preprocessing)
  5. AzureMLExecutePipeline (to run inference in Azure ML)
  6. Inference Script in Azure ML pipeline (e.g., scoring with a trained model)

1. Create a Linked Service (Blob Storage)

In Azure Data Factory, navigate to Manage > Linked services and create a new linked service for your Blob Storage. Below is an example JSON (if you prefer the code view). Store your connection string or SAS token securely (e.g., in Key Vault).

{
  "name": "LS_AzureBlobStorage",
  "properties": {
    "type": "AzureBlobStorage",
    "typeProperties": {
      "connectionString": "@pipeline().parameters.blobConnectionString"
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Note:

  • In production, reference your connectionString from Azure Key Vault rather than hardcoding credentials in ADF.

2. Create Datasets

2.1 Dataset for Input (Blob)

{
  "name": "DS_BlobInput",
  "properties": {
    "linkedServiceName": {
      "referenceName": "LS_AzureBlobStorage",
      "type": "LinkedServiceReference"
    },
    "type": "Blob",
    "typeProperties": {
      "folderPath": "input-data",
      "fileName": "mydata.csv"
    },
    "schema": []
  }
}
Enter fullscreen mode Exit fullscreen mode

If you need to handle dynamic folder paths or filenames, parameterize folderPath and fileName.

2.2 Dataset for Output (Blob)

{
  "name": "DS_BlobOutput",
  "properties": {
    "linkedServiceName": {
      "referenceName": "LS_AzureBlobStorage",
      "type": "LinkedServiceReference"
    },
    "type": "Blob",
    "typeProperties": {
      "folderPath": "output-data",
      "fileName": "mytransformeddata.csv"
    },
    "schema": []
  }
}
Enter fullscreen mode Exit fullscreen mode

3. Create the ADF Pipeline

We will define a pipeline that contains three activities:

  1. Copy Data from Blob to Blob (or another sink)
  2. Data Flow to transform the data
  3. Azure ML Execute Pipeline to perform inference

Below is an example pipeline JSON named PL_CopyTransformAML.json:

{
  "name": "PL_CopyTransformAML",
  "properties": {
    "activities": [
      {
        "name": "CopyFromBlob",
        "type": "Copy",
        "dependsOn": [],
        "policy": {
          "timeout": "7.00:00:00",
          "retry": 0,
          "retryIntervalInSeconds": 30,
          "secureOutput": false,
          "secureInput": false
        },
        "userProperties": [],
        "typeProperties": {
          "source": {
            "type": "BlobSource"
          },
          "sink": {
            "type": "BlobSink"
          },
          "enableStaging": false,
          "dataIntegrationUnits": 4
        },
        "inputs": [
          {
            "referenceName": "DS_BlobInput",
            "type": "DatasetReference"
          }
        ],
        "outputs": [
          {
            "referenceName": "DS_BlobOutput",
            "type": "DatasetReference"
          }
        ]
      },
      {
        "name": "TransformDataFlow",
        "type": "DataFlow",
        "dependsOn": [
          {
            "activity": "CopyFromBlob",
            "dependencyConditions": [
              "Succeeded"
            ]
          }
        ],
        "policy": {
          "timeout": "7.00:00:00",
          "retry": 0,
          "retryIntervalInSeconds": 30,
          "secureOutput": false,
          "secureInput": false
        },
        "typeProperties": {
          "dataflow": {
            "referenceName": "DF_PreprocessingFlow",
            "type": "DataFlowReference"
          },
          "staging": {
            "linkedServiceName": {
              "referenceName": "LS_AzureBlobStorage",
              "type": "LinkedServiceReference"
            },
            "folderPath": "temp/staging"
          }
        }
      },
      {
        "name": "RunAzureMLPipeline",
        "type": "AzureMLExecutePipeline",
        "dependsOn": [
          {
            "activity": "TransformDataFlow",
            "dependencyConditions": [
              "Succeeded"
            ]
          }
        ],
        "policy": {
          "timeout": "7.00:00:00",
          "retry": 0,
          "retryIntervalInSeconds": 30,
          "secureOutput": false,
          "secureInput": false
        },
        "typeProperties": {
          "mlPipelineId": "@pipeline().parameters.mlPipelineId",
          "mlWorkspaceLinkedServiceName": {
            "referenceName": "LS_AzureMLWorkspace",
            "type": "LinkedServiceReference"
          },
          "experimentName": "InferenceExperiment",
          "pipelineParameters": {
            "input_data_path": "@concat('https://<yourstorageaccount>.blob.core.windows.net/output-data/mytransformeddata.csv')",
            "output_data_path": "@concat('https://<yourstorageaccount>.blob.core.windows.net/output-data/predictions.csv')"
          }
        }
      }
    ],
    "parameters": {
      "mlPipelineId": {
        "type": "String",
        "defaultValue": "/subscriptions/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/resourceGroups/MyRG/providers/Microsoft.MachineLearningServices/workspaces/MyAMLWorkspace/Pipelines/my-ml-pipeline-id"
      },
      "blobConnectionString": {
        "type": "SecureString"
      }
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Key Points

  • CopyFromBlob copies data from the DS_BlobInput dataset to DS_BlobOutput.
  • TransformDataFlow references a data flow named DF_PreprocessingFlow.
  • RunAzureMLPipeline uses the AzureMLExecutePipeline activity to trigger a published pipeline in Azure ML.

4. Define the Data Flow (Preprocessing)

Below is an example Mapping Data Flow definition (DF_PreprocessingFlow.json) that filters out rows with null values in certain columns. It reads from DS_BlobOutput (as input) and writes back to DS_BlobOutput (or a different dataset).

{
  "name": "DF_PreprocessingFlow",
  "properties": {
    "type": "MappingDataFlow",
    "typeProperties": {
      "sources": [
        {
          "name": "source1",
          "dataset": {
            "referenceName": "DS_BlobOutput",
            "type": "DatasetReference"
          },
          "streamName": "inputStream"
        }
      ],
      "sinks": [
        {
          "name": "sink1",
          "dataset": {
            "referenceName": "DS_BlobOutput",
            "type": "DatasetReference"
          },
          "streamName": "outputStream"
        }
      ],
      "transformations": [
        {
          "name": "FilterNulls",
          "type": "Filter",
          "streamName": "inputStream",
          "conditions": {
            "condition": "isNull(columnA) == false && isNull(columnB) == false"
          }
        }
      ]
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

In a real-world scenario, your transformations could be more complex (e.g., derived columns, joins, aggregations, etc.).


5. Azure ML Linked Service in ADF

Create a Linked Service to your Azure Machine Learning workspace:

{
  "name": "LS_AzureMLWorkspace",
  "properties": {
    "type": "AzureMLService",
    "typeProperties": {
      "subscriptionId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
      "resourceGroupName": "MyRG",
      "mlWorkspaceName": "MyAMLWorkspace",
      "servicePrincipalId": "@pipeline().parameters.servicePrincipalId",
      "servicePrincipalKey": "@pipeline().parameters.servicePrincipalKey",
      "tenantId": "@pipeline().parameters.tenantId"
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Authentication: Example uses Service Principal credentials. You can leverage Managed Identity if configured.


6. Azure ML Pipeline & Inference Script

6.1 Creating/Publishing an Azure ML Pipeline

Use the Azure ML Python SDK (or CLI v2) in a notebook or script to define, validate, and publish the pipeline:

# azure_ml_pipeline.py
from azureml.core import Workspace, Environment, Datastore
from azureml.core.runconfig import RunConfiguration
from azureml.pipeline.core import Pipeline
from azureml.pipeline.steps import PythonScriptStep
from azureml.pipeline.core import PipelineParameter

# Connect to AML Workspace
ws = Workspace.from_config()

# Create environment
env = Environment(name="inference-env")
env.python.conda_dependencies.add_pip_package("pandas")
env.python.conda_dependencies.add_pip_package("scikit-learn")

run_config = RunConfiguration()
run_config.environment = env

# Pipeline parameters
input_data_path_param = PipelineParameter(name="input_data_path", default_value="input_path_placeholder")
output_data_path_param = PipelineParameter(name="output_data_path", default_value="output_path_placeholder")

# Create a PythonScriptStep
inference_step = PythonScriptStep(
    name="InferenceStep",
    script_name="score.py",
    arguments=[
        "--input_data", input_data_path_param,
        "--output_data", output_data_path_param
    ],
    compute_target="cpu-cluster",  # AML compute name
    source_directory="./scripts",  # folder containing score.py
    runconfig=run_config
)

# Build pipeline
pipeline = Pipeline(workspace=ws, steps=[inference_step])
pipeline.validate()

# Publish pipeline
published_pipeline = pipeline.publish(name="InferencePipeline")
print("Published pipeline ID: ", published_pipeline.id)
Enter fullscreen mode Exit fullscreen mode

Once published, you’ll receive a Pipeline ID (used in the ADF mlPipelineId parameter).

6.2 Example Inference Script (score.py)

A simple inference script using a scikit-learn model (model.pkl):

# score.py
import argparse
import pandas as pd
import joblib

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--input_data", type=str, required=True)
    parser.add_argument("--output_data", type=str, required=True)
    args = parser.parse_args()

    # Read data (assuming CSV from a publicly accessible URL or local path)
    df = pd.read_csv(args.input_data)

    # Load model
    model = joblib.load("model.pkl")

    # Predict
    predictions = model.predict(df)

    # Convert predictions to DataFrame
    pred_df = pd.DataFrame(predictions, columns=["prediction"])

    # Write predictions to CSV
    pred_df.to_csv(args.output_data, index=False)
    print("Inference complete. Predictions saved.")

if __name__ == "__main__":
    main()
Enter fullscreen mode Exit fullscreen mode

7. Putting It All Together

  1. Upload/Deploy your pipeline JSON, data flow JSON, dataset JSON, and linked service JSON to your ADF.
  2. Publish your ADF changes.
  3. Trigger the pipeline (PL_CopyTransformAML).

Pipeline Flow Recap

  1. CopyFromBlob pulls data from DS_BlobInput to DS_BlobOutput.
  2. TransformDataFlow applies filtering (or other transformations) via DF_PreprocessingFlow.
  3. RunAzureMLPipeline invokes the published Azure ML pipeline, passing the processed data path as input for inference.

7.1 Run & Monitor

  • In the ADF Author pane, select your pipeline.
  • Debug or Trigger Now.
  • Track the status in the Monitor pane.

Conclusion

With these steps, you've built a fully orchestrated solution:

  1. Data ingestion from Blob Storage
  2. Data preprocessing in ADF Data Flows
  3. Inference using an Azure ML pipeline and script

You can extend or modify these steps for more complex transformations, multiple models, or advanced parameterization. Always remember to keep secrets secure (via Key Vault) and optimize your compute usage in both ADF and AML.

Happy Building!

Top comments (0)