-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.py
240 lines (203 loc) · 7.9 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
from flask import Flask
from flask import Flask, flash, redirect, render_template, request, session, abort
from wtforms import Form, TextField, TextAreaField, validators, StringField, SubmitField, SelectField
import os
from sqlalchemy.orm import sessionmaker
from tabledef import *
from dropdown_tables import *
from elasticsearch import Elasticsearch
import os
'''
This class contains the main functionality of the web app. It contains the logic and the routing done for the web pages.
'''
global admin_logged_on # need a more secure way of having admin logon
dropdowns = create_engine('sqlite:///dropdowns.db', echo=True)
DEBUG = True
app = Flask(__name__)
app.config.from_object(__name__)
app.config['SECRET_KEY'] = '7d441f27d441f27567d441f2b6176a'
es = Elasticsearch([{'host': 'localhost', 'port': 10000}])
class ReusableForm(Form):
'''
This class binds a form to a session. It also populates the dropdown menus by extracting information from the sqlite database that contains the
tables.
'''
Session = sessionmaker(bind=dropdowns)
a = Session()
# run query to obtain all results
ped_opts_q = a.query(Drop_ped.ped)
fab_opts_q = a.query(Drop_fab.fab)
post_opts_q = a.query(Drop_post.post)
test_opts_q = a.query(Drop_test.test)
# convert them to str
ped_opts = convert_str(ped_opts_q)
fab_opts = convert_str(fab_opts_q)
post_opts = convert_str(post_opts_q)
test_opts = convert_str(test_opts_q)
# the method used to populate the dropdown names needs tuples
ped_opts_tuple = []
fab_opts_tuple = []
post_opts_tuple = []
test_opts_tuple =[]
# filling in the tuples
for x in ped_opts:
temp = (x, x)
ped_opts_tuple.append(temp)
for x in fab_opts:
temp = (x, x)
fab_opts_tuple.append(temp)
for x in post_opts:
temp = (x, x)
post_opts_tuple.append(temp)
for x in test_opts:
temp = (x, x)
test_opts_tuple.append(temp)
# filling in the web form with the dropdown options and establishing the searchbar field
pedigree = SelectField('Pedigree:', choices=ped_opts_tuple)
fabrication = SelectField('Fabrication:', choices=fab_opts_tuple)
post_processing = SelectField('Post-Processing:', choices=post_opts_tuple)
testing = SelectField('Testing:', choices=test_opts_tuple)
search = StringField('')
def reformat(item):
'''
Due to how the names of the items are stroed on the elastic search engine, we go ahead reformat them to look cleaner.
'''
temp = item.replace("_", " ")
temp = temp.split(" ")
result = ""
for word in temp:
result += word[0].upper() + word[1::] + " "
return result
def run_query(search):
'''
Pretty self explanitory. Uses the elastic search format to run query on the database.
'''
results = []
search_command = {
"query": {
"bool": {
"should": [
{ "query_string": { "query": search.data['search']}},
{ "match": { "pedigree": search.data['pedigree'] }},
{ "match": { "fabrication": search.data['fabrication']}},
{ "match": { "post_processing": search.data['post_processing']}},
{ "match": { "testing": search.data['testing']}}
]
}
}
}
results = es.search(index="database", body=search_command)
doc_results = []
doc_id = []
if results:
#filters_applied = search.data['pedigree'] + " " + search.data['fabrication'] + " " + search.data['post_processing'] + " " + search.data['testing']
#flash("You Searched for: " + search.data['search'] + " with filters: " + filters_applied)
# print(results)
if results['hits']['total'] == 0:
flash("No Results")
else:
display_results = ''
flash("Results:")
for hit in results['hits']['hits']:
doc_id.append('Document id: ' + hit['_id'])
for item in hit['_source']:
display_results += item + ": " + hit['_source'][item] + '\n'
# flash(str(display_results))
# print(str(display_results))
doc_results.append(display_results)
return zip(doc_id, doc_results)
@app.route('/', methods=['GET', 'POST'])
def admin():
'''
Detects if an admin user requested a log on, if not, it returns you to the login page, else it sends you to the admin home page.
'''
if not session.get('logged_in'):
return home()
else:
return redirect('http://localhost:4000/admin_home')
@app.route('/admin_home', methods=['GET', 'POST'])
def admin_home():
'''
The admin home page. It loads the admin options with the normal search form as well.
'''
if not session.get('logged_in') or not admin_logged_on:
return home()
else:
search = ReusableForm(request.form) # uses reusable form object declared above in this file
if request.method == 'POST':
return search_results_admin(search)
return render_template('admin_home.html', form=search) # you can pass a form to the method render_template to be used in the web app
@app.route('/search', methods=['GET', 'POST'])
def search():
if not session.get('logged_in'):
return home()
else:
search = ReusableForm(request.form)
if request.method == 'POST':
return search_results(search)
return render_template('front_page.html', form=search)
@app.route('/results_admin')
def search_results_admin(search):
if admin_logged_on:
my_results = run_query(search)
return render_template('admin_home.html', form=search, results=my_results)
else:
return home()
@app.route('/Document', methods=['GET', 'POST'])
def display_document():
doc_id = request.args.get('type')
doc_id = doc_id.strip("Document id: ")
# runs query to match individual document in order to dsiplay it on its own document page.
search = {
"query": {
"match": {
"_id": doc_id
}
}
}
results = es.search(index='database', body=search)
items = []
display_results = []
for hit in results['hits']['hits']:
for item in hit['_source']:
display_results.append(hit['_source'][item] + '\n')
items.append(reformat(item))
return render_template('/document.html', results=zip(items, display_results), doc_id=doc_id)
@app.route('/results')
def search_results(search):
my_results = run_query(search)
return render_template('front_page.html', form=search, results=my_results)
@app.route('/', methods=['GET', 'POST'])
def home():
if not session.get('logged_in'):
return render_template('login.html')
else:
return redirect('http://localhost:4000/search')
@app.route('/login', methods=['POST'])
def login():
global admin_logged_on
POST_USERNAME = str(request.form['username'])
POST_PASSWORD = str(request.form['password'])
Session = sessionmaker(bind=dropdowns)
s = Session()
query = s.query(User).filter(User.username.in_([POST_USERNAME]), User.password.in_([POST_PASSWORD]))
result = query.first()
if result and result.admin:
session['logged_in'] = True
admin_logged_on = True
return admin()
elif result:
session['logged_in'] = True
admin_logged_on = False
else:
admin_logged_on = False
return home()
@app.route("/logout")
def logout():
global admin_logged_on
admin_logged_on = False
session['logged_in'] = False
return home()
if __name__ == "__main__":
app.secret_key = os.urandom(12)
app.run(debug=True,host='0.0.0.0', port=4000)