Skip to main content

pyspark , databricks sample program

 Databricks was built on top of Apache Spark. Spark was built using Java and Scala, and since we are going to use it with Python, we are going for Pyspark.




to make it even better and faster spark has higher level abstracted libraries called modules  , spark dataframe , spark SQl, Spark Streaming, Mlib and graph , Pandas API.



Databricks was designed on top of Apache Spark, so it simplified everything for the creation of the cluster, configuring it to store the data 


Azure Databricks is a unified, open analytics platform for building, deploying, sharing, and maintaining enterprise-grade data, analytics, and AI solutions at scale. The Databricks Data Intelligence Platform integrates with cloud storage and security in your cloud account and manages and deploys cloud infrastructure for you.

After creating the databricks resource in the Azure account 
Click Lunch Databricks, then the following screen appears.





code management --> workspace ( activities like  read , write , import , export , update codes , storing in repo etc.. )

Data management --> catalog (we are developing the code to manage or process the data and that data can be managed under catalog where the data are manged as files and database )

Compute management --> compute ( hadoop cluster with spark )











And under the New option, we can see the list of frequently used options 


Notebooks :


To create the new notebook, click on the workspace under will be two folders 
1. Workspace --> its internal azure workspace 
2. Repo --> exporting the code from the external source like git .




Under User right-click and create a new notebook ( looks similar to Jupyter Notebook)
To execute the code, we need to connect  with an existing cluster or SQL warehouse 

the programming language used for Azure is 




We can read the file from the source system in Spark using the .read attribute 

 In Spark, you can indeed read data from various source systems using the .read attribute of a SparkSession or a SparkContext (though SparkSession is the more modern and recommended approach.

For example, if you want to read a CSV file, you would typically do something like this:

Python
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("ReadFile").getOrCreate()

# Read a CSV file
df = spark.read.csv("path/to/your/file.csv")

# To specify options like header and delimiter:
df_with_options = spark.read.option("header", "true").option("delimiter", ",").csv("path/to/your/file.csv")

# You can do something similar for other file formats like JSON, Parquet, etc.
json_df = spark.read.json("path/to/your/file.json")
parquet_df = spark.read.parquet("path/to/your/file.parquet")

# Don't forget to stop the SparkSession when you're done
spark.stop()

The .read The attribute provides access to  DataFrameReader  methods for reading data in different formats. You can also use .option() it to configure various parameters specific to the data source.

Using Python, we can read the file using CSV, requests, and pandas  libraries 

Read CSV File from Azure Data Lake Storage Account

CSV Source File Path : "abfss://working-labs@datalakestorageaccountname.dfs.core.windows.net/bronze/daily-pricing/csv"

JSON Target File Path : "abfss://working-labs@datalakestorageaccountname.dfs.core.windows.net/bronze/daily-pricing/json"

Spark Methods


The source file URL is there 
importing pandas 
reading the file using the pandas library and storing in a dataframe 
Creating the spark session ( spark session is for abstracted library and spark context is for core library )
creating the dataframe in spark and giving the input as dataframe that has been created using pandas 


To access the storage account in Azure, we need the access key 

Give the correct storage name and the container name in the pyspark program 




After running the program, we can see the output jas been run in different nodes in the cluster 


In the write function we are giving mode as overwrite to overwrite the output each time 


To know the access key, go to the particular storage account under the security +networking, you can find the access key 




read library can used in spark program by giving the keyword spark and after read we need to give .csvor any other format ( spark.read.csv )


It automatically gives the column name that starts with an underscore


Display () is used to see the contents inside the dataframe 



now adding the option as a header as True 

This enables the column headers as column names 

If there is no header, it automatically picks the first row as a column name.



If we manually need to set the schema of the table, we use StructType 

StructType In Spark SQL, it is crucial to define the structure and data types of your DataFrames. It provides a way to explicitly specify the schema, which is important for data integrity, performance, and working with complex or unstructured data sources.


hink of StructType as the blueprint or the schema definition for your tabular data in Spark. It tells Spark the names and types of the columns it should expect.

Key Characteristics of StructType:

  • Ordered Collection of Fields: A StructType is an ordered collection of StructField objects. The order in which you define the fields matters for some operations and for how the data is interpreted.
  • StructField Components: Each field within a StructType is defined by a StructField. A StructField has three main attributes:
    • name (String): The name of the column.
    • dataType (DataType): The data type of the column (e.g., StringType, IntegerType, BooleanType, DateType, another nested StructType, ArrayType, MapType, etc.).
    • nullable (Boolean): Indicates whether the column can contain null values (True) or not (False).
    • metadata (Map[String, Any], optional): A map to store extra information about the field.

How StructType is Used in Spark:

  1. Defining DataFrame Schemas: When creating DataFrames from sources that don't inherently provide schema information (like CSV or JSON files without schema inference, or from RDDs), you explicitly define the schema using a StructType.

    Python
    from pyspark.sql.types import StructType, StructField, StringType, IntegerType
    
    schema = StructType([
        StructField("name", StringType(), False),
        StructField("age", IntegerType(), True),
        StructField("city", StringType(), True)
    ])
    
    # Assuming you have an RDD called 'data_rdd'
    df = spark.createDataFrame(data_rdd, schema)
    df.printSchema()
    
  2. Schema Inference (Implicit): Spark can sometimes infer the schema of your data automatically, especially when reading structured files like Parquet or when you have header rows in CSV files. However, explicitly defining the schema with StructType is often recommended for:

    • Performance: Explicit schemas can sometimes lead to better performance as Spark doesn't need to sample the data to infer types.
    • Data Integrity: You can ensure the data types are interpreted correctly, preventing potential data type mismatches and errors.
    • Clarity and Maintainability: Explicit schemas make your code more readable and easier to understand.
  3. Working with Nested Data: StructType allows you to define schemas for nested data structures. A StructField can have its dataType set to another StructType, allowing you to represent hierarchical data.

    Python
    from pyspark.sql.types import StructType, StructField, StringType, IntegerType
    
    address_schema = StructType([
        StructField("street", StringType(), True),
        StructField("zipcode", StringType(), True)
    ])
    
    person_schema = StructType([
        StructField("name", StringType(), False),
        StructField("age", IntegerType(), True),
        StructField("address", address_schema, True)
    ])
    
    # ... create DataFrame with this schema ...
    
  4. Defining Complex Data Types: You can also use StructType in conjunction with other complex data types like ArrayType (for lists) and MapType (for key-value pairs) to create sophisticated schemas.

    Python
    from pyspark.sql.types import StructType, StructField, StringType, ArrayType, IntegerType
    
    schema_with_array = StructType([
        StructField("name", StringType(), False),
        StructField("scores", ArrayType(IntegerType()), True)
    ])
    
    # ... create DataFrame ...




Here, the true and false define whether it has null values or not.

now we have create the schema using StructType and we have to add the created schema into the dataframe
by using option.schema,(schemaname )


To know the schema for the dataframe, use the df.printschema() function

To write the dataframe into the file using the df. write. json ("filepath")
Like-wise in the read function, we have option("header", True), and to overwrite the file again and again, we used mode("override")

(sourceCSVFileDF.
 write.
 option("header","true").
 mode("overwrite").
 csv(sourceCSVFilePath)
 )



All the jobs are running in the cluster we created before writing this program

Comments

Popular posts from this blog

session 19 Git Repository

  šŸ” Steps to Create a Branch in Databricks, Pull from Git, and Merge into a Collaborative Branch Create a New Branch in Databricks: Go to the Repos tab in your workspace. Navigate to the Git-linked repo. Click the Git icon (or three dots ⋮) and choose "Create Branch." Give your branch a name (e.g., feature-xyz ) and confirm. Pull the Latest Changes from Git: With your new branch selected, click the Git icon again. Select “Pull” to bring the latest updates from the remote repository into your local Databricks environment. Make Changes & Commit: Edit notebooks or files as needed in your branch. Use the "Commit & Push" option to push changes to the remote repo. Merge into the Collaborative Branch: Switch to the collaborative branch (e.g., dev or main ) in Git or from the Databricks UI. Click "Pull & Merge" . Choose the branch you want to merge into the collaborative branch. Review the c...

Session 18 monitering and logging - Azure Monitor , Log analytics , and job notification

 After developing the code, we deploy it into the production environment. To monitor and logging the jobs run in the real time systems in azure  we have scheduled the jobs under the workflow , we haven't created any monitoring or any matrics . After a few times, the job failed, but we don't know because we haven't set up any monitoring, and every time we can't navigate to workspace-> workflows, under runs to see to check whether the job has been successfully running or not and in real time there will be nearly 100 jobs or more jobs to run  In real time, the production support team will monitor the process. Under the workflow, there is an option called Job notification. After setting the job notification, we can set a notification to email . if we click the date and time its takes us to the notebook which is scheduled there we can able to see the error where it happens . order to see more details, we need to under Spark tab, where we have the option to view logs ( tha...

Transformation - section 6 - data flow

  Feature from Slide Explanation ✅ Code-free data transformations Data Flows in ADF allow you to build transformations using a drag-and-drop visual interface , with no need for writing Spark or SQL code. ✅ Executed on Data Factory-managed Databricks Spark clusters Internally, ADF uses Azure Integration Runtimes backed by Apache Spark clusters , managed by ADF, not Databricks itself . While it's similar in concept, this is not the same as your own Databricks workspace . ✅ Benefits from ADF scheduling and monitoring Data Flows are fully integrated into ADF pipelines, so you get all the orchestration, parameterization, logging, and alerting features of ADF natively. ⚠️ Important Clarification Although it says "executed on Data Factory managed Databricks Spark clusters," this does not mean you're using your own Azure Databricks workspace . Rather: ADF Data Flows run on ADF-managed Spark clusters. Azure Databricks notebooks (which you trigger via an "Exe...