-
Notifications
You must be signed in to change notification settings - Fork 29
/
spark_exam.txt
138 lines (72 loc) · 5.74 KB
/
spark_exam.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
------------------------------------------------------------------------------------------------
Entry Point
First reading the File into RDD & read into DataFrame & read through Temp table.
order_items = sc.textFile("retail_db/order_items/*")
order_items_clean = order_items.map(lambda val : val.split(","))
#Creating a DataFrame from RDD
from pyspark.sql import Row
order_items_df = order_items_clean.map( lambda val : Row( order_item_id = int(val[0]) , order_item_order_id = int(val[1]), order_item_product_id = int(val[2]), order_item_quantity = int(val[3]), order_item_subtotal = float(val[4]), order_item_product_price = float(val[5]) ) ).toDF()
#Creating Temporary table from the dataframe.
sqlContext.sql("select * from order_items")
#Then we can run sql queries on it like
sqlContext.sql("select * from order_items limit 10").show()
------------------------------------------------------------------------------------------------
Reading avro file in spark
There is no way to read avro file using sparkContext, we have to use sqlContext to read avro file.
val = sqlContext.read.format("com.databricks.spark.avro").load("/user/cloudera/customers/")
Reading parquet file in spark
val = sqlContext.read.format("parquet").load("/user/cloudera/customer_parquet/").rdd
Reading sequenceFile
What is seqence File?
SequenceFile is a flat file consisting of binary key/value pairs. It is extensively used in MapReduce as input/output formats. It is also worth noting that, internally, the temporary outputs of maps are stored using SequenceFile.
There are 3 different SequenceFile formats:
Uncompressed key/value records.
Record compressed key/value records - only 'values' are compressed here.
Block compressed key/value records - both keys and values are collected in 'blocks' separately and compressed. The size of the 'block' is configurable.
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Adding a new column in RDD.
It's not straight forward to add new column to RDD. One method to do is:
val.map(lambda val : (val , val['customer_fname'] + ' ' +val['customer_lname'] )).take(5)
This will also work but it will return a tuple with row object and the other is Full Name
Another method is to convert rdd to dataframe and then add the column then convert back to rdd.
from pyspark.sql.functions import lit
from pyspark.sql.functions import col
#Convert RDD to DataFrame
val = val.toDF()
#Add new column
val = val.withColumn('full_name', concat(col('customer_fname'),lit(' '),col('customer_lname')) )
#If I do like below I will not get the desired concatenated string because with '+' it expects the integer values which is not in this case so null will be there in the full_name
#val = val.withColumn('full_name', col('customer_fname') + lit('D ') + col('customer_lname') )
#Then convert back to rdd
val = val.rdd
-----------------------------------------------------------------------------------------------------
#finding the revenue generated by product categories. Sorted by revenue ascending and category descending.
First we need order_items and products in hdfs.
Let's move these tables
sqoop import --connect jdbc:mysql://quickstart.cloudera:3306/retail_db --username root --password cloudera --table products --target-dir '/user/cloudera/retail_db/products' --as-avrodatafile --compression-codec 'snappy'
------Reading the files in spark----------
order_item_raw = sc.textFile('/user/cloudera/retail_db/order_items/*')
order_item_rdd = order_item_raw.map(lambda val : val.split(","))
products = sqlContext.read.format('com.databricks.spark.avro').load('/user/cloudera/retail_db/products/*.avro')
categories = sqlContext.read.parquet('/user/cloudera/retail_db/categories/*.parquet')
---------------Solving using RDD---------------
order_item_rdd = order_item_rdd.map(lambda val : ( int(val[2]), float(val[4]) ) )
products_rdd = products.rdd
products_rdd = products_rdd.map(lambda val : (val.product_id, (val.product_category_id, val.product_price ) ) )
categories_rdd = categories_rdd.map(lambda val : (val.category_id , val.category_name) )
#Joining
--------------Solving using dataframe--------------
order_product.withColumn('order_item_subtotal' , order_product.order_item_subtotal.cast(DoubleType())).show(5)
order_product_group = order_product.groupBy('order_item_product_id','product_name').sum('order_item_subtotal')
order_product_group = order_product_group.withColumnRenamed( "sum(order_item_subtotal)" , 'product_subtotal')
order_product_group = order_product_group.withColumn("product_subtotal" , col("product_subtotal").cast(FloatType()))
order_product_group.orderBy("product_subtotal" , ascending = [0])
--------------Solving using sparksql--------------
order_item_df.registerTempTable("order_items")
product_df.registerTempTable("products")
sqlContext.sql("show tables").show()
-----------------------------------------------------------------------------------------------------
Confusion betweeen sqlContext, SparkContext, SparkSession
In older version of Spark there was different contexts that was entrypoints to the different api (sparkcontext for the core api, sql context for the spark-sql api, streaming context for the Dstream api etc...) this was source of confusion for the developer and was a point of optimization for the spark team, so in the most recent version of spark there is only one entrypoint (the spark session) and from this you can get the various other entrypoint (the spark context , the streaming context , etc ....)
Can I create SparkSession from previous SparkContext?
Apparently there is no way how to initialize SparkSession from existing SparkContext.