11"""Utilities for manipulating git."""
22
3+ import copy
4+ import dataclasses
35import filecmp
46import fnmatch
57import io
1012import shutil
1113import subprocess
1214import sys
13- from typing import List , Sequence , Union
15+ from typing import Dict , List , Optional , Sequence , Union
1416
1517import git
1618
@@ -107,7 +109,24 @@ def get_gitattributes_file(repo):
107109 return os .path .join (repo .working_dir , ".gitattributes" )
108110
109111
110- def read_gitattributes (gitattributes_file ):
112+ @dataclasses .dataclass
113+ class GitAttributes :
114+ pattern : str
115+ attributes : Dict [str , str ]
116+ raw : Optional [str ] = None
117+
118+ def __str__ (self ):
119+ if self .raw :
120+ return self .raw
121+ attrs = " " .join (f"{ k } ={ v } " if v else k for k , v in self .attributes .items ())
122+ return f"{ self .pattern } { attrs } "
123+
124+ def __eq__ (self , o ):
125+ raw_eq = self .raw == o .raw if self .raw and o .raw else True
126+ return self .pattern == o .pattern and self .attributes == o .attributes and raw_eq
127+
128+
129+ def read_gitattributes (gitattributes_file ) -> List [GitAttributes ]:
111130 """
112131 Read contents of this repo's .gitattributes file
113132
@@ -123,14 +142,30 @@ def read_gitattributes(gitattributes_file):
123142 """
124143 if os .path .exists (gitattributes_file ):
125144 with open (gitattributes_file , "r" ) as f :
126- return [line .rstrip ("\n " ) for line in f ]
145+ return [parse_gitattributes ( line .rstrip ("\n " ) ) for line in f ]
127146 else :
128147 return []
129148
130149
150+ def parse_gitattributes (gitattributes : str ) -> GitAttributes :
151+ # TODO: Fix for escaped patterns
152+ pattern , * attributes = gitattributes .split (" " )
153+ attrs = {}
154+ # Overwrite as we go to get the LAST attribute behavior
155+ for attribute in attributes :
156+ # TODO: Update to handle unsetting attribute, etc.
157+ if "=" in attribute :
158+ key , value = attribute .split ("=" )
159+ else :
160+ key = attribute
161+ value = None
162+ attrs [key ] = value
163+ return GitAttributes (pattern , attrs , gitattributes )
164+
165+
131166@file_or_name (gitattributes_file = "w" )
132167def write_gitattributes (
133- gitattributes_file : Union [str , io .FileIO ], attributes : List [str ]
168+ gitattributes_file : Union [str , io .FileIO ], attributes : List [GitAttributes ]
134169):
135170 """
136171 Write list of attributes to this repo's .gitattributes file
@@ -143,60 +178,136 @@ def write_gitattributes(
143178 attributes:
144179 Attributes to write to .gitattributes
145180 """
146- gitattributes_file .write ("\n " .join (attributes ))
181+ gitattributes_file .write ("\n " .join (map ( str , attributes ) ))
147182 # End file with newline.
148183 gitattributes_file .write ("\n " )
149184
150185
151- def add_theta_to_gitattributes (gitattributes : List [str ], path : str ) -> str :
152- """Add a filter=theta that covers file_name.
153-
154- Parameters
155- ----------
156- gitattributes: A list of the lines from the gitattribute files.
157- path: The path to the model we are adding a filter to.
158-
159- Returns
160- -------
161- List[str]
162- The lines to write to the new gitattribute file with a (possibly) new
163- filter=theta added that covers the given file.
164- """
165- pattern_found = False
166- new_gitattributes = []
167- for line in gitattributes :
168- # TODO(bdlester): Revisit this regex to see if it when the pattern
169- # is escaped due to having spaces in it.
170- match = re .match (r"^\s*(?P<pattern>[^\s]+)\s+(?P<attributes>.*)$" , line )
171- if match :
172- # If there is already a pattern that covers the file, add the filter
173- # to that.
174- if fnmatch .fnmatchcase (path , match .group ("pattern" )):
175- pattern_found = True
176- if not "filter=theta" in match .group ("attributes" ):
177- line = f"{ line .rstrip ()} filter=theta"
178- if not "merge=theta" in match .group ("attributes" ):
179- line = f"{ line .rstrip ()} merge=theta"
180- if not "diff=theta" in match .group ("attributes" ):
181- line = f"{ line .rstrip ()} diff=theta"
182- new_gitattributes .append (line )
183- # If we don't find a matching pattern, add a new line that covers just this
184- # specific file.
185- if not pattern_found :
186- new_gitattributes .append (f"{ path } filter=theta merge=theta diff=theta" )
187- return new_gitattributes
186+ def add_theta_to_gitattributes (
187+ gitattributes : List [GitAttributes ], path : str
188+ ) -> List [GitAttributes ]:
189+ """Add git attributes required by git-theta for path."""
190+ theta_attributes = ("filter" , "merge" , "diff" )
191+ previous_attribute = None
192+ # Find if an active gitattribute entry applies to path
193+ for gitattribute in gitattributes [::- 1 ]:
194+ if fnmatch .fnmatchcase (path , gitattribute .pattern ):
195+ previous_attribute = gitattribute
196+ break
197+ # If path is already managed by a git attributes entry.
198+ if previous_attribute :
199+ # If all of the theta attributes are set, we don't do anything.
200+ if all (
201+ previous_attribute .attributes .get (attr ) == "theta"
202+ for attr in theta_attributes
203+ ):
204+ return gitattributes
205+ # If any of the attributes theta uses is set to something else, error out.
206+ if any (
207+ attr in previous_attribute .attributes
208+ and previous_attribute .attributes [attr ] != "theta"
209+ for attr in theta_attributes
210+ ):
211+ raise ValueError (
212+ f"Git Attributes used by git-theta are already set for { path } . "
213+ f"Found filter={ previous_attribute .attributes .get ('filter' )} , "
214+ f"diff={ previous_attribute .attributes .get ('diff' )} , "
215+ f"merge={ previous_attribute .attributes .get ('merge' )} ."
216+ )
217+ # If the old entry set other attributes, make sure they are preserved.
218+ attributes = (
219+ copy .deepcopy (previous_attribute .attributes ) if previous_attribute else {}
220+ )
221+ for attr in theta_attributes :
222+ attributes [attr ] = "theta"
223+ new_attribute = GitAttributes (path , attributes )
224+ gitattributes .append (new_attribute )
225+ return gitattributes
226+
227+
228+ # def add_theta_to_gitattributes(gitattributes: List[str], path: str) -> str:
229+ # """Add a filter=theta that covers file_name.
230+
231+ # If there is a pattern that covers the current file that applies the git-theta
232+ # attribute, no new pattern is added. If there is a pattern that covers the
233+ # current file and sets attributes used for git-theta an error is raised. If
234+ # there is a pattern that sets non-overlapping attributes they are copied into
235+ # a new path-specific pattern. If there is no match, a new path-specific
236+ # pattern is always created.
237+
238+ # Parameters
239+ # ----------
240+ # gitattributes: A list of the lines from the gitattribute files.
241+ # path: The path to the model we are adding a filter to.
242+
243+ # Returns
244+ # -------
245+ # List[str]
246+ # The lines to write to the new gitattribute file with a (possibly) new
247+ # filter=theta added that covers the given file.
248+ # """
249+ # new_gitattributes = []
250+ # theta_attributes = "filter=theta merge=theta diff=theta"
251+ # previous_attributes = None
252+ # previous_match = None
253+ # # Find the *last* attribute line that matches the path.
254+ # for i, line in enumerate(gitattributes):
255+ # # TODO(bdlester): Revisit this regex to see if it when the pattern
256+ # # is escaped due to having spaces in it.
257+ # if (m := re.match(r"^\s*(?P<pattern>[^\s]+)\s+(?P<attributes>.*)$", line)):
258+ # # Record if there is already a pattern that covers the file.
259+ # # Note: The *last* gitattribute line is the one that is used when
260+ # # multiple lines match a file.
261+ # # TODO(brianlester): Some patterns like [:space:] aren't handled by
262+ # # this function. Find a replacement that does?
263+ # if fnmatch.fnmatchcase(path, m.group("pattern")):
264+ # previous_attributes = m.group("attributes")
265+ # previous_match = i
266+ # new_gitattributes.append(line)
267+ # if previous_attributes:
268+ # # If the previous match set overlapping attributes, error out.
269+ # for attr in ("filter", "diff", "merge"):
270+ # if attr not in previous_attributes:
271+ # continue
272+ # if attr in previous_attributes and not is_attribute_active(previous_attributes, attr, "theta"):
273+ # raise ValueError()
274+ # # Add the new attributes
275+ # else:
276+ # new_attributes[previous_match] = f"{path} {previous_attributes.strip()} {theta_attributes}"
277+ # else:
278+ # new_attributes.append(f"{path} {theta_attributes}")
279+ # return new_gitattributes
188280
189281
190282def get_gitattributes_tracked_patterns (gitattributes_file ):
191283 gitattributes = read_gitattributes (gitattributes_file )
192284 theta_attributes = [
193- attribute for attribute in gitattributes if "filter=theta" in attribute
285+ attr
286+ for attr in gitattributes
287+ if attr .attributes .get (a ) == "theta"
288+ for a in ("filter" , "diff" , "merge" )
194289 ]
290+ return [attr .pattern for attr in theta_attributes ]
195291 # TODO: Correctly handle patterns with escaped spaces in them
196292 patterns = [attribute .split (" " )[0 ] for attribute in theta_attributes ]
197293 return patterns
198294
199295
296+ def is_theta_tracked (path : str , gitattributes : List [GitAttributes ]) -> bool :
297+ """Check if `path` is tracked by git-theta based on `.gitattributes`.
298+
299+ Note: The last line that matches in .gitattributes is the active one so
300+ start from the end. If the first match (really last) does not have the
301+ theta filter active then the file is not tracked by Git-Theta.
302+ """
303+ for attr in gitattributes [::- 1 ]:
304+ if fnmatch .fnmatchcase (path , attr .pattern ):
305+ return all (
306+ attr .attributes .get (a ) == "theta" for a in ("filter" , "diff" , "merge" )
307+ )
308+ return False
309+
310+
200311def add_file (f , repo ):
201312 """
202313 Add file to git staging area
0 commit comments