-
Notifications
You must be signed in to change notification settings - Fork 182
/
Copy pathconfluence.py
160 lines (129 loc) Β· 6.3 KB
/
confluence.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import logging
import os
from datetime import datetime
from typing import List, Dict
from atlassian import Confluence
from atlassian.errors import ApiError
from requests import HTTPError
from data_source.api.base_data_source import BaseDataSource, ConfigField, HTMLInputType, Location, BaseDataSourceConfig
from data_source.api.basic_document import BasicDocument, DocumentType
from data_source.api.exception import InvalidDataSourceConfig
from parsers.html import html_to_text
from queues.index_queue import IndexQueue
logger = logging.getLogger(__name__)
class ConfluenceConfig(BaseDataSourceConfig):
url: str
token: str
class ConfluenceDataSource(BaseDataSource):
@staticmethod
def get_config_fields() -> List[ConfigField]:
return [
ConfigField(label="Confluence URL", name="url", placeholder="https://example.confluence.com"),
ConfigField(label="Personal Access Token", name="token", input_type=HTMLInputType.PASSWORD)
]
@staticmethod
def list_spaces(confluence: Confluence, start=0) -> List[Location]:
# Usually the confluence connection fails, so we retry a few times
retries = 3
for i in range(retries):
try:
return [Location(label=space['name'], value=space['key'])
for space in confluence.get_all_spaces(expand='status', start=start)['results']]
except Exception as e:
logging.error(f'Confluence connection failed: {e}')
if i == retries - 1:
raise e
@staticmethod
def list_all_spaces(confluence: Confluence) -> List[Location]:
logger.info('Listing spaces')
spaces = []
start = 0
while True:
new_spaces = ConfluenceDataSource.list_spaces(confluence=confluence, start=start)
if len(new_spaces) == 0:
break
spaces.extend(new_spaces)
start += len(new_spaces)
logger.info(f'Found {len(spaces)} spaces')
return spaces
@staticmethod
async def validate_config(config: Dict) -> None:
try:
client = ConfluenceDataSource.confluence_client_from_config(config)
ConfluenceDataSource.list_spaces(confluence=client)
except Exception as e:
raise InvalidDataSourceConfig from e
@staticmethod
def confluence_client_from_config(config: Dict) -> Confluence:
parsed_config = ConfluenceConfig(**config)
should_verify_ssl = os.environ.get('CONFLUENCE_VERIFY_SSL') is not None
return Confluence(url=parsed_config.url, token=parsed_config.token, verify_ssl=should_verify_ssl)
@staticmethod
def list_locations(config: Dict) -> List[Location]:
confluence = ConfluenceDataSource.confluence_client_from_config(config)
return ConfluenceDataSource.list_all_spaces(confluence=confluence)
@staticmethod
def has_prerequisites() -> bool:
return True
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._confluence = ConfluenceDataSource.confluence_client_from_config(self._raw_config)
def _list_spaces(self) -> List[Location]:
return ConfluenceDataSource.list_all_spaces(confluence=self._confluence)
def _feed_new_documents(self) -> None:
logger.info('Feeding new documents with Confluence')
spaces = self._config.locations_to_index or self._list_spaces()
for space in spaces:
self.add_task_to_queue(self._feed_space_docs, space=space)
def _feed_space_docs(self, space: Location) -> List[Dict]:
logging.info(f'Getting documents from space {space.label} ({space.value})')
start = 0
limit = 200 # limit when expanding the version
while True:
new_batch = self._confluence.get_all_pages_from_space(space.value, start=start, limit=limit,
expand='version')
len_new_batch = len(new_batch)
logger.info(f'Got {len_new_batch} documents from space {space.label} (total {start + len_new_batch})')
for raw_doc in new_batch:
raw_doc['space_name'] = space.label
self.add_task_to_queue(self._feed_doc, raw_doc=raw_doc)
if len(new_batch) < limit:
break
start += limit
def _feed_doc(self, raw_doc: Dict):
last_modified = datetime.strptime(raw_doc['version']['when'], "%Y-%m-%dT%H:%M:%S.%fZ")
if last_modified < self._last_index_time:
return
doc_id = raw_doc['id']
try:
fetched_raw_page = self._confluence.get_page_by_id(doc_id, expand='body.storage,history')
except HTTPError as e:
logging.warning(
f'Confluence returned status code {e.response.status_code} for document {doc_id} ({raw_doc["title"]}). skipping.')
return
except ApiError as e:
logging.warning(
f'unable to access document {doc_id} ({raw_doc["title"]}). reason: "{e.reason}". skipping.')
return
author = fetched_raw_page['history']['createdBy']['displayName']
author_image = fetched_raw_page['history']['createdBy']['profilePicture']['path']
author_image_url = fetched_raw_page['_links']['base'] + author_image
html_content = fetched_raw_page['body']['storage']['value']
plain_text = html_to_text(html_content)
url = fetched_raw_page['_links']['base'] + fetched_raw_page['_links']['webui']
doc = BasicDocument(title=fetched_raw_page['title'],
content=plain_text,
author=author,
author_image_url=author_image_url,
timestamp=last_modified,
id=doc_id,
data_source_id=self._data_source_id,
location=raw_doc['space_name'],
url=url,
type=DocumentType.DOCUMENT)
IndexQueue.get_instance().put_single(doc=doc)
# if __name__ == '__main__':
# import os
# config = {"url": os.environ['CONFLUENCE_URL'], "token": os.environ['CONFLUENCE_TOKEN']}
# confluence = ConfluenceDataSource(config=config, data_source_id=0)
# confluence._feed_new_documents()