Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
Important
This functionality is in Public Preview.
The apply_changes_from_snapshot
function uses DLT change data capture (CDC) functionality to process source data from database snapshots. See How is CDC implemented with the APPLY CHANGES FROM SNAPSHOT
API?.
Important
You must have a target streaming table for this operation. You can optionally specify the columns and their types for your target table. When specifying the columns and their types for the apply_changes_from_snapshot()
target table, you must also include the __START_AT
and __END_AT
columns with the same data type as the sequence_by
field.
To create the required target table, you can use the create_streaming_table() function.
Syntax
import dlt
dlt.apply_changes_from_snapshot(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
Note
For APPLY CHANGES FROM SNAPSHOT
processing, the default behavior is to insert a new row when a matching record with the same key(s) does not exist in the target. If a matching record does exist, it is updated only if any of the values in the row have changed. Rows with keys present in the target but no longer present in the source are deleted.
To learn more about CDC processing with snapshots, see The APPLY CHANGES APIs: Simplify change data capture with DLT. For examples of using the apply_changes_from_snapshot()
function, see the periodic snapshot ingestion and historical snapshot ingestion examples.
Parameters
Parameter | Type | Description |
---|---|---|
target |
str |
Required. The name of the table to be updated. You can use the create_streaming_table() function to create the target table before executing the apply_changes() function. |
source |
str or lambda function |
Required. Either the name of a table or view to snapshot periodically or a Python lambda function that returns the snapshot DataFrame to be processed and the snapshot version. See Implement the source argument. |
keys |
list |
Required. The column or combination of columns that uniquely identify a row in the source data. This is used to identify which CDC events apply to specific records in the target table. You can specify either:
|
stored_as_scd_type |
str or int |
Whether to store records as SCD type 1 or SCD type 2. Set to 1 for SCD type 1 or 2 for SCD type 2. The default is SCD type 1. |
track_history_column_list or track_history_except_column_list |
list |
A subset of output columns to be tracked for history in the target table. Use track_history_column_list to specify the complete list of columns to be tracked. Use track_history_except_column_list to specify the columns to be excluded from tracking. You can declare either value as a list of strings or as Spark SQL col() functions:
Arguments to col() functions cannot include qualifiers. For example, you can use col(userId) , but you cannot use col(source.userId) . The default is to include all columns in the target table when no track_history_column_list or track_history_except_column_list argument is passed to the function. |
Implement the source
argument
The apply_changes_from_snapshot()
function includes the source
argument. For processing historical snapshots, the source
argument is expected to be a Python lambda function that returns two values to the apply_changes_from_snapshot()
function: a Python DataFrame containing the snapshot data to be processed and a snapshot version.
The following is the signature of the lambda function:
lambda Any => Optional[(DataFrame, Any)]
- The argument to the lambda function is the most recently processed snapshot version.
- The return value of the lambda function is
None
or a tuple of two values: The first value of the tuple is a DataFrame containing the snapshot to be processed. The second value of the tuple is the snapshot version that represents the logical order of the snapshot.
An example that implements and calls the lambda function:
def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Tuple[Optional[int], DataFrame]:
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None
apply_changes_from_snapshot(
# ...
source = next_snapshot_and_version,
# ...
)
The DLT runtime performs the following steps each time the pipeline that contains the apply_changes_from_snapshot()
function is triggered:
- Runs the
next_snapshot_and_version
function to load the next snapshot DataFrame and the corresponding snapshot version. - If no DataFrame returns, the run is terminated and the pipeline update is marked as complete.
- Detects the changes in the new snapshot and incrementally applies them to the target table.
- Returns to step #1 to load the next snapshot and its version.