From 5129c14df97f438b87035e56f9ce2ca8953f94cb Mon Sep 17 00:00:00 2001 From: Santiago Ramirez Date: Tue, 19 Sep 2023 17:29:53 +0000 Subject: [PATCH] Formated --- bot/helpers/GoogleImageScraper.py | 185 ++++++++++++++++++------------ bot/helpers/main.py | 64 ++++++----- 2 files changed, 149 insertions(+), 100 deletions(-) diff --git a/bot/helpers/GoogleImageScraper.py b/bot/helpers/GoogleImageScraper.py index 80134a8..1d8f6fa 100644 --- a/bot/helpers/GoogleImageScraper.py +++ b/bot/helpers/GoogleImageScraper.py @@ -1,10 +1,3 @@ -# -*- coding: utf-8 -*- -""" -Created on Sat Jul 18 13:01:02 2020 - -@author: OHyic -""" -#import selenium drivers from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By @@ -12,7 +5,7 @@ from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import NoSuchElementException -#import helper libraries +# import helper libraries import time import urllib.request from urllib.parse import urlparse @@ -22,69 +15,89 @@ from PIL import Image import re -#custom patch libraries +# custom patch libraries import patch -class GoogleImageScraper(): - def __init__(self, webdriver_path, image_path, search_key="cat", number_of_images=1, headless=True, min_resolution=(0, 0), max_resolution=(1920, 1080), max_missed=10): - #check parameter types + +class GoogleImageScraper: + def __init__( + self, + webdriver_path, + image_path, + search_key="cat", + number_of_images=1, + headless=True, + min_resolution=(0, 0), + max_resolution=(1920, 1080), + max_missed=10, + ): + # check parameter types image_path = os.path.join(image_path, search_key) - if (type(number_of_images)!=int): + if type(number_of_images) != int: print("[Error] Number of images must be integer value.") return if not os.path.exists(image_path): print("[INFO] Image path not found. Creating a new folder.") os.makedirs(image_path) - - #check if chromedriver is installed - if (not os.path.isfile(webdriver_path)): + + # check if chromedriver is installed + if not os.path.isfile(webdriver_path): is_patched = patch.download_lastest_chromedriver() - if (not is_patched): - exit("[ERR] Please update the chromedriver.exe in the webdriver folder according to your chrome version:https://chromedriver.chromium.org/downloads") + if not is_patched: + exit( + "[ERR] Please update the chromedriver.exe in the webdriver folder according to your chrome version:https://chromedriver.chromium.org/downloads" + ) for i in range(1): try: - #try going to www.google.com + # try going to www.google.com options = Options() - if(headless): - options.add_argument('--headless') + if headless: + options.add_argument("--headless") driver = webdriver.Chrome(webdriver_path, chrome_options=options) - driver.set_window_size(1400,1050) + driver.set_window_size(1400, 1050) driver.get("https://www.google.com") try: - WebDriverWait(driver, 5).until(EC.element_to_be_clickable((By.ID, "W0wltc"))).click() + WebDriverWait(driver, 5).until( + EC.element_to_be_clickable((By.ID, "W0wltc")) + ).click() except Exception as e: continue except Exception as e: - #update chromedriver - pattern = '(\d+\.\d+\.\d+\.\d+)' + # update chromedriver + pattern = "(\d+\.\d+\.\d+\.\d+)" version = list(set(re.findall(pattern, str(e))))[0] is_patched = patch.download_lastest_chromedriver(version) - if (not is_patched): - exit("[ERR] Please update the chromedriver.exe in the webdriver folder according to your chrome version:https://chromedriver.chromium.org/downloads") + if not is_patched: + exit( + "[ERR] Please update the chromedriver.exe in the webdriver folder according to your chrome version:https://chromedriver.chromium.org/downloads" + ) self.driver = driver self.search_key = search_key self.number_of_images = number_of_images self.webdriver_path = webdriver_path self.image_path = image_path - self.url = "https://www.google.com/search?q=%s&source=lnms&tbm=isch&sa=X&ved=2ahUKEwie44_AnqLpAhUhBWMBHUFGD90Q_AUoAXoECBUQAw&biw=1920&bih=947"%(search_key) - self.headless=headless + self.url = ( + "https://www.google.com/search?q=%s&source=lnms&tbm=isch&sa=X&ved=2ahUKEwie44_AnqLpAhUhBWMBHUFGD90Q_AUoAXoECBUQAw&biw=1920&bih=947" + % (search_key) + ) + self.headless = headless self.min_resolution = min_resolution self.max_resolution = max_resolution self.max_missed = max_missed def find_image_urls(self): """ - This function search and return a list of image urls based on the search key. - Example: - google_image_scraper = GoogleImageScraper("webdriver_path","image_path","search_key",number_of_photos) - image_urls = google_image_scraper.find_image_urls() + This function search and return a list of image urls based on the search key. + Example: + google_image_scraper = GoogleImageScraper("webdriver_path","image_path","search_key",number_of_photos) + image_urls = google_image_scraper.find_image_urls() """ print("[INFO] Gathering image links") self.driver.get(self.url) - image_urls=[] + image_urls = [] count = 0 missed_count = 0 indx_1 = 0 @@ -94,13 +107,17 @@ def find_image_urls(self): while self.number_of_images > count and missed_count < self.max_missed: if indx_2 > 0: try: - imgurl = self.driver.find_element(By.XPATH, search_string%(indx_1,indx_2+1)) + imgurl = self.driver.find_element( + By.XPATH, search_string % (indx_1, indx_2 + 1) + ) imgurl.click() indx_2 = indx_2 + 1 missed_count = 0 except Exception: try: - imgurl = self.driver.find_element(By.XPATH, search_string%(indx_1+1,1)) + imgurl = self.driver.find_element( + By.XPATH, search_string % (indx_1 + 1, 1) + ) imgurl.click() indx_2 = 1 indx_1 = indx_1 + 1 @@ -109,58 +126,69 @@ def find_image_urls(self): missed_count = missed_count + 1 else: try: - imgurl = self.driver.find_element(By.XPATH, search_string%(indx_1+1)) + imgurl = self.driver.find_element( + By.XPATH, search_string % (indx_1 + 1) + ) imgurl.click() missed_count = 0 - indx_1 = indx_1 + 1 + indx_1 = indx_1 + 1 except Exception: try: - imgurl = self.driver.find_element(By.XPATH, '//*[@id="islrg"]/div[1]/div[%s]/div[%s]/a[1]/div[1]/img'%(indx_1,indx_2+1)) + imgurl = self.driver.find_element( + By.XPATH, + '//*[@id="islrg"]/div[1]/div[%s]/div[%s]/a[1]/div[1]/img' + % (indx_1, indx_2 + 1), + ) imgurl.click() missed_count = 0 indx_2 = indx_2 + 1 - search_string = '//*[@id="islrg"]/div[1]/div[%s]/div[%s]/a[1]/div[1]/img' + search_string = ( + '//*[@id="islrg"]/div[1]/div[%s]/div[%s]/a[1]/div[1]/img' + ) except Exception: indx_1 = indx_1 + 1 missed_count = missed_count + 1 - + try: - #select image from the popup + # select image from the popup time.sleep(1) - class_names = ["n3VNCb","iPVvYb","r48jcc","pT0Scc"] - images = [self.driver.find_elements(By.CLASS_NAME, class_name) for class_name in class_names if len(self.driver.find_elements(By.CLASS_NAME, class_name)) != 0 ][0] + class_names = ["n3VNCb", "iPVvYb", "r48jcc", "pT0Scc"] + images = [ + self.driver.find_elements(By.CLASS_NAME, class_name) + for class_name in class_names + if len(self.driver.find_elements(By.CLASS_NAME, class_name)) != 0 + ][0] for image in images: - #only download images that starts with http + # only download images that starts with http src_link = image.get_attribute("src") - if(("http" in src_link) and (not "encrypted" in src_link)): - print( - f"[INFO] {self.search_key} \t #{count} \t {src_link}") + if ("http" in src_link) and (not "encrypted" in src_link): + print(f"[INFO] {self.search_key} \t #{count} \t {src_link}") image_urls.append(src_link) - count +=1 + count += 1 break except Exception: print("[INFO] Unable to get link") try: - #scroll page to load next image - if(count%3==0): - self.driver.execute_script("window.scrollTo(0, "+str(indx_1*60)+");") - element = self.driver.find_element(By.CLASS_NAME,"mye4qd") + # scroll page to load next image + if count % 3 == 0: + self.driver.execute_script( + "window.scrollTo(0, " + str(indx_1 * 60) + ");" + ) + element = self.driver.find_element(By.CLASS_NAME, "mye4qd") element.click() print("[INFO] Loading next page") time.sleep(3) except Exception: time.sleep(1) - - self.driver.quit() print("[INFO] Google search ended") return image_urls - def save_images(self,image_urls, keep_filenames): + def save_images(self, image_urls, keep_filenames): print(keep_filenames) - #save images into file directory + # save images into file directory """ This function takes in an array of image urls and save it into the given image path/directory. Example: @@ -170,40 +198,55 @@ def save_images(self,image_urls, keep_filenames): """ print("[INFO] Saving image, please wait...") - for indx,image_url in enumerate(image_urls): + for indx, image_url in enumerate(image_urls): try: - print("[INFO] Image url:%s"%(image_url)) - search_string = ''.join(e for e in self.search_key if e.isalnum()) - image = requests.get(image_url,timeout=5) + print("[INFO] Image url:%s" % (image_url)) + search_string = "".join(e for e in self.search_key if e.isalnum()) + image = requests.get(image_url, timeout=5) if image.status_code == 200: with Image.open(io.BytesIO(image.content)) as image_from_web: try: - if (keep_filenames): - #extact filename without extension from URL + if keep_filenames: + # extact filename without extension from URL o = urlparse(image_url) image_url = o.scheme + "://" + o.netloc + o.path name = os.path.splitext(os.path.basename(image_url))[0] - #join filename and extension - filename = "%s.%s"%(name,image_from_web.format.lower()) + # join filename and extension + filename = "%s.%s" % ( + name, + image_from_web.format.lower(), + ) else: - filename = "%s%s.%s"%(search_string,str(indx),image_from_web.format.lower()) + filename = "%s%s.%s" % ( + search_string, + str(indx), + image_from_web.format.lower(), + ) image_path = os.path.join(self.image_path, filename) print( - f"[INFO] {self.search_key} \t {indx} \t Image saved at: {image_path}") + f"[INFO] {self.search_key} \t {indx} \t Image saved at: {image_path}" + ) image_from_web.save(image_path) except OSError: - rgb_im = image_from_web.convert('RGB') + rgb_im = image_from_web.convert("RGB") rgb_im.save(image_path) image_resolution = image_from_web.size if image_resolution != None: - if image_resolution[0]self.max_resolution[0] or image_resolution[1]>self.max_resolution[1]: + if ( + image_resolution[0] < self.min_resolution[0] + or image_resolution[1] < self.min_resolution[1] + or image_resolution[0] > self.max_resolution[0] + or image_resolution[1] > self.max_resolution[1] + ): image_from_web.close() os.remove(image_path) image_from_web.close() except Exception as e: - print("[ERROR] Download failed: ",e) + print("[ERROR] Download failed: ", e) pass print("--------------------------------------------------") - print("[INFO] Downloads completed. Please note that some photos were not downloaded as they were not in the correct format (e.g. jpg, jpeg, png)") + print( + "[INFO] Downloads completed. Please note that some photos were not downloaded as they were not in the correct format (e.g. jpg, jpeg, png)" + ) diff --git a/bot/helpers/main.py b/bot/helpers/main.py index 9ef3385..eea1eac 100644 --- a/bot/helpers/main.py +++ b/bot/helpers/main.py @@ -6,39 +6,45 @@ def worker_thread(search_key): image_scraper = GoogleImageScraper( - webdriver_path, - image_path, - search_key, - number_of_images, - headless, - min_resolution, - max_resolution, - max_missed) + webdriver_path, + image_path, + search_key, + number_of_images, + headless, + min_resolution, + max_resolution, + max_missed, + ) image_urls = image_scraper.find_image_urls() image_scraper.save_images(image_urls, keep_filenames) - #Release resources + # Release resources del image_scraper + if __name__ == "__main__": - #Define file path - webdriver_path = os.path.normpath(os.path.join(os.getcwd(), 'webdriver', webdriver_executable())) - image_path = os.path.normpath(os.path.join(os.getcwd(), 'photos')) - - #Add new search key into array ["cat","t-shirt","apple","orange","pear","fish"] - search_keys = list(set(["cat","t-shirt"])) - - #Parameters - number_of_images = 5 # Desired number of images - headless = True # True = No Chrome GUI - min_resolution = (0, 0) # Minimum desired image resolution - max_resolution = (9999, 9999) # Maximum desired image resolution - max_missed = 10 # Max number of failed images before exit - number_of_workers = 1 # Number of "workers" used - keep_filenames = False # Keep original URL image filenames - - #Run each search_key in a separate thread - #Automatically waits for all threads to finish - #Removes duplicate strings from search_keys - with concurrent.futures.ThreadPoolExecutor(max_workers=number_of_workers) as executor: + # Define file path + webdriver_path = os.path.normpath( + os.path.join(os.getcwd(), "webdriver", webdriver_executable()) + ) + image_path = os.path.normpath(os.path.join(os.getcwd(), "photos")) + + # Add new search key into array ["cat","t-shirt","apple","orange","pear","fish"] + search_keys = list(set(["cat", "t-shirt"])) + + # Parameters + number_of_images = 5 # Desired number of images + headless = True # True = No Chrome GUI + min_resolution = (0, 0) # Minimum desired image resolution + max_resolution = (9999, 9999) # Maximum desired image resolution + max_missed = 10 # Max number of failed images before exit + number_of_workers = 1 # Number of "workers" used + keep_filenames = False # Keep original URL image filenames + + # Run each search_key in a separate thread + # Automatically waits for all threads to finish + # Removes duplicate strings from search_keys + with concurrent.futures.ThreadPoolExecutor( + max_workers=number_of_workers + ) as executor: executor.map(worker_thread, search_keys)