Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
The dlt
module implements much of its core functionality using decorators. These decorators accept a function that defines either a streaming or batch query and returns an Apache Spark DataFrame. The following syntax shows a simple example for definining a DLT dataset:
import dlt
@dlt.table()
def function_name(): # This is the function decorated
return (<query>) # This is the query logic that defines the dataset
This page provides an overview of the functions and queries that define datasets in DLT. For a complete list of available decorators, see DLT developer reference.
The functions you use to define datasets should not include arbitrary Python logic unrelated to the dataset, including calls to third-party APIs. DLT runs these functions multiple times during planning, validation, and updates. Including arbitrary logic can lead to unexpected results.
Read data to begin a dataset definition
Functions used to define DLT datasets typically begin with a spark.read
or spark.readStream
operation. These read operations return a static or streaming DataFrame object you use to define additional transformations before returning the DataFrame. Other examples of spark operations that return a DataFrame include spark.table
, or spark.range
.
Functions should never reference DataFrames defined outside the function. Attempting to reference DataFrames defined at a different scope might result in unexpected behavior. For an example of a metaprogramming pattern for creating multiple tables, see Create tables in a for
loop.
The following examples show the basic syntax for reading data using batch or streaming logic:
import dlt
# Batch read on a table
@dlt.table()
def function_name():
return spark.read.table("catalog_name.schema_name.table_name")
# Batch read on a path
@dlt.table()
def function_name():
return spark.read.format("parquet").load("/Volumes/catalog_name/schema_name/volume_name/data_path")
# Streaming read on a table
@dlt.table()
def function_name():
return spark.readStream.table("catalog_name.schema_name.table_name")
# Streaming read on a path
@dlt.table()
def function_name():
return (spark.read
.format("cloudFiles")
.option("cloudFile.format", "parquet")
.load("/Volumes/catalog_name/schema_name/volume_name/data_path")
)
If you need to read data from an external REST API, implement this connection using a Python custom data source. See PySpark custom data sources.
Note
It is possible to create arbitrary Apache Spark DataFrames from Python collections of data, including pandas DataFrames, dicts, and lists. These patterns might be useful during development and testing, but most production DLT dataset definitions should begin by load data from files, an external system, or an existing table or view.
Chaining transformations
DLT supports nearly all Apache Spark DataFrame transformations. You can include any number of transformations in your dataset definition function, but you should ensure that the methods you use always return a DataFrame object.
If you have an intermediary transformation that drives several downstream workloads but you don't need to materialize it as a table, use @dlt.view()
to add a temporary view to your pipeline. You can then reference this view using spark.read.table("temp_view_name")
in multiple downstream dataset definitions. The following syntax demonstrates this pattern:
import dlt
@dlt.view()
def a():
return spark.read.table("source").filter(...)
@dlt.table()
def b():
return spark.read.table("b").groupBy(...)
@dlt.table()
def c():
return spark.read.table("c").groupBy(...)
This ensures that DLT has full awareness of the transformations in your view during pipeline planning and prevents potential issues related to arbitrary Python code running outside dataset definitions.
Within your function, you can chain DataFrames together to make new DataFrames without writing incremental results as views, materialized views, or streaming tables, as in the following example:
import dlt
@dlt.table()
def multiple_transformations():
df1 = spark.read.table("source").filter(...)
df2 = df1.groupBy(...)
return df2.filter(...)
If all your DataFrames perform their initial reads using batch logic, your return result is a static DataFrame. If you have any queries that are streaming, your return result is a streaming DataFrame.
Return a DataFrame
For the @dlt.table()
decorator, returning a static DataFrame means that you are defining a materialized view. Returning a streaming DataFrame means that you are defining a streaming table. Most decorators work on both streaming and static DataFrames, while others require a streaming DataFrame.
The function used to define a dataset must return a Spark DataFrame. Never use methods that save or write to files or tables as part of your DLT dataset code.
Examples of Apache Spark operations that should never be used in DLT code:
collect()
count()
toPandas()
save()
saveAsTable()
start()
toTable()
Note
DLT also supports using Pandas on Spark for dataset definition functions. See Pandas API on Spark.
Use SQL in a Python pipeline
PySpark supports the spark.sql
operator to write DataFrame code using SQL. When you use this pattern in DLT source code, it compiles to materialized views or streaming tables.
The following code example is equivalent to using spark.read.table("catalog_name.schema_name.table_name")
for the dataset query logic:
@dlt.table
def my_table():
return spark.sql("SELECT * FROM catalog_name.schema_name.table_name")
dlt.read
and dlt.read_stream
(legacy)
The dlt
module includes dlt.read()
and dlt.read_stream()
functions that were introduced to support functionality in the legacy pipeline publishing mode. These methods are supported, but Databricks recommends always using the spark.read.table()
and spark.readStream.table()
functions because of the following:
- The
dlt
functions have limited support for reading datasets defined outside of the current pipeline. - The
spark
functions support specifying options, such asskipChangeCommits
, to read operations. Specifying options is not supported by thedlt
functions.