forked from typedb/typedb-driver-examples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmigrate_csv.py
executable file
·133 lines (113 loc) · 4.77 KB
/
migrate_csv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# the Python client for Grakn
# https://github.com/graknlabs/client-python
from grakn.client import GraknClient
# Python's built in module for dealing with .csv files.
# we will use it read data source files.
# https://docs.python.org/3/library/csv.html#dialects-and-formatting-parameters
import csv
def build_phone_call_graph(inputs, data_path, keyspace_name):
"""
gets the job done:
1. creates a Grakn instance
2. creates a session to the targeted keyspace
3. for each input:
- a. constructs the full path to the data file
- b. loads csv to Grakn
:param input as list of dictionaties: each dictionary contains details required to parse the data
"""
with GraknClient(uri="localhost:48555") as client: # 1
with client.session(keyspace=keyspace_name) as session: # 2
for input in inputs:
input["file"] = input["file"].replace(data_path, "") # for testing purposes
input["file"] = data_path + input["file"] # 3a
print("Loading from [" + input["file"] + ".csv] into Grakn ...")
load_data_into_grakn(input, session) # 3b
def load_data_into_grakn(input, session):
"""
loads the csv data into our Grakn phone_calls keyspace:
1. gets the data items as a list of dictionaries
2. for each item dictionary
a. creates a Grakn transaction
b. constructs the corresponding Graql insert query
c. runs the query
d. commits the transaction
:param input as dictionary: contains details required to parse the data
:param session: off of which a transaction will be created
"""
items = parse_data_to_dictionaries(input) # 1
for item in items: # 2
with session.transaction().write() as transaction: # a
graql_insert_query = input["template"](item) # b
print("Executing Graql Query: " + graql_insert_query)
transaction.query(graql_insert_query) # c
transaction.commit() # d
print("\nInserted " + str(len(items)) +
" items from [ " + input["file"] + ".csv] into Grakn.\n")
def company_template(company):
return 'insert $company isa company, has name "' + company["name"] + '";'
def person_template(person):
# insert person
graql_insert_query = 'insert $person isa person, has phone-number "' + \
person["phone_number"] + '"'
if person["first_name"] != "":
graql_insert_query += ', has first-name "' + person["first_name"] + '"'
graql_insert_query += ', has last-name "' + person["last_name"] + '"'
graql_insert_query += ', has city "' + person["city"] + '"'
graql_insert_query += ", has age " + str(person["age"])
graql_insert_query += ";"
return graql_insert_query
def contract_template(contract):
# match company
graql_insert_query = 'match $company isa company, has name "' + \
contract["company_name"] + '";'
# match person
graql_insert_query += ' $customer isa person, has phone-number "' + \
contract["person_id"] + '";'
# insert contract
graql_insert_query += " insert (provider: $company, customer: $customer) isa contract;"
return graql_insert_query
def call_template(call):
# match caller
graql_insert_query = 'match $caller isa person, has phone-number "' + \
call["caller_id"] + '";'
# match callee
graql_insert_query += ' $callee isa person, has phone-number "' + \
call["callee_id"] + '";'
# insert call
graql_insert_query += (" insert $call(caller: $caller, callee: $callee) isa call; " +
"$call has started-at " + call["started_at"] + "; " +
"$call has duration " + str(call["duration"]) + ";")
return graql_insert_query
def parse_data_to_dictionaries(input):
"""
1. reads the file through a stream,
2. adds the dictionary to the list of items
:param input.file as string: the path to the data file, minus the format
:returns items as list of dictionaries: each item representing a data item from the file at input.file
"""
items = []
with open(input["file"] + ".csv") as data: # 1
for row in csv.DictReader(data, skipinitialspace=True):
item = {key: value for key, value in row.items()}
items.append(item) # 2
return items
Inputs = [
{
"file": "companies",
"template": company_template
},
{
"file": "people",
"template": person_template
},
{
"file": "contracts",
"template": contract_template
},
{
"file": "calls",
"template": call_template
}
]
if __name__ == "__main__":
build_phone_call_graph(inputs=Inputs, data_path="../../datasets/phone-calls/", keyspace_name = "phone_calls")