-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.py
88 lines (74 loc) · 2.44 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import os
from pyspark.sql import SparkSession
from perspective import Table as PerspectiveTable, PerspectiveManager
from time import sleep
from helpers.data import (
HOST,
MACHINES_PORT,
USAGE_PORT,
STATUS_PORT,
JOBS_PORT,
)
from helpers.spark import (
MACHINE_SCHEMA,
MACHINE_SCHEMA_SPARK,
USAGE_SCHEMA,
USAGE_SCHEMA_SPARK,
STATUS_SCHEMA,
STATUS_SCHEMA_SPARK,
JOBS_SCHEMA,
JOBS_SCHEMA_SPARK,
)
# Important imports
from helpers.spark import (
get_df_from_server,
push_to_perspective,
)
from helpers.fastapi import (
make_perspective_app,
perspective_spark_bridge,
start_server,
)
def main():
# Make a spark session
spark = SparkSession.builder.appName("Perspective Demo").getOrCreate()
# Construct a perspective manager
manager = PerspectiveManager()
# Get spark streaming dfs
machines_df = get_df_from_server(spark, MACHINE_SCHEMA_SPARK, HOST, MACHINES_PORT)
usage_df = get_df_from_server(spark, USAGE_SCHEMA_SPARK, HOST, USAGE_PORT)
status_df = get_df_from_server(spark, STATUS_SCHEMA_SPARK, HOST, STATUS_PORT)
jobs_df = get_df_from_server(spark, JOBS_SCHEMA_SPARK, HOST, JOBS_PORT)
# construct 4 separate perspective tables
machines_table = PerspectiveTable(MACHINE_SCHEMA, index="machine_id")
usage_table = PerspectiveTable(USAGE_SCHEMA, index="machine_id")
status_table = PerspectiveTable(STATUS_SCHEMA, index="machine_id")
jobs_table = PerspectiveTable(JOBS_SCHEMA)
# host these tables
manager.host_table("machines", machines_table)
manager.host_table("usage", usage_table)
manager.host_table("status", status_table)
manager.host_table("jobs", jobs_table)
# Bridge Perspective to Spark
app = perspective_spark_bridge(
{
"machines": machines_table,
"usage": usage_table,
"status": status_table,
"jobs": jobs_table,
}
)
# Wrap with FastAPI
make_perspective_app(manager, app)
# Start the server
thread = start_server(app, 8080)
# Sleep for a sec to let the server start
sleep(2)
# Now push from spark to perspective
push_to_perspective(machines_df, "machines", "localhost", 8080)
push_to_perspective(usage_df, "usage", "localhost", 8080)
push_to_perspective(status_df, "status", "localhost", 8080)
push_to_perspective(jobs_df, "jobs", "localhost", 8080)
thread.join()
if __name__ == "__main__":
main()