Skip to main content

Pyspark


link the above link for a sample code for Pyspark 

Transformations create new datasets, actions return values, or write data.

Transformations:

  • Definition:
    • Transformations are operations that create a new RDD or data frame from an existing one.
    • They are "lazy," meaning they don't execute immediately. Instead, Spark builds a lineage graph (DAG - Directed Acyclic Graph) of transformations.
Examples:
  • map(): Applies a function to each element.
  • filter(): Select elements based on a condition.
  • flatMap(): Similar to a map, but flattens the results.
  • groupByKey(): Groups elements by key.
  • reduceByKey(): Reduces elements by key.
  • join(): Joins two datasets.
  • distinct(): Removes duplicate elements.
  • union(): Combines two datasets.
  • coalesce(): Reduces the number of partitions.
  • repartition(): Increases or decreases the number of partitions.

Actions:

  • Definition:
    • Actions trigger the execution of the lineage graph, which returns a result to the driver program or writes data to external storage.
  • collect(): Returns all elements as an array to the driver.
  • count(): Returns the number of elements.
  • first(): Returns the first element.
  • take(n): Returns the first 'n' elements.
  • reduce(): Reduces elements using a function.
  • foreach(): Applies a function to each element.
  • saveAsTextFile(): Writes elements to a text file.
  • show(): Displays the contents of a data frame.

  • Repartitioning Mechanisms

    Spark provides two primary methods for repartitioning:  

    • repartition():
      • This method is used to either increase or decrease the number of partitions.  
      • It performs a full shuffle of the data, meaning that data is redistributed across all partitions. This can be an expensive operation, especially for large datasets, as it involves significant network communication.  
      • repartition() aims to distribute data evenly across the new partitions.
      • It is typically used when you need to redistribute data to achieve a more balanced workload or when you are increasing the number of partitions.
    • coalesce():
      • This method is primarily used to decrease the number of partitions.  
      • It attempts to minimize data shuffling by combining existing partitions.  
      • It is generally more efficient than repartition() when reducing the number of partitions.
      • However, coalesce() may result in unevenly sized partitions.
      • It is best used when you are filtering your data, and have reduced the size of the data set.
    Optimation techniques :
    Apache Spark, cache() and persist() are fundamental optimization techniques used to store intermediate results of computations, thereby avoiding redundant recalculations. While they are closely related, there's a key distinction:

    • cache(): its has limited storage capacity if it overloaded keep on running but in persist after it got overloaded its stored in disk 
    • cache() is essentially a shorthand for persist(StorageLevel.MEMORY_ONLY) (for RDDs) or persist(StorageLevel.MEMORY_AND_DISK) (for Dataframes and Datasets).
      • It stores the data in memory, if possible. If there's not enough memory, some partitions might be recomputed.
      • It's a convenient way to quickly cache data when you're primarily concerned with in-memory storage.
    • persist(): once its persist we have to unpersist it .  
      • persist() provides more fine-grained control over storage. You can specify different storage levels, such as:
        • MEMORY_ONLY: Store data in memory only.
        • MEMORY_AND_DISK: Store data in memory, and spill to disk if memory is insufficient.
        • DISK_ONLY: Store data only on disk.
        • And variations of these, including serialization and replication.
      • This allows you to choose the storage strategy that best suits your needs, balancing performance and fault tolerance.

    In essence:

    • cache() is a simple way to use the default storage level.
    • persist() offers flexibility to choose from various storage levels.
    Broadcast Join : 

    A broadcast join in Apache Spark is an optimization technique designed to significantly speed up join operations when one of the DataFrames involved is relatively small

    The Solution: Broadcast Joins

    • A broadcast join avoids this shuffling by copying the smaller DataFrame to every executor node in the Spark cluster.
    • This means each executor has a complete copy of the small DataFrame in its memory.
    • Then, when the join is performed, each executor can perform the join locally, without needing to shuffle the larger data frame.
    • Improved Performance: By eliminating shuffling, broadcast joins can dramatically reduce the execution time of join operations.
    • Reduced Network Traffic: Network traffic is minimized, as the larger DataFrame does not need to be shuffled.
    • Limitations

      • Memory Constraints: The smaller DataFrame must fit in the memory of each executor node. If it's too large, you'll encounter out-of-memory errors.
      • Suitability: Broadcast joins are not suitable for joining two large DataFrames. In those cases, shuffle joins are necessary.

    Comments

    Popular posts from this blog

    session 19 Git Repository

      🔁 Steps to Create a Branch in Databricks, Pull from Git, and Merge into a Collaborative Branch Create a New Branch in Databricks: Go to the Repos tab in your workspace. Navigate to the Git-linked repo. Click the Git icon (or three dots ⋮) and choose "Create Branch." Give your branch a name (e.g., feature-xyz ) and confirm. Pull the Latest Changes from Git: With your new branch selected, click the Git icon again. Select “Pull” to bring the latest updates from the remote repository into your local Databricks environment. Make Changes & Commit: Edit notebooks or files as needed in your branch. Use the "Commit & Push" option to push changes to the remote repo. Merge into the Collaborative Branch: Switch to the collaborative branch (e.g., dev or main ) in Git or from the Databricks UI. Click "Pull & Merge" . Choose the branch you want to merge into the collaborative branch. Review the c...

    Session 18 monitering and logging - Azure Monitor , Log analytics , and job notification

     After developing the code, we deploy it into the production environment. To monitor and logging the jobs run in the real time systems in azure  we have scheduled the jobs under the workflow , we haven't created any monitoring or any matrics . After a few times, the job failed, but we don't know because we haven't set up any monitoring, and every time we can't navigate to workspace-> workflows, under runs to see to check whether the job has been successfully running or not and in real time there will be nearly 100 jobs or more jobs to run  In real time, the production support team will monitor the process. Under the workflow, there is an option called Job notification. After setting the job notification, we can set a notification to email . if we click the date and time its takes us to the notebook which is scheduled there we can able to see the error where it happens . order to see more details, we need to under Spark tab, where we have the option to view logs ( tha...

    Transformation - section 6 - data flow

      Feature from Slide Explanation ✅ Code-free data transformations Data Flows in ADF allow you to build transformations using a drag-and-drop visual interface , with no need for writing Spark or SQL code. ✅ Executed on Data Factory-managed Databricks Spark clusters Internally, ADF uses Azure Integration Runtimes backed by Apache Spark clusters , managed by ADF, not Databricks itself . While it's similar in concept, this is not the same as your own Databricks workspace . ✅ Benefits from ADF scheduling and monitoring Data Flows are fully integrated into ADF pipelines, so you get all the orchestration, parameterization, logging, and alerting features of ADF natively. ⚠️ Important Clarification Although it says "executed on Data Factory managed Databricks Spark clusters," this does not mean you're using your own Azure Databricks workspace . Rather: ADF Data Flows run on ADF-managed Spark clusters. Azure Databricks notebooks (which you trigger via an "Exe...