diff --git a/setup.py b/setup.py index a96b784..75de689 100644 --- a/setup.py +++ b/setup.py @@ -14,7 +14,7 @@ if __name__ == "__main__": setup( name='fosslight_util', - version='1.4.11', + version='1.4.16', package_dir={"": "src"}, packages=find_packages(where='src'), description='FOSSLight Util', diff --git a/src/fosslight_util/download.py b/src/fosslight_util/download.py index 291273d..34e87ea 100755 --- a/src/fosslight_util/download.py +++ b/src/fosslight_util/download.py @@ -8,19 +8,23 @@ import tarfile import zipfile import logging -import getopt +import math +import argparse import shutil import pygit2 as git import bz2 from datetime import datetime from pathlib import Path -from ._get_downloadable_url import get_downloadable_url +from fosslight_util._get_downloadable_url import get_downloadable_url +from fosslight_util.help import print_help_msg_download +from fosslight_util.download_ftp import download_ftp_tree import fosslight_util.constant as constant from fosslight_util.set_log import init_log import signal import time import threading import platform +import ftplib logger = logging.getLogger(constant.LOGGER_NAME) compression_extension = {".tar.bz2", ".tar.gz", ".tar.xz", ".tgz", ".tar", ".zip", ".jar", ".bz2"} @@ -48,37 +52,59 @@ def alarm_handler(signum, frame): raise TimeOutException() -def print_help_msg(): - print("* Required : -s link_to_download") - print("* Optional : -t target_directory") - print("* Optional : -d log_file_directory") - sys.exit(0) +def change_src_link_to_https(src_link): + src_link = src_link.replace("git://", "https://") + if src_link.endswith(".git"): + src_link = src_link.replace('.git', '') + return src_link + + +def parse_src_link(src_link): + src_info = {} + src_link_changed = "" + if src_link.startswith("git://") or src_link.startswith("https://") or src_link.startswith("http://"): + src_link_split = src_link.split(';') + if src_link.startswith("git://github.com/"): + src_link_changed = change_src_link_to_https(src_link_split[0]) + else: + src_link_changed = src_link_split[0] + + branch_info = [s for s in src_link_split if s.startswith('branch')] + tag_info = [s for s in src_link_split if s.startswith('tag')] + + src_info["url"] = src_link_changed + src_info["branch"] = branch_info + src_info["tag"] = tag_info + return src_info def main(): + parser = argparse.ArgumentParser(description='FOSSLight Downloader', prog='fosslight_download', add_help=False) + parser.add_argument('-h', '--help', help='Print help message', action='store_true', dest='help') + parser.add_argument('-s', '--source', help='Source link to download', type=str, dest='source') + parser.add_argument('-t', '--target_dir', help='Target directory', type=str, dest='target_dir', default="") + parser.add_argument('-d', '--log_dir', help='Directory to save log file', type=str, dest='log_dir', default="") src_link = "" target_dir = os.getcwd() log_dir = os.getcwd() try: - argv = sys.argv[1:] - opts, args = getopt.getopt(argv, 'hs:t:d:') - except getopt.GetoptError: - print_help_msg() - - for opt, arg in opts: - if opt == "-h": - print_help_msg() - elif opt == "-s": - src_link = arg - elif opt == "-t": - target_dir = arg - elif opt == "-d": - log_dir = arg + args = parser.parse_args() + except SystemExit: + sys.exit(0) + + if args.help: + print_help_msg_download() + if args.source: + src_link = args.source + if args.target_dir: + target_dir = args.target_dir + if args.log_dir: + log_dir = args.log_dir if src_link == "": - print_help_msg() + print_help_msg_download() else: cli_download_and_extract(src_link, target_dir, log_dir) @@ -98,20 +124,39 @@ def cli_download_and_extract(link, target_dir, log_dir, checkout_to="", compress msg = "Need a link to download." elif os.path.isfile(target_dir): success = False - msg = "The target directory exists as a file.:"+target_dir + msg = f"The target directory exists as a file.: {target_dir}" else: - if not download_git_clone(link, target_dir, checkout_to): - if os.path.isfile(target_dir): - shutil.rmtree(target_dir) - - success, downloaded_file = download_wget(link, target_dir, compressed_only) - if success: - success = extract_compressed_file(downloaded_file, target_dir, True) + if link.startswith("ftp://"): + logger.warning("aaaaaaaaaaaaa") + # ftp = FTP(link.replace("ftp://", "")) + try: + logger.warning('bbbbbbbbbbbb') + ftp = ftplib.FTP('ftp.debian.org') + logger.warning('cccccccccccccccccccccccc') + success = download_ftp_tree(ftp, '/debian/pool/main/n/netbase', + target_dir, pattern=None, + overwrite=False, guess_by_extension=True) + except Exception as e: + logger.error(e) + logger.warning(f"success: {success} ") + else: + src_info = parse_src_link(link) + link = src_info.get("url", "") + tag = ''.join(src_info.get("tag", "")).split('=')[-1] + branch = ''.join(src_info.get("branch", "")).split('=')[-1] + + if not download_git_clone(link, target_dir, checkout_to, tag, branch): + if os.path.isfile(target_dir): + shutil.rmtree(target_dir) + + success, downloaded_file = download_wget(link, target_dir, compressed_only) + if success: + success = extract_compressed_file(downloaded_file, target_dir, True) except Exception as error: success = False msg = str(error) - logger.info("* FOSSLight Downloader - Result :"+str(success)+"\n"+msg) + logger.info(f"\n* FOSSLight Downloader - Result: {success}\n {msg}") return success, msg @@ -131,11 +176,24 @@ def get_ref_to_checkout(checkout_to, ref_list): ref_to_checkout = next( x for x in ref_list if x.endswith(checkout_to)) except Exception as error: - logger.warning("git find ref - failed:"+str(error)) + logger.warning(f"git find ref - failed: {error}") return ref_to_checkout -def download_git_clone(git_url, target_dir, checkout_to=""): +def decide_checkout(checkout_to="", tag="", branch=""): + if checkout_to != "": + ref_to_checkout = checkout_to + else: + if branch != "": + ref_to_checkout = branch + else: + ref_to_checkout = tag + return ref_to_checkout + + +def download_git_clone(git_url, target_dir, checkout_to="", tag="", branch=""): + ref_to_checkout = decide_checkout(checkout_to, tag, branch) + if platform.system() != "Windows": signal.signal(signal.SIGALRM, alarm_handler) signal.alarm(SIGNAL_TIMEOUT) @@ -152,21 +210,28 @@ def download_git_clone(git_url, target_dir, checkout_to=""): else: del alarm except Exception as error: - logger.warning("git clone - failed:"+str(error)) + logger.warning(f"git clone - failed: {error}") return False try: - ref_to_checkout = checkout_to - if checkout_to != "": + if ref_to_checkout != "": ref_list = [x for x in repo.references] - ref_to_checkout = get_ref_to_checkout(checkout_to, ref_list) - logger.info("git checkout :"+ref_to_checkout) + ref_to_checkout = get_ref_to_checkout(ref_to_checkout, ref_list) + logger.info(f"git checkout: {ref_to_checkout}") repo.checkout(ref_to_checkout) except Exception as error: - logger.warning("git checkout to "+ref_to_checkout + - " - failed:"+str(error)) + logger.warning(f"git checkout to {ref_to_checkout} - failed: {error}") return True +def progress_bar(current, total, width): + width = 40 + avail_dots = width - 2 + shaded_dots = int(math.floor(float(current) / total * avail_dots)) + percent_bar = '[' + '■'*shaded_dots + ' '*(avail_dots-shaded_dots) + ']' + progress = "%d%% %s [%d / %d]" % (current / total * 100, percent_bar, current, total) + return progress + + def download_wget(link, target_dir, compressed_only): success = False downloaded_file = "" @@ -194,33 +259,31 @@ def download_wget(link, target_dir, compressed_only): if not success: raise Exception('Not supported compression type (link:{0})'.format(link)) - logger.info("wget:"+link) - downloaded_file = wget.download(link) + logger.info(f"wget: {link}") + downloaded_file = wget.download(link, target_dir, progress_bar) if platform.system() != "Windows": signal.alarm(0) else: del alarm - shutil.move(downloaded_file, target_dir) - downloaded_file = os.path.join(target_dir, downloaded_file) if downloaded_file != "": success = True - logger.debug("wget - downloaded:"+downloaded_file) + logger.debug(f"wget - downloaded: {downloaded_file}") except Exception as error: success = False - logger.warning("wget - failed:"+str(error)) + logger.warning(f"wget - failed: {error}") return success, downloaded_file def extract_compressed_dir(src_dir, target_dir, remove_after_extract=True): - logger.debug("Extract Dir:"+src_dir) + logger.debug(f"Extract Dir: {src_dir}") try: files_path = [os.path.join(src_dir, x) for x in os.listdir(src_dir)] for fname in files_path: extract_compressed_file(fname, target_dir, remove_after_extract) except Exception as error: - logger.debug("Extract files in dir - failed:"+str(error)) + logger.debug(f"Extract files in dir - failed: {error}") return False return True @@ -248,15 +311,15 @@ def extract_compressed_file(fname, extract_path, remove_after_extract=True): decompress_bz2(fname, extract_path) else: is_compressed_file = False - logger.warning("Unsupported file extension:"+fname) + logger.warning(f"Unsupported file extension: {fname}") if remove_after_extract and is_compressed_file: - logger.debug("Remove - extracted file :"+fname) + logger.debug(f"Remove - extracted file: {fname}") os.remove(fname) else: - logger.warning("Not a file:"+fname) + logger.warning(f"Not a file: {fname}") except Exception as error: - logger.error("Extract - failed:"+str(error)) + logger.error(f"Extract - failed: {error}") return False return True @@ -268,7 +331,7 @@ def decompress_bz2(source_file, dest_path): open(os.path.splitext(source_file)[0], 'wb').write(data) # write a uncompressed file except Exception as error: - logger.error("Decompress bz2 - failed:"+str(error)) + logger.error(f"Decompress bz2 - failed: {error}") return False return True @@ -280,7 +343,7 @@ def unzip(source_file, dest_path): fzip.extract(filename, dest_path) fzip.close() except Exception as error: - logger.error("Unzip - failed:"+str(error)) + logger.error(f"Unzip - failed: {error}") return False return True diff --git a/src/fosslight_util/download_ftp.py b/src/fosslight_util/download_ftp.py new file mode 100644 index 0000000..f1ff6f3 --- /dev/null +++ b/src/fosslight_util/download_ftp.py @@ -0,0 +1,133 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright (c) 2023 LG Electronics Inc. +# SPDX-License-Identifier: Apache-2.0 +import ftplib +import os +import re +import logging +import fosslight_util.constant as constant +from pathlib import Path + +""" +MIT license: 2017 - Jwely +Example usage: +``` python +import ftplib +ftp = ftplib.FTP(mysite, username, password) +download_ftp_tree(ftp, remote_dir, local_dir) +``` +The code above will look for a directory called "remote_dir" on the ftp host, and then duplicate the +directory and its entire contents into the "local_dir". +*** Note that if wget is an option, I recommend using that instead *** +""" +logger = logging.getLogger(constant.LOGGER_NAME) + + +def _is_ftp_dir(ftp_handle, name, guess_by_extension=True): + """ simply determines if an item listed on the ftp server is a valid directory or not """ + + # if the name has a "." in the fourth to last position, its probably a file extension + # this is MUCH faster than trying to set every file to a working directory, and will work 99% of time. + if guess_by_extension is True: + if len(name) >= 4: + if name[-4] == '.': + return False + + original_cwd = ftp_handle.pwd() # remember the current working directory + try: + ftp_handle.cwd(name) # try to set directory to new name + ftp_handle.cwd(original_cwd) # set it back to what it was + return True + + except ftplib.error_perm as e: + print(e) + return False + + except Exception as e: + print(e) + return False + + +def _make_parent_dir(fpath): + """ ensures the parent directory of a filepath exists """ + dirname = os.path.dirname(fpath) + while not os.path.exists(dirname): + try: + os.makedirs(dirname) + print("created {0}".format(dirname)) + except OSError as e: + print(e) + _make_parent_dir(dirname) + + +def _download_ftp_file(ftp_handle, name, dest, overwrite): + """ downloads a single file from an ftp server """ + _make_parent_dir(dest.lstrip("/")) + if not os.path.exists(dest) or overwrite is True: + try: + with open(dest, 'wb') as f: + ftp_handle.retrbinary("RETR {0}".format(name), f.write) + print("downloaded: {0}".format(dest)) + except FileNotFoundError: + print("FAILED: {0}".format(dest)) + else: + print("already exists: {0}".format(dest)) + + +def _file_name_match_patern(pattern, name): + """ returns True if filename matches the pattern""" + if pattern is None: + return True + else: + return bool(re.match(pattern, name)) + + +def _mirror_ftp_dir(ftp_handle, name, overwrite, guess_by_extension, pattern): + """ replicates a directory on an ftp server recursively """ + for item in ftp_handle.nlst(name): + if _is_ftp_dir(ftp_handle, item, guess_by_extension): + _mirror_ftp_dir(ftp_handle, item, overwrite, guess_by_extension, pattern) + else: + if _file_name_match_patern(pattern, name): + _download_ftp_file(ftp_handle, item, item, overwrite) + else: + # quietly skip the file + pass + + +def download_ftp_tree(ftp_handle, path, destination, pattern=None, overwrite=False, guess_by_extension=True): + """ + Downloads an entire directory tree from an ftp server to the local destination + :param ftp_handle: an authenticated ftplib.FTP instance + :param path: the folder on the ftp server to download + :param destination: the local directory to store the copied folder + :param pattern: Python regex pattern, only files that match this pattern will be downloaded. + :param overwrite: set to True to force re-download of all files, even if they appear to exist already + :param guess_by_extension: It takes a while to explicitly check if every item is a directory or a file. + if this flag is set to True, it will assume any file ending with a three character extension ".???" is + a file and not a directory. Set to False if some folders may have a "." in their names -4th position. + """ + success = True + try: + logger.warning("2222222222222222") + path = path.lstrip("/") + original_directory = os.getcwd() # remember working directory before function is executed + Path(destination).mkdir(parents=True, exist_ok=True) + os.chdir(destination) # change working directory to ftp mirror directory + + logger.warning("333333333333333") + _mirror_ftp_dir( + ftp_handle, + path, + pattern=pattern, + overwrite=overwrite, + guess_by_extension=guess_by_extension) + + logger.warning("4444444444444444") + os.chdir(original_directory) # reset working directory to what it was before function exec + except Exception as ex: + success = False + logger.info(f"Error downloading ftp tree: {ex}") + + return success diff --git a/src/fosslight_util/help.py b/src/fosslight_util/help.py index ebd59b9..18ec870 100644 --- a/src/fosslight_util/help.py +++ b/src/fosslight_util/help.py @@ -19,6 +19,21 @@ """ +_HELP_MESSAGE_DOWNLOAD = """ + FOSSLight Downloader is a tool to download the package via input URL + + Usage: fosslight_download [option1] [options2] + ex) fosslight_download -s http://github.com/fosslight/fosslight -t output_dir -d log_dir + + Required: + -s\t\t URL of the package to be downloaded + + Optional: + -h\t\t Print help message + -t\t\t Output path name + -d\t\t Directory name to save the log file""" + + class PrintHelpMsg(): message_suffix = "" @@ -41,3 +56,8 @@ def print_package_version(pkg_name, msg="", exitopt=True): if exitopt: sys.exit(0) + + +def print_help_msg_download(exitOpt=True): + helpMsg = PrintHelpMsg(_HELP_MESSAGE_DOWNLOAD) + helpMsg.print_help_msg(exitOpt) diff --git a/src/fosslight_util/parsing_yaml.py b/src/fosslight_util/parsing_yaml.py index 69fc49a..1d3ac9b 100644 --- a/src/fosslight_util/parsing_yaml.py +++ b/src/fosslight_util/parsing_yaml.py @@ -92,7 +92,7 @@ def find_sbom_yaml_files(path_to_find): return oss_pkg_files -def set_value_switch(oss, key, value, yaml_file): +def set_value_switch(oss, key, value, yaml_file=""): if key in ['oss name', 'name']: oss.name = value elif key in ['oss version', 'version']: @@ -116,4 +116,5 @@ def set_value_switch(oss, key, value, yaml_file): elif key == 'yocto_recipe': oss.yocto_recipe = value else: - _logger.debug(f"file:{yaml_file} - key:{key} cannot be parsed") + if yaml_file != "": + _logger.debug(f"file:{yaml_file} - key:{key} cannot be parsed")