-
-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathopensearch.py
133 lines (105 loc) · 4 KB
/
opensearch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import os
from typing import Dict, Iterable, List, Union
import opensearchpy
from .interfaces import IndexInterface
class OpenSearchInterface(IndexInterface):
def __init__(
self,
hosts: List,
user: str,
password: str,
timeout: int = 30,
default_index: str = "",
):
self._search_engine = opensearchpy.OpenSearch(
hosts=hosts, http_auth=(user, password)
)
self._timeout = timeout
self._default_index = default_index
def index_exists(self, index_name: str) -> bool:
return self._search_engine.indices.exists(index=index_name)
def is_valid_index_name(self, index_name: str) -> bool:
return isinstance(index_name, str) and len(index_name) > 0
def get_index_name(self, index_name: str) -> str:
if self.is_valid_index_name(index_name):
return index_name
if self._default_index == "":
raise Exception("Index name not defined")
return self._default_index
def create_index(self, index_name: str = "", body: Dict = {}) -> None:
index_name = self.get_index_name(index_name)
if self.index_exists(index_name):
return
self._search_engine.indices.create(
index=index_name,
body=body,
timeout=self._timeout,
)
def refresh_index(self, index_name: str = "") -> None:
index_name = self.get_index_name(index_name)
if self.index_exists(index_name):
return
self._search_engine.indices.refresh(
index=index_name,
)
def index_document(
self,
document: Dict,
document_id: Union[str, None] = None,
index: str = "",
refresh: bool = False,
) -> None:
index = self.get_index_name(index)
self._search_engine.index(
index=index, body=document, id=document_id, refresh=refresh
)
def search(self, query: Dict, index: str = "") -> Dict:
index = self.get_index_name(index)
result = self._search_engine.search(index=index, body=query, request_timeout=60)
return result
def analyze(self, text: str, field: str, index: str = "") -> Dict:
index = self.get_index_name(index)
result = self._search_engine.indices.analyze(
body={"text": text, "field": field}, index=index
)
return result
def paginated_search(
self, query: Dict, index: str = "", keep_alive: str = "5m"
) -> Iterable[Dict]:
index = self.get_index_name(index)
result = self._search_engine.search(
index=index, body=query, scroll=keep_alive, request_timeout=120
)
if len(result["hits"]["hits"]) == 0:
return
scroll_id = None
while len(result["hits"]["hits"]) > 0:
yield result
if scroll_id is not None and scroll_id != result["_scroll_id"]:
self._search_engine.clear_scroll(scroll_id=scroll_id)
scroll_id = result["_scroll_id"]
result = self._search_engine.scroll(
scroll_id=scroll_id, scroll=keep_alive, request_timeout=120
)
self._search_engine.clear_scroll(scroll_id=scroll_id)
def get_opensearch_host():
return os.environ["OPENSEARCH_HOST"]
def get_opensearch_index():
return os.environ["OPENSEARCH_INDEX"]
def get_opensearch_user():
return os.environ["OPENSEARCH_USER"]
def get_opensearch_password():
return os.environ["OPENSEARCH_PASSWORD"]
def create_index_interface() -> IndexInterface:
hosts = get_opensearch_host()
if not isinstance(hosts, str) or len(hosts) == 0:
raise Exception("Missing index hosts")
default_index_name = get_opensearch_index()
if not isinstance(default_index_name, str) or len(default_index_name) == 0:
raise Exception("Invalid index name")
return OpenSearchInterface(
[hosts],
get_opensearch_user(),
get_opensearch_password(),
default_index=default_index_name,
)