Share via


apply_changes_from_snapshot

Important

This functionality is in Public Preview.

The apply_changes_from_snapshot function uses DLT change data capture (CDC) functionality to process source data from database snapshots. See How is CDC implemented with the APPLY CHANGES FROM SNAPSHOT API?.

Important

You must have a target streaming table for this operation. You can optionally specify the columns and their types for your target table. When specifying the columns and their types for the apply_changes_from_snapshot() target table, you must also include the __START_AT and __END_AT columns with the same data type as the sequence_by field.

To create the required target table, you can use the create_streaming_table() function.

Syntax

import dlt

dlt.apply_changes_from_snapshot(
  target = "<target-table>",
  source = Any,
  keys = ["key1", "key2", "keyN"],
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Note

For APPLY CHANGES FROM SNAPSHOT processing, the default behavior is to insert a new row when a matching record with the same key(s) does not exist in the target. If a matching record does exist, it is updated only if any of the values in the row have changed. Rows with keys present in the target but no longer present in the source are deleted.

To learn more about CDC processing with snapshots, see The APPLY CHANGES APIs: Simplify change data capture with DLT. For examples of using the apply_changes_from_snapshot() function, see the periodic snapshot ingestion and historical snapshot ingestion examples.

Parameters

Parameter Type Description
target str Required. The name of the table to be updated. You can use the create_streaming_table() function to create the target table before executing the apply_changes() function.
source str or lambda function Required. Either the name of a table or view to snapshot periodically or a Python lambda function that returns the snapshot DataFrame to be processed and the snapshot version. See Implement the source argument.
keys list Required. The column or combination of columns that uniquely identify a row in the source data. This is used to identify which CDC events apply to specific records in the target table. You can specify either:
  • A list of strings: ["userId", "orderId"]
  • A list of Spark SQL col() functions: [col("userId"), col("orderId"].
    Arguments to col() functions cannot include qualifiers. For example, you can use col(userId), but you cannot use col(source.userId).
stored_as_scd_type str or int Whether to store records as SCD type 1 or SCD type 2. Set to 1 for SCD type 1 or 2 for SCD type 2. The default is SCD type 1.
track_history_column_list or track_history_except_column_list list A subset of output columns to be tracked for history in the target table. Use track_history_column_list to specify the complete list of columns to be tracked. Use track_history_except_column_list to specify the columns to be excluded from tracking. You can declare either value as a list of strings or as Spark SQL col() functions:
  • track_history_column_list = ["userId", "name", "city"]
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

Arguments to col() functions cannot include qualifiers. For example, you can use col(userId), but you cannot use col(source.userId). The default is to include all columns in the target table when no track_history_column_list or track_history_except_column_list argument is passed to the function.

Implement the source argument

The apply_changes_from_snapshot() function includes the source argument. For processing historical snapshots, the source argument is expected to be a Python lambda function that returns two values to the apply_changes_from_snapshot() function: a Python DataFrame containing the snapshot data to be processed and a snapshot version.

The following is the signature of the lambda function:

lambda Any => Optional[(DataFrame, Any)]
  • The argument to the lambda function is the most recently processed snapshot version.
  • The return value of the lambda function is None or a tuple of two values: The first value of the tuple is a DataFrame containing the snapshot to be processed. The second value of the tuple is the snapshot version that represents the logical order of the snapshot.

An example that implements and calls the lambda function:

def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Tuple[Optional[int], DataFrame]:
 if latest_snapshot_version is None:
   return (spark.read.load("filename.csv"), 1)
 else:
   return None

apply_changes_from_snapshot(
  # ...
  source = next_snapshot_and_version,
  # ...
)

The DLT runtime performs the following steps each time the pipeline that contains the apply_changes_from_snapshot() function is triggered:

  1. Runs the next_snapshot_and_version function to load the next snapshot DataFrame and the corresponding snapshot version.
  2. If no DataFrame returns, the run is terminated and the pipeline update is marked as complete.
  3. Detects the changes in the new snapshot and incrementally applies them to the target table.
  4. Returns to step #1 to load the next snapshot and its version.