Skip to main content

session 11- Spark Structured streaming - Real Time Data processing.

 101. Streaming source data Generation.

Normally streaming process involves streaming the data to Kafka, Kenesis(AWS) and PUb SUb (GCP )


The data is continuously flowing into the bronze layer This kind of data is called as unbounded data.






As far as now, we have only used the reading data from pyspark using Spark. Read the file format (file name ), and we will write using Spark. write to write the data into the destination.

So to read the streaming data, we have to use Spark. readstream and spark.writestream



If we haven't specified any format default format will be the delta format in Azure. For normal write, we will use option (mode, 'override') In Spark streaming, we have to use outputmode ("append")

the spark write stream output will be 




Python
sinkStreamJSONCheckpointPath = 'abfss://bronze@adlsudadatalakedev.dfs.core.windows.net/daily-pricing-streaming-data/json/checkpoint/'

streamProcessingQuery = (sourceStreamJSONFileDF
                       .writeStream
                       .outputMode("append")
                       .format("json")
                       .queryName("stream-processing")
                       .trigger(processingTime = "5 Minutes")
                       .option("checkpointLocation", sinkStreamJSONCheckpointPath)
                       .start(sinkStreamJSONFilePath)
                       )

The key additions compared to the previous snippet are:

  1. .queryName("stream-processing"):

    • This line assigns a logical name to your streaming query: "stream-processing".
    • Benefits of setting a query name:
      • Monitoring and Management: It makes it easier to identify and monitor your specific streaming query in Spark's UI (e.g., the Spark History Server or the Spark UI's Streaming tab).
      • Debugging: When troubleshooting issues, the query name helps in distinguishing logs and metrics related to this particular stream.
      • Programmatic Access: You can potentially use the query name to programmatically interact with or manage the running stream (though this is less common in basic write scenarios).
  2. .trigger(processingTime = "5 Minutes"):

    • This line configures the trigger interval for your streaming query.
    • processingTime = "5 Minutes" specifies that Spark will attempt to process new data and update the sink every 5 minutes.
    • Trigger Modes: Spark Structured Streaming supports different trigger modes:
      • processingTime: As used here, it triggers processing at fixed time intervals.
      • once: The query will process all available data in the source and then terminate. This is useful for one-time batch-like processing on streaming data.
      • continuous (experimental): For low-latency processing with certain sources and sinks.
      • availableNow (as of Spark 3.4): Triggers processing as soon as new data is available.
    • Impact: Setting a trigger interval controls the latency of your streaming pipeline. A shorter interval leads to lower latency but potentially higher resource consumption. A longer interval reduces resource usage but increases latency. Choosing the appropriate trigger interval depends on your specific application requirements.

The rest of the code remains the same and performs the following actions:

  • Reads a streaming DataFrame (sourceStreamJSONFileDF).
  • Writes the output in "append" mode (only new data is written).
  • Formats the output as "json" files.
  • Uses the specified sinkStreamJSONCheckpointPath for maintaining the query's state for fault tolerance.
  • Starts the streaming query and writes the output to sinkStreamJSONFilePath (assumed to be defined elsewhere).

In summary, this updated code snippet enhances the previous one by:

  • Providing a logical name to the streaming query for better monitoring and management.
  • Defining a specific processing interval of 5 minutes, controlling how frequently new data is processed and written to the sink.

This configuration is more explicit about how the streaming pipeline will operate in terms of processing frequency and provides better metadata for monitoring.




he output streamProcessingQuery.Id shows the unique identifier of the Spark Structured Streaming query that you just started. This confirms that the .start() action was successful in launching the streaming process. The query is now running in the background, continuously processing data according to the configured trigger and writing the results to the specified sink.



Module Summary

Spark Structured Streaming Functionalities:

  1. Spark Structured Streaming Use Case Scenarios

  2. Spark Structured Streaming Reader / Writer Configurations

  3. Spark Structured Streaming CHECKPOINT options Purpose and Usability


Comments

Popular posts from this blog

session 19 Git Repository

  🔁 Steps to Create a Branch in Databricks, Pull from Git, and Merge into a Collaborative Branch Create a New Branch in Databricks: Go to the Repos tab in your workspace. Navigate to the Git-linked repo. Click the Git icon (or three dots ⋮) and choose "Create Branch." Give your branch a name (e.g., feature-xyz ) and confirm. Pull the Latest Changes from Git: With your new branch selected, click the Git icon again. Select “Pull” to bring the latest updates from the remote repository into your local Databricks environment. Make Changes & Commit: Edit notebooks or files as needed in your branch. Use the "Commit & Push" option to push changes to the remote repo. Merge into the Collaborative Branch: Switch to the collaborative branch (e.g., dev or main ) in Git or from the Databricks UI. Click "Pull & Merge" . Choose the branch you want to merge into the collaborative branch. Review the c...

Session 18 monitering and logging - Azure Monitor , Log analytics , and job notification

 After developing the code, we deploy it into the production environment. To monitor and logging the jobs run in the real time systems in azure  we have scheduled the jobs under the workflow , we haven't created any monitoring or any matrics . After a few times, the job failed, but we don't know because we haven't set up any monitoring, and every time we can't navigate to workspace-> workflows, under runs to see to check whether the job has been successfully running or not and in real time there will be nearly 100 jobs or more jobs to run  In real time, the production support team will monitor the process. Under the workflow, there is an option called Job notification. After setting the job notification, we can set a notification to email . if we click the date and time its takes us to the notebook which is scheduled there we can able to see the error where it happens . order to see more details, we need to under Spark tab, where we have the option to view logs ( tha...

Transformation - section 6 - data flow

  Feature from Slide Explanation ✅ Code-free data transformations Data Flows in ADF allow you to build transformations using a drag-and-drop visual interface , with no need for writing Spark or SQL code. ✅ Executed on Data Factory-managed Databricks Spark clusters Internally, ADF uses Azure Integration Runtimes backed by Apache Spark clusters , managed by ADF, not Databricks itself . While it's similar in concept, this is not the same as your own Databricks workspace . ✅ Benefits from ADF scheduling and monitoring Data Flows are fully integrated into ADF pipelines, so you get all the orchestration, parameterization, logging, and alerting features of ADF natively. ⚠️ Important Clarification Although it says "executed on Data Factory managed Databricks Spark clusters," this does not mean you're using your own Azure Databricks workspace . Rather: ADF Data Flows run on ADF-managed Spark clusters. Azure Databricks notebooks (which you trigger via an "Exe...