Skip to main content

Session 15 - Datalake Bronze layer Load - Ingest Weather- Data API source

 The URL of the API.


The response of the API.






Here we just separated and declared variables for the source and destination paths.



spark.sql(...): This is the entry point in PySpark to execute SQL queries against DataFrames or tables registered in the Spark metastore."SELECT latitude,longitude,marketName from pricing_analytics.silver.geo_location_silver LIMIT 20": This is the SQL query being executed.
    • SELECT latitude,longitude,marketName It selects three specific columns: latitude, longitude, and marketName.
    • from pricing_analytics.silver.geo_location_silver It specifies the table to query from. The naming convention suggests this is a table  geo_location_silver located within a database or schema named silver under a top-level catalog pricing_analytics. This likely corresponds to the silver layer we've discussed previously.
    • LIMIT 20This clause restricts the number of rows the query returns to a maximum of 20.

  • reading the elements in the dataframe by a for loop and printing each in the array elements.



    Now we are giving the input in the URL and getting the response in JSON format.

  • for geolocations in geolocationsDF.collect()::

    • geolocationsDF.collect(): This retrieves all the rows from the geolocationsDF DataFrame (which was limited to 20 rows in the previous step) and brings them into the driver's memory as a list of Row objects. Caution: Using collect() on a large DataFrame can cause memory issues on the driver.
    • for geolocations in ...: The code then iterates through each Row object in this collected list.
  • print(geolocations["marketName"], geolocations["latitude"], geolocations["longitude"]): Inside the loop, this line prints the marketName, latitude, and longitude for the current location.

  • weatherDataSourceAPIURL = f"{weatherDataSourceAPIBaseURL}({geolocations['latitude']},{geolocations['longitude']})&start_date=2023-01-01&end_date=2023-12-31{weatherDataSourceAPIURLOptions}":

    • This line constructs a URL for a weather data API. It uses f-strings to embed the latitude and longitude of the current location into the URL.
    • It also includes a start_date and end_date parameter, setting the date range for the weather data to the entire year of 2023.
    • weatherDataSourceAPIBaseURL and weatherDataSourceAPIURLOptions are assumed to be previously defined variables containing the base URL and other API parameters, respectively.
  • weatherDataAPIResponse = requests.get(weatherDataSourceAPIURL).json():

    • This line uses the requests library to make an HTTP GET request to the constructed weatherDataSourceAPIURL.
    • .json() parses the JSON response from the weather API into a Python dictionary and stores it in the weatherDataAPIResponse variable.
  • weatherDataAPIResponse["marketName"] = geolocations["marketName"]:

    • This line adds a new key-value pair to the weatherDataAPIResponse dictionary. It takes the marketName of the current location from the geolocations row and adds it to the weather data response. This likely serves to associate the fetched weather data with the specific market.
  • weatherDataAPIResponsejson = json.dumps(weatherDataAPIResponse):

    • This line uses the json.dumps() function to convert the weatherDataAPIResponse Python dictionary into a JSON-formatted string and store it in the weatherDataAPIResponsejson variable.



  • weatherDataAPIResponseList = []
    
    for geolocations in geolocationsDF.collect():
        print(geolocations["marketName"], geolocations["latitude"], geolocations["longitude"])
        weatherDataSourceAPIURL = f"{weatherDataSourceAPIBaseURL}({geolocations['latitude']},{geolocations['longitude']})&start_date=2023-01-01&end_date=2023-12-31{weatherDataSourceAPIURLOptions}"
        try:
            weatherDataAPIResponse = requests.get(weatherDataSourceAPIURL, timeout=120).json()
            weatherDataAPIResponse["marketName"] = geolocations["marketName"]
            weatherDataAPIResponsejson = json.dumps(weatherDataAPIResponse)
            print(weatherDataAPIResponsejson)
            if isinstance(weatherDataAPIResponse, dict):
                weatherDataAPIResponseList.append(weatherDataAPIResponsejson)
        except requests.exceptions.RequestException as e:
            print(f"Error fetching data for {geolocations['marketName']}: {e}")
    
    weatherDataRDD = sc.parallelize(weatherDataAPIResponseList)
    weatherDataSparkDF = spark.read.json(weatherDataRDD)
    
    weatherDataSparkDF \
        .write \
        .mode("overwrite") \
        .json(weatherDataSinkFolderPath)
    

    Explanation:

    1. weatherDataAPIResponseList = []: Initializes an empty list to store the JSON responses from the weather API.

    2. for geolocations in geolocationsDF.collect():: Iterates through the rows collected from the geolocationsDF (which contained market names, latitudes, and longitudes). Note: As discussed before, collect() can be inefficient for large DataFrames.

    3. print(...): Prints the market name, latitude, and longitude for each location being processed.

    4. weatherDataSourceAPIURL = f"...": Constructs the URL for the weather API, embedding the latitude and longitude. It also specifies the date range for the data (2023-01-01 to 2023-12-31).

    5. try...except requests.exceptions.RequestException as e:: This block handles potential errors during the API request.

      • weatherDataAPIResponse = requests.get(weatherDataSourceAPIURL, timeout=120).json(): Makes a GET request to the weather API with a timeout of 120 seconds and parses the JSON response.
      • weatherDataAPIResponse["marketName"] = geolocations["marketName"]: Adds the marketName to the weather data response.
      • weatherDataAPIResponsejson = json.dumps(weatherDataAPIResponse): Converts the Python dictionary response to a JSON string.
      • print(weatherDataAPIResponsejson): Prints the JSON response.
      • if isinstance(weatherDataAPIResponse, dict): weatherDataAPIResponseList.append(weatherDataAPIResponsejson): If the API response is a dictionary (implying a successful JSON parse), its JSON string representation is appended to the weatherDataAPIResponseList.
      • except requests.exceptions.RequestException as e:: If an error occurs during the API request, an error message including the market name and the exception details is printed.
    6. weatherDataRDD = sc.parallelize(weatherDataAPIResponseList): Creates a Spark RDD from the list of JSON response strings.

    7. weatherDataSparkDF = spark.read.json(weatherDataRDD): Reads the JSON data from the RDD into a Spark DataFrame. Spark will infer the schema from the JSON strings.

    8. weatherDataSparkDF.write.mode("overwrite").json(weatherDataSinkFolderPath): Writes the weatherDataSparkDF to the location specified by weatherDataSinkFolderPath in JSON format, overwriting any existing files.

    Output:

    Below the code cell, you can see a series of lines starting with market names followed by latitude and longitude. This is the output of the print() statement at the beginning of the loop. It shows the coordinates for which the weather data is being fetched.

    The absence of any error messages in the output suggests that the API requests for these initial locations were successful. The code is designed to catch requests.exceptions.RequestException, so if there were network issues or problems with the API for these first few locations, error messages would likely have been printed.

    In summary, this code fetches weather data for a set of locations, handles potential API errors, and then saves the fetched weather data (along with the market name) as JSON files using Spark. The try...except block is a good practice for handling potential network issues when interacting with external APIs.

    Featuresc.parallelize() (PySpark)pd.json_normalize() (Pandas)
    LibraryPySparkPandas
    InputPython collection (list, etc.)JSON object or list of JSON objects (often as Python dictionaries)
    OutputSpark RDDPandas DataFrame
    PurposeDistribute local data for parallel processingTransform nested JSON into a flat table
    ProcessingDistributed across the Spark clusterIn-memory on a single machine
    Data StructureUnstructured or semi-structured elementsStructured, potentially nested JSON data
    Use CaseInitializing RDDs from local data and parallelizing existing Python dataHandling and flattening complex JSON responses, creating tabular data from JSON






    Under Spark UI tables, we can see the driver is executing the jobs. Inside the job, there will be a number of stages.
    . Spark jobs are the high-level actions, and they are broken down into stages, which represent the sequences of tasks needed to perform the computations, often separated by shuffles or dependencies. The Spark UI provides a detailed view of this execution flow, allowing you to monitor and troubleshoot your Spark applications.

    the data was finally ingested into the bronze container.





    Comments

    Popular posts from this blog

    session 19 Git Repository

      🔁 Steps to Create a Branch in Databricks, Pull from Git, and Merge into a Collaborative Branch Create a New Branch in Databricks: Go to the Repos tab in your workspace. Navigate to the Git-linked repo. Click the Git icon (or three dots ⋮) and choose "Create Branch." Give your branch a name (e.g., feature-xyz ) and confirm. Pull the Latest Changes from Git: With your new branch selected, click the Git icon again. Select “Pull” to bring the latest updates from the remote repository into your local Databricks environment. Make Changes & Commit: Edit notebooks or files as needed in your branch. Use the "Commit & Push" option to push changes to the remote repo. Merge into the Collaborative Branch: Switch to the collaborative branch (e.g., dev or main ) in Git or from the Databricks UI. Click "Pull & Merge" . Choose the branch you want to merge into the collaborative branch. Review the c...

    Session 18 monitering and logging - Azure Monitor , Log analytics , and job notification

     After developing the code, we deploy it into the production environment. To monitor and logging the jobs run in the real time systems in azure  we have scheduled the jobs under the workflow , we haven't created any monitoring or any matrics . After a few times, the job failed, but we don't know because we haven't set up any monitoring, and every time we can't navigate to workspace-> workflows, under runs to see to check whether the job has been successfully running or not and in real time there will be nearly 100 jobs or more jobs to run  In real time, the production support team will monitor the process. Under the workflow, there is an option called Job notification. After setting the job notification, we can set a notification to email . if we click the date and time its takes us to the notebook which is scheduled there we can able to see the error where it happens . order to see more details, we need to under Spark tab, where we have the option to view logs ( tha...

    Transformation - section 6 - data flow

      Feature from Slide Explanation ✅ Code-free data transformations Data Flows in ADF allow you to build transformations using a drag-and-drop visual interface , with no need for writing Spark or SQL code. ✅ Executed on Data Factory-managed Databricks Spark clusters Internally, ADF uses Azure Integration Runtimes backed by Apache Spark clusters , managed by ADF, not Databricks itself . While it's similar in concept, this is not the same as your own Databricks workspace . ✅ Benefits from ADF scheduling and monitoring Data Flows are fully integrated into ADF pipelines, so you get all the orchestration, parameterization, logging, and alerting features of ADF natively. ⚠️ Important Clarification Although it says "executed on Data Factory managed Databricks Spark clusters," this does not mean you're using your own Azure Databricks workspace . Rather: ADF Data Flows run on ADF-managed Spark clusters. Azure Databricks notebooks (which you trigger via an "Exe...