Share via


apply_changes

The apply_changes() function uses DLT change data capture (CDC) functionality to process source data from a change data feed (CDF).

Important

You must declare a target streaming table to apply changes into. You can optionally specify the schema for your target table. When specifying the schema of the apply_changes() target table, you must include the __START_AT and __END_AT columns with the same data type as the sequence_by fields.

To create the required target table, you can use the create_streaming_table() function in the DLT Python interface.

Syntax

import dlt

dlt.apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

For apply_changes processing, the default behavior for INSERT and UPDATE events is to upsert CDC events from the source: update any rows in the target table that match the specified key(s) or insert a new row when a matching record does not exist in the target table. Handling for DELETE events can be specified with the apply_as_deletes parameter.

To learn more about CDC processing with a change feed, see The APPLY CHANGES APIs: Simplify change data capture with DLT. For an example of using the apply_changes() function, see Example: SCD type 1 and SCD type 2 processing with CDF source data.

Parameters

Parameter Type Description
target str Required. The name of the table to be updated. You can use the create_streaming_table() function to create the target table before executing the apply_changes() function.
source str Required. The data source containing CDC records.
keys list Required. The column or combination of columns that uniquely identify a row in the source data. This is used to identify which CDC events apply to specific records in the target table. You can specify either:
  • A list of strings: ["userId", "orderId"]
  • A list of Spark SQL col() functions: [col("userId"), col("orderId"]. Arguments to col() functions cannot include qualifiers. For example, you can use col(userId), but you cannot use col(source.userId).
sequence_by str, col() or struct() Required. The column names specifying the logical order of CDC events in the source data. DLT uses this sequencing to handle change events that arrive out of order. The specified column must be a sortable data type. You can specify either:
  • A string: "sequenceNum"
  • A Spark SQL col() function: col("sequenceNum"). Arguments to col() functions cannot include qualifiers. For example, you can use col(userId), but you cannot use col(source.userId).
  • A struct() combining multiple columns to break ties: struct("timestamp_col", "id_col"), it will order by the first struct field first, then by the second field if there's a tie, and so on.
ignore_null_updates bool Allow ingesting updates containing a subset of the target columns. When a CDC event matches an existing row and ignore_null_updates is True, columns with a null retain their existing values in the target. This also applies to nested columns with a value of null. When ignore_null_updates is False, existing values are overwritten with null values.
The default is False.
apply_as_deletes str or expr() Specifies when a CDC event should be treated as a DELETE rather than an upsert. You can specify either:
  • A string: "Operation = 'DELETE'"
  • A Spark SQL expr() function: expr("Operation = 'DELETE'")

To handle out-of-order data, the deleted row is temporarily retained as a tombstone in the underlying Delta table, and a view is created in the metastore that filters out these tombstones. The retention interval can be configured with the pipelines.cdc.tombstoneGCThresholdInSeconds table property.
apply_as_truncates str or expr() Specifies when a CDC event should be treated as a full table TRUNCATE. You can specify either:
  • A string: "Operation = 'TRUNCATE'"
  • A Spark SQL expr() function: expr("Operation = 'TRUNCATE'")

Because this clause triggers a full truncate of the target table, it should be used only for specific use cases requiring this functionality. The apply_as_truncates parameter is supported only for SCD type 1. SCD type 2 does not support truncate operations.
column_list or except_column_list list A subset of columns to include in the target table. Use column_list to specify the complete list of columns to include. Use except_column_list to specify the columns to exclude. You can declare either value as a list of strings or as Spark SQL col() functions:
  • column_list = ["userId", "name", "city"]
  • column_list = [col("userId"), col("name"), col("city")]
  • except_column_list = ["operation", "sequenceNum"]
  • except_column_list = [col("operation"), col("sequenceNum")

Arguments to col() functions cannot include qualifiers. For example, you can use col(userId), but you cannot use col(source.userId). The default is to include all columns in the target table when no column_list or except_column_list argument is passed to the function.
stored_as_scd_type str or int Whether to store records as SCD type 1 or SCD type 2. Set to 1 for SCD type 1 or 2 for SCD type 2. The default is SCD type 1.
track_history_column_list or track_history_except_column_list list A subset of output columns to be tracked for history in the target table. Use track_history_column_list to specify the complete list of columns to be tracked. Use track_history_except_column_list to specify the columns to be excluded from tracking. You can declare either value as a list of strings or as Spark SQL col() functions:
  • track_history_column_list = ["userId", "name", "city"]
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

Arguments to col() functions cannot include qualifiers. For example, you can use col(userId), but you cannot use col(source.userId). The default is to include all columns in the target table when no track_history_column_list or track_history_except_column_list argument is passed to the function.