Lesson 5 Write Disposition and Incremental Loading
Write Disposition
A write disposition in DLT (Data Loading Tool) defines how data should be written to the destination.
Types of Write Dispositions
- Append: This is the default disposition. It appends the data to the existing data in the destination.
- Replace: This disposition replaces the data in the destination with the data from the resource. It deletes all the data and recreates the schema before loading the data.
- Merge: This write disposition merges the data from the resource with the data at the destination. For the merge disposition, you need to specify a primary key for the resource.
Choosing the Write Disposition
The write disposition you choose depends on the dataset and how you can extract it.
Specifying the Write Disposition
A write disposition in DLT can be specified in the resource decorator or directly in the pipeline run. In case you specify both, the write disposition specified at the pipeline run level will override the write disposition specified at the resource level.
Incremental Loading
Incremental loading is the act of loading only new or changed data and not old records that we already loaded.
Example Use Case for Incremental Loading
Imagine you have a dataset of Pokémon, each with a unique ID, their name, size (height and weight), and when they were "caught" (created_at field). You want to load only Pokémon caught after January 1, 2024, skipping the ones you already have.
Steps for Incremental Loading
- Adding the created_at Field: Load only Pokémon caught after a certain date.
- Defining the Incremental Logic: Set up an incremental filter to only fetch Pokémon caught after a certain date.
- Running the Pipeline: Run the pipeline and load the fresh Pokémon data.
Why Incremental Loading Matters
Incremental loading matters because it helps you update and deduplicate your data. It finds new data and adds it to the database, ignoring any updates to user information.
Merge Write Disposition
The merge write disposition can be used to merge new data into the destination, using a merge key and/or deduplicating/updating new data using a primary key.
Strategies for Merge Write Disposition
The merge write disposition can be used with three different strategies:
- Incremental Loading: Load only new or changed data and not old records that we already loaded.
- Deduplicating/Updating: Use a primary key to identify unique records and update the existing records with the new data instead of creating duplicate entries.
- Slowly Changing Dimensions (SCD): Use the merge write disposition with the scd2 strategy to maintain a history of changes to attribute values over time.
Incremental Cursor
The incremental cursor keeps track of the latest created_at or updated_at value processed. It will skip records older than this date in future runs.
Attributes of the Incremental Cursor
The dlt.sources.incremental instance has the following attributes:
- since: A parameter used to tell the source to fetch data from a specific point in time.
- cursor_date.last_value: Holds the last cursor_date value from the previous run.
- apply_hints: Can be used to define an incremental field.
Exercise 1: Make the GitHub API Pipeline Incremental
Transform the GitHub API pipeline to use incremental loading, so it fetches only new or updated data. This means:
- Use the since parameter to tell GitHub which issues you are interested in.
- Use the cursor_date.last_value to tell GitHub which issues you are interested in.
- Apply incremental hints to define an incremental field.
Lesson 6 How dlt works
Overview
This lesson covers the internal steps of the pipeline.run()
method in dlt, including the extract
, normalize
, and load
stages.
Pipeline.run() Method
The pipeline.run()
method executes the entire pipeline, encompassing the extract
, normalize
, and load
stages.
Extract Stage
- The
extract
stage is the first step in the pipeline. - It performs the following operations:
- Creates intermediate files based on the number of resources and file rotation settings.
- Logs the extraction progress with real-time metrics.
- The
extract
stage can be run individually using theextract
command on the pipeline.
Normalize Stage
- The
normalize
stage transforms the structure of the input data. - It converts the transformed data into a relational structure that can be easily loaded into the destination.
- The
normalize
stage depends on having a completedextract
phase and will not do anything if there is no extracted data. - It can be run individually using the
normalize
command on the pipeline.
Load Stage
- The
load
stage is responsible for taking the normalized data and loading it into the chosen destination. - It depends on having a completed
normalize
phase and will not do anything if there is no normalized data. - The
load
stage can be run individually using theload
command on the pipeline.
Intermediary File Formats
dlt supports four intermediary file formats:
JSONL
- Definition: JSON Delimited is a file format that stores several JSON documents in one file.
- The JSON documents are separated by a new line.
- Compression: enabled by default.
- Data type handling: By default used by the
dlt
configuration.
Parquet
- Definition: Apache Parquet is a free and open-source column-oriented data storage format in the Apache Hadoop ecosystem.
- Prerequisite: To use this format, you need a pyarrow package.
- Default version: 2.4, which coerces timestamps to microseconds and silently truncates nanoseconds for better compatibility with databases and pandas.
- Supported by:
dlt
configuration,config.toml
orsecrets.toml
, environment variables, and resource decorators.
CSV
- Definition: CSV is a file format that stores data in a comma-separated values format.
- Two implementations: pyarrow csv writer and python stdlib writer.
- Default settings: delimiter (default: ','), include_header (default: True), and quoting (default: quote_needed).
SQL INSERT File Format
- Definition: This file format contains an INSERT...VALUES statement to be executed on the destination during the load stage.
- Additional data types are stored as follows.
- Compression: enabled by default.
- Default for:
dlt
configuration,config.toml
orsecrets.toml
, environment variables, and resource decorators.
Timestamps and Timezones
dlt adds UTC adjustments to all timestamps, creating timezone-aware timestamp columns in destinations (except DuckDB). Disable timezone/UTC adjustments by setting flavor
to spark
or timestamp_timezone
to an empty string (DATA_WRITER__TIMESTAMP_TIMEZONE=""
).
Writer Settings
dlt uses the pyarrow Parquet writer for file creation. You can adjust the writer's behavior with the following options:
-
flavor
: adjusts schema and compatibility settings for different target systems. -
version
: selects Parquet logical types based on the Parquet format version. -
data_page_size
: sets the target size for data pages within a column chunk (in bytes). -
timestamp_timezone
: specifies the timestamp based on selected timezone. -
coerce_timestamps
: sets the timestamp resolution (s, ms, us, ns). -
allow_truncated_timestamps
: raises an error if precision is lost on truncated timestamps.
Example Configurations
You can adjust the writer's behavior using the following options:
-
In
configs.toml
orsecrets.toml
:
[writer_settings] flavor = "spark" version = "2.6" data_page_size = 1048576 timestamp_timezone = "UTC" coerce_timestamps = "us" allow_truncated_timestamps = true
Via environment variables:
export DATA_WRITER__FLAVOR="spark"
export DATA_WRITER__VERSION="2.6"
export DATA_WRITER__DATA_PAGE_SIZE="1048576"
export DATA_WRITER__TIMESTAMP_TIMEZONE="UTC"
export DATA_WRITER__COERCE_TIMESTAMPS="us"
export DATA_WRITER__ALLOW_TRUNCATED_TIMESTAMPS="true"
-
Specify directly in the resource decorator:
from dlt import Resource class MyResource(Resource): writer_settings = { "flavor": "spark", "version": "2.6", "data_page_size": 1048576, "timestamp_timezone": "UTC", "coerce_timestamps": "us", "allow_truncated_timestamps": True }
Conclusion
In this lesson, we covered the internal steps of the pipeline.run()
method in dlt, including the extract
, normalize
, and load
stages. We also discussed the intermediary file formats supported by dlt, including JSONL, Parquet, CSV, and SQL INSERT file format. Additionally, we touched on writer settings and example configurations for adjusting the writer's behavior.
Next Steps
- Practice running the
pipeline.run()
method with different intermediary file formats and writer settings. - Experiment with adjusting the writer's behavior using the example configurations provided.
- Explore the official documentation for more information on dlt and its features.
Top comments (0)