Skip to main content

05-Spark-DataFrame-Transformations-and-Actions in azure notebook

 

Read CSV File from Azure Data Lake Storage Account

CSV Source File Path : "abfss://working-labs@datalakestorageaccountname.dfs.core.windows.net/bronze/daily-pricing/csv"

JSON File Path : "abfss://working-labs@datalakestorageaccountname.dfs.core.windows.net/bronze/daily-pricing/json"

Spark Methods
Spark MethodPurpose
selectReturns a new DataFrame by computing given expression for each element
dropReturns a new DataFrame with a column dropped
filterwhereFilters rows using the given condition
sortorderByReturns a new DataFrame sorted by the given expressions
show , displayDisplays the top n rows of DataFrame in a tabular form
countReturns the number of rows in the DataFrame
dropDuplicatesdistinctReturns a new DataFrame with duplicate rows removed
withColumnRenamedReturns a new DataFrame with a column renamed
withColumnReturns a new DataFrame by adding a column or replacing the existing column that has the same name
first , headReturns the the first row
collectReturns an array that contains all rows in this DataFrame
limit , takeReturns a new DataFrame by taking the first n rows
groupByGroups the DataFrame using the specified columns, so we can run aggregation on them

once again again declare the sccesskey and conf file for storage container to connect the between databricks and storage account 

if we want to define the schema for entire dataframe we use StructType
if we want to modify the schema we use DDF 


creating the DDF with the coloum name with modified datatype and giving into the read attribute as schema (name of modified DDF )to make changes 
or 
  • Read the JSON file (if it exists) with its original schema (or let Spark infer it).
  • Use Spark transformations like withColumnRenamed() to change the column names.
  • Use withColumn() along with .cast() to change the data types of the columns.
  • we have to transformations always use ( and ) brackets as best practice, and we can select columns using
  • df.select(coloumn name ) as shown in above pic
  • likewise we can drop the column using df.drop(column name )

  • the common term for applying multiple transformations sequentially in data processing frameworks like Spark or Pandas is chaining transformations or using a fluent interface.
  • df.select(cloumn names).filter(conditions like where clause in SQl ).where(another conditions)

  • another sample
  • # Chaining multiple transformations transformed_df = df.withColumn("upper_name", upper(col("name"))) \ .where(col("age") > 28) \ .withColumn("country", when(col("city") == "London", "UK").otherwise("France")) \ .select("upper_name", "age", "country"
  • If we want to import a descending library into the program  into the dataframe, so we can use df.sort() transformations 

  • Imagine you're giving instructions to a team to build a car. Instead of telling them to immediately weld each part as you name it, you first give them the complete blueprint (the DAG of transformations). Then, once you say "build the car!" (an action), the team can look at the entire plan and figure out the most efficient way to assemble it, potentially doing multiple steps in parallel or reordering tasks for better flow.

    In summary, lazy evaluation is a fundamental design principle in Spark that allows for significant performance optimizations, efficient resource utilization, and a resilient data processing model. It's the reason why Spark can handle massive datasets effectively. You define the transformations you want, and Spark intelligently figures out the best way to execute them only when you explicitly ask for a result through an action


  • to select distinct function df.select(cloumn name ).distinct()

  • To display first () or head () to display first 10 rows or we can use take(10),  collect () is collect all the rows 


  • this time after giving storage account access , conf the storage account we are giving the destination file as parquet 

  • to read the parquet file give Spark ( to call spark core lib ) .read( generic read function .load( file path , file format )
    the output is display without the header file inorder to know the header we have to give near to read we have to give option () like read.option("header", True ).load( source file path , "CSV")

  • Even we use the option as "header = true" is automatically identifies the header but sometimes datatype is not accurate that's why we use inferSchema = "true" . if we give inferSchema option spark looks deep into every record and decides the datatype 

  • If we need to add a new column any specific cloumnn we have to use "withcoloumn ()" transformations for that we need to import col function from the pyspark.sql.functions

  • Aggregrate functions :
  • if we want to use aggregate function we need to import particular aggregale functions like Count . sum . min, max from Pyspark.spl.functions package 
  • syntax will be df.groupy(cl1, col 2) .agg (sum("col name"))


  • instead of sort() transformation we can use orderby()if we want to orderby column wise 


  • to write the output into the parquet file 
  • df.write.option("override").(destinationfilepath)




  • if you open and see the file its will be in binary format you cannot read that if you want to see the contains in the binary file we have to again used read function with show () actions to see the contains inside the parquet file

  • Date and Time Handling 


    Now we are cccreatinf the new column using withcolumn () trananforamtions  current_timestamp() function we are importing current_timestamp() the pyspark.sql.functions package 

  • new_df = df.withcoloumn( "new column name" ,  current_timestamp())

  • Date and Timestamp Data Types:

    • DateType: Represents dates in the format YYYY-MM-DD.
    • TimestampType: Represents timestamps, which include both date and time, typically in the format YYYY-MM-DD HH:MM:SS.ffffff (with optional fractional seconds).

    When you read data into a Spark DataFrame, PySpark might infer the data type of date and timestamp columns as strings. You often need to explicitly cast these columns to DateType or TimestampType for proper date/time manipulation.

    2. Working with pyspark.sql.functions:

    The pyspark.sql.functions module provides a wide array of functions for working with dates and timestamps. Here are some of the most commonly used ones:

    • Current Date and Timestamp:

      • current_date(): Returns the current date at the time of query evaluation as a DateType.
      • current_timestamp(): Returns the current timestamp at the time of query evaluation as a TimestampType.
      Python
      from pyspark.sql.functions import current_date, current_timestamp
      
      df = spark.range(1).select(current_date().alias("today"), current_timestamp().alias("now"))
      df.show()
      df.printSchema()
      
    • Extracting Date and Time Components:

      • year(date): Extracts the year from a date or timestamp.
      • month(date): Extracts the month from a date or timestamp (1-12).
      • dayofmonth(date): Extracts the day of the month from a date or timestamp (1-31).
      • dayofweek(date): Extracts the day of the week from a date or timestamp (1 for Sunday, 7 for Saturday).
      • dayofyear(date): Extracts the day of the year from a date or timestamp (1-366).
      • hour(timestamp): Extracts the hour from a timestamp (0-23).
      • minute(timestamp): Extracts the minute from a timestamp (0-59).
      • second(timestamp): Extracts the second from a timestamp (0-59).
      Python
      from pyspark.sql.functions import year, month, dayofmonth, hour, minute, second, lit
      from pyspark.sql.types import TimestampType
      
      data = [("2025-05-05 10:30:45",)]
      df = spark.createDataFrame(data, ["ts"]).withColumn("ts", col("ts").cast(TimestampType()))
      
      df.select(
          year("ts"),
          month("ts"),
          dayofmonth("ts"),
          hour("ts"),
          minute("ts"),
          second("ts")
      ).show()
      


    • Date and Time Arithmetic:

      • date_add(start_date, days): Returns the date that is days after start_date.
      • date_sub(start_date, days): Returns the date that is days before start_date.
      • add_months(start_date, months): Returns the date that is months after start_date.
      • months_between(date1, date2): Returns the number of months between date1 and date2.
      Python
      from pyspark.sql.functions import date_add, date_sub, add_months, months_between, lit
      from pyspark.sql.types import DateType
      
      df = spark.range(1).select(lit("2025-05-05").cast(DateType()).alias("dt"))
      
      df.select(
          date_add("dt", 7).alias("add_week"),
          date_sub("dt", 3).alias("subtract_days"),
          add_months("dt", 2).alias("add_two_months"),
          months_between(current_date(), "dt").alias("months_diff")
      ).show()
      
    • Date and Timestamp Formatting:

      • date_format(date_col, format): Converts a date/timestamp column to a string based on the specified format. The format follows the Java SimpleDateFormat patterns (e.g., "yyyy-MM-dd", "MM/dd/yyyy HH:mm:ss").
      Python
      from pyspark.sql.functions import date_format, lit
      from pyspark.sql.types import DateType, TimestampType
      
      date_df = spark.range(1).select(lit("2025-05-05").cast(DateType()).alias("dt"))
      date_df.select(date_format("dt", "dd/MM/yyyy")).show()
      
      timestamp_df = spark.range(1).select(lit("2025-05-05 10:30:45").cast(TimestampType()).alias("ts"))
      timestamp_df.select(date_format("ts", "yyyy-MM-dd HH:mm")).show()
      
    • Converting to Date/Timestamp:

      • to_date(column, format): Converts a string column to a DateType based on the specified format. If the format is omitted, it defaults to "yyyy-MM-dd".
      • to_timestamp(column, format): Converts a string column to a TimestampType based on the specified format. If the format is omitted, it defaults to "yyyy-MM-dd HH:mm:ss".
      Python
      from pyspark.sql.functions import to_date, to_timestamp, lit
      
      string_dates = [("2025-05-05",), ("2025/05/06",)]
      string_dates_df = spark.createDataFrame(string_dates, ["date_str"])
      string_dates_df.select(to_date("date_str").alias("date")).show()
      string_dates_df.select(to_date("date_str", "yyyy/MM/dd").alias("formatted_date")).show()
      
      string_timestamps = [("2025-05-05 10:30:45",), ("05/06/2025 12:00:00",)]
      string_timestamps_df = spark.createDataFrame(string_timestamps, ["ts_str"])
      string_timestamps_df.select(to_timestamp("ts_str").alias("timestamp")).show()
      string_timestamps_df.select(to_timestamp("ts_str", "MM/dd/yyyy HH:mm:ss").alias("formatted_ts")).show()
      
    • Date and Timestamp Differences:

      • datediff(endDate, startDate): Returns the number of days between endDate and startDate.
      • timestampdiff(unit, startTime, endTime): Returns the difference between two timestamps in the specified unit (e.g., SECONDS, MINUTES, HOURS, DAYS).
      Python
      from pyspark.sql.functions import datediff, timestampdiff, lit
      from pyspark.sql.types import DateType, TimestampType
      
      date_diff_df = spark.range(1).select(
          lit("2025-05-10").cast(DateType()).alias("end_date"),
          lit("2025-05-05").cast(DateType()).alias("start_date")
      )
      date_diff_df.select(datediff("end_date", "start_date")).show()
      
      ts_diff_df = spark.range(1).select(
          lit("2025-05-05 11:00:00").cast(TimestampType()).alias("end_ts"),
          lit("2025-05-05 10:30:00").cast(TimestampType()).alias("start_ts")
      )
      ts_diff_df.select(timestampdiff("SECOND", "start_ts", "end_ts")).show()
      

    3. Handling Time Zones (Important Consideration):

    • Spark stores timestamps internally as UTC (Coordinated Universal Time).

    • When you display or write timestamps, Spark might convert them to the local time zone of the driver or worker nodes, depending on the configuration.

    • If your data involves different time zones, you might need to perform explicit time zone conversions. PySpark itself doesn't have built-in functions for explicit time zone conversion directly within SQL functions. You might need to rely on external libraries (like pytz in Python UDFs) or handle time zone conversions at the application level before or after Spark processing.

    • Spark 3.0+ introduced from_utc_timestamp and to_utc_timestamp functions to handle conversions between UTC and a specified timezone.

      Python
      from pyspark.sql.functions import from_utc_timestamp, to_utc_timestamp, lit
      from pyspark.sql.types import TimestampType
      
      ts_utc_df = spark.range(1).select(lit("2025-05-05 10:00:00").cast(TimestampType()).alias("utc_ts"))
      
      ts_utc_df.select(
          from_utc_timestamp("utc_ts", "America/Los_Angeles").alias("la_time"),
          to_utc_timestamp(lit("2025-05-05 10:00:00").cast(TimestampType()), "America/Los_Angeles").alias("back_to_utc")
      ).show()
      

    4. User Defined Functions (UDFs) for Complex Logic:

    For more complex date and time manipulations that are not directly available in pyspark.sql.functions, you can create User Defined Functions (UDFs) using Python. However, be mindful of the performance implications of UDFs compared to built-in Spark functions.

    Python
    from pyspark.sql.functions import udf
    from pyspark.sql.types import StringType
    import datetime
    
    def format_date_udf(dt):
        if dt:
            return dt.strftime("%d-%b-%Y")
        return None
    
    format_date = udf(format_date_udf, StringType())
    
    date_df = spark.range(1).select(lit(datetime.date(2025, 5, 5)).alias("dt"))
    date_df.select(format_date("dt")).show()
    

    Best Practices:

    • Explicitly Define Schemas: When reading data with dates and timestamps, explicitly define the schema with DateType and TimestampType if possible to avoid implicit (and potentially incorrect) type inference.
    • Use Built-in Functions: Leverage the optimized built-in functions in pyspark.sql.functions whenever possible for better performance than UDFs.
    • Handle Time Zones Carefully: Be aware of the time zones of your data and how Spark handles them. Use the appropriate functions (from_utc_timestamp, to_utc_timestamp in Spark 3.0+) or external libraries if necessary.
    • Understand Date/Time Formats: When converting strings to dates/timestamps or vice versa, ensure you provide the correct format string. Refer to Java's SimpleDateFormat patterns.
    • Performance Considerations: Operations on date and timestamp columns can be computationally intensive on large datasets. Optimize your queries by filtering early and using efficient functions.
    Database utilities (dbutils)


    dbutils runs on any language in the spark (scala, python , r, markdown )
  • if we want to know more about dbutils  we just have to type dbutils.help() to know more 

  • if we want to know more about the file utils we need to give db.fs.utils ()



  • if you want to know without opening more into azure account we can simply give dbutils .fs. ls("filepath") in databricks 
  • Module Summary

    Most of the PySpark Code writing questions comes from this module.  Please practise hands on this module as many times as possible to get familiar with PySpark functions syntax.

    1.   Why we are using PySpark and not Using Python in data analytics Projects?

      • Python

        • General purpose programming language and  process multiple transactions with small size data quickly

        • Used in Web based transactional systems design and normally suited for OLTP systems design

      • PySpark

        • Uses Apache Spark Engine specially designed for large scale data processing

        • Uses Distributed Compute engine to process large scale data efficiently

        • Best suited for Data Analytical projects as data involves is always huge

    2.   How to read data from the source files got different delimiters(not comma)?

      • We need to use the options keyword in spark. Read function to specify the different delimiters. e.g to read source file having pipe(|) as delimiter we need to use below code

    3.   How to create new column in dataframe from available source columns?

      • We need to use the withColumn keyword in source Dataframe to add new column into the dataframe. If we need to derive new column based on some operation on existing columns then we need to import col

        function from pyspark.sql.functions package to derive the expression. refer below code for example.

    4.   How to read dataframe from fixed width source columns(Source columns are separated by length of data without any delimiter between the columns)?

      • If we know exact length and data type of the source file column names then we can define the schema for source file with specific data type and length and pass it into sparl.read.csv method using schema option. Refer example code for this.


      • Other way to handle this type of data is , We need to do some pre-processing of the data to handle this type of source data. Define first dataframe reads all columns as one line per row and in the second dataframe split each individual columns using substr function .

      • Reading entire row as one line naturally happens when we use standard spark.read.csv method as there is no delimiter all of the column values read as one line

    5.   Many code writing questions related to Joining two dataframes and aggregating data in source dataframe?

      • There is sample code available for Aggregation in this module and Joining Dataframes are covered in the Sections 14&16(We will revisit  the joins at the Module Summary of those lessons). Please refer PySpark aggregation sample code and try to remember when groupBy function comes and where we apply aggregation function. Also remember we need to import aggregation functions from pyspark.sql.functions package before use it.

    6.   How to run (or) Write the code using PySpark and Spark SQL in the same notebook?

      • This is the easiest one as we need to use magic commands %py to run PySpark code and use %sql to run Spark SQL code

    7.   How to check the files existing on Cloud Storage account from notebook(Without opening Cloud Storage Account)?

      • we can use dbutils.fs command to view the files in Cloud storage account from databricks notebook

  • Comments

    Popular posts from this blog

    session 19 Git Repository

      🔁 Steps to Create a Branch in Databricks, Pull from Git, and Merge into a Collaborative Branch Create a New Branch in Databricks: Go to the Repos tab in your workspace. Navigate to the Git-linked repo. Click the Git icon (or three dots ⋮) and choose "Create Branch." Give your branch a name (e.g., feature-xyz ) and confirm. Pull the Latest Changes from Git: With your new branch selected, click the Git icon again. Select “Pull” to bring the latest updates from the remote repository into your local Databricks environment. Make Changes & Commit: Edit notebooks or files as needed in your branch. Use the "Commit & Push" option to push changes to the remote repo. Merge into the Collaborative Branch: Switch to the collaborative branch (e.g., dev or main ) in Git or from the Databricks UI. Click "Pull & Merge" . Choose the branch you want to merge into the collaborative branch. Review the c...

    Session 18 monitering and logging - Azure Monitor , Log analytics , and job notification

     After developing the code, we deploy it into the production environment. To monitor and logging the jobs run in the real time systems in azure  we have scheduled the jobs under the workflow , we haven't created any monitoring or any matrics . After a few times, the job failed, but we don't know because we haven't set up any monitoring, and every time we can't navigate to workspace-> workflows, under runs to see to check whether the job has been successfully running or not and in real time there will be nearly 100 jobs or more jobs to run  In real time, the production support team will monitor the process. Under the workflow, there is an option called Job notification. After setting the job notification, we can set a notification to email . if we click the date and time its takes us to the notebook which is scheduled there we can able to see the error where it happens . order to see more details, we need to under Spark tab, where we have the option to view logs ( tha...

    Transformation - section 6 - data flow

      Feature from Slide Explanation ✅ Code-free data transformations Data Flows in ADF allow you to build transformations using a drag-and-drop visual interface , with no need for writing Spark or SQL code. ✅ Executed on Data Factory-managed Databricks Spark clusters Internally, ADF uses Azure Integration Runtimes backed by Apache Spark clusters , managed by ADF, not Databricks itself . While it's similar in concept, this is not the same as your own Databricks workspace . ✅ Benefits from ADF scheduling and monitoring Data Flows are fully integrated into ADF pipelines, so you get all the orchestration, parameterization, logging, and alerting features of ADF natively. ⚠️ Important Clarification Although it says "executed on Data Factory managed Databricks Spark clusters," this does not mean you're using your own Azure Databricks workspace . Rather: ADF Data Flows run on ADF-managed Spark clusters. Azure Databricks notebooks (which you trigger via an "Exe...