DEV Community

camilo cabrales
camilo cabrales

Posted on

Limpiar y preparar grandes cantidades de datos de manera fácil con Glue DataBrew

Usualmente cuando queremos realizar un proceso de ETL (Extraer, Transformar, Cargar) utilizamos algunas herramientas o lenguajes de programación nos ayudan en este proceso. El desarrollo de ETL requiere de ciertos conocimientos técnicos, que nos tomaran un tiempo en adquirir. Para simplificar este proceso AWS tiene un servicio llamado Glue DataBrew que nos va a simplificar y agilizar la tarea de realizar las ETL.

En este post vamos a crear la siguiente arquitectura:

Arquitectura

EL objetivo de esta arquitectura es: Cuando se cargue un archivo al Bucket este llame a una función Lambda que a su vez va a ejecutar una Step Fucntion que va realizar el procesamiento que hallamos definido para nuestro data set y enviara un mensaje de texto si se ejecuto correctamente o si hubo un error durante el procesamiento de la información.

Iniciemos creando el Bucket que va almacenar nuestro conjunto de datos y el resultado del proceso de ETL. Para esto buscamos el servicio de S3 y damos click en el botón Create Bucket, una vez en la pagina de creación del Bucket le damos un nombre (el nombre debe ser úrico en todo AWS), dejamos las opciones por defecto y damos click en el botón en Create Bucket que se encuentra en la parte inferior de la pantalla.

Crear Bucket

Vamos a crear dos carpetas en nuestro Bucket una para los archivos que vamos a procesar y otra de resultados. Damos click en el botón Create Folder.

Create Folder

La carpeta que contiene el archivo a procesar le damos el nombre de procesardata y la que almacena los resultados le damos el nombre de resultados.
Creamos la carpeta procesardata y realizamos el mismo procedimiento para la carpeta de resultados.

Create Procesardata

Creadas las carpetas deberíamos verlas en nuestro Bucket.

Folders

Ahora necesitamos subir a la carpeta de procesardata el archivo con el cual vamos a trabajar damos click en la carpeta y damos click en el botón Upload.

Folders

Seleccionamos el archivo y damos click en el botón Upload.

Ya que tenemos el archivo con el cual vamos a trabajar, buscamos el servicio Glue DataBrew una vez en la pagina principal damos click en DataSets.

Page DataSet

En la siguiente pantalla seleccionamos el Bucket, el folder procesardata y el archivo precios.csv.

DataSet

Para terminar con la creación del DataSet seleccionamos el tipo de archivo el cual estamos utilizando (.CSV separado por comas) e indicamos que la primera fila del archivo es la cabecera y damos click en el botón Create dataset.

Type File

Creado nuestro DataSet el siguiente paso es crear un proyecto en el cual vamos a realizar las modificaciones de nuestros datos. para esto damos click en Projects y luego en el botón Create project.

Page Project

Para crear el proyecto necesitamos la siguiente información:

  • Nombre del proyecto

  • Receta o nombre de la receta si vamos a crear una nueva. La receta son las modificaciones que vamos a realizar al conjunto de datos.

  • Debemos seleccionar el Dataset con el cual vamos a trabajar.

  • Debemos utilizar un rol existente o crear uno nuevo definiendo un prefijo para este.

Create Project

Creado nuestro proyectos veremos una pagina como la siguiente.

  1. En la parte superior vamos a ver las opciones de uso frecuente de la receta que podemos aplicar a nuestros datos.
  2. Abajo de la cabecera de nuestros datos vamos a poder observar las estadísticas de cada columna, lo que nos ayuda a tener una idea del comportamiento de los datos que están almacenados dentro de cada una de ellas.
  3. En la parte derecha de la pantalla tenemos la sección de la receta, donde vamos a poder agregar diferentes tipos de modificaciones a nuestro conjunto de datos.

Page Project

Ya que esta listo nuestro proyecto para trabajar vamos a crear nuestra receta, para esto damos click en el botón Add step que se encuentra en la sección de la receta.

Add step

Una vez damos click en el botón Add step se despliegan las diferentes opciones que podemos utilizar para modificar los datos. Vamos a seleccionar la opción Change to capital case de la sección de FORMAT.

Capital Case 1

Como la columna PRODUCTO tiene los nombres en mayusculas los vamos a cambiar para que cada palabra empieza por una letra en mayúscula.

Capital Case 2

Al dar click en el botón Apply veremos los resultados.

Capital Case 3

Vemos que las columnas de los días tienen los precios de cada producto, sin embargo si quisiéramos hacer alguna operación matemática sobre ellos no será posible debido a que tienen el signo $. Veamos como podemos eliminar el signo $ de las columnas.

Vamos al menú superior seleccionamos Clean y en la sección Remove seleccionamos Special characters.

Remove Special Characters

Al realizar la selección en sección de Recipies se nos despliegan las opciones para realizar la limpieza de la columna. Seleccionamos la columna LUNES,Special characters y Custom value para remover el signo - que aparece delante de los ceros.

Remove Special Characters 2

Al dar click en el botón Apply veremos que de la columna LUNES se quitaron los caracteres: - $ y se agrego un nuevo paso a nuestra receta. Este proceso lo puedes realizar para el resto de columnas de los días de la semana.

Remove Special Characters 3

Al finalizar de remover los caracteres especiales de las columnas de los días de la semana, nuestra receta se vera de la siguiente forma.

Remove Special Characters 4

Ahora que nuestras columnas no tienen caracteres especiales vamos a cambiar el tipo de dato para poder realizar algunas operaciones. En la parte superior de cada columna podemos ver el tipo de datos de cada columna, en las columnas de los días de la semana identificamos que son de tipo string y necesitamos cambiarlas a un valor numérico en este caso double.

Change Type 1

Al dar click en Change type se nos presentan las opciones donde debemos seleccionar la columna y el tipo de dato al cual queremos cambiar.

Change Type 2

Damos click en el botón Apply dando como resultado el cambio de tipo de columna y un nuevo paso se agrego a nuestra receta.

Change Type 3

Debemos hacer este mismo procedimiento a las otras columnas, para que nuestra receta quede de la siguiente forma.

Change Type 4

Para terminar con nuestra receta vamos a crear un nueva columna llamada Total que va almacenar la suma de los días LUNES Y VIERNES.
Hacemos click en botón FUNCTIONS - Math functions - ADD.

Sum1

En la sección de la receta veremos las opciones donde seleccionamos el tipo de función (ADD), las columnas que vamos a sumar (LUNES Y VIERNES) y por ultimo damos el nombre de la columna que va almacenar el resultado (Total).

Sum2

Al dar click en el botón Apply vemos que se creo la columna total y contiene la suma de las columnas LUNES Y VIERNES.

Sum3

Con esto finalizamos la modificación de nuestro DataSet, ahora necesitamos definir como se van a ejecutar estas modificaciones por lo tanto necesitamos crear un Job.
Damos click en el botón Create job.

Select Job

En la pantalla de creación del Job le asignamos un nombre al Job, seleccionamos el Bucket y la carpeta en la cual queremos que se almacenen los resultados (Bucket y carpeta que creamos al inicio) y le asignamos un rol que puede ser el que creamos al momento de configurar el proyecto o uno nuevo.

Create Job

Al crearse el Job vamos a ver el listado con los Jobs que hayamos creado, seleccionamos el Job que acabamos de crear y damos click en el botón Run Job.

Run Job

Para ver los resultados podemos ir a la carpeta resultados del Bucket que creamos y ver el archivo que genero el Job.

Results Job

Hemos visto como crear, modificar un DataSet y obtener los resultados por medio de las recetas y los Jobs.
Ahora necesitamos implementar los siguientes servicios para completar nuestra arquitectura:

  • Un Topic y una Suscripción de SNS para enviar los mensajes de notificación.

  • Una Step Function que va a realizar todo el flujo para procesar el Dataset, modificando el Dataset para que tome el archivo que se suba a S3, ejecutar el Job y enviar una mensaje de texto para notificar que el proceso se realizo correctamente.

  • Una función Lambda que se va encargar de llamar a la Step Function cuando se suba un archivo al Bucket de S3.

Esta implementación la vamos a realizar con la siguiente plantilla de Terraform.

provider "aws" {
  region = "us-east-1"
}


resource "random_id" "random" {
  byte_length = 2
}

variable "job_name"{
    type= string
    default= "PreciosJob"
}

variable "bucket_name" {
    type = string
    default = "datosgluedatabrew"
}

variable "telefono"{
    type = string
    default = "+573005465765"
}

resource "aws_lambda_function" "lambda_function_call_step_function" {
  filename      = "lambda_function.py.zip"
  function_name = "LambdaStepFunction-${random_id.random.hex}"
  role          = aws_iam_role.lambda_execution_role.arn
  handler       = "lambda_function.lambda_handler"
  runtime       = "python3.8"
  memory_size   = 128
  timeout       = 5
  environment {
    variables = {
      STEPFUNCTIONARN = aws_sfn_state_machine.function.arn
    }
  }
}

resource "aws_iam_role" "lambda_execution_role" {
  name = "lambda_execution_role-${random_id.random.hex}"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = "sts:AssumeRole"
        Effect = "Allow"
        Principal = {
          Service = "lambda.amazonaws.com"
        }
      }
    ]
  })
}

resource "aws_iam_role_policy_attachment" "lambda_execution_role_policy_attach" {
  policy_arn = "arn:aws:iam::aws:policy/AWSStepFunctionsFullAccess"
  role       = aws_iam_role.lambda_execution_role.name
}

resource "aws_iam_role_policy_attachment" "lambda_execution_role_cloudwatch_attach" {
  policy_arn = "arn:aws:iam::aws:policy/CloudWatchLogsFullAccess"
  role       = aws_iam_role.lambda_execution_role.name
}


resource "aws_lambda_permission" "s3_permission_to_trigger_lambda" {
  statement_id  = "AllowExecutionFromS3Bucket"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.lambda_function_call_step_function.arn
  principal     = "s3.amazonaws.com"
  source_arn    = "arn:aws:s3:::${var.bucket_name}"
}

resource "aws_s3_bucket_notification" "bucket_notification_put" {
  bucket = "${var.bucket_name}"

  lambda_function {
    lambda_function_arn = aws_lambda_function.lambda_function_call_step_function.arn
    events              = ["s3:ObjectCreated:Put"]
    filter_prefix = "procesardata/"
    filter_suffix = ".csv"
  }
  depends_on = [aws_lambda_permission.s3_permission_to_trigger_lambda]
}


resource "aws_sfn_state_machine" "function" {
  name     = "StateMachineGlueDataBrew-${random_id.random.hex}"
  role_arn = aws_iam_role.function_role.arn

  definition = <<DEFINITION
    {
  "Comment": "A description of my state machine",
  "StartAt": "Actualizar DataSet",
  "States": {
    "Actualizar DataSet": {
      "Type": "Task",
      "Parameters": {
        "Format": "CSV",
        "Input": {
          "S3InputDefinition": {
            "Bucket.$": "$.bucket",
            "BucketOwner.$": "$.cuenta",
            "Key.$": "$.carpeta"
          }
        },
        "Name": "Precios"
      },
      "Resource": "arn:aws:states:::aws-sdk:databrew:updateDataset",
      "Next": "Ejecutar Job"
    },
    "Ejecutar Job": {
      "Type": "Task",
      "Resource": "arn:aws:states:::databrew:startJobRun",
      "Parameters": {
        "Name": "${var.job_name}"
      },
      "Catch": [
        {
          "ErrorEquals": [
            "DataBrew.ConflictException"
          ],
          "Next": "SNS Error"
        }
      ],
      "Next": "SNS Job Ejecutado"
    },
    "SNS Error": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sns:publish",
      "Parameters": {
        "Message": {
          "Mensaje": "Se produjo un error"
        },
        "TopicArn": "${aws_sns_topic.topic_glue_databrew.arn}"
      },
      "End": true
    },
    "SNS Job Ejecutado": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sns:publish",
      "Parameters": {
        "TopicArn": "${aws_sns_topic.topic_glue_databrew.arn}",
        "Message": {
          "Mensaje": "Se ejecuto el job correctamente"
        }
      },
      "End": true
    }
  }
}
  DEFINITION
}

resource "aws_iam_role" "function_role" {
  name = "my-state-machine-role-${random_id.random.hex}"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = "sts:AssumeRole"
        Effect = "Allow"
        Principal = {
          Service = "states.amazonaws.com"
        }
      }
    ]
  })
}

resource "aws_iam_role_policy_attachment" "function_policy_role_sns" {
  policy_arn = "arn:aws:iam::aws:policy/AmazonSNSFullAccess"
  role       = aws_iam_role.function_role.name
}

resource "aws_iam_role_policy_attachment" "lambda_policy_glue_data_brew" {
  policy_arn = "arn:aws:iam::aws:policy/AwsGlueDataBrewFullAccessPolicy"
  role       = aws_iam_role.function_role.name
}

resource "aws_sns_topic" "topic_glue_databrew" {
  name = "topic_glue_databrew-${random_id.random.hex}"
}

resource "aws_sns_topic_subscription" "subscription_glue_databrew" {
  topic_arn = aws_sns_topic.topic_glue_databrew.arn
  protocol  = "sms"
  endpoint  = "${var.telefono}"
}

Enter fullscreen mode Exit fullscreen mode

De la plantilla anterior debemos modificar las variables y la región en la cual se van desplegar los servicios.

La región.

provider "aws" {
   region = "us-east-1"
 }
Enter fullscreen mode Exit fullscreen mode

El nombre que le dimos la Job.

variable "job_name"{
    type= string
    default= "PreciosJob"
}
Enter fullscreen mode Exit fullscreen mode

El nombre que le dimos al Bucket.

variable "bucket_name" {
    type = string
    default = "datosgluedatabrew"
}
Enter fullscreen mode Exit fullscreen mode

El numero de teléfono al cual queremos que lleguen los mensajes cuando se ejecuta el Job.

variable "telefono"{
    type = string
    default = "+573005465765"
}
Enter fullscreen mode Exit fullscreen mode

Una vez que hemos modificado las variables de acuerdo a nuestras configuraciones podemos validar y ejecutar la plantilla.

  • Valida la plantilla
terraform validate
Enter fullscreen mode Exit fullscreen mode
  • Crea los servicios. Recuerda que la plantilla y el código de la función Lambda se encuentren en la misma carpeta.
terraform apply
Enter fullscreen mode Exit fullscreen mode

Una vez terminada la ejecución del comando, vamos a revisar que los servicios se hayan desplegado correctamente.

Buscamos SNS y damos click en Topics.

SNS

Buscamos la función Lambda.

Lambda

Buscamos la Step Function.

Step Function

Como todos los servicios fueron desplegados con éxito vamos a subir un nuevo archivo al Bucket para que se procese el archivo con la llamada a la función Lambda y la Step Function, cuando esta termina su proceso damos click en el nombre de la Step Function y damos click en la ultima ejecución para ver el gráfico del proceso.

 Exec Step Function

Con esto finalizamos la implementación y pruebas de la arquitectura planteada al inicio.Pueden realizar mas pruebas modificando la receta o agregando mas pasos a la Step Funcion.

Para no incurrir en ningún costo pueden borrar el proyecto de Glue DataBrew, el Bucket y para eliminar los servicios creados con Terraform pueden utilizar el siguiente comando.

terraform destroy
Enter fullscreen mode Exit fullscreen mode

Me pueden encontrar en

Camilo Cabrales

Referencias

API

Top comments (0)