DEV Community

Cover image for How To Process IoT Sensor Data with Windowed Aggregations and ML Inference
Sidra Saleem for SUDO Consultants

Posted on • Originally published at sudoconsultants.com

How To Process IoT Sensor Data with Windowed Aggregations and ML Inference

Internet of Things (IoT) has become a cornerstone of modern technology, generating vast amounts of data from sensors and devices. This data, when processed in real-time, can provide actionable insights, enable predictive maintenance, and improve operational efficiency. However, processing this data at scale and in real-time requires a robust infrastructure. This article explores how to build a real-time analytics pipeline using Amazon Kinesis Data Streams, Apache Flink, and Amazon EMR to process IoT sensor data, perform windowed aggregations, and integrate machine learning (ML) inference.

We will walk through the entire process, from setting up the infrastructure to implementing the pipeline, including detailed steps for both CLI-based and AWS Console-based approaches. By the end of this article, you will have a fully functional real-time analytics system capable of handling IoT sensor data.

Overview of the Architecture

The architecture for this solution consists of the following components:

  1. IoT Sensors: Devices generating real-time data (e.g., temperature, humidity, pressure).
  2. Amazon Kinesis Data Streams: A scalable and durable real-time data streaming service to ingest sensor data.
  3. Amazon EMR with Apache Flink: A managed big data platform to process and analyze the streaming data using Apache Flink.
  4. Windowed Aggregations: Real-time aggregation of sensor data over time windows (e.g., average temperature over 5 minutes).
  5. Machine Learning Inference: Integration with a pre-trained ML model to make predictions on the streaming data.
  6. Data Storage and Visualization: Store processed data in Amazon S3 or DynamoDB and visualize it using Amazon QuickSight or other tools.

Step 1: Setting Up Amazon Kinesis Data Streams

AWS Console Steps

  1. Log in to the AWS Management Console.
  2. Navigate to the Kinesis service.
  3. Click on Create data stream.
  4. Provide a name for your stream, e.g., iot-sensor-stream.
  5. Set the number of shards based on your expected data throughput. For testing, 1 shard is sufficient.
  6. Click Create data stream.

CLI Steps

To create a Kinesis Data Stream using the AWS CLI, run the following command:

aws kinesis create-stream --stream-name iot-sensor-stream --shard-count 1

Verify the stream creation:

aws kinesis describe-stream --stream-name iot-sensor-stream

Step 2: Simulating IoT Sensor Data

To simulate IoT sensor data, we will use a Python script to generate random sensor readings and publish them to the Kinesis stream.

Python Script for Data Generation

import boto3
import json
import random
import time

# Initialize Kinesis client
kinesis = boto3.client('kinesis', region_name='us-east-1')

def generate_sensor_data():
    return {
        'sensor_id': random.randint(1, 100),
        'temperature': round(random.uniform(20.0, 40.0), 2),
        'humidity': round(random.uniform(30.0, 70.0), 2),
        'timestamp': int(time.time())
    }

def publish_to_kinesis(stream_name, data):
    response = kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(data),
        PartitionKey=str(data['sensor_id'])
    )
    return response

if __name__ == '__main__':
    stream_name = 'iot-sensor-stream'
    while True:
        data = generate_sensor_data()
        response = publish_to_kinesis(stream_name, data)
        print(f"Published data: {data}, Response: {response}")
        time.sleep(1)

Run this script to start publishing data to your Kinesis stream.


Step 3: Setting Up Amazon EMR with Apache Flink

AWS Console Steps

  1. Navigate to the EMR service in the AWS Management Console.
  2. Click Create cluster.
  3. Choose Advanced options.
  4. Under Software configuration, select Flink as an application.
  5. Configure the hardware and networking settings as per your requirements.
  6. Click Create cluster.

CLI Steps

To create an EMR cluster with Apache Flink using the AWS CLI:

aws emr create-cluster \
  --name "Flink-EMR-Cluster" \
  --release-label emr-6.5.0 \
  --applications Name=Flink \
  --ec2-attributes KeyName=your-key-pair \
  --instance-type m5.xlarge \
  --instance-count 3 \
  --use-default-roles

Wait for the cluster to be in the WAITING state before proceeding.

Step 4: Writing the Apache Flink Application

We will write a Flink application to consume data from the Kinesis stream, perform windowed aggregations, and integrate ML inference.

Flink Application Code

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.api.java.tuple.Tuple2;

import java.util.Properties;

public class IoTStreamProcessor {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Kinesis consumer configuration
        Properties consumerConfig = new Properties();
        consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1");
        consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

        // Create Kinesis source
        DataStream<String> kinesisStream = env.addSource(new FlinkKinesisConsumer<>(
                "iot-sensor-stream",
                new SimpleStringSchema(),
                consumerConfig
        ));

        // Parse JSON data and perform windowed aggregations
        DataStream<Tuple2<Integer, Double>> aggregatedStream = kinesisStream
                .map(json -> {
                    // Parse JSON and extract sensor data
                    SensorData sensorData = SensorData.fromJson(json);
                    return new Tuple2<>(sensorData.getSensorId(), sensorData.getTemperature());
                })
                .keyBy(0)
                .timeWindow(Time.minutes(5))
                .apply(new WindowFunction<Tuple2<Integer, Double>, Tuple2<Integer, Double>, Integer, TimeWindow>() {
                    @Override
                    public void apply(Integer key, TimeWindow window, Iterable<Tuple2<Integer, Double>> input, Collector<Tuple2<Integer, Double>> out) {
                        double sum = 0;
                        int count = 0;
                        for (Tuple2<Integer, Double> record : input) {
                            sum += record.f1;
                            count++;
                        }
                        out.collect(new Tuple2<>(key, sum / count));
                    }
                });

        // Print results to the console
        aggregatedStream.print();

        // Execute the Flink job
        env.execute("IoT Sensor Data Processing");
    }
}

Deploying the Flink Application

  1. Package the Flink application into a JAR file.
  2. Upload the JAR file to the EMR master node using SCP:
scp -i your-key-pair.pem your-flink-app.jar hadoop@<emr-master-node-dns>:/home/hadoop/
  1. Submit the Flink job:
flink run -c IoTStreamProcessor your-flink-app.jar

Step 5: Integrating Machine Learning Inference

To integrate ML inference, we will use a pre-trained model deployed on Amazon SageMaker.

Steps for ML Inference

  1. Deploy your ML model on SageMaker.
  2. Invoke the SageMaker endpoint from the Flink application:
import com.amazonaws.services.sagemakerruntime.AmazonSageMakerRuntime;
import com.amazonaws.services.sagemakerruntime.AmazonSageMakerRuntimeClientBuilder;
import com.amazonaws.services.sagemakerruntime.model.InvokeEndpointRequest;
import com.amazonaws.services.sagemakerruntime.model.InvokeEndpointResult;

public class MLInference {
    public static double predict(double temperature, double humidity) {
        AmazonSageMakerRuntime sagemaker = AmazonSageMakerRuntimeClientBuilder.defaultClient();
        InvokeEndpointRequest request = new InvokeEndpointRequest()
                .withEndpointName("your-sagemaker-endpoint")
                .withContentType("application/json")
                .withBody(String.format("{\"temperature\": %f, \"humidity\": %f}", temperature, humidity));

        InvokeEndpointResult result = sagemaker.invokeEndpoint(request);
        return Double.parseDouble(new String(result.getBody().array()));
    }
}
  1. Call the predict method in your Flink application to make predictions on the streaming data.

Step 6: Storing and Visualizing Results

Storing Results in Amazon S3

To store the processed data in Amazon S3, add the following code to your Flink application:

aggregatedStream.addSink(new FlinkS3Writer());

Visualizing Results with Amazon QuickSight

  1. Create a dataset in Amazon QuickSight using the S3 bucket where the processed data is stored.
  2. Build dashboards to visualize the aggregated sensor data and ML predictions.

Real-Life Use Case: Predictive Maintenance in Manufacturing

In a manufacturing plant, IoT sensors monitor the temperature and vibration of machinery. By processing this data in real-time using the pipeline described above, the plant can:

  1. Detect anomalies in machine behavior.
  2. Predict equipment failures before they occur.
  3. Schedule maintenance proactively, reducing downtime and costs.

Conclusion

This article demonstrated how to build a real-time analytics pipeline for IoT sensor data using Amazon Kinesis Data Streams, Apache Flink on EMR, and machine learning inference. By following the detailed steps and code examples, you can implement a scalable and efficient solution for real-time data processing and analytics. Whether you are monitoring industrial equipment, smart homes, or connected vehicles, this architecture provides a robust foundation for your real-time analytics needs.

Top comments (0)