Skip to content

Sourcery refactored master branch #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion strip-harbor/sync/MailerSendNewApiClientAPI.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def __init__(
self.mailersend_api_key = env("MAILERSEND_API_KEY")
if not self.mailersend_api_key:
self.mailersend_api_key=mailersend_api_key
self.headers_auth = "Bearer {}".format(self.mailersend_api_key)
self.headers_auth = f"Bearer {self.mailersend_api_key}"
self.headers_default = {
"Content-Type": "application/json",
"X-Requested-With": "XMLHttpRequest",
Expand Down
38 changes: 26 additions & 12 deletions strip-harbor/sync/harbor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
harbor_endpoint = 'https://' + env('HARBOR_HOST') + '/api/v2.0'
harbor_projects_path = '/projects/'+env('HARBOR_PROJECT_ID')
harbor_project_path = '/project/'+env('HARBOR_PROJECT_ID')
harbor_robots_path = harbor_projects_path + '/robots'
harbor_robots_path = f'{harbor_projects_path}/robots'

def harbor_get(path):
return requests.get(
Expand All @@ -20,7 +20,7 @@ def harbor_get(path):
headers={'Accept':'application/json'})

def harbor_post(path, data):
log.warning("UserName --> %s" % env('HARBOR_USERNAME'))
log.warning(f"UserName --> {env('HARBOR_USERNAME')}")
return requests.post(
harbor_endpoint + path,
data=data,
Expand All @@ -34,16 +34,28 @@ def get_robot_accounts_for_project():

def create_robot_account_for_project(account_name,email,customer_name):
account = harbor_post(
harbor_robots_path,
json.dumps({
'name':account_name,
'expires_at': int((datetime.datetime.now() + datetime.timedelta(days=30)).timestamp()),
harbor_robots_path,
json.dumps(
{
'name': account_name,
'expires_at': int(
(
datetime.datetime.now() + datetime.timedelta(days=30)
).timestamp()
),
'access': [
{'resource':harbor_project_path+'/repository','action':'pull'},
{'resource':harbor_project_path+'/helm-chart-version','action':'read'}
],
})
)
{
'resource': f'{harbor_project_path}/repository',
'action': 'pull',
},
{
'resource': f'{harbor_project_path}/helm-chart-version',
'action': 'read',
},
],
}
),
)

account=account.json()
print(account)
Expand All @@ -61,7 +73,9 @@ def customer_email_to_harbor_username(email):

def create_harbor_user_from_customer(customer_email,strip_id,customer_name):
if not customer_email:
raise ValueError("Couldn't create a harbor user for customer %s - the record doesn't have the email set" % (strip_id))
raise ValueError(
f"Couldn't create a harbor user for customer {strip_id} - the record doesn't have the email set"
)
return create_robot_account_for_project(customer_email_to_harbor_username(customer_email),customer_email,customer_name)

def provision_harbor_permissions_for_customer(customer):
Expand Down
2 changes: 1 addition & 1 deletion strip-harbor/sync/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,6 @@ def webhook_handler(request):
handle_deleted_subscription(deleted_subscription)

else:
print('Unhandled event type {}'.format(event.type))
print(f'Unhandled event type {event.type}')

return HttpResponse(status=200)
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,7 @@ def __init__(self, path):
# library paths so PyPy is correctly supported.
purelib = get_python_lib(plat_specific=False, prefix=path)
platlib = get_python_lib(plat_specific=True, prefix=path)
if purelib == platlib:
self.lib_dirs = [purelib]
else:
self.lib_dirs = [purelib, platlib]
self.lib_dirs = [purelib] if purelib == platlib else [purelib, platlib]


class BuildEnvironment:
Expand Down Expand Up @@ -115,8 +112,7 @@ def __enter__(self):
}

path = self._bin_dirs[:]
old_path = self._save_env['PATH']
if old_path:
if old_path := self._save_env['PATH']:
path.extend(old_path.split(os.pathsep))

pythonpath = [self._site_dir]
Expand Down Expand Up @@ -184,8 +180,7 @@ def install_requirements(
args.extend(('--' + format_control.replace('_', '-'),
','.join(sorted(formats or {':none:'}))))

index_urls = finder.index_urls
if index_urls:
if index_urls := finder.index_urls:
args.extend(['-i', index_urls[0]])
for extra_index in index_urls[1:]:
args.extend(['--extra-index-url', extra_index])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,7 @@ def _get_cache_path_parts(self, link):
# difference for our use case here.
hashed = _hash_dict(key_parts)

# We want to nest the directories some to prevent having a ton of top
# level directories where we might run out of sub directories on some
# FS.
parts = [hashed[:2], hashed[2:4], hashed[4:6], hashed[6:]]

return parts
return [hashed[:2], hashed[2:4], hashed[4:6], hashed[6:]]

def _get_candidates(self, link, canonical_package_name):
# type: (Link, str) -> List[Any]
Expand All @@ -109,8 +104,7 @@ def _get_candidates(self, link, canonical_package_name):
candidates = []
path = self.get_path_for_link(link)
if os.path.isdir(path):
for candidate in os.listdir(path):
candidates.append((candidate, path))
candidates.extend((candidate, path) for candidate in os.listdir(path))
return candidates

def get_path_for_link(self, link):
Expand Down Expand Up @@ -259,9 +253,7 @@ def get(
):
# type: (...) -> Link
cache_entry = self.get_cache_entry(link, package_name, supported_tags)
if cache_entry is None:
return link
return cache_entry.link
return link if cache_entry is None else cache_entry.link

def get_cache_entry(
self,
Expand All @@ -287,7 +279,4 @@ def get_cache_entry(
package_name=package_name,
supported_tags=supported_tags,
)
if retval is not link:
return CacheEntry(retval, persistent=False)

return None
return CacheEntry(retval, persistent=False) if retval is not link else None
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,7 @@ def autocomplete():
subcommands = list(commands_dict)
options = []

# subcommand
subcommand_name = None # type: Optional[str]
for word in cwords:
if word in subcommands:
subcommand_name = word
break
subcommand_name = next((word for word in cwords if word in subcommands), None)
# subcommand options
if subcommand_name is not None:
# special case: 'help' subcommand has no options
Expand All @@ -50,13 +45,12 @@ def autocomplete():
not current.startswith('-')
)
if should_list_installed:
installed = []
lc = current.lower()
for dist in get_installed_distributions(local_only=True):
if dist.key.startswith(lc) and dist.key not in cwords[1:]:
installed.append(dist.key)
# if there are no dists installed, fall back to option completion
if installed:
if installed := [
dist.key
for dist in get_installed_distributions(local_only=True)
if dist.key.startswith(lc) and dist.key not in cwords[1:]
]:
for dist in installed:
print(dist)
sys.exit(1)
Expand All @@ -65,27 +59,25 @@ def autocomplete():

for opt in subcommand.parser.option_list_all:
if opt.help != optparse.SUPPRESS_HELP:
for opt_str in opt._long_opts + opt._short_opts:
options.append((opt_str, opt.nargs))

options.extend(
(opt_str, opt.nargs)
for opt_str in opt._long_opts + opt._short_opts
)
# filter out previously specified options from available options
prev_opts = [x.split('=')[0] for x in cwords[1:cword - 1]]
options = [(x, v) for (x, v) in options if x not in prev_opts]
# filter options by current input
options = [(k, v) for k, v in options if k.startswith(current)]
# get completion type given cwords and available subcommand options
completion_type = get_path_completion_type(
cwords, cword, subcommand.parser.option_list_all,
)
# get completion files and directories if ``completion_type`` is
# ``<file>``, ``<dir>`` or ``<path>``
if completion_type:
if completion_type := get_path_completion_type(
cwords,
cword,
subcommand.parser.option_list_all,
):
paths = auto_complete_paths(current, completion_type)
options = [(path, 0) for path in paths]
for option in options:
opt_label = option[0]
# append '=' to options which require args
if option[1] and option[0][:2] == "--":
if option[1] and opt_label[:2] == "--":
opt_label += '='
print(opt_label)
else:
Expand All @@ -98,13 +90,11 @@ def autocomplete():
for opt in flattened_opts:
if opt.help != optparse.SUPPRESS_HELP:
subcommands += opt._long_opts + opt._short_opts
else:
# get completion type given cwords and all available options
completion_type = get_path_completion_type(cwords, cword,
flattened_opts)
if completion_type:
subcommands = list(auto_complete_paths(current,
completion_type))
elif completion_type := get_path_completion_type(
cwords, cword, flattened_opts
):
subcommands = list(auto_complete_paths(current,
completion_type))

print(' '.join([x for x in subcommands if x.startswith(current)]))
sys.exit(1)
Expand All @@ -125,11 +115,14 @@ def get_path_completion_type(cwords, cword, opts):
if opt.help == optparse.SUPPRESS_HELP:
continue
for o in str(opt).split('/'):
if cwords[cword - 2].split('=')[0] == o:
if not opt.metavar or any(
x in ('path', 'file', 'dir')
for x in opt.metavar.split('/')):
return opt.metavar
if cwords[cword - 2].split('=')[0] == o and (
not opt.metavar
or any(
x in ('path', 'file', 'dir')
for x in opt.metavar.split('/')
)
):
return opt.metavar
return None


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,15 @@ def _main(self, args):
if options.exists_action:
os.environ['PIP_EXISTS_ACTION'] = ' '.join(options.exists_action)

if options.require_venv and not self.ignore_require_venv:
# If a venv is required check if it can really be found
if not running_under_virtualenv():
logger.critical(
'Could not find an activated virtualenv (required).'
)
sys.exit(VIRTUALENV_NOT_FOUND)
if (
options.require_venv
and not self.ignore_require_venv
and not running_under_virtualenv()
):
logger.critical(
'Could not find an activated virtualenv (required).'
)
sys.exit(VIRTUALENV_NOT_FOUND)

if options.cache_dir:
options.cache_dir = normalize_path(options.cache_dir)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,11 @@ def check_dist_restriction(options, check_target=False):
":none:)."
)

if check_target:
if dist_restriction_set and not options.target_dir:
raise CommandError(
"Can not use any platform or abi specific options unless "
"installing via '--target'"
)
if check_target and dist_restriction_set and not options.target_dir:
raise CommandError(
"Can not use any platform or abi specific options unless "
"installing via '--target'"
)


def _path_option_check(option, opt, value):
Expand Down Expand Up @@ -614,16 +613,13 @@ def add_target_python_options(cmd_opts):


def make_target_python(options):
# type: (Values) -> TargetPython
target_python = TargetPython(
return TargetPython(
platforms=options.platforms,
py_version_info=options.python_version,
abis=options.abis,
implementation=options.implementation,
)

return target_python


def prefer_binary():
# type: () -> Option
Expand Down Expand Up @@ -826,12 +822,13 @@ def _handle_merge_hash(option, opt_str, value, parser):
try:
algo, digest = value.split(':', 1)
except ValueError:
parser.error('Arguments to {} must be a hash name ' # noqa
'followed by a value, like --hash=sha256:'
'abcde...'.format(opt_str))
parser.error(
f'Arguments to {opt_str} must be a hash name followed by a value, like --hash=sha256:abcde...'
)
if algo not in STRONG_HASHES:
parser.error('Allowed hash algorithms for {} are {}.'.format( # noqa
opt_str, ', '.join(STRONG_HASHES)))
parser.error(
f"Allowed hash algorithms for {opt_str} are {', '.join(STRONG_HASHES)}."
)
parser.values.hashes.setdefault(algo, []).append(digest)


Expand Down
Loading