Skip to content

Commit 1c8c9c7

Browse files
committed
phase 2
1 parent f73354e commit 1c8c9c7

File tree

5 files changed

+123
-9
lines changed

5 files changed

+123
-9
lines changed

phase_2/src/main/scala/cse512/Entrance.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ object Entrance extends App {
1212
override def main(args: Array[String]) {
1313
val spark = SparkSession
1414
.builder()
15-
.appName("CSE512-HotspotAnalysis-MYGROUPNAME") // YOU NEED TO CHANGE YOUR GROUP NAME
15+
.appName("CSE512-HotspotAnalysis-Only_1_United") // YOU NEED TO CHANGE YOUR GROUP NAME
1616
.config("spark.some.config.option", "some-value")//.master("local[*]")
1717
.getOrCreate()
1818

phase_2/src/main/scala/cse512/HotcellAnalysis.scala

+48-2
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,54 @@ def runHotcellAnalysis(spark: SparkSession, pointPath: String): DataFrame =
4242
val maxZ = 31
4343
val numCells = (maxX - minX + 1)*(maxY - minY + 1)*(maxZ - minZ + 1)
4444

45-
// YOU NEED TO CHANGE THIS PART
45+
pickupInfo = spark.sql("select x,y,z from pickupInfoView where x>= " + minX + " and x<= " + maxX + " and y>= " + minY + " and y<= " + maxY + " and z>= " + minZ + " and z<= " + maxZ + " order by z,y,x")
46+
pickupInfo.createOrReplaceTempView("selectedCellVals")
47+
// pickupInfo.show()
4648

47-
return pickupInfo // YOU NEED TO CHANGE THIS PART
49+
pickupInfo = spark.sql("select x, y, z, count(*) as hotCells from selectedCellVals group by x, y, z order by z,y,x")
50+
pickupInfo.createOrReplaceTempView("selectedCellHotness")
51+
// pickupInfo.show()
52+
53+
val sumOfSelectedCcells = spark.sql("select sum(hotCells) as sumHotCells from selectedCellHotness")
54+
sumOfSelectedCcells.createOrReplaceTempView("sumOfSelectedCcells")
55+
// sumOfSelectedCcells.show()
56+
57+
val mean = (sumOfSelectedCcells.first().getLong(0).toDouble / numCells.toDouble).toDouble
58+
// println(mean)
59+
60+
spark.udf.register("squared", (inputX: Int) => (((inputX*inputX).toDouble)))
61+
62+
val sumOfSquares = spark.sql("select sum(squared(hotCells)) as sumOfSquares from selectedCellHotness")
63+
sumOfSquares.createOrReplaceTempView("sumOfSquares")
64+
// sumOfSquares.show()
65+
66+
val standardDeviation = scala.math.sqrt(((sumOfSquares.first().getDouble(0).toDouble / numCells.toDouble) - (mean.toDouble * mean.toDouble))).toDouble
67+
// println(mean)
68+
69+
spark.udf.register("adjacentCells", (inputX: Int, inputY: Int, inputZ: Int, minX: Int, maxX: Int, minY: Int, maxY: Int, minZ: Int, maxZ: Int) => ((HotcellUtils.calculateAdjacentCells(inputX, inputY, inputZ, minX, minY, minZ, maxX, maxY, maxZ))))
70+
71+
val adjacentCells = spark.sql("select adjacentCells(sch1.x, sch1.y, sch1.z, " + minX + "," + maxX + "," + minY + "," + maxY + "," + minZ + "," + maxZ + ") as adjacentCellCount,"
72+
+ "sch1.x as x, sch1.y as y, sch1.z as z, "
73+
+ "sum(sch2.hotCells) as sumHotCells "
74+
+ "from selectedCellHotness as sch1, selectedCellHotness as sch2 "
75+
+ "where (sch2.x = sch1.x+1 or sch2.x = sch1.x or sch2.x = sch1.x-1) "
76+
+ "and (sch2.y = sch1.y+1 or sch2.y = sch1.y or sch2.y = sch1.y-1) "
77+
+ "and (sch2.z = sch1.z+1 or sch2.z = sch1.z or sch2.z = sch1.z-1) "
78+
+ "group by sch1.z, sch1.y, sch1.x "
79+
+ "order by sch1.z, sch1.y, sch1.x")
80+
adjacentCells.createOrReplaceTempView("adjacentCells")
81+
// adjacentCells.show()
82+
83+
spark.udf.register("zScore", (adjacentCellCount: Int, sumHotCells: Int, numCells: Int, x: Int, y: Int, z: Int, mean: Double, standardDeviation: Double) => ((HotcellUtils.calculateZScore(adjacentCellCount, sumHotCells, numCells, x, y, z, mean, standardDeviation))))
84+
85+
pickupInfo = spark.sql("select zScore(adjacentCellCount, sumHotCells, "+ numCells + ", x, y, z," + mean + ", " + standardDeviation + ") as getisOrdStatistic, x, y, z from adjacentCells order by getisOrdStatistic desc");
86+
pickupInfo.createOrReplaceTempView("zScore")
87+
// pickupInfo.show()
88+
89+
pickupInfo = spark.sql("select x, y, z from zScore")
90+
pickupInfo.createOrReplaceTempView("finalPickupInfo")
91+
// pickupInfo.show()
92+
93+
return pickupInfo
4894
}
4995
}

phase_2/src/main/scala/cse512/HotcellUtils.scala

+34-1
Original file line numberDiff line numberDiff line change
@@ -47,5 +47,38 @@ object HotcellUtils {
4747
return calendar.get(Calendar.DAY_OF_MONTH)
4848
}
4949

50-
// YOU NEED TO CHANGE THIS PART
50+
def calculateAdjacentCells(inputX: Int, inputY: Int, inputZ: Int, minX: Int, maxX: Int, minY: Int, maxY: Int, minZ: Int, maxZ: Int): Int =
51+
{
52+
var count = 0
53+
54+
if (inputX == minX || inputX == maxX) {
55+
count += 1
56+
}
57+
58+
if (inputY == minY || inputY == maxY) {
59+
count += 1
60+
}
61+
62+
if (inputZ == minZ || inputZ == maxZ) {
63+
count += 1
64+
}
65+
66+
if (count == 1) {
67+
return 17;
68+
} else if (count == 2) {
69+
return 11;
70+
} else if (count == 3) {
71+
return 7;
72+
}
73+
74+
return 26;
75+
}
76+
77+
def calculateZScore(adjacentCellCount: Int, sumHotCells: Int, numCells: Int, x: Int, y: Int, z: Int, mean: Double, standardDeviation: Double): Double =
78+
{
79+
val dividend = (sumHotCells.toDouble - (mean * adjacentCellCount.toDouble))
80+
val divisor = standardDeviation * math.sqrt((((numCells.toDouble * adjacentCellCount.toDouble) - (adjacentCellCount.toDouble * adjacentCellCount.toDouble)) / (numCells.toDouble - 1.0).toDouble).toDouble).toDouble
81+
82+
return (dividend / divisor).toDouble
83+
}
5184
}

phase_2/src/main/scala/cse512/HotzoneAnalysis.scala

+5-3
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,12 @@ object HotzoneAnalysis {
2828
spark.udf.register("ST_Contains",(queryRectangle:String, pointString:String)=>(HotzoneUtils.ST_Contains(queryRectangle, pointString)))
2929
val joinDf = spark.sql("select rectangle._c0 as rectangle, point._c5 as point from rectangle,point where ST_Contains(rectangle._c0,point._c5)")
3030
joinDf.createOrReplaceTempView("joinResult")
31+
32+
//Group by and sort joined dataframe
33+
val sortDf = spark.sql("select rectangle, count(point) as numOfPoints from joinResult group by rectangle order by rectangle asc")
34+
sortDf.createOrReplaceTempView("sortResult")
3135

32-
// YOU NEED TO CHANGE THIS PART
33-
34-
return joinDf // YOU NEED TO CHANGE THIS PART
36+
return sortDf
3537
}
3638

3739
}

phase_2/src/main/scala/cse512/HotzoneUtils.scala

+35-2
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,41 @@ package cse512
33
object HotzoneUtils {
44

55
def ST_Contains(queryRectangle: String, pointString: String ): Boolean = {
6-
// YOU NEED TO CHANGE THIS PART
7-
return true // YOU NEED TO CHANGE THIS PART
6+
val point = pointString.split(",")
7+
val point_x = point(0).trim().toDouble
8+
val point_y = point(1).trim().toDouble
9+
10+
val rectangle = queryRectangle.split(",")
11+
val rectangle_x_1 = rectangle(0).trim().toDouble
12+
val rectangle_y_1 = rectangle(1).trim().toDouble
13+
val rectangle_x_2 = rectangle(2).trim().toDouble
14+
val rectangle_y_2 = rectangle(3).trim().toDouble
15+
16+
var min_x: Double = 0
17+
var max_x: Double = 0
18+
if(rectangle_x_1 < rectangle_x_2) {
19+
min_x = rectangle_x_1
20+
max_x = rectangle_x_2
21+
} else {
22+
min_x = rectangle_x_2
23+
max_x = rectangle_x_1
24+
}
25+
26+
var min_y: Double = 0
27+
var max_y: Double = 0
28+
if(rectangle_y_1 < rectangle_y_2) {
29+
min_y = rectangle_y_1
30+
max_y = rectangle_y_2
31+
} else {
32+
min_y = rectangle_y_2
33+
max_y = rectangle_y_1
34+
}
35+
36+
if(point_x >= min_x && point_x <= max_x && point_y >= min_y && point_y <= max_y) {
37+
return true
38+
} else {
39+
return false
40+
}
841
}
942

1043
// YOU NEED TO CHANGE THIS PART

0 commit comments

Comments
 (0)