Share via


table

The @table decorator can be used to define both materialized views and streaming tables. In Python, DLT determines whether to update a dataset as a materialized view or streaming table based on the defining query.

To define a materialized view in Python, apply @table to a query that performs a static read against a data source. To define a streaming table, apply @table to a query that performs a streaming read against a data source or use the create_streaming_table() function.

Syntax

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  schema="schema-definition",
  row_filter = "row-filter-clause",
  temporary=False)
@dlt.expect(...)
def <function-name>():
    return (<query>)

Parameters

@dlt.expect() is an optional DLT expectation clause. You can include multiple expectations. See Expectations.

Parameter Type Description
function function Required. A function that returns an Apache Spark DataFrame or streaming DataFrame from a user-defined query.
name str The table name. If not provided, defaults to the function name.
comment str A description for the table.
spark_conf dict A list of Spark configurations for the execution of this query
table_properties dict A dict of table properties for the table.
path str A storage location for table data. If not set, use the managed storage location for the schema containing the table.
partition_cols list A list of one or more columns to use for partitioning the table.
cluster_by list Enable liquid clustering on the table and define the columns to use as clustering keys. See Use liquid clustering for Delta tables.
schema str or StructType A schema definition for the table. Schemas can be defined as a SQL DDL string or with a Python StructType.
temporary bool Create a table, but do not publish the table to the metastore. That table is available to the pipeline but not be accessible outside the pipeline. Temporary tables persist for the lifetime of the pipeline.
The default is ‘False’.
row_filter str (Public Preview) A row filter clause for the table. See Publish tables with row filters and column masks.

Specifying a schema is optional and can be done with PySpark StructType or SQL DDL. When you specify a schema, you can optionally include generated columns, column masks, and primary and foreign keys. See:

Examples

import dlt

# Specify a schema
sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)
@dlt.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

# Specify a schema with SQL DDL, use a generated column, and set clustering columns
@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  cluster_by = ["order_day_of_week", "customer_id"])
def sales():
  return ("...")

# Specify partition columns
@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

# Specify table constraints
@dlt.table(
   schema="""
    customer_id STRING NOT NULL PRIMARY KEY,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
    CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
    """)
def sales():
   return ("...")

# Specify a row filter and column mask
@dlt.table(
   schema="""
    id int COMMENT 'This is the customer ID',
    name string COMMENT 'This is the customer full name',
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
    """,
  row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)")
def sales():
   return ("...")