From e33192a42b95954016eb32b0556dccc12121582d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martin=20B=C3=A5ngens?= Date: Sat, 9 Nov 2024 04:47:50 +0100 Subject: [PATCH] refactor all web scraping --- CMakeLists.txt | 9 + database/.gitignore | 3 +- .../{data_factorie.py => data_factory.py} | 0 database/get_trle.sh | 29 + database/get_trle_by_id_range.py | 72 ++ database/https.py | 5 +- database/ideas.txt | 3 + database/index_main.py | 20 +- database/index_query.py | 14 +- database/index_scrape.py | 355 ------- database/sanitize_downloads.py | 142 +++ database/scrape.py | 959 ++++++++++++++++++ database/tombll_add_data.py | 521 ++++++---- database/tombll_get_data.py | 314 +----- 14 files changed, 1552 insertions(+), 894 deletions(-) rename database/{data_factorie.py => data_factory.py} (100%) create mode 100755 database/get_trle.sh create mode 100644 database/get_trle_by_id_range.py delete mode 100644 database/index_scrape.py create mode 100644 database/sanitize_downloads.py create mode 100644 database/scrape.py diff --git a/CMakeLists.txt b/CMakeLists.txt index 8db510f..234c6e8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -16,8 +16,17 @@ if(POLICY CMP0167) cmake_policy(SET CMP0167 NEW) endif() find_package(Boost REQUIRED COMPONENTS system filesystem) + find_package(OpenSSL REQUIRED) +if(NOT EXISTS "${CMAKE_SOURCE_DIR}/libs/miniz/CMakeLists.txt") + message(STATUS "Submodule 'libs/miniz' not found. Initializing submodules...") + execute_process( + COMMAND git submodule update --init --recursive + WORKING_DIRECTORY ${CMAKE_SOURCE_DIR} + ) +endif() + add_subdirectory(libs/miniz) # suppress ZERO_CHECK dont think its needed diff --git a/database/.gitignore b/database/.gitignore index 6ae1d91..9a7ea09 100644 --- a/database/.gitignore +++ b/database/.gitignore @@ -1,4 +1,5 @@ data.json file_info.json +trle __pycache__ - +trle.tar.gz diff --git a/database/data_factorie.py b/database/data_factory.py similarity index 100% rename from database/data_factorie.py rename to database/data_factory.py diff --git a/database/get_trle.sh b/database/get_trle.sh new file mode 100755 index 0000000..0e0c06e --- /dev/null +++ b/database/get_trle.sh @@ -0,0 +1,29 @@ +#!/bin/bash + +# Check if megadl or mega-get is installed +if command -v megadl &> /dev/null; then + downloader="megadl" +elif command -v mega-get &> /dev/null; then + downloader="mega-get" +else + echo "Neither megatools (megadl) nor megacmd (mega-get) is installed." + echo "Please install one of them to proceed." + exit 1 +fi + +# Define the download link +url="https://mega.nz/file/xXkV3JqJ#1Ejtd9enidYYpV3FRLO5KSzcUg7-_Jg-vNi66RKo8aI" + +# Download the file using the available tool +echo "Using $downloader to download the file..." +$downloader "$url" + +# Verify the checksum (assuming you want to compare it to the expected checksum) +echo "Verifying checksum..." +echo "29e7e89bc11ebe77eafbd1c78ca3f1a7 trle.tar.gz" | md5sum -c - + +# Extract the tar.gz file +echo "Extracting the archive..." +tar xzf trle.tar.gz + +echo "Download and extraction complete." diff --git a/database/get_trle_by_id_range.py b/database/get_trle_by_id_range.py new file mode 100644 index 0000000..ff40ff1 --- /dev/null +++ b/database/get_trle_by_id_range.py @@ -0,0 +1,72 @@ +""" +This script retrieves all TRLE levels by a specified ID range and saves the data as JSON files. +Usage: python3 get_trle_by_id_range.py FROM_ID TO_ID +""" + +import sys +import time +import json + +import scrape +import data_factory + +def safe_string_to_int(id_str): + """Converts a string to an integer with error checking. + + Args: + s (str): The string to convert. + + Returns: + int: The converted integer if valid. + + Exits: + Exits with status code 1 if the input string is not a valid integer. + """ + try: + return int(id_str) + except ValueError: + print("Error: The provided string is not a valid integer.") + sys.exit(1) + + +def trle_by_id(trle_id): + """Fetches TRLE level data by ID and saves it as a JSON file. + + Args: + trle_id (int): The ID of the TRLE level to fetch. + """ + data = data_factory.make_trle_tombll_data() + soup = scrape.get_soup(f"https://www.trle.net/sc/levelfeatures.php?lid={trle_id}") + scrape.get_trle_level(soup, data) + if data['title']: + with open(f'trle/{trle_id}.json', mode='w', encoding='utf-8') as json_file: + json.dump(data, json_file) + + +if __name__ == '__main__': + + if len(sys.argv) != 3: + print("Usage: python3 get_trle_by_id_range.py FROM_ID TO_ID") + sys.exit(1) + + print("Please use get_trle.sh; I provide this as part of the open source license.") + if input("Continue? (y/n): ").lower() != 'y': + sys.exit(1) + + # Convert arguments to integers with validation + from_id = safe_string_to_int(sys.argv[1]) + to_id = safe_string_to_int(sys.argv[2]) + + if from_id == to_id: + trle_by_id(from_id) + sys.exit(0) + + # Ensure from_id is less than to_id by swapping if necessary + if from_id > to_id: + from_id, to_id = to_id, from_id # Tuple for cleaner swapping... python... + + # Fetch and save data for each level ID in the specified range + for level_id in range(from_id, to_id + 1): # Include to_id in range + print(f"Getting TRLE level by ID: {level_id}") + trle_by_id(level_id) + time.sleep(5) # To avoid rate-limiting by adding delay between requests diff --git a/database/https.py b/database/https.py index aded656..a910cfd 100644 --- a/database/https.py +++ b/database/https.py @@ -13,7 +13,8 @@ from tqdm import tqdm import get_leaf_cert -import data_factorie +import data_factory + class AcquireLock: """ Create a TCP socket to ensure a single instance. @@ -318,7 +319,7 @@ def download_file(self, url): """ curl = pycurl.Curl() temp_cert_path = None - zip_file = data_factorie.make_zip_file() # Initialize the zip_file dictionary + zip_file = data_factory.make_zip_file() # Initialize the zip_file dictionary try: # Get file size for the progress bar diff --git a/database/ideas.txt b/database/ideas.txt index 0d92a36..3b40c60 100644 --- a/database/ideas.txt +++ b/database/ideas.txt @@ -252,3 +252,6 @@ will focus on creating an index database of around 500 MB, rather than replicati the entire TRLE database, which could exceed 20 GB. Additional data, such as levels of specific interest to users, can be cached or downloaded manually, within a reasonable limit of around 2 GB. + +This was a special walkthrough the script cant handle +https://www.trle.net/sc/Levelwalk.php?lid=864 diff --git a/database/index_main.py b/database/index_main.py index a5d4dfb..abb06dc 100644 --- a/database/index_main.py +++ b/database/index_main.py @@ -3,13 +3,13 @@ import os import time import index_view -import index_scrape +import scrape import index_query import make_index_database def test_trle(): """Browse TRLE data""" - index_view.print_trle_page(index_scrape.get_trle_page(0, True)) + index_view.print_trle_page(scrape.get_trle_page(0, True)) offset = 0 while True: user_input = input("Press Enter for the next page (or type 'q' to quit: ") @@ -17,12 +17,12 @@ def test_trle(): print("Exiting...") break offset += 20 - index_view.print_trle_page(index_scrape.get_trle_page(offset, True)) + index_view.print_trle_page(scrape.get_trle_page(offset, True)) def test_trcustoms(): """Browse Trcustom data""" - page = index_scrape.get_trcustoms_page(1, True) + page = scrape.get_trcustoms_page(1, True) index_view.print_trcustoms_page(page) offset = 1 while True: @@ -31,7 +31,7 @@ def test_trcustoms(): print("Exiting...") break offset += 1 - page = index_scrape.get_trcustoms_page(offset, True) + page = scrape.get_trcustoms_page(offset, True) index_view.print_trcustoms_page(page) @@ -58,7 +58,7 @@ def test_trcustoms_pic_local(): while True: page = index_query.get_trcustoms_page_local(offset, True) levels = page['levels'] - covers = index_scrape.get_cover_list(levels) + covers = scrape.get_trcustoms_cover_list(levels, True) index_view.display_menu(levels, covers) for file in covers: try: @@ -89,7 +89,7 @@ def test_insert_trle_book(): method that is much slower but will work and accurate""" # Get the first page to determine the total number of records - page = index_scrape.get_trle_page(0) + page = scrape.get_trle_page(0) total_records = page['records_total'] # Insert the first page of data @@ -101,7 +101,7 @@ def test_insert_trle_book(): offset = 20 while offset < total_records: # Fetch the next page of data - page = index_scrape.get_trle_page(offset) + page = scrape.get_trle_page(offset) index_query.insert_trle_page(page) # Increment offset by 20 for the next batch @@ -118,7 +118,7 @@ def test_insert_trle_book(): def test_insert_trcustoms_book(): """Get index data""" # Get the first page to determine the total number of records - page = index_scrape.get_trcustoms_page(1) + page = scrape.get_trcustoms_page(1) total_pages = page['total_pages'] # Insert the first page of data @@ -130,7 +130,7 @@ def test_insert_trcustoms_book(): page_number = 2 while page_number <= total_pages: # Fetch the next page of data - page = index_scrape.get_trcustoms_page(page_number) + page = scrape.get_trcustoms_page(page_number) index_query.insert_trcustoms_page(page) print(f"Page number:{page_number} of {total_pages}") diff --git a/database/index_query.py b/database/index_query.py index 1782330..11520dc 100644 --- a/database/index_query.py +++ b/database/index_query.py @@ -2,7 +2,7 @@ import sys import sqlite3 -import data_factorie +import data_factory os.chdir(os.path.dirname(os.path.abspath(__file__))) @@ -373,7 +373,7 @@ def get_trle_level_local_by_id(trle_id): records = [] for record in result: - level = data_factorie.make_trle_level_data() + level = data_factory.make_trle_level_data() level['trle_id'] = record[0] level['author'] = record[1] level['title'] = record[2] @@ -423,7 +423,7 @@ def get_trcustoms_level_local_by_id(trcustoms_id): """, (trcustoms_id, ), cursor ) - level = data_factorie.make_trcustoms_level_data() + level = data_factory.make_trcustoms_level_data() level['trcustoms_id'] = result[0][0] level['authors'] = result[0][1].split(',') if result[0][1] else [] level['title'] = result[0][2] @@ -454,7 +454,7 @@ def get_trle_page_local(offset, sortCreatedFirst=False): if offset > rec: sys.exit(1) - page = data_factorie.make_trle_page_data() + page = data_factory.make_trle_page_data() page['offset'] = offset page['records_total'] = rec @@ -482,7 +482,7 @@ def get_trle_page_local(offset, sortCreatedFirst=False): ) # Process result to format the output as needed for row in result: - level = data_factorie.make_trle_level_data() + level = data_factory.make_trle_level_data() level['trle_id'] = row[0] level['author'] = row[1] level['title'] = row[2] @@ -511,7 +511,7 @@ def get_trcustoms_page_local(page_number, sortCreatedFirst=False): cursor )[0][0] - page = data_factorie.make_trcustoms_page_data() + page = data_factory.make_trcustoms_page_data() total = (rec + 19) // 20 if page_number > total: sys.exit(1) @@ -550,7 +550,7 @@ def get_trcustoms_page_local(page_number, sortCreatedFirst=False): ) # Process result to format the output as needed for row in result: - level = data_factorie.make_trcustoms_level_data() + level = data_factory.make_trcustoms_level_data() level['trcustoms_id'] = row[0] level['authors'] = row[1].split(',') if row[1] else [] level['title'] = row[2] diff --git a/database/index_scrape.py b/database/index_scrape.py deleted file mode 100644 index 7f5b91a..0000000 --- a/database/index_scrape.py +++ /dev/null @@ -1,355 +0,0 @@ -"""Scraping of all data; level info, cover images and https keys""" -import sys -import re -import os -import hashlib -import uuid -import time -import logging -import tempfile -from io import BytesIO -from urllib.parse import urlparse, urlencode, parse_qs -from datetime import datetime -from bs4 import BeautifulSoup, Tag -from PIL import Image - -import data_factorie -import https - -# Set up logging -logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s:%(message)s') -logging.getLogger("requests").setLevel(logging.DEBUG) - - -def trle_page_table(table): - """filter out data from the TRLE level table result""" - levels = [] - - # Mapping index to level data fields - field_mapping = { - 0: ('author', lambda cell: cell.get_text(strip=True)), - 5: ('trle_id', 'title', lambda cell: ( - cell.find('a', href=True)['href'].split('lid=')[-1] \ - if cell.find('a', href=True) else None, cell.get_text(strip=True) - ) - ), - 6: ('difficulty', lambda cell: cell.get_text(strip=True)), - 7: ('duration', lambda cell: cell.get_text(strip=True)), - 8: ('class', lambda cell: cell.get_text(strip=True)), - 10: ('type', lambda cell: cell.get_text(strip=True)), - 13: ('release', lambda cell: convert_to_iso(cell.get_text(strip=True))) - } - - for row in table[1:]: - cells = row.find_all('td') - level = data_factorie.make_trle_level_data() - - for idx, cell in enumerate(cells): - if idx in field_mapping: - if idx == 5: - level['trle_id'], level['title'] = field_mapping[idx][2](cell) - else: - field_name, extractor = field_mapping[idx] - level[field_name] = extractor(cell) - - levels.append(level) - - return levels - - -def get_trle_page(offset, sort_created_first=False): - """Scrape one TRLE page where the offset starts from the earliest date.""" - params = { - "atype": "", - "author": "", - "level": "", - "class": "", - "type": "", - "difficulty": "", - "durationclass": "", - "rating": "", - "sortidx": 8, - "sorttype": 2 if sort_created_first else 1, - "idx": "" if offset == 0 else str(offset) - } - query_string = urlencode(params) - url = f"https://www.trle.net/pFind.php?{query_string}" - response = https.get(url, 'text/html') - if not response: - sys.exit(1) - soup = BeautifulSoup(response, 'html.parser') - page = data_factorie.make_trle_page_data() - page['offset'] = offset - - # Find total records - span = soup.find('span', class_='navText') - if span: - page['records_total'] = int(span.text.strip().split()[0]) - else: - print("Total records not found") - sys.exit(1) - - # Find data table - table = soup.find('table', class_='FindTable') - if not isinstance(table, Tag): - print("Data table not found") - sys.exit(1) - - page['levels'] = trle_page_table(table.find_all('tr')) - - return page - - -def get_trcustoms_page(page_number, sort_created_first=False): - """Scrape one trcustoms page where the offset starts from the earliest date.""" - host = "https://trcustoms.org/api/levels/" - if sort_created_first: - sort="-created" - else: - sort="created" - params = { - "sort": sort, - "is_approved": 1, - "page": "" if page_number == 0 else str(page_number) - } - query_string = urlencode(params) - url = f"{host}?{query_string}" - data = https.get(url, 'application/json') - if not isinstance(data, dict): - logging.error("Data type error, expected dict got %s", type(data)) - sys.exit(1) - - page = data_factorie.make_trcustoms_page_data() - page['current_page'] = data.get('current_page') - page['total_pages'] = data.get('last_page') - page['records_total'] = data.get('total_count') - - results = data.get('results') - if not isinstance(results, list): - logging.error("Data type error, expected list got %s", type(results)) - sys.exit(1) - - for item in results: - repacked_data = data_factorie.make_trcustoms_level_data() - for author in item['authors']: - repacked_data['authors'].append(author['username']) - for tag in item['tags']: - repacked_data['tags'].append(tag['name']) - for genre in item['genres']: - repacked_data['genres'].append(genre['name']) - repacked_data['release'] = convert_to_iso(item['created']) - repacked_data['cover'] = item['cover']['url'] - repacked_data['cover_md5sum'] = item['cover']['md5sum'] - repacked_data['trcustoms_id'] = item['id'] - repacked_data['title'] = item['name'] - repacked_data['type'] = item['engine']['name'] - repacked_data['difficulty'] = item['difficulty'].get('name', None) \ - if item['difficulty'] else None - repacked_data['duration'] = item['duration'].get('name', None) \ - if item['duration'] else None - page['levels'].append(repacked_data) - return page - - -def get_trle_cover(trle_id): - """Fetch TRLE level picture by id""" - if not trle_id.isdigit(): - print("Invalid ID number.") - sys.exit(1) - url = f"https://www.trle.net/screens/{trle_id}.jpg" - - response = https.get(url, 'image/jpeg') - return cover_resize_to_webp(response) - - -def is_valid_uuid(value): - """Validate uuid format""" - try: - uuid_obj = uuid.UUID(value, version=4) - return str(uuid_obj) == value - except ValueError: - return False - - -def calculate_md5(data): - """Calculate the MD5 checksum of the given data.""" - md5_hash = hashlib.md5(usedforsecurity=False) - md5_hash.update(data) - return md5_hash.hexdigest() - - -def get_cover_list(levels): - """Get a list picture data ready to use""" - base_url = "https://data.trcustoms.org/media/level_images/" - level_list = [] - - for level in levels: - file = level['cover'].replace(base_url, "") - - filename, ext = os.path.splitext(file) - ext = ext[1:] # remove dot - - if ext.lower() in ('jpg', 'jpeg', 'png'): - level_list.append(get_trcustoms_cover(filename, level['cover_md5sum'], ext)) - else: - print(f"Skipping level {level['title']}, invalid file format: {ext}") - sys.exit(1) - - return level_list - -def get_trcustoms_cover(image_uuid, md5sum, image_format): - """Getting pictures from internet and displaying on the terminal""" - if not is_valid_uuid(image_uuid): - print("Invalid image UUID.") - sys.exit(1) - if image_format.lower() not in ["jpg", "jpeg", "png"]: - print("Invalid image format.") - sys.exit(1) - - url = f"https://data.trcustoms.org/media/level_images/{image_uuid}.{image_format}" - if image_format.lower() == "jpg": - image_format = "jpeg" - response = https.get(url, f"image/{image_format}") - - # Check if the MD5 sum matches - downloaded_md5sum = calculate_md5(response) - if downloaded_md5sum != md5sum: - print(f"MD5 mismatch: Expected {md5sum}, got {downloaded_md5sum}") - sys.exit(1) - - # Save the image to a temporary file - with tempfile.NamedTemporaryFile(delete=False, suffix=".webp") as temp_image_file: - temp_image_file.write(cover_resize_to_webp(response)) - return temp_image_file.name - - -def cover_resize_to_webp(input_img): - """webp is the default we use here with 320x240 max resolution""" - img = Image.open(BytesIO(input_img)) - - # Convert to terminal character size - img = img.resize((320, 240)) - webp_image = BytesIO() - - # Convert the image to .webp format - img.save(webp_image, format='WEBP') - - # Get the image data as bytes - return webp_image.getvalue() - - -def convert_to_iso(date_str): - """Convert date string from various formats to ISO-8601 (YYYY-MM-DD) format.""" - - # Try to parse '01-Jan-2024' format - try: - return datetime.strptime(date_str, '%d-%b-%Y').strftime('%Y-%m-%d') - except ValueError: - pass - - # Try to parse '2024-09-24T15:12:19.212984Z' ISO format with time and microseconds - try: - return datetime.strptime(date_str, '%Y-%m-%dT%H:%M:%S.%fZ').strftime('%Y-%m-%d') - except ValueError: - pass - - # Try to parse '1999-08-29T00:00:00Z' ISO format without microseconds - try: - return datetime.strptime(date_str, '%Y-%m-%dT%H:%M:%SZ').strftime('%Y-%m-%d') - except ValueError: - pass - - raise ValueError(f"Unsupported date format: {date_str}") - - -def get_key_list(html): - """scrape keys and key status here - we cant depend on local keys from package manger that might be incomplete""" - - soup = BeautifulSoup(html, 'html.parser') - # Find the table containing the keys - table = soup.find_all('table')[2] # Adjust index if necessary - - # Iterate over the rows (skipping the header row) - ids = [] - for row in table.find_all('tr')[1:]: - key_column = row.find_all('td')[0] # Get the first column - key_striped = key_column.text.strip() # Extract the key text - print(f"Key: {key_striped}") - ids.append(key_striped) - - return ids - - -def trcustoms_key_list(): - """Get list of utf-8 public key for Trcustoms""" - key_list = https.get("https://crt.sh/?q=trcustoms.org&exclude=expired", 'text/html') - validated = get_key_list(key_list) - - public_key_list = [] - for key in validated: - time.sleep(5) - public_key_list.append(get_key(key)) - - return public_key_list - - -def trle_key_list(): - """Get list of utf-8 public key for TRLE""" - resp = https.get("https://crt.sh/?q=www.trle.net&exclude=expired", 'text/html') - key_list = get_key_list(resp) - - public_key_list = [] - for key in key_list: - time.sleep(5) - public_key_list.append(get_key(key)) - - return public_key_list - - -def get_key(id_number): - """Get the certificate from crt""" - # Input validation - if not id_number.isdigit(): - print("Invalid ID number.") - sys.exit(1) - html = https.get(f"https://crt.sh/?id={id_number}", 'text/html') - - # Create a BeautifulSoup object - soup = BeautifulSoup(html, 'html.parser') - body_tag = soup.find("body") - if not isinstance(body_tag, Tag): - logging.error("Data type error, expected Tag got %s", type(body_tag)) - sys.exit(1) - - td_text_tag = body_tag.find("td", class_="text") - if not isinstance(td_text_tag, Tag): - logging.error("Data type error, expected Tag got %s", type(td_text_tag)) - sys.exit(1) - - a_tag = td_text_tag.find('a', text=re.compile(r'Serial')) - if not isinstance(a_tag, Tag): - logging.error("Data type error, expected Tag got %s", type(a_tag)) - sys.exit(1) - - href = a_tag['href'] - if not isinstance(href, str): - logging.error("Data type error, expected str got %s", type(href)) - sys.exit(1) - - # Parse the query string to get the 'serial' parameter - query_params = parse_qs(urlparse(href).query) - serial_number = query_params.get('serial', [None])[0] - if not serial_number: - logging.error("Serial number not found") - sys.exit(1) - - # Normalize serial by stripping leading zeros - serial_number = serial_number.lstrip('0') - - if not serial_number: - print("Serial Number tag not found.") - sys.exit(1) - - print("Serial Number:", serial_number) - diff --git a/database/sanitize_downloads.py b/database/sanitize_downloads.py new file mode 100644 index 0000000..efb3973 --- /dev/null +++ b/database/sanitize_downloads.py @@ -0,0 +1,142 @@ +""" +This script gose thure all TRLE levels by a specified ID range and sanitize the downloads. +Usage: python3 sanitize_downloads.py FROM_ID TO_ID +""" +import os +import sys +import json +import re + +def new_input(data, file_path): + """Take new input""" + zip_file = data['zip_files'][0] + print(zip_file) + if input("Do you want to remove the file? y/n: ") == 'y': + os.remove(file_path) + print(f"{file_path} has been removed.") + return + + zip_file['name'] = input("New name: ") + zip_file['size'] = float(input("New size: ")) + zip_file['md5'] = input("New md5: ") + zip_file['url'] = input("New url: ") + with open(file_path, mode='w', encoding='utf-8') as json_file: + json.dump(data, json_file) + + +def sanitize(data, file_path): + """ + Validates the 'zip_file' data from the given dictionary. + + This function checks: + 1. That the 'name' attribute exists and ends with ".zip". + 2. That the 'size' attribute is a float, greater than 2, and has exactly two decimal places. + 3. That the 'md5' attribute is a valid 32-character hexadecimal MD5 hash. + 4. That the 'url' attribute matches one of the allowed URL patterns. + + Args: + data (dict): The input dictionary containing 'zip_files' data. + + Exits: + Exits the program with status 1 if any validation fails. + """ + + # Extract the first zip file data + zip_file = data['zip_files'][0] + + # Validate attributes + if not zip_file.get('name'): + print("The 'name' attribute is missing.") + new_input(data, file_path) + return + + if not isinstance(zip_file.get('size'), float): + print("The 'size' attribute is missing.") + new_input(data, file_path) + return + + if not zip_file.get('md5'): + print("The 'md5' attribute is missing.") + new_input(data, file_path) + return + + if not zip_file.get('url'): + print("The 'url' attribute is missing.") + new_input(data, file_path) + return + + # Validate name end with ".zip" + if not zip_file['name'].endswith(".zip") or "$" in zip_file['name']: + print(f"The file {zip_file['name']} is not a .zip file.") + new_input(data, file_path) + return + + # Validate 'size' attribute - must be a float, >2, and have exactly two decimal places + if zip_file['size'] <= 2: + print("The 'size' attribute is smaller then 2 MiB") + new_input(data, file_path) + return + + # Validate 'md5' attribute - must be a valid MD5 hash (32 hexadecimal characters) + if not re.fullmatch(r"^[a-fA-F0-9]{32}$", zip_file.get('md5', '')): + print("The 'md5' attribute is not a valid 32-character hexadecimal MD5 hash.") + new_input(data, file_path) + return + + # Validate 'url' attribute - must match one of the expected patterns + pattern1 = r"^https://trcustoms\.org/api/level_files/\d+/download$" + pattern2 = r"^https://www\.trle\.net/levels/levels/\d{4}/\d{4}/[a-zA-Z0-9%-_\.$]+\.zip$" + + if not re.match(pattern1, zip_file.get('url', '')) \ + and not re.match(pattern2, zip_file.get('url', '')): + print("The 'url' attribute does not match any of the expected patterns.") + new_input(data, file_path) + return + + +def safe_string_to_int(id_str): + """Make sure the input is an int""" + try: + return int(id_str) + except ValueError: + print("Error: The provided string is not a valid integer.") + sys.exit(1) + + +def trle_by_id(trle_id): + """Open File by id""" + file_path = f"trle/{trle_id}.json" + + if os.path.exists(file_path): + print(f"File {file_path} exists.") + else: + print(f"File {file_path} does not exist.") + return + + with open(file_path, "r", encoding='utf-8') as file: + data = json.load(file) + + sanitize(data, file_path) + + +if __name__ == '__main__': + if len(sys.argv) != 3: + print("Usage: python3 get_trle_by_id_range.py FROM_ID TO_ID") + sys.exit(1) + + # Convert arguments to integers with validation + from_id = safe_string_to_int(sys.argv[1]) + to_id = safe_string_to_int(sys.argv[2]) + + if from_id == to_id: + trle_by_id(from_id) + sys.exit(0) + + # Ensure from_id is less than to_id by swapping if necessary + if from_id > to_id: + from_id, to_id = to_id, from_id + + # Fetch and save data for each level ID in the specified range + for level_id in range(from_id, to_id + 1): + print(f"Getting TRLE level by ID: {level_id}") + trle_by_id(level_id) diff --git a/database/scrape.py b/database/scrape.py new file mode 100644 index 0000000..8c5881b --- /dev/null +++ b/database/scrape.py @@ -0,0 +1,959 @@ +"""Scraping of all data; level info, cover images and https keys""" +import sys +import re +import os +import hashlib +import uuid +import logging +import tempfile +from io import BytesIO +from urllib.parse import urlparse, urlencode, parse_qs +from datetime import datetime +from bs4 import BeautifulSoup, Tag +from PIL import Image + +import data_factory +import https + +# Set up logging +logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s:%(message)s') +logging.getLogger("requests").setLevel(logging.DEBUG) + +############################################################################### +# Some basic URL converters +############################################################################### + +def trle_search_parser(url): + """ + Prepares a URL for level title searches on TRLE by encoding special characters. + + Note: This function should generally be avoided in favor of searching the local + database, as it may not fully cover all cases or include recent updates. + + Args: + url (str): The original URL string to be formatted for TRLE search compatibility. + + Returns: + str: The formatted URL with specific characters + replaced to match TRLE encoding requirements. + """ + return url.replace(" ", r"+").replace(":", r"%3A") \ + .replace("!", r"%21").replace("#", r"%23") \ + .replace("/", r"%2F").replace("&", r"%26") \ + .replace("'", r"%5C%27") + + +def url_postfix(url): + """ + Extracts the file extension from a URL without the leading dot. + + Args: + url (str): The URL to extract the file extension from. + + Returns: + str: The file extension without the leading dot, + or an empty string if no extension is present. + """ + return os.path.splitext(os.path.basename(urlparse(url).path))[1][1:] + + +def validate_url(url): + """Check if a URL belongs to 'trle.net' or 'trcustoms.org'. + + Args: + url (str): The URL to validate. + + Returns: + str or None: The URL if it belongs to an accepted domain, otherwise None. + """ + if url_domain(url) in {"trle.net", "trcustoms.org"}: + return url + return None + +def url_domain(url): + """Parse and validate a URL, ensuring it is HTTPS and from specific domains. + + This function verifies that the URL: + 1. Has both a valid scheme and network location (netloc). + 2. Uses the HTTPS scheme. + 3. Belongs to either 'trle.net' or 'trcustoms.org'. + + Args: + url (str): The URL to check. + + Returns: + str: The domain name if valid. + + Exits: + Logs an error and terminates the program if validation fails. + """ + # Parse URL and check if scheme and netloc are present. + parsed_url = urlparse(url) + if not all([parsed_url.scheme, parsed_url.netloc]): + logging.error("Invalid URL format: Scheme or netloc is missing.") + sys.exit(1) + + # Verify the URL uses HTTPS. + if parsed_url.scheme != "https": + logging.error("Invalid URL: Only HTTPS URLs are allowed.") + sys.exit(1) + + # Confirm the domain is allowed. + if parsed_url.netloc.endswith("trle.net"): + return "trle.net" + if parsed_url.netloc.endswith("trcustoms.org"): + return "trcustoms.org" + + logging.error("Invalid URL: URL must belong to 'trle.net' or 'trcustoms.org'.") + sys.exit(1) + + +def trle_url_to_int(url): + """ + Converts a TRLE level URL into its corresponding integer level ID. + + This function processes URLs from the TRLE website that contain a level ID + as a query parameter (lid). The following URL formats are usually used: + + 1. Level features page: + https://www.trle.net/sc/levelfeatures.php?lid= + + 2. Level download page: + https://www.trle.net/scadm/trle_dl.php?lid= + + 3. Level walkthrough page: + https://www.trle.net/sc/Levelwalk.php?lid= + + Args: + url (str): The URL string to be processed. + + Returns: + int or None: The level ID as an integer if extraction is successful, + otherwise None. + """ + try: + # Parse the URL and extract the 'lid' query parameter. + lid_value = int(parse_qs(urlparse(url).query).get('lid', [None])[0]) + return lid_value + except (TypeError, ValueError): + # Return None if the 'lid' could not be converted to an integer. + return None + + +def is_valid_uuid(value): + """Validate uuid format""" + try: + uuid_obj = uuid.UUID(value, version=4) + return str(uuid_obj) == value + except ValueError: + return False + + +############################################################################### +# raw data converters +############################################################################### + +def calculate_md5(data): + """Calculate the MD5 checksum of the given data.""" + md5_hash = hashlib.md5(usedforsecurity=False) + md5_hash.update(data) + return md5_hash.hexdigest() + + +def cover_to_tempfile(data): + """Save the image to a temporary file.""" + with tempfile.NamedTemporaryFile(delete=False, suffix=".webp") as temp_image_file: + temp_image_file.write(data) + return temp_image_file.name + + +def cover_resize_or_convert_to_webp(input_img, x=None, y=None): + """Resize and/or convert image to .webp format.""" + img = Image.open(BytesIO(input_img)) + + if x is None and y is None: + original_x, original_y = img.size + if x is None: + x = original_x + if y is None: + y = original_y + img = img.resize((x, y)) + + webp_image = BytesIO() + + # Convert the image to .webp format + img.save(webp_image, format='WEBP') + + # Get the image data as bytes + return webp_image.getvalue() + + +def get_trle_cover_by_id(trle_id): + """ + Fetch the TRLE level cover image by level ID. + + This function takes a TRLE level ID, validates it, and uses it to + construct the filename for the level's cover image. It then retrieves + the image using `get_trle_cover`. + + Args: + trle_id (str): The level ID as a string, which should be numeric. + + Returns: + The cover image data returned by `get_trle_cover`. + + Exits: + Prints an error and exits if `trle_id` is not a valid numeric string. + """ + # Validate that the trle_id is numeric. + if not trle_id.isdigit(): + print("Invalid ID number.") + sys.exit(1) + + # Construct the image filename (e.g., '1234.jpg') and fetch the cover image. + return get_trle_cover(f"{trle_id}.jpg") + + +def normalize_level_name(name): + """ + Normalizes a level name string for creating consistent + zip file names and enabling lenient searches. + + This function removes spaces and special characters to create a simplified version + of the level name suitable for use in file naming conventions, particularly for + zip files, and to standardize level names in searches. + + Args: + name (str): The level name to be processed. + + Returns: + str: A normalized level name with spaces and special characters removed. + """ + return name.replace(" ", r"").replace("'", r"").replace("-", r"") \ + .replace(":", r"").replace("!", r"") \ + .replace("#", r"").replace("/", r"").replace("&", r"") + + +def convert_to_iso(date_str): + """Convert date string from various formats to ISO-8601 (YYYY-MM-DD) format.""" + + # Try to parse '01-Jan-2024' format + try: + return datetime.strptime(date_str, '%d-%b-%Y').strftime('%Y-%m-%d') + except ValueError: + pass + + # Try to parse '2024-09-24T15:12:19.212984Z' ISO format with time and microseconds + try: + return datetime.strptime(date_str, '%Y-%m-%dT%H:%M:%S.%fZ').strftime('%Y-%m-%d') + except ValueError: + pass + + # Try to parse '1999-08-29T00:00:00Z' ISO format without microseconds + try: + return datetime.strptime(date_str, '%Y-%m-%dT%H:%M:%SZ').strftime('%Y-%m-%d') + except ValueError: + pass + + raise ValueError(f"Unsupported date format: {date_str}") + + +############################################################################### +# Getters functions that return data from the sites with get response +############################################################################### + +def get_soup(url): + """ + Retrieves and parses the HTML content from a URL using BeautifulSoup. + + Args: + url (str): The URL of the webpage to fetch and parse. + + Returns: + BeautifulSoup: A BeautifulSoup object representing the parsed HTML content. + """ + if validate_url(url) == None: + print(f"{url} had wrong domain") + sys.exit(1) + return BeautifulSoup(https.get(validate_url(url), 'text/html'), 'html.parser') + + +def get_image(url): + """ + Fetches an image from a URL, handling both JPEG and PNG formats. + + Args: + url (str): The URL of the image file. + + Returns: + bytes: The image content in bytes, based on the file format. + + Raises: + SystemExit: If the file format is unsupported. + """ + if validate_url(url) == None: + print(f"{url} had wrong domain") + sys.exit(1) + ext = url_postfix(url).lower() + if ext in ('jpg', 'jpeg'): + return get_jpg(url) + if ext == 'png': + return get_png(url) + print(f"Invalid file format: {ext}") + sys.exit(1) + + +def get_jpg(url): + """ + Fetches a JPEG image from a URL. + + Args: + url (str): The URL of the JPEG image file. + + Returns: + bytes: The JPEG image content in bytes. + """ + if validate_url(url) == None: + print(f"{url} had wrong domain") + sys.exit(1) + return https.get(validate_url(url), 'image/jpeg') + + +def get_png(url): + """ + Fetches a PNG image from a URL. + + Args: + url (str): The URL of the PNG image file. + + Returns: + bytes: The PNG image content in bytes. + """ + if validate_url(url) == None: + print(f"{url} had wrong domain") + sys.exit(1) + return https.get(validate_url(url), 'image/png') + + +def get_json(url): + """ + Fetches JSON data from a URL. + + Args: + url (str): The URL of the JSON resource. + + Returns: + dict: The JSON data parsed into a Python dictionary. + """ + if validate_url(url) == None: + print(f"{url} had wrong domain") + sys.exit(1) + return https.get(validate_url(url), 'application/json') + + +def get_zip(url): + """ + Fetches a ZIP file from a URL and returns it in dictionary format. + + Args: + url (str): The URL of the ZIP file. + + Returns: + dict: The ZIP file content in a dictionary format, if applicable. + """ + if validate_url(url) == None: + print(f"{url} had wrong domain") + sys.exit(1) + return https.get(validate_url(url), 'application/zip') + +############################################################################### +# This section handles page scraping for level data from TRCustoms and TRLE. +# +# The TRCustoms site is reliably supported and works flawlessly with this scraping logic. +# For TRLE, page scraping is implemented primarily for viewing the site’s content. +# +# Known Issues with TRLE Scraping: +# - **Sorting Bug**: TRLE has a known bug in its record sorting mechanism, which occasionally +# causes certain records to be omitted from page offsets. This issue is particularly +# problematic with levels that have multiple authors. +# - **Duplicate Records**: Duplicate entries may appear on TRLE due to sorting inconsistencies. +# - **Random Sorting Errors**: The sorting errors are unpredictable, often changing daily without +# any levels being added or modified. These errors can persist for a few hours (typically 3-5) +# before resolving on their own, only to reappear with different records affected the next day. +# +# Due to these limitations, TRLE scraping should be treated as approximate and may require +# periodic manual checks to confirm the accuracy of fetched data. +# +# get_trcustoms_page(page_number, False) +# get_trle_page(offset, False) +############################################################################### + +def get_trle_page(offset, sort_created_first=False): + """Scrape one TRLE page where the offset starts from the earliest date.""" + params = { + "atype": "", + "author": "", + "level": "", + "class": "", + "type": "", + "difficulty": "", + "durationclass": "", + "rating": "", + "sortidx": 8, + "sorttype": 2 if sort_created_first else 1, + "idx": "" if offset == 0 else str(offset) + } + query_string = urlencode(params) + soup = get_soup(f"https://www.trle.net/pFind.php?{query_string}") + page = data_factory.make_trle_page_data() + page['offset'] = offset + + # Find total records + span = soup.find('span', class_='navText') + if span: + page['records_total'] = int(span.text.strip().split()[0]) + else: + print("Total records not found") + sys.exit(1) + + # Find data table + table = soup.find('table', class_='FindTable') + if not isinstance(table, Tag): + print("Data table not found") + sys.exit(1) + + page['levels'] = trle_page_table(table.find_all('tr')) + + return page + + +def trle_page_table(table): + """filter out data from the TRLE level table result""" + levels = [] + + # Mapping index to level data fields + field_mapping = { + 0: ('author', lambda cell: cell.get_text(strip=True)), + 5: ('trle_id', 'title', lambda cell: ( + cell.find('a', href=True)['href'].split('lid=')[-1] \ + if cell.find('a', href=True) else None, cell.get_text(strip=True) + ) + ), + 6: ('difficulty', lambda cell: cell.get_text(strip=True)), + 7: ('duration', lambda cell: cell.get_text(strip=True)), + 8: ('class', lambda cell: cell.get_text(strip=True)), + 10: ('type', lambda cell: cell.get_text(strip=True)), + 13: ('release', lambda cell: convert_to_iso(cell.get_text(strip=True))) + } + + for row in table[1:]: + cells = row.find_all('td') + level = data_factory.make_trle_level_data() + + for idx, cell in enumerate(cells): + if idx in field_mapping: + if idx == 5: + level['trle_id'], level['title'] = field_mapping[idx][2](cell) + else: + field_name, extractor = field_mapping[idx] + level[field_name] = extractor(cell) + + levels.append(level) + + return levels + + +def get_trcustoms_page(page_number, sort_created_first=False): + """Scrape one trcustoms page where the offset starts from the earliest date.""" + host = "https://trcustoms.org/api/levels/" + if sort_created_first: + sort="-created" + else: + sort="created" + params = { + "sort": sort, + "is_approved": 1, + "page": "" if page_number == 0 else str(page_number) + } + query_string = urlencode(params) + data = get_json(f"{host}?{query_string}") + page = data_factory.make_trcustoms_page_data() + page['current_page'] = data.get('current_page') + page['total_pages'] = data.get('last_page') + page['records_total'] = data.get('total_count') + results = data.get('results') + + for item in results: + repacked_data = data_factory.make_trcustoms_level_data() + for author in item['authors']: + repacked_data['authors'].append(author['username']) + for tag in item['tags']: + repacked_data['tags'].append(tag['name']) + for genre in item['genres']: + repacked_data['genres'].append(genre['name']) + repacked_data['release'] = convert_to_iso(item['created']) + repacked_data['cover'] = item['cover']['url'] + repacked_data['cover_md5sum'] = item['cover']['md5sum'] + repacked_data['trcustoms_id'] = item['id'] + repacked_data['title'] = item['name'] + repacked_data['type'] = item['engine']['name'] + repacked_data['difficulty'] = item['difficulty'].get('name', None) \ + if item['difficulty'] else None + repacked_data['duration'] = item['duration'].get('name', None) \ + if item['duration'] else None + page['levels'].append(repacked_data) + return page + + +############################################################################### +# Image Fetching and Processing for TRLE and TRCustoms Levels +# ------------------------------------------------------------ +# This module provides functions to retrieve, validate, and process cover images +# for levels hosted on TRLE and TRCustoms. These images are processed for display +# or storage, ensuring consistent format and data integrity. +# +# Key Functions: +# - `get_trle_cover`: Retrieves a single cover image from TRLE using the level ID, +# resizing and converting it to `.webp` format for compatibility. +# - `get_trle_cover_list`: Retrieves a list of cover images from TRLE based on +# provided URLs, processing both primary and large cover images if available. +# - `get_trcustoms_cover`: Retrieves and validates a TRCustoms image using its UUID +# and MD5 checksum to ensure data accuracy, with options for temporary file storage. +# - `get_trcustoms_cover_list`: Processes a list of TRCustoms images, validating each +# with MD5 checksums and preparing them for immediate use or temporary storage. +# +# Assumptions: +# - HTTPS requests are pre-configured with appropriate headers. +# - Images are fetched in formats specified in the URL, typically JPG or PNG. +# - The `cover_resize_or_convert_to_webp` function is used to standardize image formats. +# +# These functions facilitate smooth image retrieval, data integrity checks, and +# format conversions, suitable for terminal display or further processing. +############################################################################### + +def get_trle_cover_list(screen, large_screens=None, want_tempfile=False): + """ + Retrieve and prepare a list of cover images from TRLE. + + This function processes a primary cover image (`screen`) and additional + larger cover images (`large_screens`), retrieves each by its URL, and stores + the processed image data in a list. Optionally, it saves images as temporary files. + + Args: + screen (str): URL of the primary TRLE cover image. + large_screens (list, optional): A list of URLs for additional larger TRLE cover images. + want_tempfile (bool): If True, stores images in temporary files instead of + returning raw image data. + + Returns: + list: A list containing processed image data (or temporary file paths if + `want_tempfile` is True) for the `screen` image followed by any additional + images in `large_screens`. + """ + # Define the base URL to simplify URL processing. + base_url = "https://www.trle.net/screens/" + + # Initialize an empty list to store the processed cover images. + level_list = [] + + # Process the `screen` image if it is provided. + if screen is not None: + # Remove the base URL part to get the relative path, then fetch the cover image. + level_list.append(get_trle_cover(screen.replace(base_url, ""), want_tempfile)) + + # Process the `large_screens` images if the list is provided. + if large_screens is not None: + for cover in large_screens: + # Remove the base URL part to get the relative path, then fetch the cover image. + level_list.append(get_trle_cover(cover.replace(base_url, ""), want_tempfile)) + + # Return the list of processed images or temporary file paths. + return level_list + + +def get_trle_cover(level, want_tempfile=False): + """Retrieve and process the cover image for a TRLE level. + + This function constructs a URL to a TRLE level's cover image and then + retrieves the image for processing. The image URL should correspond + to images hosted on 'https://www.trle.net/screens/' and is expected + in the form of a filename (e.g., "3175.jpg"). + + Example inputs for `level`: + - "3175.jpg" (single image file) + - "large/3175a.jpg" (image in the 'large' directory) + - "large/3175b.jpg", "large/3175c.jpg", etc. (for additional images) + + Args: + level (str): The file path or name of the image to retrieve, + relative to the TRLE server's screen directory. + + Returns: + Processed image data in `.webp` format, as generated by + `cover_resize_or_convert_to_webp`. + + Exits: + Logs an error and exits if `level` is not provided as a valid string. + """ + # Check if the level string is provided and valid. + if not level: + logging.error("Invalid input: A valid image filename (e.g., '3175.jpg') is required.") + sys.exit(1) + + # Construct the full URL for the cover image on TRLE.net. + image_url = "https://www.trle.net/screens/" + level + + # Fetch and process the image, converting it to webp format. + response = cover_resize_or_convert_to_webp(get_jpg(image_url)) + + # If a temporary file is requested, save the image to a temp file and return its path. + if want_tempfile: + return cover_to_tempfile(response) + + # Otherwise, return the raw image data. + return response + + +def get_trcustoms_cover_list(levels, want_tempfile=False): + """Get a list of picture data ready to use from Trcustoms""" + base_url = "https://data.trcustoms.org/media/level_images/" + level_list = [] + + for level in levels: + file = level['cover'].replace(base_url, "") + level_list.append(get_trcustoms_cover(file, level['cover_md5sum'], want_tempfile)) + + return level_list + + +def get_trcustoms_cover(image, md5sum, want_tempfile=False): + """ + Retrieve and validate an image from TRCustoms. + + This function downloads an image file from the TRCustoms server, + validates the image by checking its MD5 checksum, and optionally + stores it in a temporary file if specified. + + Args: + image (str): The image filename, including the format (e.g., 'uuid.png'). + md5sum (str): The expected MD5 checksum of the image to ensure data integrity. + want_tempfile (bool): If True, saves the image to a temporary file. + + Returns: + The downloaded image data if want_tempfile is False, + or the path to a temporary file containing the image if True. + + Exits: + Prints an error and exits if the image UUID is invalid or the MD5 checksum fails. + """ + # Extract the image file format (e.g., 'png' or 'jpg'). + image_format = url_postfix(image) + + # Remove the file extension from the image name to obtain the UUID. + image_uuid = image.replace('.' + image_format, "") + + # Validate that the extracted image UUID is in a proper format. + if not is_valid_uuid(image_uuid): + print(f"Invalid image UUID '{image_uuid}'") + sys.exit(1) + + # Construct the full URL for the image on the TRCustoms server. + url = f"https://data.trcustoms.org/media/level_images/{image_uuid}.{image_format}" + + # Send a request to download the image from the URL. + response = get_image(url) + + # Calculate the MD5 checksum of the downloaded image. + downloaded_md5sum = calculate_md5(response) + + # Verify the downloaded image's MD5 checksum against the expected checksum. + if downloaded_md5sum != md5sum: + print(f"MD5 mismatch: Expected {md5sum}, got {downloaded_md5sum}") + sys.exit(1) + + # If a temporary file is requested, save the image to a temp file and return its path. + if want_tempfile: + return cover_to_tempfile(response) + + # Otherwise, return the raw image data. + return response + + +# ID Scraping for TRLE and TRCustoms Levels +# The TRLE and TRCustoms websites allow level data retrieval by using level IDs +# in their URLs: TRCustoms: "https://trcustoms.org/levels/" or +# "https://trcustoms.org/api/levels/" (for JSON data). +# This ID-based approach lets us build a structured index of levels for TRLE +# directly from level IDs. Note: Creating this index is slow, requiring one HTTP +# request per level, rather than a bulk request (e.g., 1 request per 20 levels). +# The final TRLE database will store details, walkthroughs, and download files, +# integrating data from both TRLE and TRCustoms. +############################################################################### + +def get_trle_walkthrough(level_soup): + """Finds the walkthrough link on the TRLE page.""" + walkthrough_link = level_soup.find('a', string='Walkthrough') + if walkthrough_link: + # Constructs the walkthrough URL + url = 'https://www.trle.net/sc/' + walkthrough_link['href'] + print(url) + else: + logging.info("Walkthrough not found" ) + return "" + + # Retrieves the walkthrough content by loading the walkthrough URL + soup = get_soup(url) + iframe_tag = soup.find('iframe') + if not iframe_tag or not isinstance(iframe_tag, Tag): + sys.exit(1) + + # Extracts the source URL from the iframe + iframe_src = iframe_tag['src'] + if not iframe_src or not isinstance(iframe_src, str): + sys.exit(1) + + # Fetches the walkthrough content from the extracted URL + url = "https://www.trle.net" + iframe_src + response = https.get(url, 'text/html') + if response: + return response + return None + + +def get_trle_zip_file(soup): + """Locates the download link for the ZIP file on the TRLE page and return it.""" + download_link = soup.find('a', string='Download') + if download_link: + return https.get(download_link['href'], 'application/zip') + logging.error("Error fetching download url") + sys.exit(1) + + +def get_trle_authors(soup): + """Extracts the authors list.""" + # Find the first with class "medGText" + first_td = soup.find('td', class_='medGText') + + # Find all anchor tags within the first + author_links = first_td.find_all('a') + + # Filter the anchor tags manually based on href attribute + author_names = [] + for link in author_links: + href = link.get('href') + if href and href.startswith('/sc/authorfeatures.php?aid='): + author_names.append(link.text) + return author_names + + +def get_trle_type(soup): + """Extracts the level type.""" + return soup.find('td', string='file type:').find_next('td').get_text(strip=True) or "" + + +def get_trle_class(soup): + """Extracts the level class.""" + return soup.find('td', string='class:').find_next('td').get_text(strip=True) or "" + + +def get_trle_release(soup): + """Extracts the release date.""" + return soup.find('td', string='release date:').find_next('td').get_text(strip=True) or "" + + +def get_trle_difficulty(soup): + """Extracts the level difficulty.""" + difficulty_td = soup.find('td', string='difficulty:') + if difficulty_td: + next_td = difficulty_td.find_next('td') + if next_td: + return next_td.get_text(strip=True) + return "" + + +def get_trle_duration(soup): + """Extracts the level duration.""" + duration_td = soup.find('td', string='duration:') + if duration_td: + next_td = duration_td.find_next('td') + if next_td: + return next_td.get_text(strip=True) + return "" + + +def get_trle_body(soup): + """Extracts the main level description body.""" + specific_tags = soup.find_all('td', class_='medGText', align='left', valign='top') + return str(specific_tags[1]) if len(specific_tags) >= 2 else "" + + +def get_trle_large_screens(soup): + """Extracts the large screens URLs to the right of page.""" + onmouseover_links = soup.find_all(lambda tag: tag.name == 'a' and 'onmouseover' in tag.attrs) + return [link['href'] for link in onmouseover_links] + + +def get_trle_screen(soup): + """Extracts the main cover image URL.""" + image_tag = soup.find('img', class_='border') + return 'https://www.trle.net' + image_tag['src'] + + +def get_trle_title(soup): + """Extracts title at the to of the page.""" + title_span = soup.find('span', class_='subHeader') + if title_span: + title = title_span.get_text(strip=True) + br_tag = title_span.find('br') + if br_tag: + return title_span.contents[0].strip() + return title + logging.error("Failed to retrieve trle title") + sys.exit(1) + + +def get_trle_level(soup, data): + """Calls all the other soup extracts for TRLE.""" + data['title'] = get_trle_title(soup) + if not data['title']: + logging.info("This was an empty page") + return + data['authors'] = get_trle_authors(soup) + data['type'] = get_trle_type(soup) + data['class'] = get_trle_class(soup) + data['release'] = get_trle_release(soup) + data['difficulty'] = get_trle_difficulty(soup) + data['duration'] = get_trle_duration(soup) + data['screen'] = get_trle_screen(soup) + data['large_screens'] = get_trle_large_screens(soup) + data['zip_files'] = [get_trle_zip_file(soup)] + data['body'] = get_trle_body(soup) + data['walkthrough'] = get_trle_walkthrough(soup) + + +def get_trcustoms_level(url, data): + """Gets the main json and also looks for corresponding TRLE""" + if "api" not in url: + parts = url.split("/") + url = f"{parts[0]}//{parts[2]}/api/{'/'.join(parts[3:])}" + trcustom_level = https.get(url, 'application/json') + + title = trcustom_level['name'] + title = trle_search_parser(title) + # Look out for + ' & ! + trle_url = get_trle_index(title) # need to match this with simple words, no &#!...etc + trle_soup = get_soup(trle_url) + data['title'] = get_trle_title(trle_soup) + data['authors'] = get_trle_authors(trle_soup) + data['type'] = get_trle_type(trle_soup) + data['class'] = get_trle_class(trle_soup) + data['release'] = get_trle_release(trle_soup) + data['difficulty'] = get_trle_difficulty(trle_soup) + data['duration'] = get_trle_duration(trle_soup) + data['screen'] = get_trle_screen(trle_soup) + data['large_screens'] = get_trle_large_screens(trle_soup) + data['zip_files'] = [get_trle_zip_file(trle_soup)] + data['body'] = get_trle_body(trle_soup) + data['walkthrough'] = get_trle_walkthrough(trle_soup) + data['tags'] = [genre['name'] for genre in trcustom_level['tags']] + data['genres'] = [genre['name'] for genre in trcustom_level['genres']] + + for file_data in trcustom_level['files']: + zip_file = https.get(file_data['url'], 'application/zip') + + name = normalize_level_name(trcustom_level['name']) + authors = "" + for author in trcustom_level['authors']: + if authors != "": + authors = authors +"-" + authors = authors + author['username'] + + if file_data['version'] == 1: + version = "" + else: + version = f"-V{file_data['version']}" + zip_file['name'] = f"{file_data['id']}-{name}{version}-{authors}.zip" + + zip_file['url'] = file_data['url'] + zip_file['release'] = file_data['created'] + zip_file['version'] = file_data['version'] + data['zip_files'].append(zip_file) + + data['trle_id'] = trle_url_to_int(trle_url) + data['trcustoms_id'] = trcustom_level['id'] + return "" + + +# Consider implementing a separate function for TRLE title matching, +# using an exact word matching parser specific to TRLE. +# The TRLE matching mechanism requires an *exact match* for each word in the title. +# For example, searching for "EVIL" will not match a level titled "#E.V.I.L". +# Instead, we would need to search with "e.v.i.l" as each segment (word) in the +# title must be precisely matched. +# +# To achieve this, each word in the search term is separated by "+" signs: +# For instance: +# - "some:name" would be split into "some:+name" +# - "some - name" would become "some+-+name" +# +# This could potentially be handled locally with our own database, +# which would allow us to index levels by title and perform exact word searches. +# Implementing exact word matching locally would allow us to find titles +# like "some - name" by searching simply for "some+-" without relying on TRLE's +# search constraints. + + +def get_trle_index(title): + """ + Searches for a level on trle.net by its title and returns the URL + of the selected level's details page. + + Parameters: + title (str): The title of the level to search for. + + Returns: + str: URL of the selected level's details page on trle.net. + If multiple results are found, prompts the user to choose one. + If exactly one result is found, returns that URL directly. + If no results are found, the function logs an error and exits. + + Behavior: + - Constructs the search URL with the provided title. + - Uses `get_soup` to fetch and parse the HTML content + of the search results page. + - Finds all anchor (``) tags with `href` attributes + containing `/sc/levelfeatures.php?lid=`. + - Displays a list of search results if multiple results are + found and prompts the user to choose one. + - If only one result is found, returns that URL immediately. + - If no result is found, logs an error and terminates the program. + + Example: + >>> get_trle_index("Some Level Name") + "https://www.trle.net/sc/levelfeatures.php?lid=1234" + """ + url = "https://www.trle.net/pFind.php?atype=1&author=&level=" + trle_search_parser(title) + + # Get the parsed HTML soup of the search page + soup = get_soup(url) + + # Find all tags with href containing '/sc/levelfeatures.php?lid=' + anchor_tags = soup.find_all('a', href=re.compile(r'/sc/levelfeatures\.php\?lid=')) + + # Loop through each anchor tag and print the href and text for user selection + for i, tag in enumerate(anchor_tags, start=1): + print(f"{i}) {tag.text}, Href: {tag['href']}") + + anchor_tags_len = len(anchor_tags) + + # Handle cases based on the number of search results + if anchor_tags_len > 1: + number_input = int(input("Pick A Number From The List: ")) + if 1 <= number_input <= anchor_tags_len: + return "https://www.trle.net" + anchor_tags[number_input-1]['href'] + elif anchor_tags_len == 1: + return "https://www.trle.net" + anchor_tags[0]['href'] + + # Log an error if no results are found and exit + logging.error("trcustoms.org only not implemented") + sys.exit(1) diff --git a/database/tombll_add_data.py b/database/tombll_add_data.py index 8c78a90..855c7e3 100644 --- a/database/tombll_add_data.py +++ b/database/tombll_add_data.py @@ -6,333 +6,408 @@ import sqlite3 import json import logging -from PIL import Image -from io import BytesIO -import https +import scrape +def get_tombll_json(path): + """Load and parse a JSON file from a specified path. -# Set up logging -logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s:%(message)s') -logging.getLogger("requests").setLevel(logging.DEBUG) -logging.getLogger("urllib3").setLevel(logging.DEBUG) + Attempts to open and read a JSON file, parsing its content into a dictionary. + Handles errors for file not found, JSON decoding issues, and other I/O problems. + Args: + path (str): The path to the JSON file. -def get_tombll_json(path): + Returns: + dict: Parsed content of the JSON file. + + Exits: + Logs an error and exits the program if the file cannot be read or parsed. + """ try: + # Open the file with UTF-8 encoding with open(path, mode='r', encoding='utf-8') as json_file: try: + # Parse and return JSON content return json.load(json_file) except json.JSONDecodeError as json_error: - print(f"Error decoding JSON from file {path}: {json_error}") + # Log and exit if JSON content is invalid + logging.error("Error decoding JSON from file '%s': %s", path, json_error) sys.exit(1) except FileNotFoundError: - print(f"File not found: {path}") + # Log and exit if file is not found + logging.error("File not found: '%s'", path) sys.exit(1) except IOError as file_error: - print(f"IO error occurred while opening file {path}: {file_error}") + # Log and exit if any other I/O error occurs + logging.error("I/O error occurred while opening file '%s': %s", path, file_error) sys.exit(1) -def connect_database(): - return sqlite3.connect(os.path.dirname(os.path.abspath(__file__)) + '/tombll.db') - - -def commit_database(): - CONNECTION.commit() +def query_return_id(query, params, con): + """Execute a SQL query and return an ID. + If the query is an INSERT, this function returns the last inserted row ID. + For other queries, it fetches and returns the first integer result, if it exists and is + non-negative. -def disconnect_database(): - CONNECTION.close() + Args: + query (str): SQL query to execute. + params (tuple): Parameters for the query. + con (sqlite3.Connection): SQLite database connection. + Returns: + int or None: The ID from the query result, or None if not found. -def query_return_id(query, params): - cursor = CONNECTION.cursor() + Exits: + Logs an error and exits if a database error occurs. + """ + cursor = con.cursor() try: + # Execute the query with provided parameters cursor.execute(query, params) + + # Check if it's an INSERT query to return the last inserted row ID if query.strip().upper().startswith("INSERT"): return cursor.lastrowid - else: - result = cursor.fetchone() - if result and isinstance(result[0], int) and result[0] >= 0: - return result[0] - return None + + # For non-INSERT queries, fetch and validate the first result + result = cursor.fetchone() + if result and isinstance(result[0], int) and result[0] >= 0: + return result[0] + + return None # Return None if no valid ID is found + except sqlite3.DatabaseError as db_error: - print(f"Database error occurred: {db_error}") + # Log the database error and exit + logging.error("Database error occurred: %s", db_error) sys.exit(1) -def query_run(query, params): - cursor = CONNECTION.cursor() +def query_run(query, params, con): + """Execute a SQL query with the provided parameters. + + Args: + query (str): The SQL query to execute. + params (tuple): Parameters to substitute in the SQL query. + con (sqlite3.Connection): SQLite database connection. + + Exits: + Logs an error and exits if a database error occurs. + """ + cursor = con.cursor() try: + # Execute the query with provided parameters cursor.execute(query, params) except sqlite3.DatabaseError as db_error: - print(f"Database error occurred: {db_error}") + # Log the database error and exit the program + logging.error("Database error occurred: %s", db_error) sys.exit(1) - #except sqlite3.Error as insert_error: - # logging.error("SQLite error: %s", insert_error) def make_empty_null(value): - """ Turn empty string or unspecified size into None """ - if value == "": - return None - if value == 0.0: + """Convert empty strings or zero float values to None. + + Args: + value: The value to check, which can be a string, float, or other types. + + Returns: + None if the value is an empty string or exactly 0.0; otherwise, returns the original value. + """ + if value in ("", 0.0): return None return value -def add_authors_to_database(authors_array, level_id): +def add_authors_to_database(authors_array, level_id, con): + """Add authors to the database, linking them with a specific level. + + This function checks if each author in `authors_array` already exists in the database. + If not, it inserts the author and creates an entry in the AuthorList table + to link the author to the specified level. + + Args: + authors_array (list of str): List of author names. + level_id (int): The ID of the level to link authors with. + con (sqlite3.Connection): SQLite database connection. + """ for author in authors_array: + # SQL queries for selecting, inserting, and linking authors query_select_id = "SELECT AuthorID FROM Author WHERE value = ?" query_insert = "INSERT INTO Author (value) VALUES (?)" query_insert_middle = "INSERT INTO AuthorList (authorID, levelID) VALUES (?, ?)" - author_id = query_return_id(query_select_id, (author,)) + + # Try to get the existing author ID; if none, insert a new author and get its ID + author_id = query_return_id(query_select_id, (author,), con) if author_id is None: - author_id = query_return_id(query_insert, (author,)) - query_run(query_insert_middle, (author_id, level_id)) + author_id = query_return_id(query_insert, (author,), con) + # Link the author with the level in AuthorList table + query_run(query_insert_middle, (author_id, level_id), con) -def add_genres_to_database(genres_array, level_id): + +def add_genres_to_database(genres_array, level_id, con): + """Add genres to the database, linking them with a specific level. + + This function checks if each genre in `genres_array` already exists in the database. + If not, it inserts the genre and creates an entry in the GenreList table + to link the genre to the specified level. + + Args: + genres_array (list of str): List of genre names. + level_id (int): The ID of the level to link genres with. + con (sqlite3.Connection): SQLite database connection. + """ for genre in genres_array: + # SQL queries for selecting, inserting, and linking genres query_select_id = "SELECT GenreID FROM Genre WHERE value = ?" query_insert = "INSERT INTO Genre (value) VALUES (?)" query_insert_middle = "INSERT INTO GenreList (genreID, levelID) VALUES (?, ?)" - genre_id = query_return_id(query_select_id, (genre,)) + + # Try to get the existing genre ID; if none, insert a new genre and get its ID + genre_id = query_return_id(query_select_id, (genre,), con) if genre_id is None: - genre_id = query_return_id(query_insert, (genre,)) - query_run(query_insert_middle, (genre_id, level_id)) + genre_id = query_return_id(query_insert, (genre,), con) + + # Link the genre with the level in GenreList table + query_run(query_insert_middle, (genre_id, level_id), con) -def add_tags_to_database(tags_array, level_id): +def add_tags_to_database(tags_array, level_id, con): + """Add tags to the database, linking them with a specific level. + + This function checks if each tag in `tags_array` already exists in the database. + If not, it inserts the tag and creates an entry in the TagList table + to link the tag to the specified level. + + Args: + tags_array (list of str): List of tag names. + level_id (int): The ID of the level to link tags with. + con (sqlite3.Connection): SQLite database connection. + """ for tag in tags_array: + # SQL queries for selecting, inserting, and linking tags query_select_id = "SELECT TagID FROM Tag WHERE value = ?" query_insert = "INSERT INTO Tag (value) VALUES (?)" query_insert_middle = "INSERT INTO TagList (tagID, levelID) VALUES (?, ?)" - tag_id = query_return_id(query_select_id, (tag,)) + + # Try to get the existing tag ID; if not found, insert a new tag and get its ID + tag_id = query_return_id(query_select_id, (tag,), con) if tag_id is None: - tag_id = query_return_id(query_insert, (tag,)) - query_run(query_insert_middle, (tag_id, level_id)) + tag_id = query_return_id(query_insert, (tag,), con) + + # Link the tag with the level in TagList table + query_run(query_insert_middle, (tag_id, level_id), con) -def add_zip_files_to_database(zip_files_array, level_id): +def add_zip_files_to_database(zip_files_array, level_id, con): + """Add ZIP files to the database, linking them with a specific level. + + This function inserts each ZIP file in `zip_files_array` into the Zip table + if the file's attributes are provided, setting any empty or missing values to `None`. + It then links the inserted ZIP file to the specified level in the ZipList table. + + Args: + zip_files_array (list of dict): List of ZIP file details, each represented as a dictionary + with keys 'name', 'size', 'md5', 'url', 'version', 'release'. + level_id (int): The ID of the level to link ZIP files with. + con (sqlite3.Connection): SQLite database connection. + """ for zip_file in zip_files_array: - query_insert = "INSERT INTO Zip (name, size, md5sum, url, version, release)"\ - "VALUES (?,?,?,?,?,?)" - insert_arg =( + # SQL queries for inserting ZIP file data and linking ZIP files to a level + query_insert = ( + "INSERT INTO Zip (name, size, md5sum, url, version, release) " + "VALUES (?, ?, ?, ?, ?, ?)" + ) + + # Prepare arguments for inserting the ZIP file, converting empty values to None + insert_args = ( make_empty_null(zip_file.get('name')), make_empty_null(zip_file.get('size')), make_empty_null(zip_file.get('md5')), make_empty_null(zip_file.get('url')), - make_empty_null(zip_file.get('release')), - make_empty_null(zip_file.get('version')) + make_empty_null(zip_file.get('version')), + make_empty_null(zip_file.get('release')) ) + + # Insert the ZIP file and get its ID + zip_id = query_return_id(query_insert, insert_args, con) + + # Link the ZIP file to the level in ZipList table query_insert_middle = "INSERT INTO ZipList (zipID, levelID) VALUES (?, ?)" - middle_arg =( - query_return_id(query_insert, insert_arg), - level_id - ) - query_run(query_insert_middle, middle_arg) + middle_args = (zip_id, level_id) + query_run(query_insert_middle, middle_args, con) + +def add_screen_to_database(screen, level_id, con): + """Add a screen image to the database and link it to a specific level. -def add_screen_to_database(screen, level_id): + This function checks if the screen URL points to the TRLE.net screens URL path, + then downloads the corresponding image as a .webp file, and inserts it into the + Picture table. It also creates an association with the specified level in the Screens table. + + Args: + screen (str): URL of the screen image. + level_id (int): The ID of the level to link the screen image with. + con (sqlite3.Connection): SQLite database connection. + """ + # Ensure the screen URL matches the TRLE.net screens directory if screen.startswith("https://www.trle.net/screens/"): - response = https.get(screen, 'image/jpeg') + # Fetch the .webp image data for the screen + webp_image_data = scrape.get_trle_cover(screen.replace("https://www.trle.net/screens/", "")) - # Open the image and convert it to .webp format - img = Image.open(BytesIO(response)) - webp_image = BytesIO() + # Insert the .webp image data into the Picture table and retrieve its ID + query_insert_picture = "INSERT INTO Picture (data) VALUES (?)" + picture_id = query_return_id(query_insert_picture, (webp_image_data,), con) - # Convert the image to .webp - img.save(webp_image, format='WEBP') + # Link the inserted picture to the specified level in the Screens table + query_insert_screen = "INSERT INTO Screens (pictureID, levelID) VALUES (?, ?)" + query_run(query_insert_screen, (picture_id, level_id), con) - # Get the image data as bytes - webp_image_data = webp_image.getvalue() - # Insert the .webp image into the database - query_insert = "INSERT INTO Picture (data) VALUES (?)" - query_insert_middle = "INSERT INTO Screens (pictureID, levelID) VALUES (?, ?)" +def add_screens_to_database(large_screens_array, level_id, con): + """Add multiple screen images to the database and associate them with a specific level. - query_run(query_insert_middle, ( - query_return_id(query_insert, (webp_image_data,)), - level_id - )) + This function iterates over an array of screen URLs, adding each to the database and linking it + to the specified level using the `add_screen_to_database` helper function. + Args: + large_screens_array (list): List of URLs of screen images. + level_id (int): The ID of the level to associate the screen images with. + con (sqlite3.Connection): SQLite database connection. + """ + # Iterate over each screen URL in the provided array and add it to the database + for screen in large_screens_array: + add_screen_to_database(screen, level_id, con) -""" -def add_screen_to_database(screen, level_id): - if screen.startswith("https://www.trle.net/screens/"): - response = requests.get(screen, verify=CERT, timeout=5) - query_insert = "INSERT INTO Picture (data) VALUES (?)" - query_insert_middle = "INSERT INTO Screens (pictureID, levelID) VALUES (?, ?)" - query_run(query_insert_middle,( - query_return_id(query_insert, (response.content, )), - level_id - )) -""" +def add_level_to_database(data, con): + """Insert a level entry into the Level table and return its ID. -def add_screens_to_database(large_screens_array, level_id): - for screen in large_screens_array: - add_screen_to_database(screen, level_id) + This function inserts a new level into the database, using information provided in the + `data` dictionary. It retrieves or creates an associated `infoID` using the + `add_info_to_database` helper function, then logs and returns the `level_id` of + the inserted level. + Args: + data (dict): A dictionary containing level data, including 'body' and 'walkthrough' fields. + con (sqlite3.Connection): SQLite database connection. -def add_level_to_database(): + Returns: + int: The ID of the newly inserted level. + """ + # SQL query to insert a new level record query = "INSERT INTO Level (body, walkthrough, infoID) VALUES (?, ?, ?)" - level_id = query_return_id(query, (DATA.get('body'), DATA.get('walkthrough'), add_info_to_database())) + + # Prepare arguments for the insertion, including the infoID from `add_info_to_database` + arg = ( + data.get('body'), # Level body content + data.get('walkthrough'), # Level walkthrough content + add_info_to_database(data, con) # Retrieve or create infoID + ) + + # Execute the query and get the ID of the inserted level + level_id = query_return_id(query, arg, con) + + # Log the current level ID for debugging or tracking purposes logging.info("Current tombll level_id: %s", level_id) + return level_id -def add_info_to_database(): - info_difficulty = DATA.get('difficulty') - if info_difficulty != "": +def add_info_to_database(data, con): + """Insert or retrieve IDs for level information attributes and add an Info record. + + This function retrieves or sets IDs for various level attributes (difficulty, duration, + type, class) and uses them to insert a new record into the Info table. + + Args: + data (dict): A dictionary containing level information attributes. + con (sqlite3.Connection): SQLite database connection. + + Returns: + int: The ID of the newly inserted or existing Info record. + """ + # Retrieve or assign InfoDifficultyID, or set to None if not specified + info_difficulty = data.get('difficulty') + info_difficulty_id = None + if info_difficulty: query = "SELECT InfoDifficultyID FROM InfoDifficulty WHERE value = ?" - info_difficulty_id = query_return_id(query, (info_difficulty,)) - else: - info_difficulty_id = None + info_difficulty_id = query_return_id(query, (info_difficulty,), con) - info_duration = DATA.get('duration') - if info_duration != "": + # Retrieve or assign InfoDurationID, or set to None if not specified + info_duration = data.get('duration') + info_duration_id = None + if info_duration: query = "SELECT InfoDurationID FROM InfoDuration WHERE value = ?" - info_duration_id = query_return_id(query, (info_duration,)) - else: - info_duration_id = None + info_duration_id = query_return_id(query, (info_duration,), con) - info_type = DATA.get('type') - if info_type == "": - info_type = None + # Retrieve or assign InfoTypeID, allowing None if not specified + info_type = data.get('type') or None query = "SELECT InfoTypeID FROM InfoType WHERE value = ?" - info_type_id = query_return_id(query, (info_type,)) + info_type_id = query_return_id(query, (info_type,), con) - info_class = DATA.get('class') - if info_class == "": - info_class = None + # Retrieve or assign InfoClassID, allowing None if not specified + info_class = data.get('class') or None query = "SELECT InfoClassID FROM InfoClass WHERE value = ?" - info_class_id = query_return_id(query, (info_class,)) + info_class_id = query_return_id(query, (info_class,), con) - query = "INSERT INTO Info (title, release, difficulty, duration,"\ - " type, class, trleID, trcustomsID) VALUES (?,?,?,?,?,?,?,?)" + # Insert a new Info record with the retrieved or default IDs + query = ( + "INSERT INTO Info (title, release, difficulty, duration, type, class, trleID, trcustomsID) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?)" + ) arg = ( - DATA.get('title'), - DATA.get('release'), - info_difficulty_id, - info_duration_id, - info_type_id, - info_class_id, - DATA.get('trle_id'), - DATA.get('trcustoms_id') + data.get('title'), # Title of the level + data.get('release'), # Release date + info_difficulty_id, # Retrieved or default difficulty ID + info_duration_id, # Retrieved or default duration ID + info_type_id, # Retrieved or default type ID + info_class_id, # Retrieved or default class ID + data.get('trle_id'), # TRLE ID if available + data.get('trcustoms_id') # TRCustoms ID if available ) - return query_return_id(query, arg) + return query_return_id(query, arg, con) -def add_tombll_json_to_database(): - level_id = add_level_to_database() - add_authors_to_database(DATA.get('authors'), level_id) - add_genres_to_database(DATA.get('genres'), level_id) - add_tags_to_database(DATA.get('tags'), level_id) - add_zip_files_to_database(DATA.get('zip_files'), level_id) - add_screen_to_database(DATA.get('screen'), level_id) - add_screens_to_database(DATA.get('large_screens'), level_id) +def add_tombll_json_to_database(data, con): + """Insert level data and related details into the database. + This function inserts a level record into the database and subsequently + adds associated authors, genres, tags, zip files, screen, and large screens. -if __name__ == "__main__": - if len(sys.argv) != 2: - logging.error("Usage: python3 addData.py FILE.json") - sys.exit(1) - DATA = get_tombll_json(sys.argv[1]) - CONNECTION = connect_database() - add_tombll_json_to_database() - commit_database() - disconnect_database() + Args: + data (dict): A dictionary containing level data and related details. + con (sqlite3.Connection): SQLite database connection. + """ + # Insert level information and obtain the generated level ID + level_id = add_level_to_database(data, con) + # Add related information only if corresponding data is present + if data.get('authors'): + add_authors_to_database(data['authors'], level_id, con) + if data.get('genres'): + add_genres_to_database(data['genres'], level_id, con) + if data.get('tags'): + add_tags_to_database(data['tags'], level_id, con) + if data.get('zip_files'): + add_zip_files_to_database(data['zip_files'], level_id, con) -""" -def download_file(url, cert, file_name): - response = requests.get(url, stream=True, verify=cert, timeout=5) - response.raise_for_status() - - total_size = int(response.headers.get('content-length', 0)) - block_size = 1024 # 1 Kilobyte - wrote = 0 - - with open(file_name, 'wb') as file: - for data in tqdm(response.iter_content(block_size),\ - total=(total_size // block_size + 1),\ - unit='KB', unit_scale=True): - wrote += len(data) - file.write(data) - - if total_size not in (0 ,wrote): - logging.error("ERROR, something went wrong with the download") - else: - logging.info("Downloaded %s successfully", file_name) -""" -""" -def add_level_file_list_to_database(data): - with open(zip_name, 'wb') as zip_file: - zip_file.write(zip_content) - - with zipfile.ZipFile(zip_name, 'r') as zip_ref: - zip_ref.extractall('extracted_files') - - for root, dirs, files in os.walk('extracted_files'): - for file in files: - file_path = os.path.join(root, file) - relative_path = os.path.relpath(file_path, 'extracted_files') - with open(file_path, 'rb') as f: - file_content = f.read() - file_md5 = hashlib.md5(file_content, usedforsecurity=False).hexdigest() - - c.execute("SELECT FileID FROM Files WHERE md5sum = ? AND path = ?", \ - (file_md5, relative_path)) - existing_file = c.fetchone() - - if existing_file: - file_id = existing_file[0] - logging.info("File with md5sum %s and path %s" - " already exists. Using existing FileID: %s", - file_md5, - relative_path, - file_id - ) - else: - c.execute("INSERT INTO Files (md5sum, path) VALUES (?, ?)", \ - (file_md5, relative_path)) - file_id = c.lastrowid - logging.info("Inserted new file with md5sum %s. New FileID: %s", file_md5, file_id) + # Single screen image, checked for None in the add_screen_to_database function + add_screen_to_database(data.get('screen'), level_id, con) - try: - c.execute("SELECT 1 FROM LevelFileList WHERE fileID = ? AND levelID = ?", \ - (file_id, level_id)) - existing_combination = c.fetchone() - - if not existing_combination: - c.execute("INSERT INTO LevelFileList (fileID, levelID) VALUES (?, ?)", \ - (file_id, level_id)) - else: - logging.info("Combination of FileID %s and LevelID %s" - " already exists in LevelFileList. Skipping insertion.", - file_id, - level_id - ) - - except sqlite3.IntegrityError as e: - logging.error("Uniqueness violation in LevelFileList: %s", e) - logging.error("FileID: %s, LevelID: %s", file_id, level_id) - - - shutil.rmtree('extracted_files') - os.remove(zip_name) -""" -""" - response = requests.get(zip_url, verify=CERT, timeout=5) - zip_content = response.content + # Add large screens if they are provided + if data.get('large_screens'): + add_screens_to_database(data['large_screens'], level_id, con) - md5_hash = hashlib.md5(zip_content, usedforsecurity=False).hexdigest() - zip_md5 = data.get('zipFileMd5') - if md5_hash != zip_md5: - logging.error("MD5 checksum does not match") + +if __name__ == "__main__": + if len(sys.argv) != 2: + logging.error("Usage: python3 addData.py FILE.json") sys.exit(1) -""" + DATA = get_tombll_json(sys.argv[1]) + CON = sqlite3.connect(os.path.dirname(os.path.abspath(__file__)) + '/tombll.db') + add_tombll_json_to_database(DATA, CON) + CON.commit() + CON.close() diff --git a/database/tombll_get_data.py b/database/tombll_get_data.py index f4f9a0e..ac4973b 100644 --- a/database/tombll_get_data.py +++ b/database/tombll_get_data.py @@ -1,310 +1,32 @@ """ Grab raw data from trle.net/TRCustoms.org and put it in a data.json file """ -import re import sys import json import logging -from urllib.parse import urlparse, parse_qs -from bs4 import BeautifulSoup, Tag -import https -import data_factorie +import scrape +import data_factory # Set up logging logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s:%(message)s') logging.getLogger("requests").setLevel(logging.DEBUG) - -def validate_url(url_input): - """Set some constraints to the URL""" - parsed_url = urlparse(url_input) - - if not all([parsed_url.scheme, parsed_url.netloc]): - logging.error("Invalid URL format.") - sys.exit(1) - - if parsed_url.scheme != "https": - logging.error("Only HTTPS URLs are allowed.") +if __name__ == '__main__': + if len(sys.argv) != 2: + logging.error("Usage: python3 getData.py URL") sys.exit(1) - - if parsed_url.netloc.endswith("trle.net"): - return "trle.net" - - if parsed_url.netloc.endswith("trcustoms.org"): - return "trcustoms.org" - - logging.error("URL must belong to the domain 'trle.net' or 'trcustoms.org'.") - sys.exit(1) - - -def get_soup(url): - response = https.get(url, 'text/html') - if response: - return BeautifulSoup(response, 'html.parser') - return "" - - -def get_trle_walkthrough(level_soup): - walkthrough_link = level_soup.find('a', string='Walkthrough') - if walkthrough_link: - url = 'https://www.trle.net/sc/' + walkthrough_link['href'] else: - logging.info("Walkthrough not found" ) - return "" - - soup = get_soup(url) - iframe_tag = soup.find('iframe') - if not iframe_tag or not isinstance(iframe_tag, Tag): - sys.exit(1) - - iframe_src = iframe_tag['src'] - if not iframe_src or not isinstance(iframe_src, str): - sys.exit(1) - - url = "https://www.trle.net" + iframe_src - response = https.get(url, 'text/html') - if response: - return response - return None - - -def get_trle_zip_file(soup): - download_link = soup.find('a', string='Download') - if download_link: - return https.get(download_link['href'], 'application/zip') - logging.error("Error fetching download url") - sys.exit(1) - - -def get_trle_authors(soup): - # Find the first with class "medGText" - first_td = soup.find('td', class_='medGText') - - # Find all anchor tags within the first - author_links = first_td.find_all('a') - - # Filter the anchor tags manually based on href attribute - author_names = [] - for link in author_links: - href = link.get('href') - if href and href.startswith('/sc/authorfeatures.php?aid='): - author_names.append(link.text) - return author_names - - -def get_trle_type(soup): - return soup.find('td', string='file type:').find_next('td').get_text(strip=True) or "" - - -def get_trle_class(soup): - return soup.find('td', string='class:').find_next('td').get_text(strip=True) or "" - - -def get_trle_release(soup): - return soup.find('td', string='release date:').find_next('td').get_text(strip=True) or "" - - -def get_trle_difficulty(soup): - difficulty_td = soup.find('td', string='difficulty:') - if difficulty_td: - next_td = difficulty_td.find_next('td') - if next_td: - return next_td.get_text(strip=True) - return "" - - -def get_trle_duration(soup): - duration_td = soup.find('td', string='duration:') - if duration_td: - next_td = duration_td.find_next('td') - if next_td: - return next_td.get_text(strip=True) - return "" - - -def get_trle_body(soup): - specific_tags = soup.find_all('td', class_='medGText', align='left', valign='top') - return str(specific_tags[1]) if len(specific_tags) >= 2 else "" - - -def get_trle_large_screens(soup): - onmouseover_links = soup.find_all(lambda tag: tag.name == 'a' and 'onmouseover' in tag.attrs) - return [link['href'] for link in onmouseover_links] - - -def get_trle_screen(soup): - image_tag = soup.find('img', class_='border') - return 'https://www.trle.net' + image_tag['src'] - - -def get_trle_title(soup): - title_span = soup.find('span', class_='subHeader') - if title_span: - title = title_span.get_text(strip=True) - br_tag = title_span.find('br') - if br_tag: - return title_span.contents[0].strip() - return title - logging.error("Failed to retrieve trle title") - sys.exit(1) - - -def get_trle_level(soup, data): - data['title'] = get_trle_title(soup) - data['authors'] = get_trle_authors(soup) - data['type'] = get_trle_type(soup) - data['class'] = get_trle_class(soup) - data['release'] = get_trle_release(soup) - data['difficulty'] = get_trle_difficulty(soup) - data['duration'] = get_trle_duration(soup) - data['screen'] = get_trle_screen(soup) - data['large_screens'] = get_trle_large_screens(soup) - data['zip_files'] = [get_trle_zip_file(soup)] - data['body'] = get_trle_body(soup) - data['walkthrough'] = get_trle_walkthrough(soup) - - -def trle_search_post_parser(url): - return url.replace(" ", r"+").replace(":", r"%3A") \ - .replace("!", r"%21").replace("#", r"%21") \ - .replace("/", r"%2F").replace("&", r"%26") - - -def trle_url_to_int(url): - try: - lid_value = int(parse_qs(urlparse(url).query).get('lid', [None])[0]) - return lid_value - except (TypeError, ValueError): - return None - - -def get_trcustoms_level(url, data): - if "api" not in url: - parts = url.split("/") - url = f"{parts[0]}//{parts[2]}/api/{'/'.join(parts[3:])}" - trcustom_level = https.get(url, 'application/json') - - title = trcustom_level['name'] - title = trle_search_post_parser(title) - # Look out for + ' & ! - trle_url = get_trle_index(title) - trle_soup = get_soup(trle_url) - data['title'] = get_trle_title(trle_soup) - data['authors'] = get_trle_authors(trle_soup) - data['type'] = get_trle_type(trle_soup) - data['class'] = get_trle_class(trle_soup) - data['release'] = get_trle_release(trle_soup) - data['difficulty'] = get_trle_difficulty(trle_soup) - data['duration'] = get_trle_duration(trle_soup) - data['screen'] = get_trle_screen(trle_soup) - data['large_screens'] = get_trle_large_screens(trle_soup) - data['zip_files'] = [get_trle_zip_file(trle_soup)] - data['body'] = get_trle_body(trle_soup) - data['walkthrough'] = get_trle_walkthrough(trle_soup) - data['tags'] = [genre['name'] for genre in trcustom_level['tags']] - data['genres'] = [genre['name'] for genre in trcustom_level['genres']] - - for file_data in trcustom_level['files']: - zip_file = https.get(file_data['url'], 'application/zip') - - name = trcustom_level['name'].replace(" ", r"") \ - .replace("'", r"").replace("-", r"") \ - .replace(":", r"").replace("!", r"") \ - .replace("#", r"").replace("/", r"").replace("&", r"") - authors = "" - for author in trcustom_level['authors']: - if authors != "": - authors = authors +"-" - authors = authors + author['username'] - - if file_data['version'] == 1: - version = "" - else: - version = f"-V{file_data['version']}" - zip_file['name'] = f"{file_data['id']}-{name}{version}-{authors}.zip" - - zip_file['url'] = file_data['url'] - zip_file['release'] = file_data['created'] - zip_file['version'] = file_data['version'] - data['zip_files'].append(zip_file) - data['trle_id'] = trle_url_to_int(trle_url) - data['trcustoms_id'] = trcustom_level['id'] - return "" - -def get_trle_index(title): - url = "https://www.trle.net/pFind.php?atype=1&author=&level=" + title - print(url) - response = https.get(url, 'application/zip') - - soup = BeautifulSoup(response, 'html.parser') - # Find all tags where href contains '/sc/levelfeatures.php?lid=' - anchor_tags = soup.find_all('a', href=re.compile(r'/sc/levelfeatures\.php\?lid=')) - # Loop through each anchor tag and print the href and text - i = 0 - for tag in anchor_tags: - i = i+1 - print(f"{i}) {tag.text}, Href: {tag['href']}") - anchor_tags_len = len(anchor_tags) - if anchor_tags_len > 1: - number_input = int(input("Pick A Number From The List: ")) - if 1 <= number_input <= anchor_tags_len: - return "https://www.trle.net" + anchor_tags[number_input-1]['href'] - if anchor_tags_len == 1: - return "https://www.trle.net" + anchor_tags[0]['href'] - logging.error("trcustoms.org only not implemented") - sys.exit(1) - -#"https://trcustoms.org/levels/"#num -# with json api -#"https://trcustoms.org/api/levels/"#num -#search trle url - - - - - -# Replace custom tags with HTML spans and apply classes -# https://raw.githubusercontent.com/rr-/TRCustoms/develop/frontend/src/components/markdown-composer/MarkdownButtons/index.tsx -def custom_markdown_parser(text): - text = re.sub(r'\[o\](.*?)\[/o\]', r'\1', text) # blue text for objects - text = re.sub(r'\[s\](.*?)\[/s\]', r'\1', text) # secret styling - text = re.sub(r'\[p\](.*?)\[/p\]', r'\1', text) # pickup styling - text = re.sub(r'\[e\](.*?)\[/e\]', r'\1', text) # enemy styling - text = re.sub(r'\[t\](.*?)\[/t\]', r'\1', text) # trap styling - text = re.sub(r'\[center\](.*?)\[/center\]', r'
\1
', text) # center align - - return text - -# Example usage -#description = """[center]**Tomb Raider: Pandora's Box**[/center] -#[s]Secret text[/s] [o]Object text[/o] [p]Pickup text[/p]""" - -#parsed_description = custom_markdown_parser(description) -#print(parsed_description) - - - - -if __name__ == '__main__': - try: - if len(sys.argv) != 2: - logging.error("Usage: python3 getData.py URL") - sys.exit(1) - else: - URL = sys.argv[1] - HOST = validate_url(URL) - DATA = data_factorie.make_trle_tombll_data() - if HOST == "trle.net": - SOUP = get_soup(URL) - get_trle_level(SOUP, DATA) - with open('data.json', mode='w', encoding='utf-8') as json_file: - json.dump(DATA, json_file) - if HOST == "trcustoms.org": - get_trcustoms_level(URL, DATA) - with open('data.json', mode='w', encoding='utf-8') as json_file: - json.dump(DATA, json_file) - finally: - # OS should release the port immediately when the program closes "normally" - # like with sys.exit(1) but doing it here is better - https.release_lock() # https locks automatically + URL = sys.argv[1] + HOST = scrape.url_domain(URL) + DATA = data_factory.make_trle_tombll_data() + if HOST == "trle.net": + DATA = data_factory.make_trle_tombll_data() + SOUP = scrape.get_soup(URL) + scrape.get_trle_level(SOUP, DATA) + with open('data.json', mode='w', encoding='utf-8') as json_file: + json.dump(DATA, json_file) + if HOST == "trcustoms.org": + scrape.get_trcustoms_level(URL, DATA) + with open('data.json', mode='w', encoding='utf-8') as json_file: + json.dump(DATA, json_file)