#!/usr/bin/env python
# coding: utf-8
# In[133]:
#https://spark.apache.org/docs/latest/api/python/getting_started/install.html to check version
get_ipython().system('pip install pyspark')
get_ipython().system('pip install findspark')
from pyspark.sql.functions import broadcast
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("pipeline").config("spark.hadoop.fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem").getOrCreate()
# In[139]:
# ingest the sales.csv file
df1 = spark.read.csv(r"C:\Users\keert\Downloads\sales.csv", header =True, inferSchema=True) # inferSchema is given to change everything as datatype or else everything will be become string
df1.show()
df1.printSchema()
# In[140]:
# ingest products file
df_products = spark.read.option("multiline", "true").json(r"C:\Users\keert\Downloads\products.json") # giving option as multiline since json file as multiline
df_products.show()
df_products.printSchema() # to see the datatypes and null values
# In[141]:
#to ingest regions file
df_regions = spark.read.csv(r"C:\Users\keert\Downloads\regions.csv", header =True , inferSchema = True)
df_regions.show()
df_regions.printSchema()
# In[142]:
# joining two dataframe to large dataframe to perform broadcast
df_sale_details = df1.join(df_products, "product_id", "inner")
df_sale_details.show()
# In[143]:
# trying to optimized by partition
optimized_df = df_sale_details.repartition(4)
optimized_df.show()
# In[129]:
# optimizing by using broadcast join
Joined_df = optimized_df.join(broadcast(df_regions), "region_id", "inner")
Joined_df.show()
# In[144]:
# creating the temp view to show the results
Joined_df.createOrReplaceTempView("Joined_df")
# totol sales per category
total_sales_per_category = spark.sql("SELECT category, SUM(amount) AS Total_sales FROM Joined_df GROUP BY category")
total_sales_per_category.show()
total_sales_per_category.persist() # to store the result in memory or disk for further use
# In[145]:
# Total sales per region
total_sales_per_region = spark.sql("SELECT region_id, SUM(amount) AS total_sales FROM Joined_df GROUP BY region_id")
total_sales_per_region.show()
total_sales_per_region.persist()
# In[146]:
total_sales_per_category.write.csv(r"C:\Users\keert\Downloads\final.csv", header = True , mode="overwrite") # performing the write action as result in spark
total_sales_per_category.saveAsTextFile(r"C:\Users\keert\Downloads") # Note that PySpark requires Java 8 (except prior to 8u371), 11 or 17 with JAVA_HOME properly set. If using JDK 11, set -Dio.netty.tryReflectionSetAccessible=true for Arrow related features and refer to
Comments
Post a Comment