-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathestools.py
More file actions
53 lines (47 loc) · 1.69 KB
/
estools.py
File metadata and controls
53 lines (47 loc) · 1.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import os
import time
from elasticsearch import Elasticsearch, exceptions as es_exceptions
from elasticsearch import helpers
def get_es_connection():
"""
establishes es connection.
"""
print("make sure we are connected to ES...")
try:
if 'ES_USER' in os.environ and 'ES_PASS' in os.environ and 'ES_HOST' in os.environ:
es_conn = Elasticsearch(
[{'host': os.environ['ES_HOST'], 'port': 9200}],
http_auth=(os.environ['ES_USER'], os.environ['ES_PASS'])
)
else:
es_conn = Elasticsearch([{'host': 'atlas-kibana.mwt2.org', 'port': 9200}])
print("connected OK!")
except es_exceptions.ConnectionError as error:
print('ConnectionError in get_es_connection: ', error)
except Exception as e:
print('Something seriously wrong happened in getting ES connection.', e)
else:
return es_conn
time.sleep(70)
get_es_connection()
def bulk_index(data, es_conn=None, thread_name=''):
"""
sends the data to ES for indexing.
if successful returns True.
"""
success = False
if es_conn is None:
es_conn = get_es_connection()
try:
res = helpers.bulk(es_conn, data, raise_on_exception=True, request_timeout=120)
print(thread_name, "inserted:", res[0], 'errors:', res[1])
success = True
except es_exceptions.ConnectionError as error:
print('ConnectionError ', error)
except es_exceptions.TransportError as error:
print('TransportError ', error)
except helpers.BulkIndexError as error:
print(error)
except Exception as e:
print('Something seriously wrong happened.', e)
return success