Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
The @table
decorator can be used to define both materialized views and streaming tables. In Python, DLT determines whether to update a dataset as a materialized view or streaming table based on the defining query.
To define a materialized view in Python, apply @table
to a query that performs a static read against a data source. To define a streaming table, apply @table
to a query that performs a streaming read against a data source or use the create_streaming_table() function.
Syntax
import dlt
@dlt.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
temporary=False)
@dlt.expect(...)
def <function-name>():
return (<query>)
Parameters
@dlt.expect()
is an optional DLT expectation clause. You can include multiple expectations. See Expectations.
Parameter | Type | Description |
---|---|---|
function | function |
Required. A function that returns an Apache Spark DataFrame or streaming DataFrame from a user-defined query. |
name |
str |
The table name. If not provided, defaults to the function name. |
comment |
str |
A description for the table. |
spark_conf |
dict |
A list of Spark configurations for the execution of this query |
table_properties |
dict |
A dict of table properties for the table. |
path |
str |
A storage location for table data. If not set, use the managed storage location for the schema containing the table. |
partition_cols |
list |
A list of one or more columns to use for partitioning the table. |
cluster_by |
list |
Enable liquid clustering on the table and define the columns to use as clustering keys. See Use liquid clustering for Delta tables. |
schema |
str or StructType |
A schema definition for the table. Schemas can be defined as a SQL DDL string or with a Python StructType . |
temporary |
bool |
Create a table, but do not publish the table to the metastore. That table is available to the pipeline but not be accessible outside the pipeline. Temporary tables persist for the lifetime of the pipeline. The default is ‘False’. |
row_filter |
str |
(Public Preview) A row filter clause for the table. See Publish tables with row filters and column masks. |
Specifying a schema is optional and can be done with PySpark StructType
or SQL DDL. When you specify a schema, you can optionally include generated columns, column masks, and primary and foreign keys. See:
- Delta Lake generated columns
- Constraints on Azure Databricks
- Publish tables with row filters and column masks.
Examples
import dlt
# Specify a schema
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
# Specify a schema with SQL DDL, use a generated column, and set clustering columns
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
cluster_by = ["order_day_of_week", "customer_id"])
def sales():
return ("...")
# Specify partition columns
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
# Specify table constraints
@dlt.table(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
""")
def sales():
return ("...")
# Specify a row filter and column mask
@dlt.table(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)")
def sales():
return ("...")