There are two ways to read the data from the database , which we decides depending upon the data sources.
1. ODBC
You're using ODBC to ingest data into a pipeline, you generally need the following information
1. ODBC Connection String
connection_string = ( "Driver={ODBC Driver 17 for SQL Server};" "Server=my_server.database.windows.net;" "Database=my_database;" "UID=my_user;" "PWD=my_password;" )
Key components of a connection string often include:
Driver
: The name of the specific ODBC driver installed on the system (e.g.,{ODBC Driver 17 for SQL Server}
,{PostgreSQL Unicode}
,{MySQL ODBC 8.0 Unicode Driver}
).Server
(orHost
): The hostname or IP address of the database server.Port
: The port number on which the database server is listening (if not default).Database
(orInitial Catalog
): The name of the specific database you want to connect to within the server.UID
(User ID): The username for authentication.PWD
(Password): The password for authentication.- Other parameters like
Encrypt
,TrustServerCertificate
,Connection Timeout
, etc., for security and performance.
2. JDBC
here we are going to read the source data from the database for that we need to know the Url, source schema and table name
General Format:
jdbc:<subprotocol>:<subname>
jdbc
: The fixed protocol for JDBC URLs.<subprotocol>
: Identifies the specific JDBC driver (e.g.,mysql
,postgresql
,sqlserver
,oracle
).<subname>
: Driver-specific way to identify the data source (server address, port, database name, etc.).
-
Examples of JDBC Connection URLs:
- MySQL:
jdbc:mysql://hostname:port/database_name?key=value&key2=value2
- Example:
jdbc:mysql://localhost:3306/mydatabase
- Example:
- PostgreSQL:
jdbc:postgresql://hostname:port/database_name?key=value
- Example:
jdbc:postgresql://my_pg_server:5432/my_pg_database
- Example:
- SQL Server:
jdbc:sqlserver://hostname:port;databaseName=database_name;key=value
- Example:
jdbc:sqlserver://my_server.database.windows.net:1433;databaseName=my_database;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;
- Example:
- Oracle:
jdbc:oracle:thin:@hostname:port:sid
(for SID) orjdbc:oracle:thin:@//hostname:port/service_name
(for Service Name)- Example (SID):
jdbc:oracle:thin:@localhost:1521:XE
- Example (Service Name):
jdbc:oracle:thin:@//my_oracle_server:1521/ORCLPDB1
- Example (SID):
- Snowflake:
jdbc:snowflake://<account_identifier>.snowflakecomputing.com/?db=<database_name>&warehouse=<warehouse_name>&role=<role_name>
- Example:
jdbc:snowflake://xyz12345.snowflakecomputing.com/?db=SALES_DB&warehouse=COMPUTE_WH
- Example:
- MySQL:
Key components embedded in the URL (or passed separately):
hostname
(orserver
): The hostname or IP address of the database server.port
: The port number on which the database server is listening(if not default). database_name
: The name of the specific database you want to connect to.username
: The user ID for authentication (often passed as a separate parameter toDriverManager.getConnection()
).password
: The password for authentication (often passed as a separate parameter toDriverManager.getConnection()
).- Driver-specific properties: Many drivers allow additional parameters to be appended to the URL (e.g.,
encrypt=true
,useSSL=true
,autoReconnect=true
,warehouse
,role
for Snowflake).
syntax:
from pyspark.sql import SparkSession
# 1. Initialize SparkSession spark = SparkSession.builder \ .appName("JdbcReadExample") \ .config("spark.driver.extraClassPath", "/path/to/your/jdbc_driver.jar") \ .getOrCreate() #
--- OR
if submitting with --jars --- # spark = SparkSession.builder \ # .appName("JdbcReadExample") \ # .getOrCreate() # ------------------------------------ #
2. JDBC Connection Details
jdbc_url = "jdbc:postgresql://localhost:5432/your_database" # Replace with your URL jdbc_user = "your_username" # Replace with your DB username jdbc_password = "your_password" # Replace with your DB password jdbc_driver = "org.postgresql.Driver" # Replace with your specific driver class
df = spark.read \ .format("jdbc") \ .option("url", jdbc_url) \ .option("dbtable", table_name) \ .option("user", jdbc_user) \ .option("password", jdbc_password) \ .option("driver", jdbc_driver) \ .load()
pricingReferenceSourceTableDF = (
spark
.read
.format("JDBC")
option ("url", JDBCconnectionUrl)
.option("dbtable", "masterdata.market_address")
.load()
)
🔍 Line-by-Line Breakdown
spark.read: Starts the process of reading data with Spark.
.format("JDBC"): Specifies that you're reading from a JDBC (Java Database Connectivity) source.
.option ("url", JDBCconnectionUrl): Provides the JDBC URL to connect to the database (e.g., PostgreSQL, SQL Server, etc.).
.option("dbtable", "masterdata.market_address"): Indicates the schema and table to read from the database.
.load(): Executes the load operation and returns a DataFrame.
pricingReferenceSourceTableDF: The resulting DataFrame containing the data from the specified table.
inoder to modorlize the code assigning the variable to the source table so that and adding string formatting incase we if need to ingest multiple table
You're using variables like:
These are probably being used to form a path to ADLS, such as:
🛠️ To use the local filesystem in Community Edition, replace that with a DBFS path, like this:
✅ Step-by-Step Change
❌ Original (ADLS path – won’t work in Community Edition)
✅ Replacement (Local DBFS path)
How did you manage to connect to different Source Systems from Databricks?
Depending on the Type of source systems you mentioned in your CV we need to provide the answers and have given some of the source systems used in our course
Databases
We use Database Username/Password with Database host name and database name(Configured in this module)
We store all Database credentials in Azure Key Vault(Covered in the Later Module Section 19)
How did you perform the Incremental load from the source database table?
Nowadays, all database tables include latest_updated_datetime columns to identify when a specific record was inserted or updated. For each ingestion, if we store the maximum of this column before starting ingestion and select the records from the source table WHERE latest_updated_datetime> prev_max_latest_updated_datetime and latest_updated_datetime <= current_max_latest_updated_datetime, then for each loa,d only new records from the source table get selected
If we don't have the latest_updated_datetime column the next choice is to use the Primary Key column values in the source table. Again, same logic. For each ingestion, if we store the maximum of this column before starting ingestion and select the records from the source table WHERE Primary Key > prev_max_Primary Key and Primary Key <= current_max_Primary Key, then for each load, only new records from the source table get selected
Comments
Post a Comment