Skip to content

Commit f73354e

Browse files
committed
phase 2 template
1 parent 8ca5469 commit f73354e

31 files changed

+110988
-0
lines changed
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.

phase_1/project/build.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
sbt.version=1.2.6
File renamed without changes.
File renamed without changes.
File renamed without changes.

phase_2/.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
/.idea/
2+
/spark-warehouse/
3+
/target/
4+
/test/

phase_2/README.md

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
# CSE512-Project-Hotspot-Analysis-Template
2+
#### Version history
3+
v1.1, Nov 16, Fix a bug in "Entrace.scala"
4+
v1.0, Nov 13, Initial version
5+
6+
7+
## Requirement
8+
9+
In this phase, you are required to do spatial hot spot analysis. In particular, you need to complete two different hot spot analysis tasks
10+
11+
12+
### Hot zone analysis
13+
This task will needs to perform a range join operation on a rectangle datasets and a point dataset. For each rectangle, the number of points located within the rectangle will be obtained. The hotter rectangle means that it include more points. So this task is to calculate the hotness of all the rectangles.
14+
15+
### Hot cell analysis
16+
17+
#### Description
18+
This task will focus on applying spatial statistics to spatio-temporal big data in order to identify statistically significant spatial hot spots using Apache Spark. The topic of this task is from ACM SIGSPATIAL GISCUP 2016.
19+
20+
The Problem Definition page is here: [http://sigspatial2016.sigspatial.org/giscup2016/problem](http://sigspatial2016.sigspatial.org/giscup2016/problem)
21+
22+
The Submit Format page is here: [http://sigspatial2016.sigspatial.org/giscup2016/submit](http://sigspatial2016.sigspatial.org/giscup2016/submit)
23+
24+
#### Special requirement (different from GIS CUP)
25+
As stated in the Problem Definition page, in this task, you are asked to implement a Spark program to calculate the Getis-Ord statistic of NYC Taxi Trip datasets. We call it "**Hot cell analysis**"
26+
27+
To reduce the computation power need,we made the following changes:
28+
29+
1. The input will be a monthly taxi trip dataset from 2009 - 2012. For example, "yellow\_tripdata\_2009-01\_point.csv", "yellow\_tripdata\_2010-02\_point.csv".
30+
2. Each cell unit size is 0.01 * 0.01 in terms of latitude and longitude degrees.
31+
3. You should use 1 day as the Time Step size. The first day of a month is step 1. Every month has 31 days.
32+
4. You only need to consider Pick-up Location.
33+
5. We don't use Jaccard similarity to check your answer.
34+
However, you don't need to worry about how to decide the cell coordinates because the code template generated cell coordinates. You just need to write the rest of the task.
35+
36+
## Coding template specification
37+
38+
### Input parameters
39+
40+
1. Output path (Mandatory)
41+
2. Task name: "hotzoneanalysis" or "hotcellanalysis"
42+
3. Task parameters: (1) Hot zone (2 parameters): nyc taxi data path, zone path(2) Hot cell (1 parameter): nyc taxi data path
43+
44+
Example
45+
```
46+
test/output hotzoneanalysis src/resources/point-hotzone.csv src/resources/zone-hotzone.csv hotcellanalysis src/resources/yellow_trip_sample_100000.csv
47+
```
48+
49+
Note:
50+
51+
1. The number/order of tasks do not matter.
52+
2. But, the first 7 of our final test cases will be hot zone analysis, the last 8 will be hot cell analysis.
53+
54+
55+
56+
57+
### Input data format
58+
The main function/entrace is "cse512.Entrance" scala file.
59+
60+
1. Point data: the input point dataset is the pickup point of New York Taxi trip datasets. The data format of this phase is the original format of NYC taxi trip which is different from the previous phase. But the coding template already parsed it for you. Find the data from our asu google drive shared folder: [https://drive.google.com/drive/folders/1W4GLKNsGlgXp7fHtDlhHEBdLVw_IuAXh?usp=sharing](https://drive.google.com/drive/folders/1W4GLKNsGlgXp7fHtDlhHEBdLVw_IuAXh?usp=sharing)
61+
62+
2. Zone data (only for hot zone analysis): at "src/resources/zone-hotzone" of the template
63+
64+
#### Hot zone analysis
65+
The input point data can be any small subset of NYC taxi dataset.
66+
67+
#### Hot cell analysis
68+
The input point data is a monthly NYC taxi trip dataset (2009-2012) like "yellow\_tripdata\_2009-01\_point.csv"
69+
70+
### Output data format
71+
72+
#### Hot zone analysis
73+
All zones with their count, sorted by "rectangle" string in an ascending order.
74+
75+
```
76+
"-73.795658,40.743334,-73.753772,40.779114",1
77+
"-73.797297,40.738291,-73.775740,40.770411",1
78+
"-73.832707,40.620010,-73.746541,40.665414",20
79+
```
80+
81+
82+
#### Hot cell analysis
83+
The coordinates of top 50 hotest cells sorted by their G score in a descending order. Note, DO NOT OUTPUT G score.
84+
85+
```
86+
-7399,4075,15
87+
-7399,4075,29
88+
-7399,4075,22
89+
```
90+
### Example answers
91+
An example input and answer are put in "testcase" folder of the coding template
92+
93+
94+
## Where you need to change
95+
DO NOT DELETE any existing code in the coding template unless you see this "YOU NEED TO CHANGE THIS PART"
96+
97+
### Hot zone analysis
98+
99+
In the code template,
100+
101+
1. You need to change "**HotzoneAnalysis.scala** and **HotzoneUtils.scala**".
102+
2. The coding template has loaded the data and wrote the first step, range join query, for you. Please finish the rest of the task.
103+
3. The output DataFrame should be sorted by you according to "rectangle" string.
104+
105+
### Hot cell analysis
106+
In the code template,
107+
108+
1. You need to change "**HotcellAnalysis.scala** and **HotcellUtils.scala**".
109+
2. The coding template has loaded the data and decided the cell coordinate, x, y, z and their min and max. Please finish the rest of the task.
110+
3. The output DataFrame should be sorted by you according to G-score. The coding template will take the first 50 to output. DO NOT OUTPUT G-score.
111+
112+
113+
## Submission
114+
### Submission files
115+
1. Submit your project jar package.
116+
2. Submit your project source code onto Blackboard and follow the submission instruction on BB.
117+
3. Note that: you need to make sure your code can compile and package by entering ```sbt clean assembly```. We will run the compiled package on our cluster directly using "spark-submit" with parameters. If your code cannot compile and package, you will not receive any points.
118+
119+
## Tips (Optional)
120+
This section is same with that in Phase 1.
121+
### How to debug your code in IDE
122+
123+
If you are using the Scala template
124+
125+
1. Use IntelliJ Idea with Scala plug-in or any other Scala IDE.
126+
2. Replace the logic of User Defined Functions ST\_Contains and ST\_Within in SpatialQuery.scala.
127+
3. Append ```.master("local[*]")``` after ```.config("spark.some.config.option", "some-value")``` to tell IDE the master IP is localhost.
128+
3. In some cases, you may need to go to "build.sbt" file and change ```% "provided"``` to ```% "compile"``` in order to debug your code in IDE
129+
4. Run your code in IDE
130+
5. **You must revert Step 3 and 4 above and recompile your code before use spark-submit!!!**
131+
132+
### How to submit your code to Spark
133+
If you are using the Scala template
134+
135+
1. Go to project root folder
136+
2. Run ```sbt clean assembly```. You may need to install sbt in order to run this command.
137+
3. Find the packaged jar in "./target/scala-2.11/CSE512-Project-Hotspot-Analysis-Template-assembly-0.1.0.jar"
138+
4. Submit the jar to Spark using Spark command "./bin/spark-submit". A pseudo code example: ```./bin/spark-submit ~/GitHub/CSE512-Project-Hotspot-Analysis-Template/target/scala-2.11/CSE512-Project-Hotspot-Analysis-Template-assembly-0.1.0.jar test/output hotzoneanalysis src/resources/point-hotzone.csv src/resources/zone-hotzone.csv hotcellanalysis src/resources/yellow_tripdata_2009-01_point.csv```

phase_2/build.sbt

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import sbt.Keys.{libraryDependencies, scalaVersion, version}
2+
3+
4+
lazy val root = (project in file(".")).
5+
settings(
6+
name := "CSE512-Hotspot-Analysis-Template",
7+
8+
version := "0.1.0",
9+
10+
scalaVersion := "2.11.11",
11+
12+
organization := "org.datasyslab",
13+
14+
publishMavenStyle := true,
15+
16+
mainClass := Some("cse512.Entrance")
17+
)
18+
19+
libraryDependencies ++= Seq(
20+
"org.apache.spark" %% "spark-core" % "2.2.0" % "provided",
21+
"org.apache.spark" %% "spark-sql" % "2.2.0" % "provided",
22+
"org.scalatest" %% "scalatest" % "2.2.4" % "test",
23+
"org.specs2" %% "specs2-core" % "2.4.16" % "test",
24+
"org.specs2" %% "specs2-junit" % "2.4.16" % "test"
25+
)

phase_2/project/.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
/target/
2+
/project/
3+
*.properties

phase_2/project/plugins.sbt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.5")
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package cse512
2+
3+
import org.apache.log4j.{Level, Logger}
4+
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
5+
6+
object Entrance extends App {
7+
Logger.getLogger("org.spark_project").setLevel(Level.WARN)
8+
Logger.getLogger("org.apache").setLevel(Level.WARN)
9+
Logger.getLogger("akka").setLevel(Level.WARN)
10+
Logger.getLogger("com").setLevel(Level.WARN)
11+
12+
override def main(args: Array[String]) {
13+
val spark = SparkSession
14+
.builder()
15+
.appName("CSE512-HotspotAnalysis-MYGROUPNAME") // YOU NEED TO CHANGE YOUR GROUP NAME
16+
.config("spark.some.config.option", "some-value")//.master("local[*]")
17+
.getOrCreate()
18+
19+
paramsParser(spark, args)
20+
21+
}
22+
23+
private def paramsParser(spark: SparkSession, args: Array[String]): Unit = {
24+
var paramOffset = 1
25+
var currentQueryParams = ""
26+
var currentQueryName = ""
27+
var currentQueryIdx = -1
28+
29+
while (paramOffset <= args.length) {
30+
if (paramOffset == args.length || args(paramOffset).toLowerCase.contains("analysis")) {
31+
// Turn in the previous query
32+
if (currentQueryIdx != -1) queryLoader(spark, currentQueryName, currentQueryParams, args(0) + currentQueryIdx)
33+
34+
// Start a new query call
35+
if (paramOffset == args.length) return
36+
37+
currentQueryName = args(paramOffset)
38+
currentQueryParams = ""
39+
currentQueryIdx = currentQueryIdx + 1
40+
}
41+
else {
42+
// Keep appending query parameters
43+
currentQueryParams = currentQueryParams + args(paramOffset) + " "
44+
}
45+
paramOffset = paramOffset + 1
46+
}
47+
}
48+
49+
private def queryLoader(spark: SparkSession, queryName: String, queryParams: String, outputPath: String) {
50+
val queryParam = queryParams.split(" ")
51+
if (queryName.equalsIgnoreCase("hotcellanalysis")) {
52+
if (queryParam.length != 1) throw new ArrayIndexOutOfBoundsException("[CSE512] Query " + queryName + " needs 1 parameters but you entered " + queryParam.length)
53+
HotcellAnalysis.runHotcellAnalysis(spark, queryParam(0)).limit(50).write.mode(SaveMode.Overwrite).csv(outputPath)
54+
}
55+
else if (queryName.equalsIgnoreCase("hotzoneanalysis")) {
56+
if (queryParam.length != 2) throw new ArrayIndexOutOfBoundsException("[CSE512] Query " + queryName + " needs 2 parameters but you entered " + queryParam.length)
57+
HotzoneAnalysis.runHotZoneAnalysis(spark, queryParam(0), queryParam(1)).write.mode(SaveMode.Overwrite).csv(outputPath)
58+
}
59+
else {
60+
throw new NoSuchElementException("[CSE512] The given query name " + queryName + " is wrong. Please check your input.")
61+
}
62+
}
63+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package cse512
2+
3+
import org.apache.log4j.{Level, Logger}
4+
import org.apache.spark.sql.{DataFrame, SparkSession}
5+
import org.apache.spark.sql.functions.udf
6+
import org.apache.spark.sql.functions._
7+
8+
object HotcellAnalysis {
9+
Logger.getLogger("org.spark_project").setLevel(Level.WARN)
10+
Logger.getLogger("org.apache").setLevel(Level.WARN)
11+
Logger.getLogger("akka").setLevel(Level.WARN)
12+
Logger.getLogger("com").setLevel(Level.WARN)
13+
14+
def runHotcellAnalysis(spark: SparkSession, pointPath: String): DataFrame =
15+
{
16+
// Load the original data from a data source
17+
var pickupInfo = spark.read.format("com.databricks.spark.csv").option("delimiter",";").option("header","false").load(pointPath);
18+
pickupInfo.createOrReplaceTempView("nyctaxitrips")
19+
pickupInfo.show()
20+
21+
// Assign cell coordinates based on pickup points
22+
spark.udf.register("CalculateX",(pickupPoint: String)=>((
23+
HotcellUtils.CalculateCoordinate(pickupPoint, 0)
24+
)))
25+
spark.udf.register("CalculateY",(pickupPoint: String)=>((
26+
HotcellUtils.CalculateCoordinate(pickupPoint, 1)
27+
)))
28+
spark.udf.register("CalculateZ",(pickupTime: String)=>((
29+
HotcellUtils.CalculateCoordinate(pickupTime, 2)
30+
)))
31+
pickupInfo = spark.sql("select CalculateX(nyctaxitrips._c5),CalculateY(nyctaxitrips._c5), CalculateZ(nyctaxitrips._c1) from nyctaxitrips")
32+
var newCoordinateName = Seq("x", "y", "z")
33+
pickupInfo = pickupInfo.toDF(newCoordinateName:_*)
34+
pickupInfo.show()
35+
36+
// Define the min and max of x, y, z
37+
val minX = -74.50/HotcellUtils.coordinateStep
38+
val maxX = -73.70/HotcellUtils.coordinateStep
39+
val minY = 40.50/HotcellUtils.coordinateStep
40+
val maxY = 40.90/HotcellUtils.coordinateStep
41+
val minZ = 1
42+
val maxZ = 31
43+
val numCells = (maxX - minX + 1)*(maxY - minY + 1)*(maxZ - minZ + 1)
44+
45+
// YOU NEED TO CHANGE THIS PART
46+
47+
return pickupInfo // YOU NEED TO CHANGE THIS PART
48+
}
49+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package cse512
2+
3+
import java.sql.Timestamp
4+
import java.text.SimpleDateFormat
5+
import java.util.Calendar
6+
7+
object HotcellUtils {
8+
val coordinateStep = 0.01
9+
10+
def CalculateCoordinate(inputString: String, coordinateOffset: Int): Int =
11+
{
12+
// Configuration variable:
13+
// Coordinate step is the size of each cell on x and y
14+
var result = 0
15+
coordinateOffset match
16+
{
17+
case 0 => result = Math.floor((inputString.split(",")(0).replace("(","").toDouble/coordinateStep)).toInt
18+
case 1 => result = Math.floor(inputString.split(",")(1).replace(")","").toDouble/coordinateStep).toInt
19+
// We only consider the data from 2009 to 2012 inclusively, 4 years in total. Week 0 Day 0 is 2009-01-01
20+
case 2 => {
21+
val timestamp = HotcellUtils.timestampParser(inputString)
22+
result = HotcellUtils.dayOfMonth(timestamp) // Assume every month has 31 days
23+
}
24+
}
25+
return result
26+
}
27+
28+
def timestampParser (timestampString: String): Timestamp =
29+
{
30+
val dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss")
31+
val parsedDate = dateFormat.parse(timestampString)
32+
val timeStamp = new Timestamp(parsedDate.getTime)
33+
return timeStamp
34+
}
35+
36+
def dayOfYear (timestamp: Timestamp): Int =
37+
{
38+
val calendar = Calendar.getInstance
39+
calendar.setTimeInMillis(timestamp.getTime)
40+
return calendar.get(Calendar.DAY_OF_YEAR)
41+
}
42+
43+
def dayOfMonth (timestamp: Timestamp): Int =
44+
{
45+
val calendar = Calendar.getInstance
46+
calendar.setTimeInMillis(timestamp.getTime)
47+
return calendar.get(Calendar.DAY_OF_MONTH)
48+
}
49+
50+
// YOU NEED TO CHANGE THIS PART
51+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package cse512
2+
3+
import org.apache.log4j.{Level, Logger}
4+
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
5+
6+
object HotzoneAnalysis {
7+
8+
Logger.getLogger("org.spark_project").setLevel(Level.WARN)
9+
Logger.getLogger("org.apache").setLevel(Level.WARN)
10+
Logger.getLogger("akka").setLevel(Level.WARN)
11+
Logger.getLogger("com").setLevel(Level.WARN)
12+
13+
def runHotZoneAnalysis(spark: SparkSession, pointPath: String, rectanglePath: String): DataFrame = {
14+
15+
var pointDf = spark.read.format("com.databricks.spark.csv").option("delimiter",";").option("header","false").load(pointPath);
16+
pointDf.createOrReplaceTempView("point")
17+
18+
// Parse point data formats
19+
spark.udf.register("trim",(string : String)=>(string.replace("(", "").replace(")", "")))
20+
pointDf = spark.sql("select trim(_c5) as _c5 from point")
21+
pointDf.createOrReplaceTempView("point")
22+
23+
// Load rectangle data
24+
val rectangleDf = spark.read.format("com.databricks.spark.csv").option("delimiter","\t").option("header","false").load(rectanglePath);
25+
rectangleDf.createOrReplaceTempView("rectangle")
26+
27+
// Join two datasets
28+
spark.udf.register("ST_Contains",(queryRectangle:String, pointString:String)=>(HotzoneUtils.ST_Contains(queryRectangle, pointString)))
29+
val joinDf = spark.sql("select rectangle._c0 as rectangle, point._c5 as point from rectangle,point where ST_Contains(rectangle._c0,point._c5)")
30+
joinDf.createOrReplaceTempView("joinResult")
31+
32+
// YOU NEED TO CHANGE THIS PART
33+
34+
return joinDf // YOU NEED TO CHANGE THIS PART
35+
}
36+
37+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package cse512
2+
3+
object HotzoneUtils {
4+
5+
def ST_Contains(queryRectangle: String, pointString: String ): Boolean = {
6+
// YOU NEED TO CHANGE THIS PART
7+
return true // YOU NEED TO CHANGE THIS PART
8+
}
9+
10+
// YOU NEED TO CHANGE THIS PART
11+
12+
}

phase_2/src/resources/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
/yellow_tripdata_2009-01_point.csv

0 commit comments

Comments
 (0)