-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtagged_document.py
292 lines (202 loc) · 9.36 KB
/
tagged_document.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
#!/usr/bin/env python
import git
import re
import copy
import logging
from six import StringIO
import textwrap
import os
from source_document import WORKSPACE_REF
class TaggedDocument(object):
"""A document containing tagged regions."""
@staticmethod
def find(repo, extensions):
assert isinstance(repo, git.Repo)
assert isinstance(extensions, list)
documents = []
starting_dir = repo.working_dir
for (path, dirs, files) in os.walk(starting_dir):
for filename in files:
for extension in extensions:
if filename.endswith("."+extension):
file_path = path+os.path.sep+filename
if ".git" in path:
continue
if "old" in path:
continue
path_relative_to_repo = os.path.relpath(file_path, starting_dir)
# if this path happens to be in the repo, then we add it to the
# documents we're using
logging.debug("Adding %s", path_relative_to_repo)
documents.append(TaggedDocument(repo, path_relative_to_repo))
if len(documents) == 0:
logging.warn("No tagged documents were found.")
return documents
def __init__(self, repo, path):
assert isinstance(repo, git.Repo)
assert isinstance(path, str)
self.path = path.replace(os.sep, "/")
self.versions = {} # maps git refs to TaggedDocumentVersion objects
self.repo = repo
# Add the current-on-disk version
path_on_disk = os.path.join(repo.working_dir, path)
with open(path_on_disk) as file_on_disk:
data_on_disk = file_on_disk.read()
self.versions[WORKSPACE_REF] = TaggedDocumentVersion(self.path, data_on_disk, WORKSPACE_REF)
def __getitem__(self, revision):
"""Gets the version of this document at a specified revision (ie commit number, tag or other ref)"""
assert isinstance(revision, str)
try:
version = self.versions[revision]
except KeyError:
# attempt to get the file at this path, at this version
try:
# get the data of the file at this ref; may raise KeyError
data = self.repo.tree(revision)[self.path].data_stream.read()
# create the version from this data
version = TaggedDocumentVersion(self.path, data, revision)
# cache it
self.versions[revision] = version
except KeyError:
# there's no commit of this type in the repo at this name
return None
assert isinstance(version, TaggedDocumentVersion)
return self.versions[revision]
class TaggedDocumentVersion(object):
"""A specific version of a tagged document."""
def __init__(self, path, data, version):
self.path = path
self.data = data.replace(b"\r", b"")
self.version = version
self.lines = []
self.parse_lines(self.data)
logging.debug("Loaded %s (%i lines)", self.path, len(self.lines))
@property
def tags(self):
tags = set()
for line in self.lines:
assert isinstance(line, TaggedLine)
tags = tags.union(set(line.tags))
# return the set of all tags in this document
return tags
def query(self, query_string):
"""Given a query string, returns the lines of text that match the specified query."""
assert isinstance(query_string, str)
query = TagQuery(query_string)
has_content = False
snippet_contents = []
for line in self.lines:
assert isinstance(line, TaggedLine)
# If its LAST tag is the same as any of the isolating tags, include it
if set(line.tags[-1:]).intersection(query.isolate):
snippet_contents.append (line.text)
has_content = True
# Otherwise, if it has tags that we want, and none of the tags we don't, include it
elif set(line.tags).intersection(query.include) and not set(line.tags).intersection(query.exclude):
snippet_contents.append(line.text)
has_content = True
else:
# This line doesn't match the tags we're looking for. Move on to the next.
pass
if not has_content:
return None
rendered_snippet = "\n".join(snippet_contents)
rendered_snippet = textwrap.dedent(rendered_snippet)
return rendered_snippet
def parse_lines(self, data):
assert isinstance(data, str)
# begin_re = re.compile(r"\s*(\/\/|#)\s*BEGIN\s+([^\s]+)")
# end_re = re.compile(r"\s*(\/\/|#)\s*END\s+([^\s]+)")
begin_re = re.compile(r"\s*(\/\/|\#)\s*BEGIN\s+([^\s]+)", flags=re.IGNORECASE)
end_re = re.compile(r"\s*(\/\/|\#)\s*END\s+([^\s]+)", flags=re.IGNORECASE)
current_tags = []
for (line_number, line_text) in enumerate(data.split("\n")):
# If this line contains "//-", "/*-" or "-*/", it's a comment
# that should not be included in rendered snippets.
if "/*-" in line_text or "-*/" in line_text or "//-" in line_text:
pass
# If we entered a tag, add it to the list
elif begin_re.search(line_text):
tag = begin_re.search(line_text).group(2)
if tag in current_tags:
logging.warn("{0}:{1}: \"{2}\" was entered twice without exiting it".format(self.path, line_number, tag))
else:
current_tags.append(tag)
# If we left a tag, remove it
elif end_re.search(line_text):
tag = end_re.search(line_text).group(2)
if tag not in current_tags:
logging.warn("{0}:{1}: \"{2}\" was exited, but had not yet been entered".format(self.path, line_number, tag))
else:
current_tags.remove(tag)
# If it's neither, and we're inside any tagged region,
# add it to the list of tagged lines
elif current_tags:
self.lines.append(TaggedLine(self.path, line_number, line_text, copy.copy(current_tags)))
def lines_over_limit(self, limit):
# Returns the collection of lines in this document that go over the
# specified limit. The characters are counted in Unicode, not
# individual codes.
return [
line for line in self.lines
if len(line.text.decode("utf-8")) > limit
]
class TaggedLine(object):
"""A line in a document, with its associated tags."""
def __init__(self, source_name, line_number, text, tags):
assert isinstance(source_name, str)
assert isinstance(line_number, int)
assert isinstance(text, str)
assert isinstance(tags, list)
assert tags, "Expected a non-empty list of tags when creating a TaggedLine"
self.source_name = source_name
self.line_number = line_number
self.text = text
self.tags = tags
INCLUDE_TAGS = 0
EXCLUDE_TAGS = 1
HIGHLIGHT_TAGS = 2
ISOLATE_TAGS = 3
class TagQuery(object):
"""Represents a query for a specific set of tags."""
def __init__(self, query_string, ref="HEAD"):
assert isinstance(query_string, str)
tokens = query_string.split(" ")
mode = INCLUDE_TAGS
# The context at which we
self.query_string = query_string
self.ref = ref
# The specific tags this query deals with
self.include = []
self.exclude = []
self.highlight = []
self.isolate = []
# Interpret the list of tokens
for token in tokens:
# Change mode if we have to
if token.lower() == "except":
mode = EXCLUDE_TAGS
elif token.lower() == "highlighting":
mode = HIGHLIGHT_TAGS
elif token.lower() == "isolating":
mode = ISOLATE_TAGS
# Otherwise, add it to the list of tokens
else:
if mode == INCLUDE_TAGS:
self.include.append(token)
elif mode == EXCLUDE_TAGS:
self.exclude.append(token)
elif mode == HIGHLIGHT_TAGS:
self.highlight.append(token)
elif mode == ISOLATE_TAGS:
self.isolate.append(token)
logging.debug("Query includes tags %s", self.include)
@property
def as_filename(self):
if self.ref == "HEAD":
return "{}.txt".format(self.query_string.replace(" ", "_"))
else:
return "{}_{}.txt".format(self.ref, self.query_string.replace(" ", "_"))
@property
def all_referenced_tags(self):
return set(self.include) | set(self.exclude) | set(self.highlight) | set(self.isolate)