If we create the reporting of the dimensional and fact tables in the gold layer. Many teams can access those tables by creating the materialized views.
It's defining a materialized view named dlt_daily_pricing_materialized_view
.
Here's a breakdown of what the highlighted code is doing:
.format("json")
: Specifies that the data source is in JSON format..schema(...)
: Defines the schema of the JSON data being read. It lists the names and data types of the columns:ARRIVAL_IN_TONNES
: stringDATETIME_OF_PRICING
: stringMARKET_NAME
: stringMAXIMUM_PRICE
: doubleMINIMUM_PRICE
: doubleMODAL_PRICE
: doubleORIGIN_STATE
: stringDUCTGROUP_NAME
: stringPRODUCT_NAME
: stringROW_ID
: longSTATE_NAME
: stringVARIETY
: stringsource_stream_load_datetime
: string
.load("abfss://bronze@adlsudadatalakehousedev.dfs.core.windows.net/daily-pricing-streaming-source-data")
: Specifies the location of the JSON data to be read. This looks like an Azure Blob File System (ABFS) path.
After click the create pipeline following page appears.
After clicking all the details, click the create option to create the pipeline.
Go to the respective storage account and copy and paste the access keys in the JSON script to link the storage account.
The above SS shows the workflow. Here only one task was created because we have create only one materialized view
Let's break down the updated code:
@dlt.table(name = "dlt_daily_pricing_streaming_table")
: This decorator from Delta Live Tables defines a streaming table nameddlt_daily_pricing_streaming_table
. DLT will manage the creation and updates of this table.def create_streaming_table():
: This defines the Python function that contains the logic for creating the streaming table.return (
: This starts the Spark read stream definition.spark
: This refers to the SparkSession..readStream
: As we discussed before, this indicates a streaming data source..format("cloudFiles")
: This specifies the use of Auto Loader for efficient file discovery and processing..schema(...)
: This explicitly defines the schema of the data in the files being ingested. This is important because while Auto Loader can infer the schema, providing it explicitly ensures consistency and can prevent unexpected schema evolution issues..option("cloudFiles.format", "json")
: This option tells Auto Loader that the underlying files it will be discovering and processing are in JSON format..load("abfss://bronze@adlsudadatalakehousedev.dfs.core.windows.net/daily-pricing-streaming-source-data")
: This specifies the cloud storage path where Auto Loader will look for new JSON files to process.
This Delta Live Tables code leverages the Auto Loader feature (
.format("cloudFiles")
) to efficiently ingest streaming data. Unlike standard PySpark structured streaming, where you would typically need to manually configure checkpointing with the .option("checkpointLocation", "/path/to/checkpoints")
to track the processed files and manage state, Delta Live Tables with Auto Loader largely abstracts away this manual checkpoint management. DLT handles the necessary state management and tracking behind the scenes, simplifying the setup of robust streaming pipelines. You still benefit from the fault-tolerance and exactly-once processing guarantees that checkpointing provides, but DLT takes care of the configuration for you."how a streaming table created with the Auto Loader option works within Delta Live Tables!
Here's a breakdown to confirm your understanding:
- Incremental Load Detection: Because the streaming table is configured with
.format("cloudFiles")
(Auto Loader), it continuously monitors the source location (abfss://bronze@adlsudadatalakehousedev.dfs.core.windows.net/daily-pricing-streaming-source-data
in your example) for new files. - Automatic Streaming: When new files (in JSON format, as specified by
.option("cloudFiles.format", "json")
) land in the source location, Auto Loader automatically detects these new files and makes them available as a stream of data to your DLT pipeline. - Data Ingestion: The DLT pipeline then processes this stream of new data according to the transformations defined (though not shown in the snippet). This processed data is then ingested into the target streaming table (
dlt_daily_pricing_streaming_table
). - Materialized Table Update: If the streaming table (
dlt_daily_pricing_streaming_table
) is used as a source for another Delta Live Table, such as the materialized view (dlt_daily_pricing_materialized_view
) you showed earlier, any new data ingested into the streaming table will automatically be processed and reflected in the materialized view based on its definition.
In essence, the combination of a streaming table with Auto Loader in Delta Live Tables creates a continuously running pipeline that automatically ingests and processes new data as it arrives at the source, and any downstream tables (like materialized views) will be updated accordingly. This eliminates the need for manual triggering or scheduling of data ingestion for new files.
@dlt.table
def dlt_daily_pricing_streaming_trans_2():
return (
spark.sql("""SELECT * FROM LIVE.dlt_daily_pricing_streaming_table""")
)
@dlt.table
: Again, this marks it as a Delta Live Table definition.def dlt_daily_pricing_streaming_trans_2():
: This defines another DLT, likely another transformation step.return ( spark.sql("""SELECT * FROM LIVE.dlt_daily_pricing_streaming_table""") )
: This uses Spark SQL to query data from thedlt_daily_pricing_streaming_table
. Notice theLIVE.
prefix. In Delta Live Tables, tables defined within the pipeline are often referenced with theLIVE.
keyword. This ensures that the query operates on the managed tables within the DLT environment. TheSELECT *
indicates that all columns from the streaming table are being selected.
What this table does: Similar to the first transformation table, dlt_daily_pricing_streaming_trans_2
also reads all the data from the dlt_daily_pricing_streaming_table
. It achieves this using Spark SQL. Like the previous one, without any further SQL clauses (like WHERE
, GROUP BY
, etc.), it acts as a direct pass-through of the streaming data, potentially for further processing or to register it with a different name or for lineage tracking within the DLT pipeline.
In summary: Both dlt_daily_pricing_streaming_trans_1
and dlt_daily_pricing_streaming_trans_2
are reading the stream of data coming from the initial streaming table (dlt_daily_pricing_streaming_table
). They don't perform any immediate filtering or aggregation in these definitions. They likely represent intermediate steps in a more complex data processing pipeline within Delta Live Tables.
how Delta Live Tables handles data quality! This section demonstrates defining and applying data quality rules to your streaming data. Let's break down each part:
Code Block 1 (Defining Data Quality Rules):
dqRules = { "Valid_DATETIME_OF_PRICING": "DATETIME_OF_PRICING IS NOT NULL" }
@dlt.table
@dlt.expect_all_or_drop(dqRules)
def dlt_daily_pricing_streaming_trans_3():
return (
dlt.read_stream("dlt_daily_pricing_streaming_table")
.dropDuplicates(["DATETIME_OF_PRICING", "ROW_ID"])
)
dqRules = { "Valid_DATETIME_OF_PRICING": "DATETIME_OF_PRICING IS NOT NULL" }
: This Python dictionary defines a data quality rule."Valid_DATETIME_OF_PRICING"
: This is a name you give to the rule."DATETIME_OF_PRICING IS NOT NULL"
: This is the actual SQL expression that defines the data quality constraint. It checks if theDATETIME_OF_PRICING
column has a non-null value.
@dlt.table
: This decorator signifies the definition of a Delta Live Table.@dlt.expect_all_or_drop(dqRules)
: This is a DLT expectation. It applies the data quality rules defined in thedqRules
dictionary to the data flowing through this table (dlt_daily_pricing_streaming_trans_3
).expect_all_or_drop
: This specific expectation means that if a record violates any of the rules defined indqRules
, the entire record will be dropped from the output of this table.
def dlt_daily_pricing_streaming_trans_3():
: This defines the function for this DLT.return ( dlt.read_stream("dlt_daily_pricing_streaming_table") .dropDuplicates(["DATETIME_OF_PRICING", "ROW_ID"]) )
: This reads the stream from our initial streaming table and then applies adropDuplicates()
transformation based on the combined values of theDATETIME_OF_PRICING
andROW_ID
columns to remove any duplicate records.
In summary, dlt_daily_pricing_streaming_trans_3
reads the streaming data, removes duplicates based on the specified columns, and then applies a data quality check to ensure that the DATETIME_OF_PRICING
column is not null. Any record failing this check will be dropped.
Code Block 2 (Redirecting Bad Records):
badRecordsrules = {}
badRecordsrules["incorrect_record"] = f"NOT ({' AND '.join(dqRules.values())})"
@dlt.table
@dlt.expect_all_or_drop(badRecordsrules)
def dlt_daily_pricing_streaming_bad_records():
return (
dlt.read_stream("dlt_daily_pricing_streaming_table")
)
badRecordsrules = {}
: An empty dictionary is initialized.badRecordsrules["incorrect_record"] = f"NOT ({' AND '.join(dqRules.values())})"
: This dynamically creates a data quality rule to capture records that violate the rules defined indqRules
.dqRules.values()
: Gets the SQL expressions from thedqRules
dictionary.' AND '.join(...)
: Joins these expressions with theAND
operator.f"NOT (...)"
: Negates the combined condition. So, this rule will be true for records that do not satisfy all the conditions indqRules
.
@dlt.table
: Defines a Delta Live Table.@dlt.expect_all_or_drop(badRecordsrules)
: Applies the dynamically createdbadRecordsrules
. Any record that satisfies the "incorrect_record" condition (i.e., fails the originaldqRules
) will be dropped from this table's output.def dlt_daily_pricing_streaming_bad_records():
: Defines the function for this DLT.return ( dlt.read_stream("dlt_daily_pricing_streaming_table") )
This reads the stream from the initial streaming table.
However, there's a slight misunderstanding in how @dlt.expect_all_or_drop
it works for capturing bad records. The @dlt.expect_all_or_drop
The expectation will drop the records that violate the rule from the current table. To redirect bad records to a separate table, you would typically use a different expectation type, such as @dlt.expect_all_or_fail
in conjunction with DLT's error handling mechanisms, or by creating separate pipelines with complementary filtering logic.
A more common pattern for capturing bad records would involve:
- Using
@dlt.expect
(without_or_drop
or_or_fail
) to validate the data and track metrics on the validity. - Then, in a subsequent table, filtering the original streaming data based on the negation of the data quality rules to explicitly select the "bad" records.
The second code block you provided, using NOT (DATETIME_OF_PRICING IS NOT NULL)
, is on the right track for identifying bad records, but applying @dlt.expect_all_or_drop
here would simply drop those bad records from the dlt_daily_pricing_streaming_bad_records
table, resulting in an empty table for bad records.
To correctly redirect bad records, you might see a pattern like this (conceptual):
@dlt.table
@dlt.expect("valid_datetime", "DATETIME_OF_PRICING IS NOT NULL")
def dlt_daily_pricing_streaming_validated():
return dlt.read_stream("dlt_daily_pricing_streaming_table")
@dlt.table
def dlt_daily_pricing_bad_records():
return (
dlt.read_stream("dlt_daily_pricing_streaming_table").filter("NOT (DATETIME_OF_PRICING IS NOT NULL)")
)
In this conceptual example:
dlt_daily_pricing_streaming_validated
checks the data quality and potentially tracks metrics.dlt_daily_pricing_bad_records
explicitly filters the original stream to include only records whereDATETIME_OF_PRICING
is null.
The approach in your second code block is trying to achieve this by inverting the logic within an expectation that drops records, which isn't the typical way to redirect bad records in DLT.
he image shows code for automating Change Data Capture (CDC) using the APPLY CHANGES
API within a Databricks environment, likely using Delta Live Tables (DLT).
Here's a breakdown of what the code is doing based on the instructions and the Python snippet:
Instructions (Steps 1-4):
- Use
dlt.create_streaming_table()
To Define Target Table: This step involves creating the target Delta table where the changes will be applied. The data in this table will be updated based on the changes captured from the source. - Read the Source Delta Live Table using
spark.readStream.table()
: This step indicates that the source of the changes is another Delta Live Table that is likely capturing streaming data. - Use
dlt.apply_changes
to automatically perform the Change Data Capture between Source and Target Tables: This is the core of the CDC process. Thedlt.apply_changes
function handles identifying and applying changes (inserts, updates, deletes) from the source to the target table. - Need to configure
dlt.apply_changes
with keys and sequence_by columns to identify existing records and select changed records from the source using sequence by column: This highlights the crucial configuration parameters fordlt.apply_changes
.keys
: These columns uniquely identify records in both the source and target tables. They are used to match records for updates or to identify records that need to be inserted.sequence_by
: This column indicates the order of changes in the source data. It's essential for correctly applying updates and ensuring that the latest change is reflected in the target table.
Python Code Snippet:
@dlt.view
def dlt_daily_pricing_source_view():
return dlt.read_stream("dlt_daily_pricing_streaming_trans_3")
dlt.create_streaming_table("dlt_daily_pricing_streaming_trans_4")
dlt.apply_changes(
target = "dlt_daily_pricing_streaming_trans_4",
source = "dlt_daily_pricing_source_view",
keys = ["DATETIME_OF_PRICING", "ROW_ID"],
sequence_by = col("source_stream_load_datetime"),
stored_as_scd_type = "1"
)
Explanation of the Code:
@dlt.view
: This decorator defines a temporary, read-only view within the DLT pipeline.def dlt_daily_pricing_source_view():
: This defines a view nameddlt_daily_pricing_source_view
.return dlt.read_stream("dlt_daily_pricing_streaming_trans_3")
: This view reads a streaming table nameddlt_daily_pricing_streaming_trans_3
. This is the source of the changes.dlt.create_streaming_table("dlt_daily_pricing_streaming_trans_4")
: This creates a streaming Delta table nameddlt_daily_pricing_streaming_trans_4
. This is the target table where the changes will be applied.dlt.apply_changes(...)
: This is the core function for applying CDC.target = "dlt_daily_pricing_streaming_trans_4"
: Specifies the target Delta table.source = "dlt_daily_pricing_source_view"
: Specifies the source view containing the changes.keys = ["DATETIME_OF_PRICING", "ROW_ID"]
: Defines the columns that uniquely identify records. In this case, a combination ofDATETIME_OF_PRICING
andROW_ID
is used as the primary key.sequence_by = col("source_stream_load_datetime")
: Specifies the column (source_stream_load_datetime
) that indicates the order of changes in the source data. DLT will use this to determine the latest version of a record. Thecol()
function is likely used to refer to a column within the Spark DataFrame.stored_as_scd_type = "1"
: This parameter indicates that the target table should be maintained as a Type 1 Slowly Changing Dimension (SCD). In a Type 1 SCD, when a change occurs to an existing record, the old value is overwritten with the new value.
In summary, the code defines a DLT pipeline that reads streaming data from dlt_daily_pricing_streaming_trans_3
, applies changes to it based on the DATETIME_OF_PRICING
and ROW_ID
as keys and the source_stream_load_datetime
as the sequence, and updates the target streaming table dlt_daily_pricing_streaming_trans_4
using a Type 1 SCD approach. This automates the process of keeping the target table synchronized with the changes in the source table.
Comments
Post a Comment