-
Notifications
You must be signed in to change notification settings - Fork 2
/
dataflow.py
75 lines (63 loc) · 2.05 KB
/
dataflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
from datetime import timedelta, datetime
import time
from bytewax.dataflow import Dataflow
from bytewax.inputs import ManualInputConfig
from bytewax.outputs import StdOutputConfig, ManualOutputConfig
from bytewax.execution import run_main, spawn_cluster, cluster_main
import pandas as pd
from bytewax import parse
from app.dataflow import timescore, webscore, user, devicescore, pcscore
def input_builder(worker_index, worker_count, resume_state):
state = None # ignore recovery
for line in open("sample.csv"):
yield state, line
def get_msg(line):
list = line.split(",")
json = {
"id": list[0],
"time": list[1],
"user_id": list[2],
"pc": list[3],
"activity": list[4],
}
json["activity"] = json["activity"].strip()
json["user_id"] = json["user_id"][5:]
return json
def get_role(json):
json["role"] = user.get_role_from_uid(json["user_id"])
return json
def score_time(json):
json["time_score"] = timescore.score(json["time"])
return json
def score_activity(json):
if json["activity"].startswith("http"):
json["activity_score"] = webscore.score(json["activity"])
elif json["activity"] == "Connect":
json["activity_score"] = devicescore.score(json["role"])
else:
json["activity_score"] = 0
return json
def score_pc(json):
json["pc_score"] = pcscore.score(json["user_id"], json["pc"], json["role"])
return json
def sum_score(json):
json["score"] = json["time_score"] + json["activity_score"] + json["pc_score"]
#time.sleep(1)
return json
def output_builder(worker_index: int, worker_count: int) -> callable:
"""Build the function to write to feast"""
def write(json):
if json["score"] > 2.5:
print(json)
return write
flow = Dataflow()
flow.input("input", ManualInputConfig(input_builder))
flow.map(get_msg)
flow.map(get_role)
flow.map(score_time)
flow.map(score_activity)
flow.map(score_pc)
flow.map(sum_score)
flow.capture(ManualOutputConfig(output_builder))
if __name__ == "__main__":
run_main(flow)