From 1f27a504717c216852a5b0c156e7a4cd35a7846e Mon Sep 17 00:00:00 2001 From: Glenn Jocher Date: Sun, 9 Jun 2024 03:53:13 +0200 Subject: [PATCH] Refactor code for speed and clarity --- beautiful_scraper.py | 17 +-- bing_scraper.py | 342 +++++++++++++++++-------------------------- clean_images.py | 9 +- 3 files changed, 148 insertions(+), 220 deletions(-) diff --git a/beautiful_scraper.py b/beautiful_scraper.py index a286573a..8f114175 100644 --- a/beautiful_scraper.py +++ b/beautiful_scraper.py @@ -14,7 +14,7 @@ def download_uri(uri, dir="./"): def download_baidu(word): """Downloads images from Baidu based on a search word, saving them with a specific naming convention.""" - url = "https://image.baidu.com/search/flip?tn=baiduimage&ie=utf-8&word=" + word + "&ct=201326592&v=flip" + url = f"https://image.baidu.com/search/flip?tn=baiduimage&ie=utf-8&word={word}&ct=201326592&v=flip" pic_url = re.findall('"objURL":"(.*?)",', requests.get(url).text, re.S) i = 0 @@ -26,10 +26,9 @@ def download_baidu(word): print("exception") continue - string = "pictures" + word + "_" + str(i) + ".jpg" - fp = open(string, "wb") - fp.write(pic.content) - fp.close() + string = f"pictures{word}_{str(i)}.jpg" + with open(string, "wb") as fp: + fp.write(pic.content) i += 1 @@ -37,13 +36,13 @@ def download_google(word): """Downloads images from Bing for a given search word by scraping image links and using curl to download.""" # url = 'https://www.google.com/search?q=' + word + '&client=opera&hs=cTQ&source=lnms&tbm=isch&sa=X&ved=0ahUKEwig3LOx4PzKAhWGFywKHZyZAAgQ_AUIBygB&biw=1920&bih=982' - url = "https://www.bing.com/images/search?q=" + word + url = f"https://www.bing.com/images/search?q={word}" soup = BeautifulSoup(requests.get(url).text, "html.parser") links = soup.find_all("a", {"class": "thumb"}) for link in links: link = link.get("href") - s = "curl -s -L -o '%s' '%s'" % (link.split("/")[-1], link) + s = f"""curl -s -L -o '{link.split("/")[-1]}' '{link}'""" os.system(s) @@ -60,7 +59,7 @@ def get_html(): link = url + link.get("href") f = dir + link.split("/")[-1] if not os.path.exists(f): - s = "curl -s -L -o '%s' '%s'" % (f, link) + s = f"curl -s -L -o '{f}' '{link}'" os.system(s) @@ -75,7 +74,7 @@ def organize_folders(): link = url + link.get("href") f = dir + link.split("/")[-1] if not os.path.exists(f): - s = "curl -s -L -o '%s' '%s'" % (f, link) + s = f"curl -s -L -o '{f}' '{link}'" os.system(s) diff --git a/bing_scraper.py b/bing_scraper.py index 5b670604..7770ea12 100644 --- a/bing_scraper.py +++ b/bing_scraper.py @@ -78,13 +78,11 @@ def user_input(): config_file_check = config.parse_known_args() object_check = vars(config_file_check[0]) + records = [] if object_check["config_file"] != "": - records = [] json_file = json.load(open(config_file_check[0].config_file)) - for record in range(0, len(json_file["Records"])): - arguments = {} - for i in args_list: - arguments[i] = None + for record in range(len(json_file["Records"])): + arguments = {i: None for i in args_list} for key, value in json_file["Records"][record].items(): arguments[key] = value records.append(arguments) @@ -375,11 +373,10 @@ def user_input(): # args.chromedriver = './chromedriver' if args.search: # construct url - args.url = "https://www.bing.com/images/search?q=%s" % args.search.replace(" ", "%20") + args.url = f'https://www.bing.com/images/search?q={args.search.replace(" ", "%20")}' args.image_directory = args.search.replace(" ", "_") arguments = vars(args) - records = [] records.append(arguments) return records @@ -393,14 +390,12 @@ def __init__(self): def download_page(self, url): """Downloads raw page content from URL using custom User-Agent; returns string.""" try: - headers = {} - headers["User-Agent"] = ( - "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36" - ) + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36" + } req = urllib.request.Request(url, headers=headers) resp = urllib.request.urlopen(req) - respData = str(resp.read()) - return respData + return str(resp.read()) except Exception: print( "Could not open URL. Please check your internet connection and/or ssl settings \n" @@ -439,10 +434,10 @@ def download_extended_page(self, url, chromedriver): element = browser.find_element(By.TAG_NAME, "body") pbar = tqdm(enumerate(range(30)), desc="Downloading HTML...", total=30) # progress bar for _ in pbar: - try: # click 'see more' button if found + try: # browser.find_element_by_id("smb").click() # google images 'see more' button browser.find_element(By.CLASS_NAME, "btn_seemore").click() # bing images 'see more' button - except: + except Exception: pass pbar.desc = "Downloading HTML... %d elements" % len(browser.page_source) # page source element.send_keys(Keys.PAGE_DOWN) @@ -467,31 +462,28 @@ def repair(self, brokenjson): def get_next_tab(self, s): """Parses HTML to find and return the next tab's URL, label, and end content position.""" start_line = s.find('class="dtviD"') - if start_line == -1: # If no links are found then give an error! - end_quote = 0 - link = "no_tabs" - return link, "", end_quote + if start_line == -1: + return "no_tabs", "", 0 + start_line = s.find('class="dtviD"') + start_content = s.find('href="', start_line + 1) + end_content = s.find('">', start_content + 1) + url_item = f"https://www.google.com{str(s[start_content + 6:end_content])}" + url_item = url_item.replace("&", "&") + + start_line_2 = s.find('class="dtviD"') + s = s.replace("&", "&") + start_content_2 = s.find(":", start_line_2 + 1) + end_content_2 = s.find("&usg=", start_content_2 + 1) + url_item_name = str(s[start_content_2 + 1 : end_content_2]) + + chars = url_item_name.find(",g_1:") + chars_end = url_item_name.find(":", chars + 6) + if chars_end == -1: + updated_item_name = (url_item_name[chars + 5 :]).replace("+", " ") else: - start_line = s.find('class="dtviD"') - start_content = s.find('href="', start_line + 1) - end_content = s.find('">', start_content + 1) - url_item = "https://www.google.com" + str(s[start_content + 6 : end_content]) - url_item = url_item.replace("&", "&") - - start_line_2 = s.find('class="dtviD"') - s = s.replace("&", "&") - start_content_2 = s.find(":", start_line_2 + 1) - end_content_2 = s.find("&usg=", start_content_2 + 1) - url_item_name = str(s[start_content_2 + 1 : end_content_2]) - - chars = url_item_name.find(",g_1:") - chars_end = url_item_name.find(":", chars + 6) - if chars_end == -1: - updated_item_name = (url_item_name[chars + 5 :]).replace("+", " ") - else: - updated_item_name = (url_item_name[chars + 5 : chars_end]).replace("+", " ") + updated_item_name = (url_item_name[chars + 5 : chars_end]).replace("+", " ") - return url_item, updated_item_name, end_content + return url_item, updated_item_name, end_content # Getting all links with the help of '_images_get_next_image' def get_all_tabs(self, page): @@ -503,13 +495,11 @@ def get_all_tabs(self, page): item, item_name, end_content = self.get_next_tab(page) if item == "no_tabs": break - else: - if len(item_name) > 100 or item_name == "background-color": - break - else: - tabs[item_name] = item # Append all the links in the list named 'Links' - time.sleep(0.1) # Timer could be used to slow down the request for image downloads - page = page[end_content:] + if len(item_name) > 100 or item_name == "background-color": + break + tabs[item_name] = item # Append all the links in the list named 'Links' + time.sleep(0.1) # Timer could be used to slow down the request for image downloads + page = page[end_content:] return tabs # Format the object in readable format @@ -519,10 +509,11 @@ def format_object(self, object): """ if "?" in object["murl"]: object["murl"] = object["murl"].split("?")[0] - formatted_object = {} - formatted_object["image_format"] = object["murl"].split(".")[-1] - formatted_object["image_height"] = False - formatted_object["image_width"] = False + formatted_object = { + "image_format": object["murl"].split(".")[-1], + "image_height": False, + "image_width": False, + } formatted_object["image_link"] = object["murl"].replace(" ", "+") formatted_object["image_description"] = object["desc"] formatted_object["image_host"] = object["purl"] @@ -542,7 +533,6 @@ def single_image(self, image_url): except OSError as e: if e.errno != 17: raise - pass req = Request( url, headers={ @@ -560,15 +550,14 @@ def single_image(self, image_url): # if ".jpg" in image_name or ".gif" in image_name or ".png" in image_name or ".bmp" in image_name or ".svg" # in image_name or ".webp" in image_name or ".ico" in image_name: if any(map(lambda extension: extension in image_name, extensions)): - file_name = main_directory + "/" + image_name + file_name = f"{main_directory}/{image_name}" else: - file_name = main_directory + "/" + image_name + ".jpg" - image_name = image_name + ".jpg" + file_name = f"{main_directory}/{image_name}.jpg" + image_name = f"{image_name}.jpg" try: - output_file = open(file_name, "wb") - output_file.write(data) - output_file.close() + with open(file_name, "wb") as output_file: + output_file.write(data) except IOError as e: raise e except OSError as e: @@ -579,12 +568,10 @@ def single_image(self, image_url): def similar_images(self, similar_images): """Finds images similar to the input URL by performing a Google reverse image search.""" try: - searchUrl = "https://www.google.com/searchbyimage?site=search&sa=X&image_url=" + similar_images - headers = {} - headers["User-Agent"] = ( - "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36" - ) - + searchUrl = f"https://www.google.com/searchbyimage?site=search&sa=X&image_url={similar_images}" + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36" + } req1 = urllib.request.Request(searchUrl, headers=headers) resp1 = urllib.request.urlopen(req1) content = str(resp1.read()) @@ -592,14 +579,13 @@ def similar_images(self, similar_images): l2 = content.find("&", l1) urll = content[l1:l2] - newurl = "https://www.google.com/search?tbs=sbi:" + urll + "&site=search&sa=X" + newurl = f"https://www.google.com/search?tbs=sbi:{urll}&site=search&sa=X" req2 = urllib.request.Request(newurl, headers=headers) resp2 = urllib.request.urlopen(req2) l3 = content.find("/search?sa=X&q=") l4 = content.find(";", l3 + 19) - urll2 = content[l3 + 19 : l4] - return urll2 - except: + return content[l3 + 19 : l4] + except Exception: return "Cloud not connect to Google Images endpoint" # Building URL parameters @@ -650,7 +636,7 @@ def build_url_parameters(self, arguments): if arguments["exact_size"]: size_array = [x.strip() for x in arguments["exact_size"].split(",")] - exact_size = ",isz:ex,iszw:" + str(size_array[0]) + ",iszh:" + str(size_array[1]) + exact_size = f",isz:ex,iszw:{str(size_array[0])},iszh:{str(size_array[1])}" else: exact_size = "" @@ -741,26 +727,19 @@ def build_url_parameters(self, arguments): }, ], } - for key, value in params.items(): + for value in params.values(): if value[0] is not None: ext_param = value[1][value[0]] # counter will tell if it is first param added or not - if counter == 0: - # add it to the built url - built_url = built_url + ext_param - counter += 1 - else: - built_url = built_url + "," + ext_param - counter += 1 - built_url = lang_url + built_url + exact_size + time_range - return built_url + built_url = built_url + ext_param if counter == 0 else f"{built_url},{ext_param}" + counter += 1 + return lang_url + built_url + exact_size + time_range # building main search URL def build_search_url(self, search_term, params, url, similar_images, specific_site, safe_search): """Constructs a Google search URL based on input parameters such as search term, image specificity, and safe search settings. """ - safe_search_string = "&safe=active" # check the args and choose the URL if url: url = url @@ -793,6 +772,7 @@ def build_search_url(self, search_term, params, url, similar_images, specific_si # safe search check if safe_search: + safe_search_string = "&safe=active" url = url + safe_search_string return url @@ -814,18 +794,10 @@ def keywords_from_file(self, file_name): """Extracts keywords from a .txt or .csv file, ignoring empty lines; returns a list of keywords.""" search_keyword = [] with codecs.open(file_name, "r", encoding="utf-8-sig") as f: - if ".csv" in file_name: - for line in f: - if line in ["\n", "\r\n"]: - pass - else: - search_keyword.append(line.replace("\n", "").replace("\r", "")) - elif ".txt" in file_name: - for line in f: - if line in ["\n", "\r\n"]: - pass - else: - search_keyword.append(line.replace("\n", "").replace("\r", "")) + if ".csv" in file_name or ".txt" in file_name: + search_keyword.extend( + line.replace("\n", "").replace("\r", "") for line in f if line not in ["\n", "\r\n"] + ) else: print("Invalid file type: Valid file types are either .txt or .csv \n" "exiting...") sys.exit() @@ -838,19 +810,13 @@ def create_directories(self, main_directory, dir_name): if not os.path.exists(main_directory): os.makedirs(main_directory) time.sleep(0.2) - path = dir_name - sub_directory = os.path.join(main_directory, path) - if not os.path.exists(sub_directory): - os.makedirs(sub_directory) - else: - path = dir_name - sub_directory = os.path.join(main_directory, path) - if not os.path.exists(sub_directory): - os.makedirs(sub_directory) + path = dir_name + sub_directory = os.path.join(main_directory, path) + if not os.path.exists(sub_directory): + os.makedirs(sub_directory) except OSError as e: if e.errno != 17: raise - pass return # Download Images @@ -878,9 +844,8 @@ def download_image( download_message = "%s %s" % (image_url, download_message) return "success", download_message, None, image_url - if ignore_urls: - if any(url in image_url for url in ignore_urls.split(",")): - return "fail", "Image ignored due to 'ignore url' parameter", None, image_url + if ignore_urls and any(url in image_url for url in ignore_urls.split(",")): + return "fail", "Image ignored due to 'ignore url' parameter", None, image_url try: req = Request( @@ -891,11 +856,7 @@ def download_image( ) try: # timeout time to download an image - if socket_timeout: - timeout = float(socket_timeout) - else: - timeout = 10 - + timeout = float(socket_timeout) if socket_timeout else 10 response = urlopen(req, None, timeout) data = response.read() response.close() @@ -903,47 +864,40 @@ def download_image( extensions = [".jpg", ".jpeg", ".gif", ".png", ".bmp", ".svg", ".webp", ".ico"] # keep everything after the last '/' image_name = str(image_url[(image_url.rfind("/")) + 1 :]) - if format: - if not image_format or image_format != format: - download_status = "fail" - download_message = "Wrong image format returned. Skipping..." - return_image_name = "" - absolute_path = "" - download_message = "%s %s" % (image_url, download_message) - return download_status, download_message, return_image_name, absolute_path - - if image_format == "" or not image_format or "." + image_format not in extensions: + if format and (not image_format or image_format != format): + download_status = "fail" + download_message = "Wrong image format returned. Skipping..." + return_image_name = "" + absolute_path = "" + download_message = "%s %s" % (image_url, download_message) + return download_status, download_message, return_image_name, absolute_path + + if image_format == "" or not image_format or f".{image_format}" not in extensions: download_status = "fail" download_message = "Invalid or missing image format. Skipping..." return_image_name = "" absolute_path = "" download_message = "%s %s" % (image_url, download_message) return download_status, download_message, return_image_name, absolute_path - elif image_name.lower().find("." + image_format) < 0: - image_name = image_name + "." + image_format + elif image_name.lower().find(f".{image_format}") < 0: + image_name = f"{image_name}.{image_format}" else: - image_name = image_name[: image_name.lower().find("." + image_format) + (len(image_format) + 1)] + image_name = image_name[: image_name.lower().find(f".{image_format}") + (len(image_format) + 1)] # prefix name in image - if prefix: - prefix = prefix + " " - else: - prefix = "" - + prefix = f"{prefix} " if prefix else "" if no_numbering: - path = main_directory + "/" + dir_name + "/" + prefix + image_name + path = f"{main_directory}/{dir_name}/{prefix}{image_name}" else: - path = main_directory + "/" + dir_name + "/" + prefix + str(count) + "." + image_name + path = f"{main_directory}/" + dir_name + "/" + prefix + str(count) + "." + image_name try: - output_file = open(path, "wb") - output_file.write(data) - output_file.close() + with open(path, "wb") as output_file: + output_file.write(data) if save_source: list_path = main_directory + "/" + save_source + ".txt" - list_file = open(list_path, "a") - list_file.write(path + "\t" + img_src + "\n") - list_file.close() + with open(list_path, "a") as list_file: + list_file.write(path + "\t" + img_src + "\n") absolute_path = os.path.abspath(path) except OSError as e: download_status = "fail" @@ -957,9 +911,8 @@ def download_image( return_image_name = prefix + str(count) + "." + image_name # image size parameter - if not silent_mode: - if print_size: - print("Image Size: " + str(self.file_size(path))) + if not silent_mode and print_size: + print("Image Size: " + str(self.file_size(path))) except UnicodeEncodeError as e: download_status = "fail" @@ -1017,35 +970,32 @@ def _get_next_item(self, s): found. """ start_line = s.find("imgpt") - if start_line == -1: # If no links are found then give an error! - end_quote = 0 - link = "no_links" - return link, end_quote - else: - start_line = s.find('class="imgpt"') - start_object = s.find('m="{', start_line) - end_object = s.find('}"', start_object) - object_raw = str(s[(start_object + 3) : (end_object + 1)]) - - # remove escape characters with python 3.4+ - try: - object_decode = bytes(html.unescape(object_raw), "utf-8").decode("unicode_escape") - final_object = json.loads(object_decode) - except: - final_object = "" + if start_line == -1: + return "no_links", 0 + start_line = s.find('class="imgpt"') + start_object = s.find('m="{', start_line) + end_object = s.find('}"', start_object) + object_raw = str(s[(start_object + 3) : (end_object + 1)]) + + # remove escape characters with python 3.4+ + try: + object_decode = bytes(html.unescape(object_raw), "utf-8").decode("unicode_escape") + final_object = json.loads(object_decode) + except Exception: + final_object = "" - return final_object, end_object + return final_object, end_object # Getting all links with the help of '_images_get_next_image' def _get_all_items(self, page, main_directory, dir_name, limit, arguments): """Fetches and formats items from a page up to a specified limit, applying optional metadata and offset arguments. """ - items = [] abs_path = [] errorCount = 0 i = 0 count = 1 + items = [] while count < limit + 1: object, end_content = self._get_next_item(page) if object == "no_links": @@ -1058,9 +1008,8 @@ def _get_all_items(self, page, main_directory, dir_name, limit, arguments): else: # format the item for readability object = self.format_object(object) - if arguments["metadata"]: - if not arguments["silent_mode"]: - print("\nImage Metadata: " + str(object)) + if arguments["metadata"] and not arguments["silent_mode"]: + print("\nImage Metadata: " + str(object)) # download the images download_status, download_message, return_image_name, absolute_path = self.download_image( @@ -1099,11 +1048,7 @@ def _get_all_items(self, page, main_directory, dir_name, limit, arguments): i += 1 if count < limit: print( - "Unfortunately all " - + str(limit - count) - + " could not be downloaded because some images were not downloadable. " - + str(count - 1) - + " is all we got for this search filter!" + f"Unfortunately all {str(limit - count)} could not be downloaded because some images were not downloadable. {str(count - 1)} is all we got for this search filter!" ) return items, errorCount, abs_path @@ -1112,45 +1057,36 @@ def download(self, arguments): """Downloads images/videos based on arguments; returns paths and error count, supporting bulk and CLI input.""" paths_agg = {} # for input coming from other python files - if __name__ != "__main__": - # if the calling file contains config_file param - if "config_file" in arguments: - records = [] - json_file = json.load(open(arguments["config_file"])) - for record in range(0, len(json_file["Records"])): - arguments = {} - for i in args_list: - arguments[i] = None - for key, value in json_file["Records"][record].items(): - arguments[key] = value - records.append(arguments) - total_errors = 0 - for rec in records: - paths, errors = self.download_executor(rec) - for i in paths: - paths_agg[i] = paths[i] - if not arguments["silent_mode"]: - if arguments["print_paths"]: - print(paths.encode("raw_unicode_escape").decode("utf-8")) - total_errors = total_errors + errors - return paths_agg, total_errors - # if the calling file contains params directly - else: - paths, errors = self.download_executor(arguments) + if __name__ == "__main__": + paths, errors = self.download_executor(arguments) + for i in paths: + paths_agg[i] = paths[i] + if not arguments["silent_mode"] and arguments["print_paths"]: + print(paths.encode("raw_unicode_escape").decode("utf-8")) + elif "config_file" in arguments: + records = [] + json_file = json.load(open(arguments["config_file"])) + for record in range(len(json_file["Records"])): + arguments = {i: None for i in args_list} + for key, value in json_file["Records"][record].items(): + arguments[key] = value + records.append(arguments) + total_errors = 0 + for rec in records: + paths, errors = self.download_executor(rec) for i in paths: paths_agg[i] = paths[i] - if not arguments["silent_mode"]: - if arguments["print_paths"]: - print(paths.encode("raw_unicode_escape").decode("utf-8")) - return paths_agg, errors - # for input coming from CLI + if not arguments["silent_mode"] and arguments["print_paths"]: + print(paths.encode("raw_unicode_escape").decode("utf-8")) + total_errors = total_errors + errors + return paths_agg, total_errors else: paths, errors = self.download_executor(arguments) for i in paths: paths_agg[i] = paths[i] - if not arguments["silent_mode"]: - if arguments["print_paths"]: - print(paths.encode("raw_unicode_escape").decode("utf-8")) + if not arguments["silent_mode"] and arguments["print_paths"]: + print(paths.encode("raw_unicode_escape").decode("utf-8")) + return paths_agg, errors return paths_agg, errors def download_executor(self, arguments): @@ -1192,11 +1128,7 @@ def download_executor(self, arguments): prefix_keywords = [""] # Setting limit on number of images to be downloaded - if arguments["limit"]: - limit = int(arguments["limit"]) - else: - limit = 100 - + limit = int(arguments["limit"]) if arguments["limit"] else 100 if arguments["url"]: current_time = str(datetime.datetime.now()).split(".")[0] search_keyword = [current_time.replace(":", "_")] @@ -1283,9 +1215,8 @@ def download_executor(self, arguments): else: raw_html = self.download_extended_page(url, arguments["chromedriver"]) - if not arguments["silent_mode"]: - if arguments["download"]: - print("Downloading images...") + if not arguments["silent_mode"] and arguments["download"]: + print("Downloading images...") items, errorCount, abs_path = self._get_all_items( raw_html, main_directory, dir_name, limit, arguments ) # get all image items and download images @@ -1329,11 +1260,10 @@ def main(): total_errors = 0 t0 = time.time() # start the timer for arguments in records: + response = googleimagesdownload() if arguments["single_image"]: # Download Single Image using a URL - response = googleimagesdownload() response.single_image(arguments["single_image"]) else: # or download multiple images based on keywords/keyphrase search - response = googleimagesdownload() paths, errors = response.download(arguments) # wrapping response in a variable just for consistency total_errors = total_errors + errors diff --git a/clean_images.py b/clean_images.py index 0fa30f08..ea47dc5f 100644 --- a/clean_images.py +++ b/clean_images.py @@ -11,7 +11,7 @@ # Remove bad suffixes suffix = f.split(".")[-1] if suffix in ["gif", "svg"]: - print("Removing %s" % f) + print(f"Removing {f}") os.remove(f) continue @@ -22,11 +22,10 @@ # Downsize to max_wh if necessary r = max_wh / max(img.shape) # ratio if r < 1: # resize - print("Resizing %s" % f) + print(f"Resizing {f}") img = transform.resize(img, (round(img.shape[0] * r), round(img.shape[1] * r))) io.imsave(f, img.astype(np.uint8)) - # Remove corrupted - except: - print("Removing corrupted %s" % f) + except Exception: + print(f"Removing corrupted {f}") os.remove(f)