Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
The @dlt.append_flow
decorator creates append flows or backfills for your DLT tables. The function must return an Apache Spark streaming DataFrame. See Load and process data incrementally with DLT flows.
Append flows can target streaming tables or sinks.
Syntax
import dlt
dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.
@dlt.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming query>)
Parameters
Parameter | Type | Description |
---|---|---|
function | function |
Required. A function that returns an Apache Spark streaming DataFrame from a user-defined query. |
target |
str |
Required. The name of the table or sink that is the target of the append flow. |
name |
str |
The flow name. If not provided, defaults to the function name. |
comment |
str |
A description for the flow. |
spark_conf |
dict |
A list of Spark configurations for the execution of this query |
Examples
import dlt
# Create a sink for an external Delta table
dlt.create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})
# Add an append flow to an external Delta table
@dlt.append_flow(name = "flow", target = "my_sink")
def flowFunc():
return <streaming-query>
# Create a Kafka sink
dlt.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
# Add an append flow to a Kafka sink
@dlt.append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))