Share via


Functions to define datasets

The dlt module implements much of its core functionality using decorators. These decorators accept a function that defines either a streaming or batch query and returns an Apache Spark DataFrame. The following syntax shows a simple example for definining a DLT dataset:

import dlt

@dlt.table()
def function_name(): # This is the function decorated
  return (<query>) # This is the query logic that defines the dataset

This page provides an overview of the functions and queries that define datasets in DLT. For a complete list of available decorators, see DLT developer reference.

The functions you use to define datasets should not include arbitrary Python logic unrelated to the dataset, including calls to third-party APIs. DLT runs these functions multiple times during planning, validation, and updates. Including arbitrary logic can lead to unexpected results.

Read data to begin a dataset definition

Functions used to define DLT datasets typically begin with a spark.read or spark.readStream operation. These read operations return a static or streaming DataFrame object you use to define additional transformations before returning the DataFrame. Other examples of spark operations that return a DataFrame include spark.table, or spark.range.

Functions should never reference DataFrames defined outside the function. Attempting to reference DataFrames defined at a different scope might result in unexpected behavior. For an example of a metaprogramming pattern for creating multiple tables, see Create tables in a for loop.

The following examples show the basic syntax for reading data using batch or streaming logic:

import dlt

# Batch read on a table
@dlt.table()
def function_name():
  return spark.read.table("catalog_name.schema_name.table_name")

# Batch read on a path
@dlt.table()
def function_name():
  return spark.read.format("parquet").load("/Volumes/catalog_name/schema_name/volume_name/data_path")


# Streaming read on a table
@dlt.table()
def function_name():
  return spark.readStream.table("catalog_name.schema_name.table_name")

# Streaming read on a path
@dlt.table()
def function_name():
  return (spark.read
    .format("cloudFiles")
    .option("cloudFile.format", "parquet")
    .load("/Volumes/catalog_name/schema_name/volume_name/data_path")
  )

If you need to read data from an external REST API, implement this connection using a Python custom data source. See PySpark custom data sources.

Note

It is possible to create arbitrary Apache Spark DataFrames from Python collections of data, including pandas DataFrames, dicts, and lists. These patterns might be useful during development and testing, but most production DLT dataset definitions should begin by load data from files, an external system, or an existing table or view.

Chaining transformations

DLT supports nearly all Apache Spark DataFrame transformations. You can include any number of transformations in your dataset definition function, but you should ensure that the methods you use always return a DataFrame object.

If you have an intermediary transformation that drives several downstream workloads but you don't need to materialize it as a table, use @dlt.view() to add a temporary view to your pipeline. You can then reference this view using spark.read.table("temp_view_name") in multiple downstream dataset definitions. The following syntax demonstrates this pattern:

import dlt

@dlt.view()
def a():
  return spark.read.table("source").filter(...)

@dlt.table()
def b():
  return spark.read.table("b").groupBy(...)

@dlt.table()
def c():
  return spark.read.table("c").groupBy(...)

This ensures that DLT has full awareness of the transformations in your view during pipeline planning and prevents potential issues related to arbitrary Python code running outside dataset definitions.

Within your function, you can chain DataFrames together to make new DataFrames without writing incremental results as views, materialized views, or streaming tables, as in the following example:

import dlt

@dlt.table()
def multiple_transformations():
  df1 = spark.read.table("source").filter(...)
  df2 = df1.groupBy(...)
  return df2.filter(...)

If all your DataFrames perform their initial reads using batch logic, your return result is a static DataFrame. If you have any queries that are streaming, your return result is a streaming DataFrame.

Return a DataFrame

For the @dlt.table() decorator, returning a static DataFrame means that you are defining a materialized view. Returning a streaming DataFrame means that you are defining a streaming table. Most decorators work on both streaming and static DataFrames, while others require a streaming DataFrame.

The function used to define a dataset must return a Spark DataFrame. Never use methods that save or write to files or tables as part of your DLT dataset code.

Examples of Apache Spark operations that should never be used in DLT code:

  • collect()
  • count()
  • toPandas()
  • save()
  • saveAsTable()
  • start()
  • toTable()

Note

DLT also supports using Pandas on Spark for dataset definition functions. See Pandas API on Spark.

Use SQL in a Python pipeline

PySpark supports the spark.sql operator to write DataFrame code using SQL. When you use this pattern in DLT source code, it compiles to materialized views or streaming tables.

The following code example is equivalent to using spark.read.table("catalog_name.schema_name.table_name") for the dataset query logic:

@dlt.table
def my_table():
  return spark.sql("SELECT * FROM catalog_name.schema_name.table_name")

dlt.read and dlt.read_stream (legacy)

The dlt module includes dlt.read() and dlt.read_stream() functions that were introduced to support functionality in the legacy pipeline publishing mode. These methods are supported, but Databricks recommends always using the spark.read.table() and spark.readStream.table() functions because of the following:

  • The dlt functions have limited support for reading datasets defined outside of the current pipeline.
  • The spark functions support specifying options, such as skipChangeCommits, to read operations. Specifying options is not supported by the dlt functions.