DEV Community

Cover image for A new Kedro dataset for Spark Structured Streaming
Juan Luis Cano Rodríguez for Kedro

Posted on • Edited on • Originally published at kedro.org

A new Kedro dataset for Spark Structured Streaming

This article guides data practitioners on how to set up a Kedro project to use the new SparkStreaming Kedro dataset, with example use cases, and a deep-dive on some design considerations. It's meant for data practitioners familiar with Kedro so we'll not be covering the basics of a project, but you can familiarise yourself with them in the Kedro documentation.

What is Kedro?

Kedro is an open-source Python toolbox that applies software engineering principles to data science code. It makes it easier for a team to apply software engineering principles to data science code, which reduces the time spent rewriting data science experiments so that they are fit for production.

Kedro was born at QuantumBlack to solve the challenges faced regularly in data science projects and promote teamwork through standardised team workflows. It is now hosted by the LF AI & Data Foundation as an incubating project.

What are Kedro datasets?

Kedro datasets are abstractions for reading and loading data, designed to decouple these operations from your business logic. These datasets manage reading and writing data from a variety of sources, while also ensuring consistency, tracking, and versioning. They allow users to maintain focus on core data processing, leaving data I/O tasks to Kedro.

 What is Spark Structured Streaming?

Spark Structured Streaming is built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data, and the Spark SQL engine will run it incrementally and continuously and update the final result as streaming data continues to arrive.

 Integrating Kedro and Spark Structured Streaming

Kedro is easily extensible for your own workflows and this article explains one of the ways to add new functionality. To enable Kedro to work with Spark Structured Streaming, a team inside QuantumBlack Labs developed a new Spark Streaming Dataset, as the existing Kedro Spark dataset was not compatible with Spark Streaming use cases. To ensure seamless streaming, the new dataset has a checkpoint location specification to avoid data duplication in streaming use cases and it uses .start() at the end of the _save method to initiate the stream.

 Set up a project to integrate Kedro with Spark Structured streaming

The project uses a Kedro dataset to build a structured data pipeline that can read and write data streams with Spark Structured Streaming and process data streams in realtime. You need to add two separate Hooks to the Kedro project to enable it to function as a streaming application.

Integration involves the following steps:

  1. Create a Kedro project.
  2. Register the necessary PySpark and streaming related Hooks.
  3. Configure the custom dataset in the catalog.yml file, defining the streaming sources and sinks.
  4. Use Kedro’s new dataset for Spark Structured Streaming to store intermediate dataframes generated during the Spark streaming process.

Create a Kedro project

Ensure you have installed a version of Kedro greater than version 0.18.9 and kedro-datasets greater than version 1.4.0.

pip install kedro~=0.18.0 kedro-datasets~=1.4.0
Enter fullscreen mode Exit fullscreen mode

Create a new Kedro project using the Kedro pyspark starter:

kedro new --starter=pyspark
Enter fullscreen mode Exit fullscreen mode

Register the necessary PySpark and streaming related Hooks

To work with multiple streaming nodes, two hooks are required. The first is for integrating PySpark: see Build a Kedro pipeline with PySpark for details. You will also need a Hook for running a streaming query without termination unless an exception occurs.

Add the following code to src/$your_kedro_project_name/hooks.py:

from pyspark import SparkConf
from pyspark.sql import SparkSession

from kedro.framework.hooks import hook_impl


class SparkHooks:
    @hook_impl
    def after_context_created(self, context) -> None:
        """Initialises a SparkSession using the config
        defined in project's conf folder.
        """

        # Load the spark configuration in spark.yaml using the config loader
        parameters = context.config_loader.get("spark*", "spark*/**")
        spark_conf = SparkConf().setAll(parameters.items())

        # Initialise the spark session
        spark_session_conf = (
            SparkSession.builder.appName(context._package_name)
            .enableHiveSupport()
            .config(conf=spark_conf)
        )

        _spark_session = spark_session_conf.getOrCreate()
        _spark_session.sparkContext.setLogLevel("WARN")


class SparkStreamsHook:
    @hook_impl
    def after_pipeline_run(self) -> None:
        """Starts a spark streaming await session
        once the pipeline reaches the last node.
        """

        spark = SparkSession.builder.getOrCreate()
        spark.streams.awaitAnyTermination()
Enter fullscreen mode Exit fullscreen mode

Register the Hooks in src/$your_kedro_project_name/settings.py:

"""Project settings. There is no need to edit this file unless you want to change values
from the Kedro defaults. For further information, including these default values, see
https://kedro.readthedocs.io/en/stable/kedro_project_setup/settings.html."""

from .hooks import SparkHooks, SparkStreamsHook

HOOKS = (SparkHooks(), SparkStreamsHook())

# Instantiated project hooks.
# from streaming.hooks import ProjectHooks
# HOOKS = (ProjectHooks(),)

# Installed plugins for which to disable hook auto-registration.
# DISABLE_HOOKS_FOR_PLUGINS = ("kedro-viz",)

# Class that manages storing KedroSession data.
# from kedro.framework.session.shelvestore import ShelveStore
# SESSION_STORE_CLASS = ShelveStore
# Keyword arguments to pass to the `SESSION_STORE_CLASS` constructor.
# SESSION_STORE_ARGS = {
#     "path": "./sessions"
# }

# Class that manages Kedro's library components.
# from kedro.framework.context import KedroContext
# CONTEXT_CLASS = KedroContext

# Directory that holds configuration.
# CONF_SOURCE = "conf"

# Class that manages how configuration is loaded.
# CONFIG_LOADER_CLASS = ConfigLoader
# Keyword arguments to pass to the `CONFIG_LOADER_CLASS` constructor.
# CONFIG_LOADER_ARGS = {
#       "config_patterns": {
#           "spark" : ["spark*/"],
#           "parameters": ["parameters*", "parameters*/**", "**/parameters*"],
#       }
# }

# Class that manages the Data Catalog.
# from kedro.io import DataCatalog
# DATA_CATALOG_CLASS = DataCatalog
Enter fullscreen mode Exit fullscreen mode

 How to set up your Kedro project to read data from streaming sources

Once you have set up your project, you can use the new Kedro Spark streaming dataset. You need to configure the data catalog, in conf/base/catalog.yml as follows to read from a streaming JSON file:

raw_json:
  type: spark.SparkStreamingDataSet
  filepath: data/01_raw/stream/inventory/
  file_format: json
Enter fullscreen mode Exit fullscreen mode

Additional options can be configured via the load_args key.

int.new_inventory:
   type: spark.SparkStreamingDataSet
   filepath: data/02_intermediate/inventory/
   file_format: csv
   load_args:
     header: True
Enter fullscreen mode Exit fullscreen mode

 How to set up your Kedro project to write data to streaming sinks

All the additional arguments can be kept under the save_args key:

processed.sensor:
   type: spark.SparkStreamingDataSet
   file_format: csv
   filepath: data/03_primary/processed_sensor/
   save_args:
     output_mode: append
     checkpoint: data/04_checkpoint/processed_sensor
     header: True
Enter fullscreen mode Exit fullscreen mode

Note that when you use the Kafka format, the respective packages should be added to the spark.ymlconfiguration as follows:

spark.jars.packages: org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1 
Enter fullscreen mode Exit fullscreen mode

 Design considerations

Pipeline design

In order to benefit from Spark's internal query optimisation, we recommend that any interim datasets are stored as memory datasets.

All streams start at the same time, so any nodes that have a dependency on another node that writes to a file sink (i.e. the input to that node is the output of another node) will fail on the first run. This is because there are no files in the file sink for the stream to process when it starts.

We recommended that you either keep intermediate datasets in memory or split out the processing into two pipelines and start by triggering the first pipeline to build up some initial history.

Feature creation

Be aware that windowing operations only allow windowing on time columns.

Watermarks must be defined for joins. Only certain types of joins are allowed, and these depend on the file types (stream-stream, stream-static) which makes joining of multiple tables a little complex at times. For further information or advice about join types and watermarking, take a look at the PySpark documentation or reach out on the Kedro Slack workspace.

 Logging

When initiated, the Kedro pipeline will download the JAR required for the Spark Kafka. After the first run, it won't download the file again but simply retrieve it from where the previously downloaded file was stored.

Spark logging

For each node, the logs for the following will be shown: Loading data, Running nodes, Saving data, Completed x out of y tasks.

The completed log doesn't mean that the stream processing in that node has stopped. It means that the Spark plan has been created, and if the output dataset is being saved to a sink, the stream has started.

Spark logging

Once Kedro has run through all the nodes and the full Spark execution plan has been created, you'll see INFO Pipeline execution completed successfully.

This doesn't mean the stream processing has stopped as the post run hook keeps the Spark Session alive. As new data comes in, new Spark logs will be shown, even after the "Pipeline execution completed" log.

Spark logging

If there is an error in the input data, the Spark error logs will come through and Kedro will shut down the SparkContext and all the streams within it.

Spark logging

 In summary

In this article, we explained how to take advantage of one of the ways to extend Kedro by building a new dataset to create streaming pipelines. We created a new Kedro project using the Kedro pysparkstarter and illustrated how to work with Hooks, adding them to the Kedro project to enable it to function as a streaming application. The dataset was then easy to configure through the Kedro data catalog, making it possible to use the new dataset, defining the streaming sources and sinks.

There are currently some limitations because it is not yet ready for use with a service broker, e.g. Kafka, as an additional JAR package is required.

If you want to find out more about the ways to extend Kedro, take a look at the advanced Kedro documentation for more about Kedro plugins, datasets, and Hooks.

 Contributors

This post was created by Tingting Wan, Tom Kurian, and Haris Michailidis, who are all Data Engineers in the London office of QuantumBlack, AI by McKinsey.

Top comments (0)