Skip to main content

ingestion of data from the database

There are two ways to read the data from the database , which we decides depending upon the data sources.

1. ODBC

You're using ODBC to ingest data into a pipeline, you generally need the following information

1. ODBC Connection String 

 connection_string = ( "Driver={ODBC Driver 17 for SQL Server};" "Server=my_server.database.windows.net;" "Database=my_database;" "UID=my_user;" "PWD=my_password;" )

Key components of a connection string often include:

  • Driver: The name of the specific ODBC driver installed on the system (e.g., {ODBC Driver 17 for SQL Server}, {PostgreSQL Unicode}, {MySQL ODBC 8.0 Unicode Driver}).
  • Server (or Host): The hostname or IP address of the database server.
  • Port: The port number on which the database server is listening (if not default).
  • Database (or Initial Catalog): The name of the specific database you want to connect to within the server.
  • UID (User ID): The username for authentication.
  • PWD (Password): The password for authentication.
  • Other parameters like Encrypt, TrustServerCertificate, Connection Timeout, etc., for security and performance.

2. JDBC

here we are going to read the source data from the database  for that we need to know the Url, source schema and table name 

  • General Format: jdbc:<subprotocol>:<subname>

    • jdbc: The fixed protocol for JDBC URLs.
    • <subprotocol>: Identifies the specific JDBC driver (e.g., mysql, postgresql, sqlserver, oracle).
    • <subname>: Driver-specific way to identify the data source (server address, port, database name, etc.).
  • Examples of JDBC Connection URLs:

    • MySQL: jdbc:mysql://hostname:port/database_name?key=value&key2=value2
      • Example: jdbc:mysql://localhost:3306/mydatabase
    • PostgreSQL: jdbc:postgresql://hostname:port/database_name?key=value
      • Example: jdbc:postgresql://my_pg_server:5432/my_pg_database
    • SQL Server: jdbc:sqlserver://hostname:port;databaseName=database_name;key=value
      • Example: jdbc:sqlserver://my_server.database.windows.net:1433;databaseName=my_database;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;
    • Oracle: jdbc:oracle:thin:@hostname:port:sid (for SID) or jdbc:oracle:thin:@//hostname:port/service_name (for Service Name)
      • Example (SID): jdbc:oracle:thin:@localhost:1521:XE
      • Example (Service Name): jdbc:oracle:thin:@//my_oracle_server:1521/ORCLPDB1
    • Snowflake: jdbc:snowflake://<account_identifier>.snowflakecomputing.com/?db=<database_name>&warehouse=<warehouse_name>&role=<role_name>
      • Example: jdbc:snowflake://xyz12345.snowflakecomputing.com/?db=SALES_DB&warehouse=COMPUTE_WH

Key components embedded in the URL (or passed separately):

  • hostname (or server): The hostname or IP address of the database server.
  • port: The port number on which the database server is listening (if not default).
  • database_name: The name of the specific database you want to connect to.
  • username: The user ID for authentication (often passed as a separate parameter to DriverManager.getConnection()).
  • password: The password for authentication (often passed as a separate parameter to DriverManager.getConnection()).
  • Driver-specific properties: Many drivers allow additional parameters to be appended to the URL (e.g., encrypt=true, useSSL=true, autoReconnect=true, warehouse, role for Snowflake).

syntax:

from pyspark.sql import SparkSession

  # 1. Initialize SparkSession spark = SparkSession.builder \ .appName("JdbcReadExample") \ .config("spark.driver.extraClassPath", "/path/to/your/jdbc_driver.jar") \ .getOrCreate() #

 --- OR

 if submitting with --jars --- # spark = SparkSession.builder \ # .appName("JdbcReadExample") \ # .getOrCreate() # ------------------------------------

2. JDBC Connection Details

jdbc_url = "jdbc:postgresql://localhost:5432/your_database" # Replace with your URL jdbc_user = "your_username" # Replace with your DB username jdbc_password = "your_password" # Replace with your DB password jdbc_driver = "org.postgresql.Driver" # Replace with your specific driver class

df = spark.read \ .format("jdbc") \ .option("url", jdbc_url) \ .option("dbtable", table_name) \ .option("user", jdbc_user) \ .option("password", jdbc_password) \ .option("driver", jdbc_driver) \ .load()

pricingReferenceSourceTableDF = (

    spark

    .read

    .format("JDBC")

    option ("url", JDBCconnectionUrl)

    .option("dbtable", "masterdata.market_address")

    .load()

)

🔍 Line-by-Line Breakdown

spark.read: Starts the process of reading data with Spark.

.format("JDBC"): Specifies that you're reading from a JDBC (Java Database Connectivity) source.

.option ("url", JDBCconnectionUrl): Provides the JDBC URL to connect to the database (e.g., PostgreSQL, SQL Server, etc.).

.option("dbtable", "masterdata.market_address"): Indicates the schema and table to read from the database.

.load(): Executes the load operation and returns a DataFrame.

pricingReferenceSourceTableDF: The resulting DataFrame containing the data from the specified table.

inoder to modorlize the code assigning the variable to the source table so that and adding string formatting incase we if need to ingest multiple table 

pricingReferenceSourceTableName = f"masterdata.market_address"

but in real time we need to ingest the source data so we are using the wigdets

we can create the wights using two waysone is GUI edit--> add parameter --> five paramter name and click ok.
or

dbutils.widgets.text("pricingReferenceSourceTableName", ")




You're using variables like:

python

pricingReferenceSinkLayerName = 'bronze'(container) pricingReferenceSinkStorageAccountName = 'adlsudadatalakehousedev'(storage account0 pricingReferenceSinkFolderName = 'reference-data'( folder that needs to be created in the container)

These are probably being used to form a path to ADLS, such as:

python

f"abfss://{pricingReferenceSinkLayerName}@{pricingReferenceSinkStorageAccountName}.dfs.core.windows.net/{pricingReferenceSinkFolderName}"

🛠️ To use the local filesystem in Community Edition, replace that with a DBFS path, like this:


✅ Step-by-Step Change

❌ Original (ADLS path – won’t work in Community Edition)

python

output_path = f"abfss://{pricingReferenceSinkLayerName}@{pricingReferenceSinkStorageAccountName}.dfs.core.windows.net/{pricingReferenceSinkFolderName}"

✅ Replacement (Local DBFS path)

python

# Replace cloud path with DBFS path pricingReferenceSinkFolderName = 'reference-data' output_path = f"/FileStore/{pricingReferenceSinkFolderName}"

we are writng the output file .
if we are using our local file system.

now our notebook is ready to automate .
open workflows in data bricks and give task name as unique and readable m type as notebook and path
(notebook path) and enter the cluster initall create as single node or cluster which we are using then
change to job cluster beacause its automatically terminates after the job execution noraml one
terminates after 10 mins .






give parameter name in key and value as table name needs to be ingest and then click create task
if we want to crete it for multiple tables we can use clone option .


we can create the flow by adding by gicing the value as Depends on

after the job runs go and check all the tables were executed properly.
Module Summary
  1.  How did you manage to connect to different Source Systems from Databricks?

    Depending on the Type of source systems you mentioned in your CV we need to provide the answers and have given some of the source systems used in our course


    • Databases

      • We use Database Username/Password with Database host name and database name(Configured in this module)

      • We store all Database credentials in Azure Key Vault(Covered in the Later Module Section 19)

  2.  How did you perform the Incremental load from the source database table?

    • Nowadays, all database tables include latest_updated_datetime columns to identify when a specific record was inserted or updated. For each ingestion, if we store the maximum of this column before starting ingestion and select the records from the source table WHERE latest_updated_datetime> prev_max_latest_updated_datetime and latest_updated_datetime <= current_max_latest_updated_datetime, then for each loa,d only new records from the source table get selected

    • If we don't have the latest_updated_datetime column the next choice is to use the Primary Key column values in the source table. Again, same logic. For each ingestion, if we store the maximum of this column before starting ingestion and select the records from the source table WHERE Primary Key > prev_max_Primary Key and Primary Key <= current_max_Primary Key, then for each load, only new records from the source table get selected

Comments

Popular posts from this blog

session 19 Git Repository

  🔁 Steps to Create a Branch in Databricks, Pull from Git, and Merge into a Collaborative Branch Create a New Branch in Databricks: Go to the Repos tab in your workspace. Navigate to the Git-linked repo. Click the Git icon (or three dots ⋮) and choose "Create Branch." Give your branch a name (e.g., feature-xyz ) and confirm. Pull the Latest Changes from Git: With your new branch selected, click the Git icon again. Select “Pull” to bring the latest updates from the remote repository into your local Databricks environment. Make Changes & Commit: Edit notebooks or files as needed in your branch. Use the "Commit & Push" option to push changes to the remote repo. Merge into the Collaborative Branch: Switch to the collaborative branch (e.g., dev or main ) in Git or from the Databricks UI. Click "Pull & Merge" . Choose the branch you want to merge into the collaborative branch. Review the c...

Session 18 monitering and logging - Azure Monitor , Log analytics , and job notification

 After developing the code, we deploy it into the production environment. To monitor and logging the jobs run in the real time systems in azure  we have scheduled the jobs under the workflow , we haven't created any monitoring or any matrics . After a few times, the job failed, but we don't know because we haven't set up any monitoring, and every time we can't navigate to workspace-> workflows, under runs to see to check whether the job has been successfully running or not and in real time there will be nearly 100 jobs or more jobs to run  In real time, the production support team will monitor the process. Under the workflow, there is an option called Job notification. After setting the job notification, we can set a notification to email . if we click the date and time its takes us to the notebook which is scheduled there we can able to see the error where it happens . order to see more details, we need to under Spark tab, where we have the option to view logs ( tha...

Transformation - section 6 - data flow

  Feature from Slide Explanation ✅ Code-free data transformations Data Flows in ADF allow you to build transformations using a drag-and-drop visual interface , with no need for writing Spark or SQL code. ✅ Executed on Data Factory-managed Databricks Spark clusters Internally, ADF uses Azure Integration Runtimes backed by Apache Spark clusters , managed by ADF, not Databricks itself . While it's similar in concept, this is not the same as your own Databricks workspace . ✅ Benefits from ADF scheduling and monitoring Data Flows are fully integrated into ADF pipelines, so you get all the orchestration, parameterization, logging, and alerting features of ADF natively. ⚠️ Important Clarification Although it says "executed on Data Factory managed Databricks Spark clusters," this does not mean you're using your own Azure Databricks workspace . Rather: ADF Data Flows run on ADF-managed Spark clusters. Azure Databricks notebooks (which you trigger via an "Exe...