Read CSV File from Azure Data Lake Storage Account
CSV Source File Path : "abfss://working-labs@datalakestorageaccountname.dfs.core.windows.net/bronze/daily-pricing/csv"
JSON File Path : "abfss://working-labs@datalakestorageaccountname.dfs.core.windows.net/bronze/daily-pricing/json"
Spark Methods
GenericDataFrameReader:
json
,csv
,option (header,inferSchema)
,schema
PySparkSQLFunctions:
col
,alias
,cast
DataFrame transformations and actions:
Spark Method | Purpose |
---|---|
select | Returns a new DataFrame by computing given expression for each element |
drop | Returns a new DataFrame with a column dropped |
filter , where | Filters rows using the given condition |
sort , orderBy | Returns a new DataFrame sorted by the given expressions |
show , display | Displays the top n rows of DataFrame in a tabular form |
count | Returns the number of rows in the DataFrame |
dropDuplicates , distinct | Returns a new DataFrame with duplicate rows removed |
withColumnRenamed | Returns a new DataFrame with a column renamed |
withColumn | Returns a new DataFrame by adding a column or replacing the existing column that has the same name |
first , head | Returns the the first row |
collect | Returns an array that contains all rows in this DataFrame |
limit , take | Returns a new DataFrame by taking the first n rows |
groupBy | Groups the DataFrame using the specified columns, so we can run aggregation on them |
Other DataFrame methods:
printSchema
,display
,createOrReplaceTempView
GenericDataFrameWriter:
json
,csv
,mode (overwrite,append)
withColumnRenamed()
to change the column names.withColumn()
along with .cast()
to change the data types of the columns.Imagine you're giving instructions to a team to build a car. Instead of telling them to immediately weld each part as you name it, you first give them the complete blueprint (the DAG of transformations). Then, once you say "build the car!" (an action), the team can look at the entire plan and figure out the most efficient way to assemble it, potentially doing multiple steps in parallel or reordering tasks for better flow.
In summary, lazy evaluation is a fundamental design principle in Spark that allows for significant performance optimizations, efficient resource utilization, and a resilient data processing model. It's the reason why Spark can handle massive datasets effectively. You define the transformations you want, and Spark intelligently figures out the best way to execute them only when you explicitly ask for a result through an action
this time after giving storage account access , conf the storage account we are giving the destination file as parquet
to read the parquet file give Spark ( to call spark core lib ) .read( generic read function .load( file path , file format )
the output is display without the header file inorder to know the header we have to give near to read we have to give option () like read.option("header", True ).load( source file path , "CSV")
Date and Timestamp Data Types:
DateType
: Represents dates in the formatYYYY-MM-DD
.TimestampType
: Represents timestamps, which include both date and time, typically in the formatYYYY-MM-DD HH:MM:SS.ffffff
(with optional fractional seconds).
When you read data into a Spark DataFrame, PySpark might infer the data type of date and timestamp columns as strings. You often need to explicitly cast these columns to DateType
or TimestampType
for proper date/time manipulation.
2. Working with pyspark.sql.functions
:
The pyspark.sql.functions
module provides a wide array of functions for working with dates and timestamps. Here are some of the most commonly used ones:
-
Current Date and Timestamp:
current_date()
: Returns the current date at the time of query evaluation as aDateType
.current_timestamp()
: Returns the current timestamp at the time of query evaluation as aTimestampType
.
Pythonfrom pyspark.sql.functions import current_date, current_timestamp df = spark.range(1).select(current_date().alias("today"), current_timestamp().alias("now")) df.show() df.printSchema()
-
Extracting Date and Time Components:
year(date)
: Extracts the year from a date or timestamp.month(date)
: Extracts the month from a date or timestamp (1-12).dayofmonth(date)
: Extracts the day of the month from a date or timestamp (1-31).dayofweek(date)
: Extracts the day of the week from a date or timestamp (1 for Sunday, 7 for Saturday).dayofyear(date)
: Extracts the day of the year from a date or timestamp (1-366).hour(timestamp)
: Extracts the hour from a timestamp (0-23).minute(timestamp)
: Extracts the minute from a timestamp (0-59).second(timestamp)
: Extracts the second from a timestamp (0-59).
Pythonfrom pyspark.sql.functions import year, month, dayofmonth, hour, minute, second, lit from pyspark.sql.types import TimestampType data = [("2025-05-05 10:30:45",)] df = spark.createDataFrame(data, ["ts"]).withColumn("ts", col("ts").cast(TimestampType())) df.select( year("ts"), month("ts"), dayofmonth("ts"), hour("ts"), minute("ts"), second("ts") ).show()
-
Date and Time Arithmetic:
date_add(start_date, days)
: Returns the date that isdays
afterstart_date
.date_sub(start_date, days)
: Returns the date that isdays
beforestart_date
.add_months(start_date, months)
: Returns the date that ismonths
afterstart_date
.months_between(date1, date2)
: Returns the number of months betweendate1
anddate2
.
Pythonfrom pyspark.sql.functions import date_add, date_sub, add_months, months_between, lit from pyspark.sql.types import DateType df = spark.range(1).select(lit("2025-05-05").cast(DateType()).alias("dt")) df.select( date_add("dt", 7).alias("add_week"), date_sub("dt", 3).alias("subtract_days"), add_months("dt", 2).alias("add_two_months"), months_between(current_date(), "dt").alias("months_diff") ).show()
-
Date and Timestamp Formatting:
date_format(date_col, format)
: Converts a date/timestamp column to a string based on the specified format. The format follows the JavaSimpleDateFormat
patterns (e.g., "yyyy-MM-dd", "MM/dd/yyyy HH:mm:ss").
Pythonfrom pyspark.sql.functions import date_format, lit from pyspark.sql.types import DateType, TimestampType date_df = spark.range(1).select(lit("2025-05-05").cast(DateType()).alias("dt")) date_df.select(date_format("dt", "dd/MM/yyyy")).show() timestamp_df = spark.range(1).select(lit("2025-05-05 10:30:45").cast(TimestampType()).alias("ts")) timestamp_df.select(date_format("ts", "yyyy-MM-dd HH:mm")).show()
-
Converting to Date/Timestamp:
to_date(column, format)
: Converts a string column to aDateType
based on the specified format. If the format is omitted, it defaults to "yyyy-MM-dd".to_timestamp(column, format)
: Converts a string column to aTimestampType
based on the specified format. If the format is omitted, it defaults to "yyyy-MM-dd HH:mm:ss".
Pythonfrom pyspark.sql.functions import to_date, to_timestamp, lit string_dates = [("2025-05-05",), ("2025/05/06",)] string_dates_df = spark.createDataFrame(string_dates, ["date_str"]) string_dates_df.select(to_date("date_str").alias("date")).show() string_dates_df.select(to_date("date_str", "yyyy/MM/dd").alias("formatted_date")).show() string_timestamps = [("2025-05-05 10:30:45",), ("05/06/2025 12:00:00",)] string_timestamps_df = spark.createDataFrame(string_timestamps, ["ts_str"]) string_timestamps_df.select(to_timestamp("ts_str").alias("timestamp")).show() string_timestamps_df.select(to_timestamp("ts_str", "MM/dd/yyyy HH:mm:ss").alias("formatted_ts")).show()
-
Date and Timestamp Differences:
datediff(endDate, startDate)
: Returns the number of days betweenendDate
andstartDate
.timestampdiff(unit, startTime, endTime)
: Returns the difference between two timestamps in the specified unit (e.g.,SECONDS
,MINUTES
,HOURS
,DAYS
).
Pythonfrom pyspark.sql.functions import datediff, timestampdiff, lit from pyspark.sql.types import DateType, TimestampType date_diff_df = spark.range(1).select( lit("2025-05-10").cast(DateType()).alias("end_date"), lit("2025-05-05").cast(DateType()).alias("start_date") ) date_diff_df.select(datediff("end_date", "start_date")).show() ts_diff_df = spark.range(1).select( lit("2025-05-05 11:00:00").cast(TimestampType()).alias("end_ts"), lit("2025-05-05 10:30:00").cast(TimestampType()).alias("start_ts") ) ts_diff_df.select(timestampdiff("SECOND", "start_ts", "end_ts")).show()
3. Handling Time Zones (Important Consideration):
-
Spark stores timestamps internally as UTC (Coordinated Universal Time).
-
When you display or write timestamps, Spark might convert them to the local time zone of the driver or worker nodes, depending on the configuration.
-
If your data involves different time zones, you might need to perform explicit time zone conversions. PySpark itself doesn't have built-in functions for explicit time zone conversion directly within SQL functions. You might need to rely on external libraries (like
pytz
in Python UDFs) or handle time zone conversions at the application level before or after Spark processing. -
Spark 3.0+ introduced
from_utc_timestamp
andto_utc_timestamp
functions to handle conversions between UTC and a specified timezone.Pythonfrom pyspark.sql.functions import from_utc_timestamp, to_utc_timestamp, lit from pyspark.sql.types import TimestampType ts_utc_df = spark.range(1).select(lit("2025-05-05 10:00:00").cast(TimestampType()).alias("utc_ts")) ts_utc_df.select( from_utc_timestamp("utc_ts", "America/Los_Angeles").alias("la_time"), to_utc_timestamp(lit("2025-05-05 10:00:00").cast(TimestampType()), "America/Los_Angeles").alias("back_to_utc") ).show()
4. User Defined Functions (UDFs) for Complex Logic:
For more complex date and time manipulations that are not directly available in pyspark.sql.functions
, you can create User Defined Functions (UDFs) using Python. However, be mindful of the performance implications of UDFs compared to built-in Spark functions.
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import datetime
def format_date_udf(dt):
if dt:
return dt.strftime("%d-%b-%Y")
return None
format_date = udf(format_date_udf, StringType())
date_df = spark.range(1).select(lit(datetime.date(2025, 5, 5)).alias("dt"))
date_df.select(format_date("dt")).show()
Best Practices:
- Explicitly Define Schemas: When reading data with dates and timestamps, explicitly define the schema with
DateType
andTimestampType
if possible to avoid implicit (and potentially incorrect) type inference. - Use Built-in Functions: Leverage the optimized built-in functions in
pyspark.sql.functions
whenever possible for better performance than UDFs. - Handle Time Zones Carefully: Be aware of the time zones of your data and how Spark handles them. Use the appropriate functions (
from_utc_timestamp
,to_utc_timestamp
in Spark 3.0+) or external libraries if necessary. - Understand Date/Time Formats: When converting strings to dates/timestamps or vice versa, ensure you provide the correct format string. Refer to Java's
SimpleDateFormat
patterns. - Performance Considerations: Operations on date and timestamp columns can be computationally intensive on large datasets. Optimize your queries by filtering early and using efficient functions.
Most of the PySpark Code writing questions comes from this module. Please practise hands on this module as many times as possible to get familiar with PySpark functions syntax.
Why we are using PySpark and not Using Python in data analytics Projects?
Python
General purpose programming language and process multiple transactions with small size data quickly
Used in Web based transactional systems design and normally suited for OLTP systems design
PySpark
Uses Apache Spark Engine specially designed for large scale data processing
Uses Distributed Compute engine to process large scale data efficiently
Best suited for Data Analytical projects as data involves is always huge
How to read data from the source files got different delimiters(not comma)?
We need to use the options keyword in spark. Read function to specify the different delimiters. e.g to read source file having pipe(|) as delimiter we need to use below code
- sourceCSVFileDF = ( spark.
- read.
- option("header","true").
- option("delimiter","|").
- csv(sourceCSVFilePath)
- )
How to create new column in dataframe from available source columns?
We need to use the withColumn keyword in source Dataframe to add new column into the dataframe. If we need to derive new column based on some operation on existing columns then we need to import col
function from pyspark.sql.functions package to derive the expression. refer below code for example.
- from pyspark.sql.functions import col
- (sourceCSVFileDF.
- withColumn("ARRIVALS_IN_KILOGRAMS" , col("ARRIVAL_IN_TONNES") * 1000)
- )
How to read dataframe from fixed width source columns(Source columns are separated by length of data without any delimiter between the columns)?
If we know exact length and data type of the source file column names then we can define the schema for source file with specific data type and length and pass it into sparl.read.csv method using schema option. Refer example code for this.
- sourceFixedWidthFileDF = ( spark.
- read.
- schema(sourceFixedWidtheFileSchemaDDL).
- csv(sourceFixedWidthFilePath)
- )
Other way to handle this type of data is , We need to do some pre-processing of the data to handle this type of source data. Define first dataframe reads all columns as one line per row and in the second dataframe split each individual columns using substr function .
Reading entire row as one line naturally happens when we use standard spark.read.csv method as there is no delimiter all of the column values read as one line
Many code writing questions related to Joining two dataframes and aggregating data in source dataframe?
There is sample code available for Aggregation in this module and Joining Dataframes are covered in the Sections 14&16(We will revisit the joins at the Module Summary of those lessons). Please refer PySpark aggregation sample code and try to remember when groupBy function comes and where we apply aggregation function. Also remember we need to import aggregation functions from pyspark.sql.functions package before use it.
- from pyspark.sql.functions import sum
- (sourceCSVFileTransDF.
- groupBy("STATE_NAME","PRODUCT_NAME").
- agg(sum("ARRIVALS_IN_KILOGRAMS")).
- show())
How to run (or) Write the code using PySpark and Spark SQL in the same notebook?
This is the easiest one as we need to use magic commands %py to run PySpark code and use %sql to run Spark SQL code
How to check the files existing on Cloud Storage account from notebook(Without opening Cloud Storage Account)?
we can use dbutils.fs command to view the files in Cloud storage account from databricks notebook
Comments
Post a Comment