Skip to main content

Ingest source data from web service HTTP in azure

 



Let us create a new notebook to injest the data copy and paste the URL for the CSV from the pyspark introduction, and we can modify the modularize code.( modularizing the code means breaking the code into smaller pieces to manage them easier 





sourceFileURL = 'https://retailpricing.blob.core.windows.net/labs/lab1/PW_MW_DR_01012023.csv'
bronzelayerCSVFilePath = 'abfss://workinglabs@ssw.dfs.core.windows.net/bronze/daily-pricing/csv

dividing the URL into 3 segments since we are going to concat because in real tile project within one day many file will arrive

'
dailyPricingSourceBaseURL = 'https://retailpricing.blob.core.windows.net/'
dailyPricingSourceFolder = 'daily-pricing/'
#daiilyPricingSourceFileDate = datetime.strptime(str(nextSourceFileDateDF.select('NEXT_SOURCE_FILE_DATE').collect()[0]['NEXT_SOURCE_FILE_DATE']),'%Y-%m-%d').strftime('%m%d%Y')
#daiilyPricingSourceFileName = f"PW_MW_DR_{daiilyPricingSourceFileDate}.csv"
daiilyPricingSourceFileName = 'PW_MW_DR_01012023.csv'

daiilyPricingSinkLayerName = 'bronze'
daiilyPricingSinkStorageAccountName = 'ssw'
daiilyPricingSinkFolderName =  'pricinganalyisis


daily_pricingsourceURL = dailyPricingSourceBaseURL+dailyPricingSourceFolder+daiilyPricingSourceFileName
print(daily_pricingsourceURL)

import the pandas to read csv file 


import pandas as pd
dailyPricingPandasDF =pd.read_csv(daily_pricingsourceURL)
converting into spark dataframe 

dailypricingSparkDF=spark.createDataFrame(dailyPricingPandasDF

Now we are gicing the path inside the azure to write the source data

daiilyPricingSinkLayerFolderPath = f"abfss://{daiilyPricingSinkLayerName}@{daiilyPricingSinkStorageAccountName}.dfs.core.windows.net/{daiilyPricingSinkFolderName}"
dailypricingSparkDF.write.mode("overwrite").option ('header', 'True').csv(daiilyPricingSinkLayerFolderPath )


to understand dbutils 


dbutils.widgets.text('prm_daily_pricing_date','')

if we give dbutils.text new text box appears as we mentioned below.


It's just created the box in order to ingest the value we enter in that text box; we have to give a value using widgets  get method.


dbutils.widgets.get('prm_daily_pricing_date')
#daiilyPricingSourceFileDate = datetime.strptime(str(nextSourceFileDateDF.select('NEXT_SOURCE_FILE_DATE').collect()[0]['NEXT_SOURCE_FILE_DATE']),'%Y-%m-%d').strftime('%m%d%Y')
daiilyPricingSourceDate = f"PW_MW_DR_{daiilyPricingSourceFileName}.csv"
daiilyPricingSourceFileName = 'PW_MW_DR_01012023.csv'

To give input to the text box, we have passed the input date from the URL using Python string formatting. The input value in the text box will be given while orchestration .

if the date format is different ( for that we have use python we have date and time function)

in python datetime.strptime to convert the string to date datatype because dbutils .text supports only string 
daiilyPricingSourceFileDate = datetime.strptime(str(daiilyPricingSourceFileName.strip().split('_')[-1].split('.')[0]),'%m%d%Y').strftime('%Y-%m-%d')


The data is properly ingested into the bronze layer.



Incremental load 

In many case, we have the column to identify the incremental load in this case we donthave any column to identify but we have that in our source URl.

CDC( Capture data Change)

  • CDC is about capturing data changes, not full datasets.
  • It's more efficient than batch processing.
  • It enables real-time or near-real-time data movement.
  • It's used in various data integration scenarios.






  • Create the  table to store  the input data to capture all the date 


    after creating the table insert the values inside the table 

    To check the data was inserted .

    just to define how the next incremental data will be 




     We know we have included what the next incremental load would be, now we are going to refactor the code (which means update the existing code ).

    While refactoring, always create a copy of the code.


    Then delete the unwanted print statement , dbutils.help cell , remove the unwanted dbutils widgets by just adding dbutils.widgets.remove() or remove the parameter related to that parameter 





    To incremental data works we need to change the write function option as append instead
    of override

    Since this sql cell needs to be run only once we have to move this cell to the SQL editor not on the regular basis



    After refactoring the code, we need to test the code end-to-end and debug the bug and fix it



    Now our code is ready for orchestration , we can schedule it using the database work flow compounants.s


    create job to create the automation the ingestion everyday and that has to be unique.



    while giving the
    task name as unique , Type we have multiople options like python , SQL , another job,
    delta live tables pipeline and in source we can give as Workspace or Git provider and in path gie the workspace path

    This parameter is we designed using dbutils.widgets insite the notebook .



    This is the workflow we have created if we click run now it automatically runs the notebook .

    In the Job runs tabs we have able to know the histroy of the job runs


    Now we know how to run the jobs manually lets us run automatically .


    We have to set the cluster into job cluster because we have created the cluster which stays alive for
    more than 10 minites .

    New cluster was created for the job.and this cluster automatically terminates after the execution
    of the job .

    to automate the workflow we have schedule to trigger the job



    we can change to every to 12 hours.


    Let us create the tables for the daily pricing data .
    Open the csv files and copy the data to know the contains for the table .

    inorder to create the table to maintain the details for orestration . go to workspace , default , and then go
    to catalog then under SQL choose queries option then click create query .

    Like hive if we want to use particular catalog we can go give command as

    Use Catalog pricing analysis;(catalog name )

    then create schema beacause we are newly creating the table for bronze layer

    Create Schema if not exists bronze;(bronze schema name )

    then create table

    create table id not exists bronze.pricing analysis{
    copy paste the header and gove proper datatype
    col1 datatype
    col2 datatype }
    using CSV
    options ('delimeter' = ',',
              'header' ='true')
    location = 'folder path'






    Module summary :
    1. How did you manage to connect to different Source Systems from Databricks?

      Depending on the Type of source systems you mentioned in your CV we need to provide the answers and have given some of the source systems used in our course

      • Web Services (or) Websites

        • We can connect using Certificates created by Web Services and use them in Databricks notebook to connect

        • We can use username/password to connect to the Web Services

        • More secured Web services use OAUTH to connect externally

        • Please explore any of the authentication method applies to your project and ready to explain it properly ( I have not included in the course as you will get confused between the code used for security setting and ingestion process , now you properly know how ingestion works so please explore how to connect and its one time setup)

    2.  How type of Ingestion load performed in the project?

      • Incremental Ingestion(Delta Load)

        • When we can use some column values (latest_updated_datatime, maximum(primary-key-value), Datetime values in the files) to identify incremental data from the source then mention Incremental Data Load notebooks are developed. This is the recommended way of developing Ingestion pipelines

        • For some migration projects (or) there is no data attribute to identify incremental source data then we end up developing Full  Load Ingestion notebooks and in this case we end up ingesting all source data on every ingestion

    3.  How did you performed Incremental load?

      • Explain in detail depending on the data attribute (latest_updated_datatime, maximum(primary-key-value), Datetime values in the files) from the source used to identify incremental data . In  the course we used processed source file dates to identify new source files from the source system

    4. How to migrate (or) load data into multiple environments(dev/test)?

      • Using Notebook Parameters is the answer for any question related to developing or using the code that works in multiple environments

    5. How did you automate  the run of Ingestion notebook and How frequently the notebook runs?

      • Use Databricks workflows to run the notebooks and also schedule it to run  regularly

      • Scheduling frequency is depends upon how frequently source data changes(if it changes at the end of day , schedule to run once in a day , if source data changes every 30 minutes we need to schedule to run every 30 minutes)

    6. What type of cluster used to run Databricks workflows (or) difference between All Purpose Cluster and Job Cluster?

      Please find below the difference and use cases for both of these clusters and always we use Job Clusters in scheduled jobs in data bricks workflows.

      All Purpose Cluster

      • Running Interactive workloads , Ad-Hoc Tasks - Mainly Used for Development Activities

      • Data Engineers during development , Data Analysts & Data scientists for Ad-Hoc Tasks

      • Manually started/stopped, can be shared with other users  , Not cost efficient as manual start/stop process , Sometimes run longer time


      Jobs Clusters

      • Running Automated Jobs , scheduled tasks , scheduled jobs and Batch processing Jobs

      • Automatically created , Started and terminated ,More cost-efficient as automated start/stop




    Comments

    Popular posts from this blog

    session 19 Git Repository

      🔁 Steps to Create a Branch in Databricks, Pull from Git, and Merge into a Collaborative Branch Create a New Branch in Databricks: Go to the Repos tab in your workspace. Navigate to the Git-linked repo. Click the Git icon (or three dots ⋮) and choose "Create Branch." Give your branch a name (e.g., feature-xyz ) and confirm. Pull the Latest Changes from Git: With your new branch selected, click the Git icon again. Select “Pull” to bring the latest updates from the remote repository into your local Databricks environment. Make Changes & Commit: Edit notebooks or files as needed in your branch. Use the "Commit & Push" option to push changes to the remote repo. Merge into the Collaborative Branch: Switch to the collaborative branch (e.g., dev or main ) in Git or from the Databricks UI. Click "Pull & Merge" . Choose the branch you want to merge into the collaborative branch. Review the c...

    Session 18 monitering and logging - Azure Monitor , Log analytics , and job notification

     After developing the code, we deploy it into the production environment. To monitor and logging the jobs run in the real time systems in azure  we have scheduled the jobs under the workflow , we haven't created any monitoring or any matrics . After a few times, the job failed, but we don't know because we haven't set up any monitoring, and every time we can't navigate to workspace-> workflows, under runs to see to check whether the job has been successfully running or not and in real time there will be nearly 100 jobs or more jobs to run  In real time, the production support team will monitor the process. Under the workflow, there is an option called Job notification. After setting the job notification, we can set a notification to email . if we click the date and time its takes us to the notebook which is scheduled there we can able to see the error where it happens . order to see more details, we need to under Spark tab, where we have the option to view logs ( tha...

    Transformation - section 6 - data flow

      Feature from Slide Explanation ✅ Code-free data transformations Data Flows in ADF allow you to build transformations using a drag-and-drop visual interface , with no need for writing Spark or SQL code. ✅ Executed on Data Factory-managed Databricks Spark clusters Internally, ADF uses Azure Integration Runtimes backed by Apache Spark clusters , managed by ADF, not Databricks itself . While it's similar in concept, this is not the same as your own Databricks workspace . ✅ Benefits from ADF scheduling and monitoring Data Flows are fully integrated into ADF pipelines, so you get all the orchestration, parameterization, logging, and alerting features of ADF natively. ⚠️ Important Clarification Although it says "executed on Data Factory managed Databricks Spark clusters," this does not mean you're using your own Azure Databricks workspace . Rather: ADF Data Flows run on ADF-managed Spark clusters. Azure Databricks notebooks (which you trigger via an "Exe...