From b7ca2369c77e8d3a44789564371869a0c5e79bd4 Mon Sep 17 00:00:00 2001 From: queukat <75810528+queukat@users.noreply.github.com> Date: Fri, 4 Oct 2024 19:11:39 +0200 Subject: [PATCH] Update aihawk_job_manager.py --- src/aihawk_job_manager.py | 325 +++++++++++++++++++++++++------------- 1 file changed, 214 insertions(+), 111 deletions(-) diff --git a/src/aihawk_job_manager.py b/src/aihawk_job_manager.py index ef0d87ae..217f92bc 100644 --- a/src/aihawk_job_manager.py +++ b/src/aihawk_job_manager.py @@ -1,9 +1,11 @@ import json import os import random +import threading import time from itertools import product from pathlib import Path +import re from inputimeout import inputimeout, TimeoutOccurred from selenium.common.exceptions import NoSuchElementException @@ -40,8 +42,9 @@ class AIHawkJobManager: def __init__(self, driver): logger.debug("Initializing AIHawkJobManager") self.driver = driver - self.set_old_answers = set() + self.set_old_answers = [] self.easy_applier_component = None + self.job_application_profile = None logger.debug("AIHawkJobManager initialized successfully") def set_parameters(self, parameters): @@ -62,8 +65,13 @@ def set_parameters(self, parameters): self.resume_path = Path(resume_path) if resume_path and Path(resume_path).exists() else None self.output_file_directory = Path(parameters['outputFileDirectory']) self.env_config = EnvironmentKeys() + self.parameters = parameters logger.debug("Parameters set successfully") + def set_job_application_profile(self, job_application_profile): + logger.debug("Setting job application profile in LinkedInJobManager") + self.job_application_profile = job_application_profile + def set_gpt_answerer(self, gpt_answerer): logger.debug("Setting GPT answerer") self.gpt_answerer = gpt_answerer @@ -72,10 +80,56 @@ def set_resume_generator_manager(self, resume_generator_manager): logger.debug("Setting resume generator manager") self.resume_generator_manager = resume_generator_manager + def get_input_with_timeout(self, prompt, timeout_duration): + user_input = [None] + + # Check if code is running in PyCharm + is_pycharm = 'PYCHARM_HOSTED' in os.environ + + if is_pycharm: + # Input with timeout is not supported in PyCharm console + logger.warning("Input with timeout is not supported in PyCharm console. Proceeding without user input.") + return '' + else: + # Use threading to implement timeout + def input_thread(): + user_input[0] = input(prompt).strip().lower() + + thread = threading.Thread(target=input_thread) + thread.daemon = True + thread.start() + thread.join(timeout_duration) + if thread.is_alive(): + logger.debug("Input timed out") + return '' + else: + return user_input[0] + + def wait_or_skip(self, time_left): + """Method for waiting or skipping the sleep time based on user input""" + if time_left > 0: + user_input = self.get_input_with_timeout( + prompt=f"Sleeping for {time_left} seconds. Press 'y' to skip waiting. Timeout 60 seconds: ", + timeout_duration=60) + if user_input == 'y': + logger.debug("User chose to skip waiting.") + utils.printyellow("User skipped waiting.") + else: + logger.debug(f"Sleeping for {time_left} seconds as user chose not to skip.") + utils.printyellow(f"Sleeping for {time_left} seconds.") + time.sleep(time_left) + + def start_applying(self): logger.debug("Starting job application process") - self.easy_applier_component = AIHawkEasyApplier(self.driver, self.resume_path, self.set_old_answers, - self.gpt_answerer, self.resume_generator_manager) + self.easy_applier_component = AIHawkEasyApplier( + self.driver, + self.resume_path, + self.set_old_answers, + self.gpt_answerer, + self.resume_generator_manager, + job_application_profile=self.job_application_profile # Pass the job_application_profile here + ) searches = list(product(self.positions, self.locations)) random.shuffle(searches) page_sleep = 0 @@ -99,8 +153,21 @@ def start_applying(self): try: jobs = self.get_jobs_from_page() if not jobs: - logger.debug("No more jobs found on this page. Exiting loop.") - break + # Attempt to find and click the search button + try: + search_button = self.driver.find_element(By.CLASS_NAME, "jobs-search-box__submit-button") + search_button.click() + logger.debug("Clicked the search button to reload jobs.") + time.sleep(random.uniform(1.5, 3.5)) + jobs = self.get_jobs_from_page() + except NoSuchElementException: + logger.warning("Search button not found.") + except Exception as e: + logger.error(f"Error while trying to click the search button: {e}") + + if not jobs: + utils.printyellow("No more jobs found on this page. Exiting loop.") + break except Exception as e: logger.error(f"Failed to retrieve jobs: {e}") break @@ -115,78 +182,48 @@ def start_applying(self): time_left = minimum_page_time - time.time() - # Ask user if they want to skip waiting, with timeout - if time_left > 0: - try: - user_input = inputimeout( - prompt=f"Sleeping for {time_left} seconds. Press 'y' to skip waiting. Timeout 60 seconds : ", - timeout=60).strip().lower() - except TimeoutOccurred: - user_input = '' # No input after timeout - if user_input == 'y': - logger.debug("User chose to skip waiting.") - else: - logger.debug(f"Sleeping for {time_left} seconds as user chose not to skip.") - time.sleep(time_left) + # Use the wait_or_skip function for sleeping + self.wait_or_skip(time_left) minimum_page_time = time.time() + minimum_time if page_sleep % 5 == 0: sleep_time = random.randint(5, 34) - try: - user_input = inputimeout( - prompt=f"Sleeping for {sleep_time / 60} minutes. Press 'y' to skip waiting. Timeout 60 seconds : ", - timeout=60).strip().lower() - except TimeoutOccurred: - user_input = '' # No input after timeout - if user_input == 'y': - logger.debug("User chose to skip waiting.") - else: - logger.debug(f"Sleeping for {sleep_time} seconds.") - time.sleep(sleep_time) + # Use the wait_or_skip function for extended sleep + self.wait_or_skip(sleep_time) page_sleep += 1 except Exception as e: - logger.error(f"Unexpected error during job search: {e}") + logger.error("Unexpected error during job search: %s", e) + utils.printred(f"Unexpected error: {e}") continue time_left = minimum_page_time - time.time() - if time_left > 0: - try: - user_input = inputimeout( - prompt=f"Sleeping for {time_left} seconds. Press 'y' to skip waiting. Timeout 60 seconds : ", - timeout=60).strip().lower() - except TimeoutOccurred: - user_input = '' # No input after timeout - if user_input == 'y': - logger.debug("User chose to skip waiting.") - else: - logger.debug(f"Sleeping for {time_left} seconds as user chose not to skip.") - time.sleep(time_left) + # Use the wait_or_skip function again before moving to the next search + self.wait_or_skip(time_left) minimum_page_time = time.time() + minimum_time if page_sleep % 5 == 0: sleep_time = random.randint(50, 90) - try: - user_input = inputimeout( - prompt=f"Sleeping for {sleep_time / 60} minutes. Press 'y' to skip waiting: ", - timeout=60).strip().lower() - except TimeoutOccurred: - user_input = '' # No input after timeout - if user_input == 'y': - logger.debug("User chose to skip waiting.") - else: - logger.debug(f"Sleeping for {sleep_time} seconds.") - time.sleep(sleep_time) + # Use the wait_or_skip function for a longer sleep period + self.wait_or_skip(sleep_time) page_sleep += 1 def get_jobs_from_page(self): try: + try: + no_jobs_element = self.driver.find_element(By.CLASS_NAME, 'jobs-search-no-results-banner') + except NoSuchElementException: + try: - no_jobs_element = self.driver.find_element(By.CLASS_NAME, 'jobs-search-two-pane__no-results-banner--expand') - if 'No matching jobs found' in no_jobs_element.text or 'unfortunately, things aren' in self.driver.page_source.lower(): + no_jobs_element = self.driver.find_element(By.CLASS_NAME, 'jobs-search-two-pane__no-results-banner--expand') + except NoSuchElementException: + no_jobs_element = None + + if no_jobs_element and ('No matching jobs found' in no_jobs_element.text or 'unfortunately, things aren' in self.driver.page_source.lower()): + utils.printyellow("No matching jobs found on this page.") logger.debug("No matching jobs found on this page, skipping.") return [] @@ -196,7 +233,7 @@ def get_jobs_from_page(self): try: job_results = self.driver.find_element(By.CLASS_NAME, "jobs-search-results-list") utils.scroll_slow(self.driver, job_results) - utils.scroll_slow(self.driver, job_results, step=300, reverse=True) + # utils.scroll_slow(self.driver, job_results, step=300, reverse=True) job_list_elements = self.driver.find_elements(By.CLASS_NAME, 'scaffold-layout__list-container')[ 0].find_elements(By.CLASS_NAME, 'jobs-search-results__list-item') @@ -223,10 +260,15 @@ def apply_jobs(self): except NoSuchElementException: pass + job_results = self.driver.find_element(By.CLASS_NAME, "jobs-search-results-list") + # utils.scroll_slow(self.driver, job_results) + # utils.scroll_slow(self.driver, job_results, step=300, reverse=True) + job_list_elements = self.driver.find_elements(By.CLASS_NAME, 'scaffold-layout__list-container')[ 0].find_elements(By.CLASS_NAME, 'jobs-search-results__list-item') if not job_list_elements: + utils.printyellow("No job class elements found on page, moving to next page.") logger.debug("No job class elements found on page, skipping") return @@ -234,45 +276,63 @@ def apply_jobs(self): for job in job_list: - logger.debug(f"Starting applicant for job: {job.title} at {job.company}") - #TODO fix apply threshold - """ + try: + logger.debug(f"Starting applicant count search for job: {job.title} at {job.company}") + + # Find all job insight elements + job_insight_elements = self.driver.find_elements(By.CLASS_NAME, + "job-details-jobs-unified-top-card__job-insight") + logger.debug(f"Found {len(job_insight_elements)} job insight elements") + # Initialize applicants_count as None applicants_count = None # Iterate over each job insight element to find the one containing the word "applicant" for element in job_insight_elements: - logger.debug(f"Checking element text: {element.text}") - if "applicant" in element.text.lower(): - # Found an element containing "applicant" - applicants_text = element.text.strip() - logger.debug(f"Applicants text found: {applicants_text}") + positive_text_element = element.find_element(By.XPATH, ".//span[contains(@class, 'tvm__text--positive')]") + applicants_text = positive_text_element.text.strip().lower() + logger.debug(f"Checking element text: {applicants_text}") + + # Look for keywords indicating the presence of applicants count + if "applicant" in applicants_text: + logger.info(f"Applicants text found: {applicants_text}") - # Extract numeric digits from the text (e.g., "70 applicants" -> "70") + # Try to find numeric value in the text, such as "27 applicants" or "over 100 applicants" applicants_count = ''.join(filter(str.isdigit, applicants_text)) - logger.debug(f"Extracted applicants count: {applicants_count}") if applicants_count: - if "over" in applicants_text.lower(): - applicants_count = int(applicants_count) + 1 # Handle "over X applicants" - logger.debug(f"Applicants count adjusted for 'over': {applicants_count}") - else: - applicants_count = int(applicants_count) # Convert the extracted number to an integer - break + applicants_count = int(applicants_count) # Convert the extracted number to an integer + logger.info(f"Extracted numeric applicants count: {applicants_count}") + + # Handle case with "over X applicants" + if "over" in applicants_text: + applicants_count += 1 + logger.info(f"Adjusted applicants count for 'over': {applicants_count}") + + logger.info(f"Final applicants count: {applicants_count}") + else: + logger.warning(f"Applicants count could not be extracted from text: {applicants_text}") + + break # Stop after finding the first valid applicants count element + else: + logger.debug(f"Skipping element as it does not contain 'applicant': {applicants_text}") # Check if applicants_count is valid (not None) before performing comparisons if applicants_count is not None: # Perform the threshold check for applicants count if applicants_count < self.min_applicants or applicants_count > self.max_applicants: + utils.printyellow( + f"Skipping {job.title} at {job.company} due to applicants count: {applicants_count}") logger.debug(f"Skipping {job.title} at {job.company}, applicants count: {applicants_count}") - self.write_to_file(job, "skipped_due_to_applicants") - continue # Skip this job if applicants count is outside the threshold + self.write_to_file(job, "skipped_due_to_applicants", applicants_count=applicants_count) + continue else: logger.debug(f"Applicants count {applicants_count} is within the threshold") else: # If no applicants count was found, log a warning but continue the process logger.warning( - f"Applicants count not found for {job.title} at {job.company}, continuing with application.") + f"Applicants count not found for {job.title} at {job.company}, but continuing with application.") + except NoSuchElementException: # Log a warning if the job insight elements are not found, but do not stop the job application process logger.warning( @@ -286,31 +346,30 @@ def apply_jobs(self): f"Unexpected error during applicants count processing for {job.title} at {job.company}: {e}") # Continue with the job application process regardless of the applicants count check - """ - + logger.debug(f"Continuing with job application for {job.title} at {job.company}") if self.is_blacklisted(job.title, job.company, job.link): - logger.debug(f"Job blacklisted: {job.title} at {job.company}") - self.write_to_file(job, "skipped") + logger.debug("Job blacklisted: %s at %s", job.title, job.company) + self.write_to_file(job, "skipped", applicants_count=applicants_count) continue if self.is_already_applied_to_job(job.title, job.company, job.link): - self.write_to_file(job, "skipped") + self.write_to_file(job, "skipped", applicants_count=applicants_count) continue if self.is_already_applied_to_company(job.company): - self.write_to_file(job, "skipped") + self.write_to_file(job, "skipped", applicants_count=applicants_count) continue try: if job.apply_method not in {"Continue", "Applied", "Apply"}: self.easy_applier_component.job_apply(job) - self.write_to_file(job, "success") - logger.debug(f"Applied to job: {job.title} at {job.company}") + self.write_to_file(job, "success", applicants_count=applicants_count) + logger.debug("Applied to job: %s at %s", job.title, job.company) except Exception as e: - logger.error(f"Failed to apply for {job.title} at {job.company}: {e}") - self.write_to_file(job, "failed") + logger.error("Failed to apply for %s at %s: %s", job.title, job.company, e) + self.write_to_file(job, "failed", applicants_count=applicants_count) continue - def write_to_file(self, job, file_name): - logger.debug(f"Writing job application result to file: {file_name}") + def write_to_file(self, job, file_name, applicants_count=None): + logger.debug("Writing job application result to file: %s", file_name) pdf_path = Path(job.pdf_path).resolve() pdf_path = pdf_path.as_uri() data = { @@ -321,30 +380,46 @@ def write_to_file(self, job, file_name): "job_location": job.location, "pdf_path": pdf_path } + + if applicants_count is not None: + data["applicants_count"] = applicants_count + file_path = self.output_file_directory / f"{file_name}.json" + temp_file_path = file_path.with_suffix('.tmp') + if not file_path.exists(): - with open(file_path, 'w', encoding='utf-8') as f: - json.dump([data], f, indent=4) - logger.debug(f"Job data written to new file: {file_name}") + try: + with open(temp_file_path, 'w', encoding='utf-8') as f: + json.dump([data], f, indent=4) + temp_file_path.rename(file_path) + logger.debug("Job data written to new file: %s", file_path) + except Exception as e: + logger.error(f"Failed to write new data to file {file_path}: {e}") else: - with open(file_path, 'r+', encoding='utf-8') as f: - try: - existing_data = json.load(f) - except json.JSONDecodeError: - logger.error(f"JSON decode error in file: {file_path}") - existing_data = [] - existing_data.append(data) - f.seek(0) - json.dump(existing_data, f, indent=4) - f.truncate() - logger.debug(f"Job data appended to existing file: {file_name}") + try: + with open(file_path, 'r+', encoding='utf-8') as f: + try: + existing_data = json.load(f) + except json.JSONDecodeError: + logger.error("JSON decode error in file: %s. Creating a backup.", file_path) + file_path.rename(file_path.with_suffix('.bak')) + existing_data = [] + + existing_data.append(data) + f.seek(0) + json.dump(existing_data, f, indent=4) + f.truncate() + logger.debug("Job data appended to existing file: %s", file_path) + except Exception as e: + logger.error(f"Failed to append data to file {file_path}: {e}") def get_base_search_url(self, parameters): logger.debug("Constructing base search URL") url_parts = [] if parameters['remote']: url_parts.append("f_CF=f_WRA") - experience_levels = [str(i + 1) for i, (level, v) in enumerate(parameters.get('experience_level', {}).items()) if + experience_levels = [str(i + 1) for i, (level, v) in enumerate(parameters.get('experience_level', {}).items()) + if v] if experience_levels: url_parts.append(f"f_E={','.join(experience_levels)}") @@ -359,14 +434,23 @@ def get_base_search_url(self, parameters): "24 hours": "&f_TPR=r86400" } date_param = next((v for k, v in date_mapping.items() if parameters.get('date', {}).get(k)), "") - url_parts.append("f_LF=f_AL") # Easy Apply + + # Easy Apply filter + url_parts.append("f_LF=f_AL") + + # Add sortBy parameter for sorting by date + sort_by = parameters.get('sort_by', 'date') # Use 'relevant' as default + if sort_by == 'date': + url_parts.append("sortBy=DD") + base_url = "&".join(url_parts) full_url = f"?{base_url}{date_param}" - logger.debug(f"Base search URL constructed: {full_url}") + + logger.debug("Base search URL constructed: %s", full_url) return full_url def next_job_page(self, position, location, job_page): - logger.debug(f"Navigating to next job page: {position} in {location}, page {job_page}") + logger.debug("Navigating to next job page: %s in %s, page %d", position, location, job_page) self.driver.get( f"https://www.linkedin.com/jobs/search/{self.base_search_url}&keywords={position}{location}&start={job_page * 25}") @@ -376,10 +460,10 @@ def extract_job_information_from_tile(self, job_tile): try: print(job_tile.get_attribute('outerHTML')) job_title = job_tile.find_element(By.CLASS_NAME, 'job-card-list__title').find_element(By.TAG_NAME, 'strong').text - + link = job_tile.find_element(By.CLASS_NAME, 'job-card-list__title').get_attribute('href').split('?')[0] company = job_tile.find_element(By.CLASS_NAME, 'job-card-container__primary-description').text - logger.debug(f"Job information extracted: {job_title} at {company}") + logger.debug("Job information extracted: %s at %s", job_title, company) except NoSuchElementException: logger.warning("Some job information (title, link, or company) is missing.") try: @@ -396,14 +480,33 @@ def extract_job_information_from_tile(self, job_tile): def is_blacklisted(self, job_title, company, link): logger.debug(f"Checking if job is blacklisted: {job_title} at {company}") - job_title_words = job_title.lower().split(' ') - title_blacklisted = any(word in job_title_words for word in self.title_blacklist) - company_blacklisted = company.strip().lower() in (word.strip().lower() for word in self.company_blacklist) + + job_title_lower = job_title.lower() + company_lower = company.strip().lower() + + # Проверка на пустой список blacklist + if not self.title_blacklist: + return False + + # Создаем регулярное выражение с учетом границ слова + blacklist_pattern = r'\b(' + '|'.join(re.escape(phrase.lower()) for phrase in self.title_blacklist) + r')\b' + + # Проверяем, есть ли совпадения в заголовке вакансии + title_blacklisted = bool(re.search(blacklist_pattern, job_title_lower)) + logger.debug(f"Title blacklist status: {title_blacklisted}") + + # Проверка компании + company_blacklisted = company_lower in (word.strip().lower() for word in self.company_blacklist) + logger.debug(f"Company blacklist status: {company_blacklisted}") + + # Проверка ссылки link_seen = link in self.seen_jobs + logger.debug(f"Link seen status: {link_seen}") + is_blacklisted = title_blacklisted or company_blacklisted or link_seen logger.debug(f"Job blacklisted status: {is_blacklisted}") - return title_blacklisted or company_blacklisted or link_seen + return is_blacklisted def is_already_applied_to_job(self, job_title, company, link): link_seen = link in self.seen_jobs