Skip to content

Commit 6098f1a

Browse files
committed
Implemented repository watcher
1 parent 2c24746 commit 6098f1a

11 files changed

+573
-58
lines changed

centraldogma/base_client.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,11 @@ def request(self, method: str, path: str, **kwargs) -> Union[Response]:
4242
def _set_request_headers(self, method: str, **kwargs) -> Dict:
4343
default_headers = self.patch_headers if method == "patch" else self.headers
4444
headers = kwargs.get("headers")
45-
kwargs["headers"] = dict(list(default_headers.items()) + list(headers.items())) if headers else default_headers
45+
kwargs["headers"] = (
46+
dict(list(default_headers.items()) + list(headers.items()))
47+
if headers
48+
else default_headers
49+
)
4650
return kwargs
4751

4852
def _httpx_request(self, method: str, url: str, **kwargs) -> Response:

centraldogma/content_service.py

+37-24
Original file line numberDiff line numberDiff line change
@@ -30,20 +30,20 @@
3030
from centraldogma.exceptions import CentralDogmaException
3131
from centraldogma.query import Query, QueryType
3232

33-
T = TypeVar('T')
33+
T = TypeVar("T")
3434

3535

3636
class ContentService:
3737
def __init__(self, client: BaseClient):
3838
self.client = client
3939

4040
def get_files(
41-
self,
42-
project_name: str,
43-
repo_name: str,
44-
path_pattern: Optional[str],
45-
revision: Optional[int],
46-
include_content: bool = False,
41+
self,
42+
project_name: str,
43+
repo_name: str,
44+
path_pattern: Optional[str],
45+
revision: Optional[int],
46+
include_content: bool = False,
4747
) -> List[Content]:
4848
params = {"revision": revision} if revision else None
4949
path = f"/projects/{project_name}/repos/{repo_name}/"
@@ -78,11 +78,11 @@ def get_file(
7878
return Content.from_dict(resp.json())
7979

8080
def push(
81-
self,
82-
project_name: str,
83-
repo_name: str,
84-
commit: Commit,
85-
changes: List[Change],
81+
self,
82+
project_name: str,
83+
repo_name: str,
84+
commit: Commit,
85+
changes: List[Change],
8686
) -> PushResult:
8787
params = {
8888
"commitMessage": asdict(commit),
@@ -95,13 +95,19 @@ def push(
9595
json: object = resp.json()
9696
return PushResult.from_dict(json)
9797

98-
def watch_repository(self, project_name: str, repo_name: str, last_known_revision: Revision, path_pattern: str,
99-
timeout_millis: int) -> Optional[Revision]:
98+
def watch_repository(
99+
self,
100+
project_name: str,
101+
repo_name: str,
102+
last_known_revision: Revision,
103+
path_pattern: str,
104+
timeout_millis: int,
105+
) -> Optional[Revision]:
100106
path = f"/projects/{project_name}/repos/{repo_name}/contents"
101107
if path_pattern[0] != "/":
102108
path += "/**/"
103109

104-
if path_pattern in ' ':
110+
if path_pattern in " ":
105111
path_pattern = path_pattern.replace(" ", "%20")
106112
path += path_pattern
107113

@@ -115,8 +121,14 @@ def watch_repository(self, project_name: str, repo_name: str, last_known_revisio
115121
# TODO(ikhoon): Handle excepitons after https://github.com/line/centraldogma-python/pull/11/ is merged.
116122
pass
117123

118-
def watch_file(self, project_name: str, repo_name: str, last_known_revision: Revision, query: Query[T],
119-
timeout_millis) -> Optional[Entry[T]]:
124+
def watch_file(
125+
self,
126+
project_name: str,
127+
repo_name: str,
128+
last_known_revision: Revision,
129+
query: Query[T],
130+
timeout_millis,
131+
) -> Optional[Entry[T]]:
120132
path = f"/projects/{project_name}/repos/{repo_name}/contents/{query.path}"
121133
if query.query_type == QueryType.JSON_PATH:
122134
queries = [f"jsonpath={quote(expr)}" for expr in query.expressions]
@@ -143,7 +155,8 @@ def _to_entry(revision: Revision, json: Any, query_type: QueryType) -> Entry:
143155
elif query_type == QueryType.IDENTITY or query_type == QueryType.JSON_PATH:
144156
if received_entry_type != EntryType.JSON:
145157
raise CentralDogmaException(
146-
f"invalid entry type. entry type: {received_entry_type} (expected: {query_type})")
158+
f"invalid entry type. entry type: {received_entry_type} (expected: {query_type})"
159+
)
147160

148161
return Entry.json(revision, entry_path, content)
149162
else: # query_type == QueryType.IDENTITY
@@ -155,16 +168,16 @@ def _to_entry(revision: Revision, json: Any, query_type: QueryType) -> Entry:
155168
return Entry.directory(revision, entry_path)
156169

157170
def _watch(
158-
self,
159-
last_known_revision: Revision,
160-
timeout_millis: int,
161-
path: str) -> Response:
171+
self, last_known_revision: Revision, timeout_millis: int, path: str
172+
) -> Response:
162173
normalized_timeout = (timeout_millis + 999) // 1000
163174
headers = {
164175
"if-none-match": f"{last_known_revision.major}",
165-
"prefer": f"wait={normalized_timeout}"
176+
"prefer": f"wait={normalized_timeout}",
166177
}
167-
return self.client.request("get", path, headers=headers, timeout=normalized_timeout)
178+
return self.client.request(
179+
"get", path, headers=headers, timeout=normalized_timeout
180+
)
168181

169182
def _change_dict(self, data):
170183
return {

centraldogma/data/entry.py

+8-5
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,10 @@ class EntryType(Enum):
2727
DIRECTORY = auto()
2828

2929

30-
T = TypeVar('T')
30+
T = TypeVar("T")
3131

3232

3333
class Entry(Generic[T]):
34-
3534
@staticmethod
3635
def text(revision: Revision, path: str, content: str) -> Entry[str]:
3736
return Entry(revision, path, EntryType.TEXT, content)
@@ -46,7 +45,9 @@ def json(revision: Revision, path: str, content: Any) -> Entry[Any]:
4645
def directory(revision: Revision, path: str) -> Entry[None]:
4746
return Entry(revision, path, EntryType.DIRECTORY, None)
4847

49-
def __init__(self, revision: Revision, path: str, entry_type: EntryType, content: T):
48+
def __init__(
49+
self, revision: Revision, path: str, entry_type: EntryType, content: T
50+
):
5051
self.revision = revision
5152
self.path = path
5253
self.entry_type = entry_type
@@ -59,7 +60,9 @@ def has_content(self) -> bool:
5960
@property
6061
def content(self) -> T:
6162
if self._content is None:
62-
raise EntryNoContentException(f"{self.path} (type: {self.entry_type}, revision: {self.revision.major})")
63+
raise EntryNoContentException(
64+
f"{self.path} (type: {self.entry_type}, revision: {self.revision.major})"
65+
)
6366

6467
return self._content
6568

@@ -70,7 +73,7 @@ def content_as_text(self) -> str:
7073
if self.entry_type == EntryType.TEXT:
7174
self._content_as_text = self.content
7275
elif self.entry_type == EntryType.DIRECTORY:
73-
self._content_as_text = ''
76+
self._content_as_text = ""
7477
else:
7578
self._content_as_text = json.dumps(self.content)
7679

centraldogma/data/revision.py

+16-1
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,24 @@
1111
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
1212
# License for the specific language governing permissions and limitations
1313
# under the License.
14+
from __future__ import annotations
15+
1416
from dataclasses import dataclass
1517

1618

1719
@dataclass
1820
class Revision:
19-
major: int
21+
def __init__(self, major):
22+
self.major = major
23+
24+
@staticmethod
25+
def init() -> Revision:
26+
return _INIT
27+
28+
@staticmethod
29+
def head() -> Revision:
30+
return _HEAD
31+
32+
33+
_INIT = Revision(1)
34+
_HEAD = Revision(-1)

centraldogma/dogma.py

+55-18
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# License for the specific language governing permissions and limitations
1313
# under the License.
1414
import os
15-
from typing import List, Optional, TypeVar, Generic
15+
from typing import List, Optional, TypeVar, Generic, Callable
1616

1717
from centraldogma.base_client import BaseClient
1818
from centraldogma.content_service import ContentService
@@ -30,11 +30,15 @@
3030
from centraldogma.project_service import ProjectService
3131
from centraldogma.query import Query
3232
from centraldogma.repository_service import RepositoryService
33+
from centraldogma.watcher import Watcher
3334

34-
T = TypeVar('T')
35+
T = TypeVar("T")
36+
U = TypeVar("U")
3537

38+
_DEFAULT_WATCH_TIMEOUT_MILLIS = 1 * 60 * 1000 # 1 minute
3639

37-
class Dogma(Generic[T]):
40+
41+
class Dogma:
3842
DEFAULT_BASE_URL = "http://localhost:36462"
3943
DEFAULT_TOKEN = "anonymous"
4044

@@ -187,27 +191,60 @@ def push(
187191
return self.content_service.push(project_name, repo_name, commit, changes)
188192

189193
def watch_repository(
190-
self,
191-
project_name: str,
192-
repo_name: str,
193-
last_known_revision: Revision,
194-
path_pattern: str,
195-
timeout_millis: int) -> Optional[Revision]:
194+
self,
195+
project_name: str,
196+
repo_name: str,
197+
last_known_revision: Revision,
198+
path_pattern: str,
199+
timeout_millis: int = _DEFAULT_WATCH_TIMEOUT_MILLIS,
200+
) -> Optional[Revision]:
196201
"""
197202
TODO(ikhoon): TBU
198203
"""
199-
return self.content_service.watch_repository(project_name, repo_name, last_known_revision,
200-
path_pattern, timeout_millis)
204+
return self.content_service.watch_repository(
205+
project_name, repo_name, last_known_revision, path_pattern, timeout_millis
206+
)
201207

202208
def watch_file(
203-
self,
204-
project_name: str,
205-
repo_name: str,
206-
last_known_revision: Revision,
207-
query: Query[T],
208-
timeout_millis: int) -> Optional[Entry[T]]:
209+
self,
210+
project_name: str,
211+
repo_name: str,
212+
last_known_revision: Revision,
213+
query: Query[T],
214+
timeout_millis: int = _DEFAULT_WATCH_TIMEOUT_MILLIS,
215+
) -> Optional[Entry[T]]:
209216
"""
210217
TODO(ikhoon): TBU
211218
:rtype: object
212219
"""
213-
return self.content_service.watch_file(project_name, repo_name, last_known_revision, query, timeout_millis)
220+
return self.content_service.watch_file(
221+
project_name, repo_name, last_known_revision, query, timeout_millis
222+
)
223+
224+
def repository_watcher(
225+
self,
226+
project_name: str,
227+
repo_name: str,
228+
path_pattern: str,
229+
function: Callable[[Revision], T] = lambda x: x,
230+
) -> Watcher[T]:
231+
from centraldogma.repository_watcher import RepositoryWatcher
232+
233+
watcher = RepositoryWatcher(
234+
self, project_name, repo_name, path_pattern, function
235+
)
236+
watcher.start()
237+
return watcher
238+
239+
def file_watcher(
240+
self,
241+
project_name: str,
242+
repo_name: str,
243+
query: Query[T],
244+
function: Callable[[T], U] = lambda x: x,
245+
) -> Watcher[U]:
246+
from centraldogma.repository_watcher import FileWatcher
247+
248+
watcher = FileWatcher(self, project_name, repo_name, query, function)
249+
watcher.start()
250+
return watcher

centraldogma/exceptions.py

+1
Original file line numberDiff line numberDiff line change
@@ -49,4 +49,5 @@ class UnknownException(CentralDogmaException):
4949

5050
class EntryNoContentException(CentralDogmaException):
5151
"""A CentralDogmaException that is raised when attempted to retrieve the content from a directory entry."""
52+
5253
pass

centraldogma/query.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,10 @@ class QueryType(Enum):
2424
JSON_PATH = auto()
2525

2626

27-
T = TypeVar('T')
27+
T = TypeVar("T")
2828

2929

3030
class Query(Generic[T]):
31-
3231
@staticmethod
3332
def identity(path: str) -> Query[str]:
3433
return Query(path=path, query_type=QueryType.IDENTITY, expressions=[])

0 commit comments

Comments
 (0)