The URL of the API.
The response of the API.
Here we just separated and declared variables for the source and destination paths.
spark.sql(...): This is the entry point in PySpark to execute SQL queries against DataFrames or tables registered in the Spark metastore."SELECT latitude,longitude,marketName from pricing_analytics.silver.geo_location_silver LIMIT 20": This is the SQL query being executed.SELECT latitude,longitude,marketNameIt selects three specific columns:latitude,longitude, andmarketName.from pricing_analytics.silver.geo_location_silverIt specifies the table to query from. The naming convention suggests this is a tablegeo_location_silverlocated within a database or schema namedsilverunder a top-level catalogpricing_analytics. This likely corresponds to the silver layer we've discussed previously.LIMIT 20This clause restricts the number of rows the query returns to a maximum of 20.
reading the elements in the dataframe by a for loop and printing each in the array elements.
for geolocations in geolocationsDF.collect()::
geolocationsDF.collect(): This retrieves all the rows from thegeolocationsDFDataFrame (which was limited to 20 rows in the previous step) and brings them into the driver's memory as a list ofRowobjects. Caution: Usingcollect()on a large DataFrame can cause memory issues on the driver.for geolocations in ...: The code then iterates through eachRowobject in this collected list.
print(geolocations["marketName"], geolocations["latitude"], geolocations["longitude"]): Inside the loop, this line prints the marketName, latitude, and longitude for the current location.
weatherDataSourceAPIURL = f"{weatherDataSourceAPIBaseURL}({geolocations['latitude']},{geolocations['longitude']})&start_date=2023-01-01&end_date=2023-12-31{weatherDataSourceAPIURLOptions}":
- This line constructs a URL for a weather data API. It uses f-strings to embed the
latitudeandlongitudeof the current location into the URL. - It also includes a
start_dateandend_dateparameter, setting the date range for the weather data to the entire year of 2023. weatherDataSourceAPIBaseURLandweatherDataSourceAPIURLOptionsare assumed to be previously defined variables containing the base URL and other API parameters, respectively.
weatherDataAPIResponse = requests.get(weatherDataSourceAPIURL).json():
- This line uses the
requestslibrary to make an HTTP GET request to the constructedweatherDataSourceAPIURL. .json()parses the JSON response from the weather API into a Python dictionary and stores it in theweatherDataAPIResponsevariable.
weatherDataAPIResponse["marketName"] = geolocations["marketName"]:
- This line adds a new key-value pair to the
weatherDataAPIResponsedictionary. It takes themarketNameof the current location from thegeolocationsrow and adds it to the weather data response. This likely serves to associate the fetched weather data with the specific market.
weatherDataAPIResponsejson = json.dumps(weatherDataAPIResponse):
- This line uses the
json.dumps()function to convert theweatherDataAPIResponsePython dictionary into a JSON-formatted string and store it in theweatherDataAPIResponsejsonvariable.
for geolocations in geolocationsDF.collect():
print(geolocations["marketName"], geolocations["latitude"], geolocations["longitude"])
weatherDataSourceAPIURL = f"{weatherDataSourceAPIBaseURL}({geolocations['latitude']},{geolocations['longitude']})&start_date=2023-01-01&end_date=2023-12-31{weatherDataSourceAPIURLOptions}"
try:
weatherDataAPIResponse = requests.get(weatherDataSourceAPIURL, timeout=120).json()
weatherDataAPIResponse["marketName"] = geolocations["marketName"]
weatherDataAPIResponsejson = json.dumps(weatherDataAPIResponse)
print(weatherDataAPIResponsejson)
if isinstance(weatherDataAPIResponse, dict):
weatherDataAPIResponseList.append(weatherDataAPIResponsejson)
except requests.exceptions.RequestException as e:
print(f"Error fetching data for {geolocations['marketName']}: {e}")
weatherDataRDD = sc.parallelize(weatherDataAPIResponseList)
weatherDataSparkDF = spark.read.json(weatherDataRDD)
weatherDataSparkDF \
.write \
.mode("overwrite") \
.json(weatherDataSinkFolderPath)
Explanation:
-
weatherDataAPIResponseList = []: Initializes an empty list to store the JSON responses from the weather API. -
for geolocations in geolocationsDF.collect():: Iterates through the rows collected from thegeolocationsDF(which contained market names, latitudes, and longitudes). Note: As discussed before,collect()can be inefficient for large DataFrames. -
print(...): Prints the market name, latitude, and longitude for each location being processed. -
weatherDataSourceAPIURL = f"...": Constructs the URL for the weather API, embedding the latitude and longitude. It also specifies the date range for the data (2023-01-01 to 2023-12-31). -
try...except requests.exceptions.RequestException as e:: This block handles potential errors during the API request.weatherDataAPIResponse = requests.get(weatherDataSourceAPIURL, timeout=120).json(): Makes a GET request to the weather API with a timeout of 120 seconds and parses the JSON response.weatherDataAPIResponse["marketName"] = geolocations["marketName"]: Adds themarketNameto the weather data response.weatherDataAPIResponsejson = json.dumps(weatherDataAPIResponse): Converts the Python dictionary response to a JSON string.print(weatherDataAPIResponsejson): Prints the JSON response.if isinstance(weatherDataAPIResponse, dict): weatherDataAPIResponseList.append(weatherDataAPIResponsejson): If the API response is a dictionary (implying a successful JSON parse), its JSON string representation is appended to theweatherDataAPIResponseList.except requests.exceptions.RequestException as e:: If an error occurs during the API request, an error message including the market name and the exception details is printed.
-
weatherDataRDD = sc.parallelize(weatherDataAPIResponseList): Creates a Spark RDD from the list of JSON response strings. -
weatherDataSparkDF = spark.read.json(weatherDataRDD): Reads the JSON data from the RDD into a Spark DataFrame. Spark will infer the schema from the JSON strings. -
weatherDataSparkDF.write.mode("overwrite").json(weatherDataSinkFolderPath): Writes theweatherDataSparkDFto the location specified byweatherDataSinkFolderPathin JSON format, overwriting any existing files.
Output:
Below the code cell, you can see a series of lines starting with market names followed by latitude and longitude. This is the output of the print() statement at the beginning of the loop. It shows the coordinates for which the weather data is being fetched.
The absence of any error messages in the output suggests that the API requests for these initial locations were successful. The code is designed to catch requests.exceptions.RequestException, so if there were network issues or problems with the API for these first few locations, error messages would likely have been printed.
In summary, this code fetches weather data for a set of locations, handles potential API errors, and then saves the fetched weather data (along with the market name) as JSON files using Spark. The try...except block is a good practice for handling potential network issues when interacting with external APIs.
| Feature | sc.parallelize() (PySpark) | pd.json_normalize() (Pandas) |
|---|---|---|
| Library | PySpark | Pandas |
| Input | Python collection (list, etc.) | JSON object or list of JSON objects (often as Python dictionaries) |
| Output | Spark RDD | Pandas DataFrame |
| Purpose | Distribute local data for parallel processing | Transform nested JSON into a flat table |
| Processing | Distributed across the Spark cluster | In-memory on a single machine |
| Data Structure | Unstructured or semi-structured elements | Structured, potentially nested JSON data |
| Use Case | Initializing RDDs from local data and parallelizing existing Python data | Handling and flattening complex JSON responses, creating tabular data from JSON |
Comments
Post a Comment