DEV Community

Cover image for Study Notes dlt Fundamentals Course: Lesson 5 & 6 - Write Disposition, Incremental Loading, How dlt works
Pizofreude
Pizofreude

Posted on

Study Notes dlt Fundamentals Course: Lesson 5 & 6 - Write Disposition, Incremental Loading, How dlt works

Lesson 5 Write Disposition and Incremental Loading

Write Disposition

A write disposition in DLT (Data Loading Tool) defines how data should be written to the destination.

Types of Write Dispositions

  • Append: This is the default disposition. It appends the data to the existing data in the destination.
  • Replace: This disposition replaces the data in the destination with the data from the resource. It deletes all the data and recreates the schema before loading the data.
  • Merge: This write disposition merges the data from the resource with the data at the destination. For the merge disposition, you need to specify a primary key for the resource.

Choosing the Write Disposition

The write disposition you choose depends on the dataset and how you can extract it.

Specifying the Write Disposition

A write disposition in DLT can be specified in the resource decorator or directly in the pipeline run. In case you specify both, the write disposition specified at the pipeline run level will override the write disposition specified at the resource level.

Incremental Loading

Incremental loading is the act of loading only new or changed data and not old records that we already loaded.

Example Use Case for Incremental Loading

Imagine you have a dataset of Pokémon, each with a unique ID, their name, size (height and weight), and when they were "caught" (created_at field). You want to load only Pokémon caught after January 1, 2024, skipping the ones you already have.

Steps for Incremental Loading

  1. Adding the created_at Field: Load only Pokémon caught after a certain date.
  2. Defining the Incremental Logic: Set up an incremental filter to only fetch Pokémon caught after a certain date.
  3. Running the Pipeline: Run the pipeline and load the fresh Pokémon data.

Why Incremental Loading Matters

Incremental loading matters because it helps you update and deduplicate your data. It finds new data and adds it to the database, ignoring any updates to user information.

Merge Write Disposition

The merge write disposition can be used to merge new data into the destination, using a merge key and/or deduplicating/updating new data using a primary key.

Strategies for Merge Write Disposition

The merge write disposition can be used with three different strategies:

  • Incremental Loading: Load only new or changed data and not old records that we already loaded.
  • Deduplicating/Updating: Use a primary key to identify unique records and update the existing records with the new data instead of creating duplicate entries.
  • Slowly Changing Dimensions (SCD): Use the merge write disposition with the scd2 strategy to maintain a history of changes to attribute values over time.

Incremental Cursor

The incremental cursor keeps track of the latest created_at or updated_at value processed. It will skip records older than this date in future runs.

Attributes of the Incremental Cursor

The dlt.sources.incremental instance has the following attributes:

  • since: A parameter used to tell the source to fetch data from a specific point in time.
  • cursor_date.last_value: Holds the last cursor_date value from the previous run.
  • apply_hints: Can be used to define an incremental field.

Exercise 1: Make the GitHub API Pipeline Incremental

Transform the GitHub API pipeline to use incremental loading, so it fetches only new or updated data. This means:

  • Use the since parameter to tell GitHub which issues you are interested in.
  • Use the cursor_date.last_value to tell GitHub which issues you are interested in.
  • Apply incremental hints to define an incremental field.

Lesson 6 How dlt works

Overview

This lesson covers the internal steps of the pipeline.run() method in dlt, including the extract, normalize, and load stages.

Pipeline.run() Method

The pipeline.run() method executes the entire pipeline, encompassing the extract, normalize, and load stages.

Extract Stage

  • The extract stage is the first step in the pipeline.
  • It performs the following operations:
    • Creates intermediate files based on the number of resources and file rotation settings.
    • Logs the extraction progress with real-time metrics.
  • The extract stage can be run individually using the extract command on the pipeline.

Normalize Stage

  • The normalize stage transforms the structure of the input data.
  • It converts the transformed data into a relational structure that can be easily loaded into the destination.
  • The normalize stage depends on having a completed extract phase and will not do anything if there is no extracted data.
  • It can be run individually using the normalize command on the pipeline.

Load Stage

  • The load stage is responsible for taking the normalized data and loading it into the chosen destination.
  • It depends on having a completed normalize phase and will not do anything if there is no normalized data.
  • The load stage can be run individually using the load command on the pipeline.

Intermediary File Formats

dlt supports four intermediary file formats:

JSONL

  • Definition: JSON Delimited is a file format that stores several JSON documents in one file.
  • The JSON documents are separated by a new line.
  • Compression: enabled by default.
  • Data type handling: By default used by the dlt configuration.

Parquet

  • Definition: Apache Parquet is a free and open-source column-oriented data storage format in the Apache Hadoop ecosystem.
  • Prerequisite: To use this format, you need a pyarrow package.
  • Default version: 2.4, which coerces timestamps to microseconds and silently truncates nanoseconds for better compatibility with databases and pandas.
  • Supported by: dlt configuration, config.toml or secrets.toml, environment variables, and resource decorators.

CSV

  • Definition: CSV is a file format that stores data in a comma-separated values format.
  • Two implementations: pyarrow csv writer and python stdlib writer.
  • Default settings: delimiter (default: ','), include_header (default: True), and quoting (default: quote_needed).

SQL INSERT File Format

  • Definition: This file format contains an INSERT...VALUES statement to be executed on the destination during the load stage.
  • Additional data types are stored as follows.
  • Compression: enabled by default.
  • Default for: dlt configuration, config.toml or secrets.toml, environment variables, and resource decorators.

Timestamps and Timezones

dlt adds UTC adjustments to all timestamps, creating timezone-aware timestamp columns in destinations (except DuckDB). Disable timezone/UTC adjustments by setting flavor to spark or timestamp_timezone to an empty string (DATA_WRITER__TIMESTAMP_TIMEZONE="").

Writer Settings

dlt uses the pyarrow Parquet writer for file creation. You can adjust the writer's behavior with the following options:

  • flavor: adjusts schema and compatibility settings for different target systems.
  • version: selects Parquet logical types based on the Parquet format version.
  • data_page_size: sets the target size for data pages within a column chunk (in bytes).
  • timestamp_timezone: specifies the timestamp based on selected timezone.
  • coerce_timestamps: sets the timestamp resolution (s, ms, us, ns).
  • allow_truncated_timestamps: raises an error if precision is lost on truncated timestamps.

Example Configurations

You can adjust the writer's behavior using the following options:

  • In configs.toml or secrets.toml:

    [writer_settings]
    flavor = "spark"
    version = "2.6"
    data_page_size = 1048576
    timestamp_timezone = "UTC"
    coerce_timestamps = "us"
    allow_truncated_timestamps = true
    
  • Via environment variables:

export DATA_WRITER__FLAVOR="spark"
export DATA_WRITER__VERSION="2.6"
export DATA_WRITER__DATA_PAGE_SIZE="1048576"
export DATA_WRITER__TIMESTAMP_TIMEZONE="UTC"
export DATA_WRITER__COERCE_TIMESTAMPS="us"
export DATA_WRITER__ALLOW_TRUNCATED_TIMESTAMPS="true"

Enter fullscreen mode Exit fullscreen mode
  • Specify directly in the resource decorator:

    from dlt import Resource
    
    class MyResource(Resource):
    writer_settings = {
    "flavor": "spark",
    "version": "2.6",
    "data_page_size": 1048576,
    "timestamp_timezone": "UTC",
    "coerce_timestamps": "us",
    "allow_truncated_timestamps": True
    }
    

Conclusion

In this lesson, we covered the internal steps of the pipeline.run() method in dlt, including the extract, normalize, and load stages. We also discussed the intermediary file formats supported by dlt, including JSONL, Parquet, CSV, and SQL INSERT file format. Additionally, we touched on writer settings and example configurations for adjusting the writer's behavior.

Next Steps

  • Practice running the pipeline.run() method with different intermediary file formats and writer settings.
  • Experiment with adjusting the writer's behavior using the example configurations provided.
  • Explore the official documentation for more information on dlt and its features.

Top comments (0)