-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtrain.py
188 lines (154 loc) · 6.78 KB
/
train.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
import json
import pandas as pd
from fairsearchdeltr import Deltr
from utils import Logger, elastic_connection, ES_HOST, ES_AUTH, JUDGMENTS_FILE, \
FEATURE_SET_NAME, MODEL_FILE, QUERIES_FILE, FEATURES_FILE, INDEX_NAME
log_query = {
"size": 1000,
"query": {
"bool": {
"filter": [
{
"terms": {
"_id": [
]
}
},
{
"sltr": {
"_name": "logged_featureset",
"featureset": "",
"params": {
"keywords": ""
}
}
}
]
}
},
"ext": {
"ltr_log": {
"log_specs": {
"name": "log_entry",
"named_query": "logged_featureset",
"missing_as_zero": "true"
}
}
}
}
def log_features(es, query_id: int, query: str, ids: list, feature_set_name: str, index_name: str):
"""
:param es: Elasicsearch client
:param query: Query to train on
:param ids: Document IDs with known judgements for this query
:param feature_set_name: What feature set to get the score for
:param index_name: What index to search against
:return:
"""
log_query['query']['bool']['filter'][0]['terms']['_id'] = ids
log_query['query']['bool']['filter'][1]['sltr']['params']['keywords'] = query
log_query['query']['bool']['filter'][1]['sltr']['featureset'] = feature_set_name
Logger.logger.info("*** POST " + str(query_id))
Logger.logger.info(json.dumps(log_query, indent=2))
resp = es.search(index=index_name, body=log_query)
return resp['hits']['hits']
def collect_train_data(es, queries_file, judgments_file, feature_set_name, index_name, features_file):
""" CCollects the train data from Elasticsearch
"""
queries = pd.read_csv(queries_file)
judgements = pd.read_csv(judgments_file)
header_id = "query_id,document_id"
header_feat = None
header_judgement = "judgement"
with open(features_file, 'w') as f:
for q in range(queries.shape[0]):
q_id = queries.loc[q]['query_id']
keywords = queries.loc[q]['keywords']
ids = judgements.loc[judgements['query_id'] == q_id]['document_id'].tolist()
hits = log_features(es, q_id, keywords, ids, feature_set_name, index_name)
for doc in hits:
doc_id = doc['_id']
log = doc['fields']['_ltrlog'][0]['log_entry']
if not header_feat:
header_feat = ",".join([a["name"] for a in log])
f.write("{0},{1},{2}\n".format(header_id, header_feat, header_judgement))
values = ",".join([str(a["value"]) for a in log])
judgement = judgements.loc[judgements['document_id'] == doc_id]['judgement'].iloc[0]
f.write("{0},{1},{2},{3}\n".format(q_id, doc_id, values, judgement))
def train_model(features_file: str, model_output: str,
protected_feature_name="1", gamma=1, number_of_iterations=10, learning_rate=0.001,
lambdaa=0.001, init_var=0.01, standardize=True, log=None):
"""
Trains the DELTR model with the specified parameters
:param features_file: The train file with features and judgements
:param model_output: The file where the model is going to be stored
:param protected_feature_name: The name of the column in the data that contains protected attribute
:param gamma: gamma parameter for the cost calculation in the training phase
(recommended to be around 1)
:param number_of_iterations number of iteration in gradient descent (optional)
:param learning_rate learning rate in gradient descent (optional)
:param lambdaa regularization constant (optional)
:param init_var range of values for initialization of weights (optional)
:param standardize boolean indicating whether the data should be standardized or not (optional)
:param log file name where the train log should be stored (optional)
:return:
"""
Logger.logger.info("*** Reading train data ")
train_data = pd.read_csv(features_file)
# get the feature names
feature_names = train_data.columns.tolist()[2:-1]
# find the index of the protected attribute
# protected_feature = train_data.columns.tolist().index(protected_feature_name) - 2 # minus for the query and doc id
# create the Deltr object
dtr = Deltr(protected_feature_name, gamma, number_of_iterations, learning_rate, lambdaa, init_var, standardize)
Logger.logger.info("*** Training...")
model = dtr.train(train_data)
Logger.logger.info("*** Done training")
Logger.logger.info("*** Saving model")
with(open(model_output, 'w')) as f:
json.dump(dict(zip(feature_names, model)), f)
Logger.logger.info("*** Done saving model")
if log:
Logger.logger.info("*** Saving log")
with(open(log, 'w')) as f:
json.dump([{"timestamp":l.timestamp,
"omega":l.omega.tolist(),
"omega_gradient":l.omega_gradient.tolist(),
"loss_standard":l.loss_standard,
"loss_exposure": l.loss_exposure} for l in dtr.log], f)
Logger.logger.info("*** Done saving log")
def save_model(script_name, feature_set, model_fname):
"""
Save the DELTR model in Elasticsearch
"""
import requests
import json
from urllib.parse import urljoin
model_payload = {
"model": {
"name": script_name,
"model": {
"type": "model/linear",
"definition": {
}
}
}
}
with open(model_fname) as modelFile:
model_content = modelFile.read()
path = "_ltr/_featureset/%s/_createmodel" % feature_set
full_path = urljoin(ES_HOST, path)
model_payload['model']['model']['definition'] = model_content
Logger.logger.info("*** Uploading model")
Logger.logger.info("POST %s" % full_path)
head = {'Content-Type': 'application/json'}
resp = requests.post(full_path, data=json.dumps(model_payload), headers=head, auth=ES_AUTH)
Logger.logger.info(resp.status_code)
Logger.logger.error(resp.text)
if __name__ == "__main__":
es = elastic_connection(timeout=1000)
# Train DELTR and store as linear regression type
Logger.logger.info("*** Training DELTR ")
collect_train_data(es, QUERIES_FILE, JUDGMENTS_FILE, FEATURE_SET_NAME, INDEX_NAME, FEATURES_FILE)
train_model(features_file=FEATURES_FILE, model_output=MODEL_FILE)
save_model(script_name="somemodel", feature_set=FEATURE_SET_NAME, model_fname=MODEL_FILE)