Skip to main content

Azure Delta live Tables - session 12

 







If we create the reporting of the dimensional and fact tables in the gold layer. Many teams can access those tables by creating the materialized views.

A materialized view is a database object that contains the results of a query. Unlike a regular (or virtual) view, which runs the underlying query every time it's accessed, a materialized view stores the results of the query on disk (or in memory, depending on the database system).

create the notebook as usual and write the query , inorder to define its delta live table as @dlt. table 






 It's defining a materialized view named dlt_daily_pricing_materialized_view.

Here's a breakdown of what the highlighted code is doing:

  • .format("json"): Specifies that the data source is in JSON format.
  • .schema(...): Defines the schema of the JSON data being read. It lists the names and data types of the columns:
    • ARRIVAL_IN_TONNES: string
    • DATETIME_OF_PRICING: string
    • MARKET_NAME: string
    • MAXIMUM_PRICE: double
    • MINIMUM_PRICE: double
    • MODAL_PRICE: double
    • ORIGIN_STATE: string
    • DUCTGROUP_NAME: string
    • PRODUCT_NAME: string
    • ROW_ID: long
    • STATE_NAME: string
    • VARIETY: string
    • source_stream_load_datetime: string
  • .load("abfss://bronze@adlsudadatalakehousedev.dfs.core.windows.net/daily-pricing-streaming-source-data"): Specifies the location of the JSON data to be read. This looks like an Azure Blob File System (ABFS) path.
After creating the table, we can't run it directly. For that, we need to create the pipeline.


Go to Delta live tables under Data engineering Clicking create pipeline 



After click the create pipeline following page appears.




in default the destination will be default, but while working in the real time we have to change into unity catalog 




After clicking all the details, click the create option to create the pipeline.


Thus, the delta line table pipeline was created.

Now we need to integrate the Azure storage blob account into the create pipeline of Delta Live Tables.
after the pipeline was created go to settings .



the above SS is showing to represent how we have connect the pyspark notebook to the storage account 

pipeline settings will be default it will be UI change into JSON 

Go to the respective storage account and copy and paste the access keys in the JSON script to link the storage account.






Run the pipeline to test the code is working properly .


The above SS shows the workflow. Here only one task was created because we have create only one materialized view 




After checking on the pipeline fo to catalog there we can see the table was created and we can edit in by using the Query option available under the open in the dashboard











Let's break down the updated code:

  • @dlt.table(name = "dlt_daily_pricing_streaming_table"): This decorator from Delta Live Tables defines a streaming table named dlt_daily_pricing_streaming_table. DLT will manage the creation and updates of this table.
  • def create_streaming_table():: This defines the Python function that contains the logic for creating the streaming table.
  • return (: This starts the Spark read stream definition.
  • spark: This refers to the SparkSession.
  • .readStream: As we discussed before, this indicates a streaming data source.
  • .format("cloudFiles"): This specifies the use of Auto Loader for efficient file discovery and processing.
  • .schema(...): This explicitly defines the schema of the data in the files being ingested. This is important because while Auto Loader can infer the schema, providing it explicitly ensures consistency and can prevent unexpected schema evolution issues.
  • .option("cloudFiles.format", "json"): This option tells Auto Loader that the underlying files it will be discovering and processing are in JSON format.
  • .load("abfss://bronze@adlsudadatalakehousedev.dfs.core.windows.net/daily-pricing-streaming-source-data"): This specifies the cloud storage path where Auto Loader will look for new JSON files to process.



This Delta Live Tables code leverages the Auto Loader feature (.format("cloudFiles")) to efficiently ingest streaming data. Unlike standard PySpark structured streaming, where you would typically need to manually configure checkpointing with the .option("checkpointLocation", "/path/to/checkpoints") to track the processed files and manage state, Delta Live Tables with Auto Loader largely abstracts away this manual checkpoint management. DLT handles the necessary state management and tracking behind the scenes, simplifying the setup of robust streaming pipelines. You still benefit from the fault-tolerance and exactly-once processing guarantees that checkpointing provides, but DLT takes care of the configuration for you."


rerun the pipeline to test the streaming table
 



Here we can see that two tasks were created under the pipeline, and we can see the details of the table as a streaming table.


how a streaming table created with the Auto Loader option works within Delta Live Tables!

Here's a breakdown to confirm your understanding:

  1. Incremental Load Detection: Because the streaming table is configured with .format("cloudFiles") (Auto Loader), it continuously monitors the source location (abfss://bronze@adlsudadatalakehousedev.dfs.core.windows.net/daily-pricing-streaming-source-data in your example) for new files.
  2. Automatic Streaming: When new files (in JSON format, as specified by .option("cloudFiles.format", "json")) land in the source location, Auto Loader automatically detects these new files and makes them available as a stream of data to your DLT pipeline.
  3. Data Ingestion: The DLT pipeline then processes this stream of new data according to the transformations defined (though not shown in the snippet). This processed data is then ingested into the target streaming table (dlt_daily_pricing_streaming_table).
  4. Materialized Table Update: If the streaming table (dlt_daily_pricing_streaming_table) is used as a source for another Delta Live Table, such as the materialized view (dlt_daily_pricing_materialized_view) you showed earlier, any new data ingested into the streaming table will automatically be processed and reflected in the materialized view based on its definition.

In essence, the combination of a streaming table with Auto Loader in Delta Live Tables creates a continuously running pipeline that automatically ingests and processes new data as it arrives at the source, and any downstream tables (like materialized views) will be updated accordingly. This eliminates the need for manual triggering or scheduling of data ingestion for new files.




Python
@dlt.table
def dlt_daily_pricing_streaming_trans_2():
  return (
    spark.sql("""SELECT * FROM LIVE.dlt_daily_pricing_streaming_table""")
  )
  • @dlt.table: Again, this marks it as a Delta Live Table definition.
  • def dlt_daily_pricing_streaming_trans_2():: This defines another DLT, likely another transformation step.
  • return ( spark.sql("""SELECT * FROM LIVE.dlt_daily_pricing_streaming_table""") ): This uses Spark SQL to query data from the dlt_daily_pricing_streaming_table. Notice the LIVE. prefix. In Delta Live Tables, tables defined within the pipeline are often referenced with the LIVE. keyword. This ensures that the query operates on the managed tables within the DLT environment. The SELECT * indicates that all columns from the streaming table are being selected.

What this table does: Similar to the first transformation table, dlt_daily_pricing_streaming_trans_2 also reads all the data from the dlt_daily_pricing_streaming_table. It achieves this using Spark SQL. Like the previous one, without any further SQL clauses (like WHERE, GROUP BY, etc.), it acts as a direct pass-through of the streaming data, potentially for further processing or to register it with a different name or for lineage tracking within the DLT pipeline.

In summary: Both dlt_daily_pricing_streaming_trans_1 and dlt_daily_pricing_streaming_trans_2 are reading the stream of data coming from the initial streaming table (dlt_daily_pricing_streaming_table). They don't perform any immediate filtering or aggregation in these definitions. They likely represent intermediate steps in a more complex data processing pipeline within Delta Live Tables.



 how Delta Live Tables handles data quality! This section demonstrates defining and applying data quality rules to your streaming data. Let's break down each part:

Code Block 1 (Defining Data Quality Rules):

Python
dqRules = { "Valid_DATETIME_OF_PRICING": "DATETIME_OF_PRICING IS NOT NULL" }

@dlt.table
@dlt.expect_all_or_drop(dqRules)
def dlt_daily_pricing_streaming_trans_3():
  return (
    dlt.read_stream("dlt_daily_pricing_streaming_table")
    .dropDuplicates(["DATETIME_OF_PRICING", "ROW_ID"])
  )
  • dqRules = { "Valid_DATETIME_OF_PRICING": "DATETIME_OF_PRICING IS NOT NULL" }: This Python dictionary defines a data quality rule.
    • "Valid_DATETIME_OF_PRICING": This is a name you give to the rule.
    • "DATETIME_OF_PRICING IS NOT NULL": This is the actual SQL expression that defines the data quality constraint. It checks if the DATETIME_OF_PRICING column has a non-null value.
  • @dlt.table: This decorator signifies the definition of a Delta Live Table.
  • @dlt.expect_all_or_drop(dqRules): This is a DLT expectation. It applies the data quality rules defined in the dqRules dictionary to the data flowing through this table (dlt_daily_pricing_streaming_trans_3).
    • expect_all_or_drop: This specific expectation means that if a record violates any of the rules defined in dqRules, the entire record will be dropped from the output of this table.
  • def dlt_daily_pricing_streaming_trans_3():: This defines the function for this DLT.
  • return ( dlt.read_stream("dlt_daily_pricing_streaming_table") .dropDuplicates(["DATETIME_OF_PRICING", "ROW_ID"]) ): This reads the stream from our initial streaming table and then applies a dropDuplicates() transformation based on the combined values of the DATETIME_OF_PRICING and ROW_ID columns to remove any duplicate records.

In summary, dlt_daily_pricing_streaming_trans_3 reads the streaming data, removes duplicates based on the specified columns, and then applies a data quality check to ensure that the DATETIME_OF_PRICING column is not null. Any record failing this check will be dropped.

Code Block 2 (Redirecting Bad Records):

Python
badRecordsrules = {}
badRecordsrules["incorrect_record"] = f"NOT ({' AND '.join(dqRules.values())})"

@dlt.table
@dlt.expect_all_or_drop(badRecordsrules)
def dlt_daily_pricing_streaming_bad_records():
  return (
    dlt.read_stream("dlt_daily_pricing_streaming_table")
  )
  • badRecordsrules = {}: An empty dictionary is initialized.
  • badRecordsrules["incorrect_record"] = f"NOT ({' AND '.join(dqRules.values())})": This dynamically creates a data quality rule to capture records that violate the rules defined in dqRules.
    • dqRules.values(): Gets the SQL expressions from the dqRules dictionary.
    • ' AND '.join(...): Joins these expressions with the AND operator.
    • f"NOT (...)": Negates the combined condition. So, this rule will be true for records that do not satisfy all the conditions in dqRules.
  • @dlt.table: Defines a Delta Live Table.
  • @dlt.expect_all_or_drop(badRecordsrules): Applies the dynamically created badRecordsrules. Any record that satisfies the "incorrect_record" condition (i.e., fails the original dqRules) will be dropped from this table's output.
  • def dlt_daily_pricing_streaming_bad_records():: Defines the function for this DLT.
  • return ( dlt.read_stream("dlt_daily_pricing_streaming_table") )This reads the stream from the initial streaming table.

However, there's a slight misunderstanding in how @dlt.expect_all_or_drop it works for capturing bad records. The @dlt.expect_all_or_drop The expectation will drop the records that violate the rule from the current table. To redirect bad records to a separate table, you would typically use a different expectation type, such as @dlt.expect_all_or_fail in conjunction with DLT's error handling mechanisms, or by creating separate pipelines with complementary filtering logic.

A more common pattern for capturing bad records would involve:

  1. Using @dlt.expect (without _or_drop or _or_fail) to validate the data and track metrics on the validity.
  2. Then, in a subsequent table, filtering the original streaming data based on the negation of the data quality rules to explicitly select the "bad" records.

The second code block you provided, using NOT (DATETIME_OF_PRICING IS NOT NULL), is on the right track for identifying bad records, but applying @dlt.expect_all_or_drop here would simply drop those bad records from the dlt_daily_pricing_streaming_bad_records table, resulting in an empty table for bad records.

To correctly redirect bad records, you might see a pattern like this (conceptual):

Python
@dlt.table
@dlt.expect("valid_datetime", "DATETIME_OF_PRICING IS NOT NULL")
def dlt_daily_pricing_streaming_validated():
  return dlt.read_stream("dlt_daily_pricing_streaming_table")

@dlt.table
def dlt_daily_pricing_bad_records():
  return (
    dlt.read_stream("dlt_daily_pricing_streaming_table").filter("NOT (DATETIME_OF_PRICING IS NOT NULL)")
  )

In this conceptual example:

  • dlt_daily_pricing_streaming_validated checks the data quality and potentially tracks metrics.
  • dlt_daily_pricing_bad_records explicitly filters the original stream to include only records where DATETIME_OF_PRICING is null.

The approach in your second code block is trying to achieve this by inverting the logic within an expectation that drops records, which isn't the typical way to redirect bad records in DLT.




he image shows code for automating Change Data Capture (CDC) using the APPLY CHANGES API within a Databricks environment, likely using Delta Live Tables (DLT).

Here's a breakdown of what the code is doing based on the instructions and the Python snippet:

Instructions (Steps 1-4):

  1. Use dlt.create_streaming_table() To Define Target Table: This step involves creating the target Delta table where the changes will be applied. The data in this table will be updated based on the changes captured from the source.
  2. Read the Source Delta Live Table using spark.readStream.table(): This step indicates that the source of the changes is another Delta Live Table that is likely capturing streaming data.
  3. Use dlt.apply_changes to automatically perform the Change Data Capture between Source and Target Tables: This is the core of the CDC process. The dlt.apply_changes function handles identifying and applying changes (inserts, updates, deletes) from the source to the target table.
  4. Need to configure dlt.apply_changes with keys and sequence_by columns to identify existing records and select changed records from the source using sequence by column: This highlights the crucial configuration parameters for dlt.apply_changes.
    • keys: These columns uniquely identify records in both the source and target tables. They are used to match records for updates or to identify records that need to be inserted.
    • sequence_by: This column indicates the order of changes in the source data. It's essential for correctly applying updates and ensuring that the latest change is reflected in the target table.

Python Code Snippet:

Python
@dlt.view
def dlt_daily_pricing_source_view():
    return dlt.read_stream("dlt_daily_pricing_streaming_trans_3")

dlt.create_streaming_table("dlt_daily_pricing_streaming_trans_4")

dlt.apply_changes(
    target = "dlt_daily_pricing_streaming_trans_4",
    source = "dlt_daily_pricing_source_view",
    keys = ["DATETIME_OF_PRICING", "ROW_ID"],
    sequence_by = col("source_stream_load_datetime"),
    stored_as_scd_type = "1"
)

Explanation of the Code:

  • @dlt.view: This decorator defines a temporary, read-only view within the DLT pipeline.
  • def dlt_daily_pricing_source_view():: This defines a view named dlt_daily_pricing_source_view.
  • return dlt.read_stream("dlt_daily_pricing_streaming_trans_3"): This view reads a streaming table named dlt_daily_pricing_streaming_trans_3. This is the source of the changes.
  • dlt.create_streaming_table("dlt_daily_pricing_streaming_trans_4"): This creates a streaming Delta table named dlt_daily_pricing_streaming_trans_4. This is the target table where the changes will be applied.
  • dlt.apply_changes(...): This is the core function for applying CDC.
    • target = "dlt_daily_pricing_streaming_trans_4": Specifies the target Delta table.
    • source = "dlt_daily_pricing_source_view": Specifies the source view containing the changes.
    • keys = ["DATETIME_OF_PRICING", "ROW_ID"]: Defines the columns that uniquely identify records. In this case, a combination of DATETIME_OF_PRICING and ROW_ID is used as the primary key.
    • sequence_by = col("source_stream_load_datetime"): Specifies the column (source_stream_load_datetime) that indicates the order of changes in the source data. DLT will use this to determine the latest version of a record. The col() function is likely used to refer to a column within the Spark DataFrame.
    • stored_as_scd_type = "1": This parameter indicates that the target table should be maintained as a Type 1 Slowly Changing Dimension (SCD). In a Type 1 SCD, when a change occurs to an existing record, the old value is overwritten with the new value.

In summary, the code defines a DLT pipeline that reads streaming data from dlt_daily_pricing_streaming_trans_3, applies changes to it based on the DATETIME_OF_PRICING and ROW_ID as keys and the source_stream_load_datetime as the sequence, and updates the target streaming table dlt_daily_pricing_streaming_trans_4 using a Type 1 SCD approach. This automates the process of keeping the target table synchronized with the changes in the source table.






Comments

Popular posts from this blog

session 19 Git Repository

  🔁 Steps to Create a Branch in Databricks, Pull from Git, and Merge into a Collaborative Branch Create a New Branch in Databricks: Go to the Repos tab in your workspace. Navigate to the Git-linked repo. Click the Git icon (or three dots ⋮) and choose "Create Branch." Give your branch a name (e.g., feature-xyz ) and confirm. Pull the Latest Changes from Git: With your new branch selected, click the Git icon again. Select “Pull” to bring the latest updates from the remote repository into your local Databricks environment. Make Changes & Commit: Edit notebooks or files as needed in your branch. Use the "Commit & Push" option to push changes to the remote repo. Merge into the Collaborative Branch: Switch to the collaborative branch (e.g., dev or main ) in Git or from the Databricks UI. Click "Pull & Merge" . Choose the branch you want to merge into the collaborative branch. Review the c...

Session 18 monitering and logging - Azure Monitor , Log analytics , and job notification

 After developing the code, we deploy it into the production environment. To monitor and logging the jobs run in the real time systems in azure  we have scheduled the jobs under the workflow , we haven't created any monitoring or any matrics . After a few times, the job failed, but we don't know because we haven't set up any monitoring, and every time we can't navigate to workspace-> workflows, under runs to see to check whether the job has been successfully running or not and in real time there will be nearly 100 jobs or more jobs to run  In real time, the production support team will monitor the process. Under the workflow, there is an option called Job notification. After setting the job notification, we can set a notification to email . if we click the date and time its takes us to the notebook which is scheduled there we can able to see the error where it happens . order to see more details, we need to under Spark tab, where we have the option to view logs ( tha...

Transformation - section 6 - data flow

  Feature from Slide Explanation ✅ Code-free data transformations Data Flows in ADF allow you to build transformations using a drag-and-drop visual interface , with no need for writing Spark or SQL code. ✅ Executed on Data Factory-managed Databricks Spark clusters Internally, ADF uses Azure Integration Runtimes backed by Apache Spark clusters , managed by ADF, not Databricks itself . While it's similar in concept, this is not the same as your own Databricks workspace . ✅ Benefits from ADF scheduling and monitoring Data Flows are fully integrated into ADF pipelines, so you get all the orchestration, parameterization, logging, and alerting features of ADF natively. ⚠️ Important Clarification Although it says "executed on Data Factory managed Databricks Spark clusters," this does not mean you're using your own Azure Databricks workspace . Rather: ADF Data Flows run on ADF-managed Spark clusters. Azure Databricks notebooks (which you trigger via an "Exe...