Share via


append_flow

The @dlt.append_flow decorator creates append flows or backfills for your DLT tables. The function must return an Apache Spark streaming DataFrame. See Load and process data incrementally with DLT flows.

Append flows can target streaming tables or sinks.

Syntax

import dlt

dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dlt.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment>") # optional
def <function-name>():
  return (<streaming query>)

Parameters

Parameter Type Description
function function Required. A function that returns an Apache Spark streaming DataFrame from a user-defined query.
target str Required. The name of the table or sink that is the target of the append flow.
name str The flow name. If not provided, defaults to the function name.
comment str A description for the flow.
spark_conf dict A list of Spark configurations for the execution of this query

Examples

import dlt

# Create a sink for an external Delta table
dlt.create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})

# Add an append flow to an external Delta table
@dlt.append_flow(name = "flow", target = "my_sink")
def flowFunc():
  return <streaming-query>

# Create a Kafka sink
dlt.create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

# Add an append flow to a Kafka sink
@dlt.append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
  return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))