-
Notifications
You must be signed in to change notification settings - Fork 0
/
elastic_interface.py
85 lines (68 loc) · 2.94 KB
/
elastic_interface.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import datetime
import logging
import uuid
import elasticsearch
import elasticsearch.helpers
class ElasticAPI:
def __init__(self, index=None, host='localhost', port=9200):
self.es = elasticsearch.Elasticsearch(hosts=[{"host": host, "port": port}], scheme='http')
self.es_index = index
self.log = logging.getLogger('root')
def push(self, data_dict):
ts = datetime.datetime.utcnow()
data_dict.update({'timestamp': ts})
res = self.es.index(index=self.es_index, id=uuid.uuid1(), body=data_dict, request_timeout=10)
return res["_shards"]["successful"] == 1
def bulk_push(self, generator):
return elasticsearch.helpers.bulk(self.es, actions=self.process_generator(generator))
def process_generator(self, generator):
ts = datetime.datetime.utcnow()
for data_dict in generator:
data_dict.update({'timestamp': ts, '_id': uuid.uuid1(), '_index': self.es_index})
yield data_dict
def index_exists(self):
exists = self.es.indices.exists(index=self.es_index)
if not exists:
raise ValueError(f"{self.es_index} does not exist!")
return exists
def trip_exists(self, trip_id):
query = {"query": {"term": {"trip_id": trip_id}}}
try:
query_gen = elasticsearch.helpers.scan(self.es, index=self.es_index, query=query, request_timeout=5)
return len(list(query_gen)) > 0
except (elasticsearch.ConnectionError, elasticsearch.ConnectionTimeout) as ex:
self.log.exception(ex)
self.log.error("Connection to ES server failed!")
return None
def delete_trip(self, trip_id):
query = {"query": {"term": {"trip_id": trip_id}}}
try:
self.es.delete_by_query(index=self.es_index, body=query)
except (elasticsearch.ConnectionError, elasticsearch.ConnectionTimeout) as ex:
self.log.exception(ex)
self.log.error("Connection to ES server failed!")
if __name__ == "__main__":
import argparse
argp = argparse.ArgumentParser()
argp.add_argument('--index',
default=None)
argp.add_argument('--trip-id',
dest='trip_id',
default=None)
argp.add_argument('-ct',
dest='check_trip',
action='store_true',
help='check trip existence')
argp.add_argument('-dt',
dest='delete_trip',
action='store_true',
help='delete trip')
args = argp.parse_args()
elastic = ElasticAPI(index=args.index)
elastic.index_exists()
if args.check_trip:
trip_exists = elastic.trip_exists(args.trip_id)
print(f"{args.trip_id} exists in index {args.index}: {trip_exists}")
if args.delete_trip:
elastic.delete_trip(args.trip_id)
print(f"{args.trip_id} deleted from index {args.index}")