The URL of the API.
The response of the API.
Here we just separated and declared variables for the source and destination paths.
spark.sql(...)
: This is the entry point in PySpark to execute SQL queries against DataFrames or tables registered in the Spark metastore."SELECT latitude,longitude,marketName from pricing_analytics.silver.geo_location_silver LIMIT 20"
: This is the SQL query being executed.SELECT latitude,longitude,marketName
It selects three specific columns:latitude
,longitude
, andmarketName
.from pricing_analytics.silver.geo_location_silver
It specifies the table to query from. The naming convention suggests this is a tablegeo_location_silver
located within a database or schema namedsilver
under a top-level catalogpricing_analytics
. This likely corresponds to the silver layer we've discussed previously.LIMIT 20
This clause restricts the number of rows the query returns to a maximum of 20.
reading the elements in the dataframe by a for loop and printing each in the array elements.
for geolocations in geolocationsDF.collect():
:
geolocationsDF.collect()
: This retrieves all the rows from thegeolocationsDF
DataFrame (which was limited to 20 rows in the previous step) and brings them into the driver's memory as a list ofRow
objects. Caution: Usingcollect()
on a large DataFrame can cause memory issues on the driver.for geolocations in ...
: The code then iterates through eachRow
object in this collected list.
print(geolocations["marketName"], geolocations["latitude"], geolocations["longitude"])
: Inside the loop, this line prints the marketName
, latitude
, and longitude
for the current location.
weatherDataSourceAPIURL = f"{weatherDataSourceAPIBaseURL}({geolocations['latitude']},{geolocations['longitude']})&start_date=2023-01-01&end_date=2023-12-31{weatherDataSourceAPIURLOptions}"
:
- This line constructs a URL for a weather data API. It uses f-strings to embed the
latitude
andlongitude
of the current location into the URL. - It also includes a
start_date
andend_date
parameter, setting the date range for the weather data to the entire year of 2023. weatherDataSourceAPIBaseURL
andweatherDataSourceAPIURLOptions
are assumed to be previously defined variables containing the base URL and other API parameters, respectively.
weatherDataAPIResponse = requests.get(weatherDataSourceAPIURL).json()
:
- This line uses the
requests
library to make an HTTP GET request to the constructedweatherDataSourceAPIURL
. .json()
parses the JSON response from the weather API into a Python dictionary and stores it in theweatherDataAPIResponse
variable.
weatherDataAPIResponse["marketName"] = geolocations["marketName"]
:
- This line adds a new key-value pair to the
weatherDataAPIResponse
dictionary. It takes themarketName
of the current location from thegeolocations
row and adds it to the weather data response. This likely serves to associate the fetched weather data with the specific market.
weatherDataAPIResponsejson = json.dumps(weatherDataAPIResponse)
:
- This line uses the
json.dumps()
function to convert theweatherDataAPIResponse
Python dictionary into a JSON-formatted string and store it in theweatherDataAPIResponsejson
variable.
for geolocations in geolocationsDF.collect():
print(geolocations["marketName"], geolocations["latitude"], geolocations["longitude"])
weatherDataSourceAPIURL = f"{weatherDataSourceAPIBaseURL}({geolocations['latitude']},{geolocations['longitude']})&start_date=2023-01-01&end_date=2023-12-31{weatherDataSourceAPIURLOptions}"
try:
weatherDataAPIResponse = requests.get(weatherDataSourceAPIURL, timeout=120).json()
weatherDataAPIResponse["marketName"] = geolocations["marketName"]
weatherDataAPIResponsejson = json.dumps(weatherDataAPIResponse)
print(weatherDataAPIResponsejson)
if isinstance(weatherDataAPIResponse, dict):
weatherDataAPIResponseList.append(weatherDataAPIResponsejson)
except requests.exceptions.RequestException as e:
print(f"Error fetching data for {geolocations['marketName']}: {e}")
weatherDataRDD = sc.parallelize(weatherDataAPIResponseList)
weatherDataSparkDF = spark.read.json(weatherDataRDD)
weatherDataSparkDF \
.write \
.mode("overwrite") \
.json(weatherDataSinkFolderPath)
Explanation:
-
weatherDataAPIResponseList = []
: Initializes an empty list to store the JSON responses from the weather API. -
for geolocations in geolocationsDF.collect():
: Iterates through the rows collected from thegeolocationsDF
(which contained market names, latitudes, and longitudes). Note: As discussed before,collect()
can be inefficient for large DataFrames. -
print(...)
: Prints the market name, latitude, and longitude for each location being processed. -
weatherDataSourceAPIURL = f"..."
: Constructs the URL for the weather API, embedding the latitude and longitude. It also specifies the date range for the data (2023-01-01 to 2023-12-31). -
try...except requests.exceptions.RequestException as e:
: This block handles potential errors during the API request.weatherDataAPIResponse = requests.get(weatherDataSourceAPIURL, timeout=120).json()
: Makes a GET request to the weather API with a timeout of 120 seconds and parses the JSON response.weatherDataAPIResponse["marketName"] = geolocations["marketName"]
: Adds themarketName
to the weather data response.weatherDataAPIResponsejson = json.dumps(weatherDataAPIResponse)
: Converts the Python dictionary response to a JSON string.print(weatherDataAPIResponsejson)
: Prints the JSON response.if isinstance(weatherDataAPIResponse, dict): weatherDataAPIResponseList.append(weatherDataAPIResponsejson)
: If the API response is a dictionary (implying a successful JSON parse), its JSON string representation is appended to theweatherDataAPIResponseList
.except requests.exceptions.RequestException as e:
: If an error occurs during the API request, an error message including the market name and the exception details is printed.
-
weatherDataRDD = sc.parallelize(weatherDataAPIResponseList)
: Creates a Spark RDD from the list of JSON response strings. -
weatherDataSparkDF = spark.read.json(weatherDataRDD)
: Reads the JSON data from the RDD into a Spark DataFrame. Spark will infer the schema from the JSON strings. -
weatherDataSparkDF.write.mode("overwrite").json(weatherDataSinkFolderPath)
: Writes theweatherDataSparkDF
to the location specified byweatherDataSinkFolderPath
in JSON format, overwriting any existing files.
Output:
Below the code cell, you can see a series of lines starting with market names followed by latitude and longitude. This is the output of the print()
statement at the beginning of the loop. It shows the coordinates for which the weather data is being fetched.
The absence of any error messages in the output suggests that the API requests for these initial locations were successful. The code is designed to catch requests.exceptions.RequestException
, so if there were network issues or problems with the API for these first few locations, error messages would likely have been printed.
In summary, this code fetches weather data for a set of locations, handles potential API errors, and then saves the fetched weather data (along with the market name) as JSON files using Spark. The try...except
block is a good practice for handling potential network issues when interacting with external APIs.
Feature | sc.parallelize() (PySpark) | pd.json_normalize() (Pandas) |
---|---|---|
Library | PySpark | Pandas |
Input | Python collection (list, etc.) | JSON object or list of JSON objects (often as Python dictionaries) |
Output | Spark RDD | Pandas DataFrame |
Purpose | Distribute local data for parallel processing | Transform nested JSON into a flat table |
Processing | Distributed across the Spark cluster | In-memory on a single machine |
Data Structure | Unstructured or semi-structured elements | Structured, potentially nested JSON data |
Use Case | Initializing RDDs from local data and parallelizing existing Python data | Handling and flattening complex JSON responses, creating tabular data from JSON |
Comments
Post a Comment