Skip to main content

Session 13 Datalake bronze Layer load - ingest Geo -location API Source data

 







It appears you've provided an image containing code snippets and configuration details related to fetching geocoding data. Let's break down what's happening here:

Section 1: Configuration Variables

This section defines several variables that seem to be used for configuring a process to retrieve and store geocoding data.

  • source API URL: "https://geocoding-api.open-meteo.com/v1/search?name=kovilpatti&count=10&language=en&format=json"

    • This is the URL of an API endpoint likely used to search for geographical locations.
    • It seems to be using the Open-Meteo Geocoding API.
    • The parameters in the URL suggest a search for locations named "kovilpatti", returning a maximum of 10 results, in English, and in JSON format.
  • JSON Target File Path: "abfss://working-labs@datalakestorageaccountname.dfs.core.windows.net/bronze/geo-location/"

    • This looks like a path in Azure Blob File System (ABFS).
    • It indicates where JSON files containing the geocoding data will be stored.
    • The path suggests a hierarchical structure within a data lake storage account named datalakestorageaccountname, under the container working-labs, and within the bronze/geo-location/ folder.

Section 2: More Configuration Variables (Likely within a Notebook or Script)

This section defines similar configuration variables, possibly within a Databricks notebook or a Python script.

  • geolocationSourceAPIURL: "https://geocoding-api.open-meteo.com/v1/search?name=kovilpatti&count=10&language=en&format=json"

    • This is the same API URL as defined earlier.
  • geolocationSinkLayerName: 'bronze'

    • This likely refers to a data lake layer, suggesting a bronze layer for raw or minimally processed data.
  • geolocationSinkStorageAccountName: 'adlsudadatalakehousedev'

    • This specifies the name of the Azure Data Lake Storage account.
  • geolocationSinkFolderName: 'geo-location'

    • This is the folder name within the storage account where the data will be stored.
  • geolocationSinkFolderPath: f'abfss://{geolocationSinkLayerName}@{geolocationSinkStorageAccountName}.dfs.core.windows.net/{geolocationSinkFolderName}'

    • This line constructs the full ABFS path dynamically using the previously defined variables. This is a more programmatic way to define the target file path.

Section 3: Python Imports

This section shows the import statements in a Python code cell.

  • import requests: This imports the requests library, which is commonly used in Python to make HTTP requests (like fetching data from an API).
  • import jsonThis imports the json library for working with JSON data (encoding and decoding).
  • import pandas as pd This imports the pandas library, a powerful tool for data manipulation and analysis, often used to work with data in a tabular format (like DataFrames).

In summary, the image shows the configuration and initial setup for a process that likely does the following:

  1. Fetches geocoding data for the location "kovilpatti" from the Open-Meteo API using the specified URL.
  2. Parses the JSON response from the API.
  3. Stores the JSON data as files in the specified Azure Data Lake Storage Gen2 path (abfss://working-labs@datalakestorageaccountname.dfs.core.windows.net/bronze/geo-location/ or dynamically constructed using geolocationSinkFolderPath).
  4. The Python code suggests that the requests library will be used to make the API call, json to handle the response, and pandas might be used for further processing or structuring of the data before storage.

This setup is typical for an ETL (Extract, Transform, Load) or data ingestion pipeline where data is being pulled from an external source (the API) and landed in a data lake for further processing and analysis.



 unnecessary intermediate step, there are several reasons why you might choose to create a Pandas DataFrame before a Spark DataFrame when dealing with data from external sources like APIs:

1. Handling Nested or Semi-Structured Data:

  • API Responses Often Have Complex Structures: API responses, especially in JSON format, can have nested lists, dictionaries, and varying structures within the data.
  • Pandas Flexibility in Parsing: Pandas provides more straightforward and flexible ways to handle such semi-structured data directly from JSON. You can easily normalize nested JSON structures using functions like pd.json_normalize(), which can flatten nested records into a tabular format that Spark can then readily understand.
  • Direct Spark JSON Reading Limitations: While Spark can read JSON directly, dealing with complex nesting and variations in the schema within records can be more cumbersome and might require more intricate schema definitions upfront. Pandas offers a more dynamic approach to initially exploring and structuring this kind of data.

2. Data Exploration and Initial Transformation:

  • Pandas for Exploratory Data Analysis (EDA): Pandas is excellent for initial data exploration, inspection, and quick transformations on smaller datasets. You can easily print parts of the DataFrame, check data types, handle missing values, and perform basic filtering and cleaning before moving to Spark for distributed processing.
  • Debugging and Understanding the Data: Creating a Pandas DataFrame allows you to get a tangible, in-memory representation of the data returned by the API. This can be very helpful for debugging and understanding the structure and content of the data before scaling up with Spark.

3. Handling Schema Evolution or Variability:

  • API Schema Changes: APIs can sometimes change their response structure. Pandas can be more forgiving in handling slight variations in the schema during the initial ingestion. You can then use Pandas' capabilities to standardize the schema before creating the Spark DataFrame.
  • Dynamic Schemas: If the schema of the API response isn't strictly fixed, Pandas can often handle this more gracefully than Spark's more rigid schema requirements for direct JSON reading.

4. Integration with Python Ecosystem:

  • Leveraging Pandas Libraries: You might want to use other Python libraries that work well with Pandas DataFrames for initial data manipulation or enrichment before moving to Spark.

5. Control over Data Types and Parsing:

  • Explicit Type Conversion: Pandas allows for more explicit control over data type conversion during the initial DataFrame creation. This can be beneficial if you need to handle specific data type nuances from the API response before Spark infers the schema.

Why Direct Spark Reading is Possible (and sometimes preferred):

It's important to note that Spark can directly read JSON data using spark.read.json(). This is often the preferred method when:

  • The JSON data is well-structured and has a consistent schema.
  • The dataset is already large, and you want to avoid bringing the entire dataset into the memory of a single node (as Pandas does).
  • You have a well-defined schema for the JSON data.

In the specific example you provided:

The code first retrieves the JSON response into a Python dictionary/list, then creates a Pandas DataFrame, and finally a Spark DataFrame. This suggests that the API response might have a structure that is more easily handled and explored initially using Pandas. Perhaps the "results" key contains a list of dictionaries that Pandas can readily convert into rows and columns.

In summary, while directly creating a Spark DataFrame from a JSON API response is possible, using Pandas as an intermediary step often provides more flexibility for handling complex or semi-structured data, facilitates initial exploration and transformation, and can be more robust to schema variations. The choice depends on the complexity of the API response, the size of the data, and the desired level of initial processing and inspection.


geolocationAPIResponse = requests.get(geolocationSourceAPIURL).json()
geolocationPandasDF = pd.DataFrame(geolocationAPIResponse)
geolocationSparkDF = spark.createDataFrame(geolocationPandasDF)

Explanation of the Code:

  1. geolocationAPIResponse = requests.get(geolocationSourceAPIURL).json():

    • This line uses the requests library to make an HTTP GET request to the API endpoint defined in the geolocationSourceAPIURL variable (which we saw in the previous image: "https://geocoding-api.open-meteo.com/v1/search?name=kovilpatti&count=10&language=en&format=json").
    • .json() is then called on the response object to parse the JSON content returned by the API into a Python dictionary or list of dictionaries. This data is stored in the geolocationAPIResponse variable.
  2. geolocationPandasDF = pd.DataFrame(geolocationAPIResponse):

    • This line uses the pandas library to create a Pandas DataFrame named geolocationPandasDF from the geolocationAPIResponse. Pandas DataFrames are powerful for tabular data manipulation in Python. The JSON response from the API is likely structured as a list of dictionaries, where each dictionary represents a location and its attributes. Pandas can directly convert such a structure into a DataFrame.
  3. geolocationSparkDF = spark.createDataFrame(geolocationPandasDF):

    • This line uses the spark object (which is typically available in a Databricks or Spark environment) to create a Spark DataFrame named geolocationSparkDF from the geolocationPandasDF. Spark DataFrames are distributed data structures optimized for large-scale data processing. This step converts the in-memory Pandas DataFrame into a distributed Spark DataFrame, making it suitable for processing with Spark's parallel processing capabilities.

Output: geolocationSparkDF: pyspark.sql.dataframe.DataFrame





marketNames = [dailyPricingMarketNames["MARKET_NAME"] for dailyPricingMarketNames in dailyPricingMarketNamesDF.collect()]
print(marketNames)

Explanation of the Code:

  1. dailyPricingMarketNamesDF.collect():

    • The .collect() action in Spark retrieves all the rows from the dailyPricingMarketNamesDF DataFrame and brings them into the memory of the driver node as a list of Row objects. It's important to be cautious when using collect() on very large DataFrames, as it can lead to out-of-memory errors on the driver.
  2. for dailyPricingMarketNames in dailyPricingMarketNamesDF.collect():

    • This is a Python list comprehension that iterates through each Row object retrieved by collect(). In each iteration, the current Row object is assigned to the variable dailyPricingMarketNames.
  3. dailyPricingMarketNames["MARKET_NAME"]:

    • For each Row object (dailyPricingMarketNamesThis part accesses the value of the column named "MARKET_NAME". Spark Row Objects can be accessed like dictionaries using column names as keys.
  4. marketNames = [...]:

    • The entire list comprehension creates a new Python list named marketNames. This list will contain the values from the "MARKET_NAME" column of each row in the dailyPricingMarketNamesDF.
  5. print(marketNames):

    • Finally, this line prints the marketNames list to the console.

Output:

Below the code cell, you can see the output of the print(marketNames) statement:

["Ahmedabad(Fruit Market", "Anand(Veg", "Binny Mill (F&V)", "Flower Market", "Live Stock", "New Grain Market", "Pat an(Veg", "Sangli(Phale", "Sesamum(Sesame", "A lot", "Aatpadi", "Abhanpur", "Abohar", "Achalda", "Achnera", "Adampur", "A gar", "Agra", "Ahirora", "Ahmedabad"]

This output is a Python list containing strings. Each string represents a market name extracted from the "MARKET_NAME" column of the dailyPricingMarketNamesDF.

In summary, this code snippet reads all the data from the dailyPricingMarketNamesDF into the driver's memory, extracts the values from the "MARKET_NAME" column for each row, stores these names in a Python list called marketNames, and then prints this list to the output.

This image shows an updated version of the previous code cell, where an API call is now being made within the loop for each marketName.

Python Code Cell (Likely Still Cell 7, Updated):

Python
for marketName in marketNames:
    print(marketName)
    geolocationSourceAPIURL = f"{geolocationSourceAPIBaseURL}({marketName}){geolocationSourceAPIURLOptions}"
    geolocationAPIResponse = requests.get(geolocationSourceAPIURL).json()

Changes and Explanation:

The first two lines remain the same as before:

  1. for marketName in marketNames:: Iterates through each market name in the marketNames list.
  2. print(marketName): Prints the current marketName.
  3. geolocationSourceAPIURL = f"{geolocationSourceAPIBaseURL}({marketName}){geolocationSourceAPIURLOptions}": Constructs the API URL for the current marketName.

The new line added is:

  1. geolocationAPIResponse = requests.get(geolocationSourceAPIURL).json():
    • This line uses the requests.get() function to make an HTTP GET request to the geolocationSourceAPIURL that was just constructed.
    • .json() is then called on the response object. This parses the JSON content returned by the API into a Python dictionary or list of dictionaries and stores it in the geolocationAPIResponse variable.

In summary, this updated code cell now iterates through each market name, prints the name, constructs a specific API URL for that market, and then immediately makes a request to that API endpoint to retrieve the geocoding data in JSON format. The geolocationAPIResponse variable will hold the data returned by the API for each market in the loop.



marketNames = [dailyPricingMarketNames["MARKET_NAME"] for dailyPricingMarketNames in dailyPricingMarketNamesDF.collect()]
# print(marketNames)

geolocationAPIResponseList = []

for marketName in marketNames:
    print(marketName)
    geolocationSourceAPIURL = f"{geolocationSourceAPIBaseURL}({marketName}){geolocationSourceAPIURLOptions}"
    geolocationAPIResponse = requests.get(geolocationSourceAPIURL).json()
    print(geolocationAPIResponse)
    if isinstance(geolocationAPIResponse, dict):
        geolocationAPIResponseList.append(geolocationAPIResponse)

geolocationPandasDF = pd.DataFrame(geolocationAPIResponseList)
geolocationSparkDF = spark.createDataFrame(geolocationPandasDF)
geolocationSparkDF \
    .write \
    .mode("overwrite") \
    .json(geolocationSinkFolderPath)

Explanation of the Code:

  1. marketNames = [...]: This line (as seen before) extracts a list of market names from the dailyPricingMarketNamesDF. The # print(marketNames) line is commented out, so the list itself won't be printed directly.

  2. geolocationAPIResponseList = []: An empty Python list is initialized to store the JSON responses received from the API for each market.

  3. for marketName in marketNames:: The code iterates through each marketName in the marketNames list.

  4. print(marketName): The current marketName being processed is printed.

  5. geolocationSourceAPIURL = f"{geolocationSourceAPIBaseURL}({marketName}){geolocationSourceAPIURLOptions}": The API URL is constructed for the current marketName.

  6. geolocationAPIResponse = requests.get(geolocationSourceAPIURL).json(): An HTTP GET request is made to the constructed API URL, and the JSON response is parsed and stored in the geolocationAPIResponse variable.

  7. print(geolocationAPIResponse): The JSON response received from the API for the current market is printed. This is useful for monitoring and debugging.

  8. if isinstance(geolocationAPIResponse, dict):: This line checks if the geolocationAPIResponse is a Python dictionary. This is a basic error handling step to ensure that only valid JSON responses (which are typically represented as dictionaries or lists of dictionaries in Python) are processed further.

  9. geolocationAPIResponseList.append(geolocationAPIResponse): If the geolocationAPIResponse is a dictionary, it's appended to the geolocationAPIResponseList. This list will eventually hold all the valid JSON responses received for all the market names.

  10. geolocationPandasDF = pd.DataFrame(geolocationAPIResponseList): After the loop has processed all the market names, this line creates a Pandas DataFrame (geolocationPandasDF) from the list of API responses (geolocationAPIResponseList). Pandas can handle a list of dictionaries to create a tabular DataFrame.

  11. geolocationSparkDF = spark.createDataFrame(geolocationPandasDF): The Pandas DataFrame is then converted into a Spark DataFrame (geolocationSparkDF) for distributed processing.

  12. geolocationSparkDF.write.mode("overwrite").json(geolocationSinkFolderPath): Finally, the geolocationSparkDF is written to the location specified by geolocationSinkFolderPath in JSON format.

    • .write: Initiates the write operation for the Spark DataFrame.
    • .mode("overwrite"): Specifies that if files already exist at the target path, they should be overwritten.
    • .json(geolocationSinkFolderPath): Specifies that the DataFrame should be written as JSON files to the directory defined by geolocationSinkFolderPath.

Output (Partial):

Below the code cell, you can see a snippet of the data that is likely part of one of the geolocationAPIResponse dictionaries printed during the loop. It shows information about a location, including:

  • country: 'BD' (Bangladesh)
  • admin1: 'Chittagong'
  • admin2: 'Comilla'
  • name: 'Burichang'
  • latitude: 23.61667
  • longitude: 91.11667
  • ... and other geographical details.

In summary, this code fetches geocoding data for a list of market names, stores the valid responses, converts them into a Spark DataFrame, and then saves this DataFrame as JSON files in the specified data lake storage path, overwriting any existing files.



Data files got ingested into the bronze layer at geo location folder as we defined .


since it has too many small files its may overloaded the spark driver . the efficiednt way is to stored it in delta live tables .

Ingesting data as numerous small JSON files directly into the bronze layer can indeed lead to performance issues in Spark due to the "small files problem." This can overwhelm the Spark driver with metadata management and result in inefficient data reading and processing.

Storing the data in Delta Live Tables (DLT) is a much more efficient and recommended approach for several reasons:

Benefits of Using Delta Live Tables for Ingested Data:

  • Automatic Optimization: DLT automatically handles many of the challenges associated with small files. It can compact small files into larger ones in the background through auto-compaction, optimizing read performance.
  • Schema Evolution: DLT provides robust schema evolution capabilities, allowing you to handle changes in the incoming data schema gracefully without breaking your pipeline.
  • Data Quality Enforcement: You can define expectations on your data within DLT pipelines to ensure data quality. Records that fail these expectations can be quarantined or dropped, preventing them from polluting downstream tables.
  • Simplified Pipeline Management: DLT manages the underlying infrastructure and execution of your data pipeline, simplifying development and operations.
  • Lineage and Monitoring: DLT automatically tracks data lineage and provides monitoring capabilities, making it easier to understand the flow of data and identify potential issues.
  • Declarative Approach: DLT allows you to define your data transformations declaratively using SQL or Python, and it handles the underlying execution and optimization.
  • Integration with Delta Lake: DLT leverages the benefits of Delta Lake, such as ACID transactions, time travel, and improved query performance.


just declaring the array variable to store the incoming data at fost of this code.

go to bronze layer and delete the entire file  and rerun the above code .



Now we got the error , we have to debug check at the end of the error we can see the option as diagonse error.

it give some suggestion to get some ideas .

inorder to know the complete details in the driver logs . go to workspace --> compute --> deriver logs .


Log Type Filters (Highlighted in Green): This is the key part of the image. There are buttons to filter the logs by different output streams:
  • All: Shows all log types.
  • stdout: Shows the standard output stream of the driver. This typically includes print() statements and other general output from your Spark application. The green arrow is currently pointing to this button, indicating that the "stdout" logs are likely being viewed or selected.
  • stderr: Shows the standard error stream of the driver. This usually contains error messages, exceptions, and stack traces.
  • log4j: Shows the logs generated by the Log4j logging framework, which is commonly used by Spark and applications running on it for more structured and detailed logging.

analyse the error here the error is datatype while creating te spark dataframe .

here to debug we used spark rdd instead of pandas dataframe and we read the spark RDD to create the spark dataframe .

geolocationAPIResponseList = []

for marketName in marketNames:
    geolocationSourceAPIURL = f"{geolocationSourceAPIBaseURL}({marketName}){geolocationSourceAPIURLOptions}"
    geolocationAPIResponse = requests.get(geolocationSourceAPIURL).json()
    if isinstance(geolocationAPIResponse, dict):
        geolocationAPIResponseList.append(geolocationAPIResponse)

geolocationSparkRDD = sc.parallelize(geolocationAPIResponseList)
geolocationSparkDF = spark.read.json(geolocationSparkRDD)

geolocationSparkDF \
    .write \
    .mode("overwrite") \
    .json(geolocationSinkFolderPath)

Explanation of the Highlighted Lines:

  • geolocationSparkRDD = sc.parallelize(geolocationAPIResponseList) (Line 16):

    • sc is the SparkContext, the entry point to Spark functionality.
    • .parallelize() is an RDD operation that takes a Python list (in this case, geolocationAPIResponseList, which contains the JSON responses as Python dictionaries) and distributes it across the Spark cluster as an RDD of Python objects.
  • geolocationSparkDF = spark.read.json(geolocationSparkRDD) (Line 17):

    • spark is the SparkSession, the entry point for working with structured data (DataFrames and SQL).
    • .read.json() is a function that can read JSON data into a DataFrame.
    • Crucially, in this case, it's reading the JSON data from the geolocationSparkRDD. This means that instead of reading from a file path, Spark is interpreting each element of the RDD (which is a Python dictionary representing a JSON object) as a row in the DataFrame. Spark will infer the schema of the DataFrame based on the structure of the JSON data in the RDD.

Why This Approach for Debugging?

As we discussed, this approach allows for more control and visibility into the data:

  • Inspecting the Raw Data: By creating an RDD from the geolocationAPIResponseList, you can use RDD-specific operations to examine the raw Python dictionaries before they are structured into a DataFrame. This can help identify any inconsistencies or unexpected structures in the API responses.
  • Avoiding Potential Pandas Interference: By skipping the intermediate Pandas DataFrame creation (pd.DataFrame(geolocationAPIResponseList)), you are directly feeding the raw Python objects into Spark, potentially avoiding any schema interpretations or transformations that Pandas might have introduced.
  • Direct Spark Schema Inference: spark.read.json() directly infers the schema from the JSON data in the RDD. If there were issues with how Pandas was structuring the data or if you wanted more direct control over Spark's schema inference, this approach would be beneficial.

In summary, the highlighted code confirms the strategy of using a Spark RDD (geolocationSparkRDD) as an intermediate step to create the geolocationSparkDF. This is a common debugging technique to gain more control over the data and understand how Spark interprets the raw JSON responses, potentially bypassing complexities or issues that might arise from using Pandas as an intermediary.

after debugging and changes in the code , now we can see the less files got stored in the geo location table under the bronze layer .



we can find some of records are empty , inoder to aviod that we can use the filter option while write the data into the bronze layer .


now rerun the entire code now the records are ingested without null records this activity ensure the data quality .


delete the entire results in the geo location and click run all to check entire flow.

takeaway from this sessin : we cannot dely on AI assitant to debug all the error , we can use it for syntax errors .




Comments

Popular posts from this blog

session 19 Git Repository

  🔁 Steps to Create a Branch in Databricks, Pull from Git, and Merge into a Collaborative Branch Create a New Branch in Databricks: Go to the Repos tab in your workspace. Navigate to the Git-linked repo. Click the Git icon (or three dots ⋮) and choose "Create Branch." Give your branch a name (e.g., feature-xyz ) and confirm. Pull the Latest Changes from Git: With your new branch selected, click the Git icon again. Select “Pull” to bring the latest updates from the remote repository into your local Databricks environment. Make Changes & Commit: Edit notebooks or files as needed in your branch. Use the "Commit & Push" option to push changes to the remote repo. Merge into the Collaborative Branch: Switch to the collaborative branch (e.g., dev or main ) in Git or from the Databricks UI. Click "Pull & Merge" . Choose the branch you want to merge into the collaborative branch. Review the c...

Session 18 monitering and logging - Azure Monitor , Log analytics , and job notification

 After developing the code, we deploy it into the production environment. To monitor and logging the jobs run in the real time systems in azure  we have scheduled the jobs under the workflow , we haven't created any monitoring or any matrics . After a few times, the job failed, but we don't know because we haven't set up any monitoring, and every time we can't navigate to workspace-> workflows, under runs to see to check whether the job has been successfully running or not and in real time there will be nearly 100 jobs or more jobs to run  In real time, the production support team will monitor the process. Under the workflow, there is an option called Job notification. After setting the job notification, we can set a notification to email . if we click the date and time its takes us to the notebook which is scheduled there we can able to see the error where it happens . order to see more details, we need to under Spark tab, where we have the option to view logs ( tha...

Transformation - section 6 - data flow

  Feature from Slide Explanation ✅ Code-free data transformations Data Flows in ADF allow you to build transformations using a drag-and-drop visual interface , with no need for writing Spark or SQL code. ✅ Executed on Data Factory-managed Databricks Spark clusters Internally, ADF uses Azure Integration Runtimes backed by Apache Spark clusters , managed by ADF, not Databricks itself . While it's similar in concept, this is not the same as your own Databricks workspace . ✅ Benefits from ADF scheduling and monitoring Data Flows are fully integrated into ADF pipelines, so you get all the orchestration, parameterization, logging, and alerting features of ADF natively. ⚠️ Important Clarification Although it says "executed on Data Factory managed Databricks Spark clusters," this does not mean you're using your own Azure Databricks workspace . Rather: ADF Data Flows run on ADF-managed Spark clusters. Azure Databricks notebooks (which you trigger via an "Exe...