diff --git a/README.md b/README.md index ce8f991..e27753d 100644 --- a/README.md +++ b/README.md @@ -163,6 +163,9 @@ As a result, connections become slower than an ordinary connection. Once you start **Tor2Tor**, give it at least 2 minutes tops to query the specified onion url and extract links from it. If you want to work around this, you can always just use a cloud shell service. + +# Screenshots +There's a dedicated repository of onion screenshots captured with **Tor2Tor** at [Tor2Tor-DB](https://github.com/rly0nheart/tor2tor-db) ## CI/CD Workflow 🌊 ### Docker Image Building 🐳 diff --git a/pyproject.toml b/pyproject.toml index 5699365..c589278 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ packages = ["tor2tor"] [project] name = "tor2tor" -version = "0.7.1" +version = "0.8.0" description = "Capture screenshots of onion services on an onion service." readme = "README.md" requires-python = ">=3.9" @@ -35,5 +35,5 @@ documentation = "https://github.com/rly0nheart/tor2tor/blob/latest/README.md" repository = "https://github.com/rly0nheart/tor2tor" [project.scripts] -t2t = "tor2tor.tor2tor:start" -tor2tor = "tor2tor.tor2tor:start" +t2t = "tor2tor.tor2tor:start_capturing" +tor2tor = "tor2tor.tor2tor:start_capturing" diff --git a/tor2tor/__init__.py b/tor2tor/__init__.py index ab0c4d8..d3a2dac 100644 --- a/tor2tor/__init__.py +++ b/tor2tor/__init__.py @@ -1,3 +1,3 @@ __author__ = "Richard Mwewa" __about__ = "https://about.me/rly0nheart" -__version__ = "0.7.1" +__version__ = "0.8.0" diff --git a/tor2tor/coreutils.py b/tor2tor/coreutils.py index c30b3f3..4f819f3 100644 --- a/tor2tor/coreutils.py +++ b/tor2tor/coreutils.py @@ -1,4 +1,5 @@ import os +import re import time import random import logging @@ -19,24 +20,6 @@ HOME_DIRECTORY = os.path.expanduser("~") -def show_banner(): - """ - Prints a random banner from a list of 2 banners. - """ - banners = [ - f""" -β”Œβ”¬β”β”Œβ”€β”β”¬β”€β”β”Œβ”€β”β”Œβ”¬β”β”Œβ”€β”β”¬β”€β” - β”‚ β”‚ β”‚β”œβ”¬β”˜β”Œβ”€β”˜ β”‚ β”‚ β”‚β”œβ”¬β”˜ - β”΄ β””β”€β”˜β”΄β””β”€β””β”€β”˜ β”΄ β””β”€β”˜β”΄β””β”€ {__version__}""", - f""" -β”Œβ”¬β”β”Œβ”€β”β”Œβ”¬β” - β”‚ β”Œβ”€β”˜ β”‚ - β”΄ β””β”€β”˜ β”΄ {__version__}""", - ] - - print(random.choice(banners)) - - def usage(): return """ tor2tor http://example.onion @@ -45,27 +28,6 @@ def usage(): """ -def check_updates(): - """ - Checks the program's updates by comparing the current program version tag with the remote version tag from GitHub. - """ - response = requests.get( - "https://api.github.com/repos/rly0nheart/tor2tor/releases/latest" - ).json() - remote_version = response.get("tag_name") - - if remote_version != __version__: - log.info( - f"Tor2Tor version {remote_version} published at {response.get('published_at')} " - f"is available. Run the 'update.sh' " - f"script (for local installation) or re-pull the image (for docker container) " - f"with 'docker pull rly0nheart/tor2tor' to get the updates. " - ) - release_notes = Markdown(response.get("body")) - print(release_notes) - print("\n") - - def create_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description=f"tor2tor - by {__author__} | {__about__}", @@ -108,6 +70,24 @@ def create_parser() -> argparse.ArgumentParser: return parser +def show_banner(): + """ + Prints a random banner from a list of 2 banners. + """ + banners = [ + f""" +β”Œβ”¬β”β”Œβ”€β”β”¬β”€β”β”Œβ”€β”β”Œβ”¬β”β”Œβ”€β”β”¬β”€β” + β”‚ β”‚ β”‚β”œβ”¬β”˜β”Œβ”€β”˜ β”‚ β”‚ β”‚β”œβ”¬β”˜ + β”΄ β””β”€β”˜β”΄β””β”€β””β”€β”˜ β”΄ β””β”€β”˜β”΄β””β”€ {__version__}""", + f""" +β”Œβ”¬β”β”Œβ”€β”β”Œβ”¬β” + β”‚ β”Œβ”€β”˜ β”‚ + β”΄ β””β”€β”˜ β”΄ {__version__}""", + ] + + print(random.choice(banners)) + + def set_loglevel(debug_mode: bool) -> logging.getLogger: """ Configure and return a logging object with the specified log level. @@ -117,9 +97,9 @@ def set_loglevel(debug_mode: bool) -> logging.getLogger: """ logging.basicConfig( level="NOTSET" if debug_mode else "INFO", - format="%(message)s", + format="%(levelname)s %(message)s", handlers=[ - RichHandler(markup=True, log_time_format="%I:%M:%S%p", show_level=False) + RichHandler(markup=True, log_time_format="%H:%M:%S", show_level=False) ], ) return logging.getLogger("Tor2Tor") @@ -137,6 +117,40 @@ def add_http_to_link(link: str) -> str: return link +def is_valid_onion(url: str) -> bool: + """ + Uses a regex pattern to determine whether a given url is an onion service or not. + + :param url: The url to check. + :return: True if the url matches the strict pattern criterion. False if it doesn't + + Regex Explanation + ----------------- + - ^ - Asserts the start of a string. + - (http://|https://)? - Matches HTTP or HTTPS protocol in the string (optional). + - [a-z2-7]{56,} - Matches 56 or more characters, where each can be a lowercase letter or a digit from 2 to 7. + - d\\.onion - Matches the letter "d" followed by .onion. + - (/|$) - Matches either a forward slash or the end of the string. + """ + if re.search(r"^(http://|https://)?[a-z2-7]{56,}d\.onion(/|$)", url): + return True + else: + return False + + +def has_desktop_environment() -> bool: + """ + Checks if current system has a desktop environment. + + :return: True if system has a desktop environment. False if it doesn't. + + Note + ---- + This is not completely reliable at all. (open for improvements) + """ + return "DISPLAY" in os.environ + + def create_table(table_headers: list, table_title: str = "") -> Table: """ Creates a rich table with the given column headers. @@ -209,6 +223,27 @@ def get_file_info(filename: str) -> tuple: return file_size, created_time +def check_updates(): + """ + Checks the program's updates by comparing the current program version tag with the remote version tag from GitHub. + """ + response = requests.get( + "https://api.github.com/repos/rly0nheart/tor2tor/releases/latest" + ).json() + remote_version = response.get("tag_name") + + if remote_version != __version__: + log.info( + f"Tor2Tor version {remote_version} published at {response.get('published_at')} " + f"is available. Run the 'update.sh' " + f"script (for local installation) or re-pull the image (for docker container) " + f"with 'docker pull rly0nheart/tor2tor' to get the updates. " + ) + release_notes = Markdown(response.get("body")) + print(release_notes) + print("\n") + + def tor_service(command: str): """ Starts/Stops the Tor service based on the provided command. diff --git a/tor2tor/tor2tor.py b/tor2tor/tor2tor.py index ba254cf..c1ee734 100644 --- a/tor2tor/tor2tor.py +++ b/tor2tor/tor2tor.py @@ -22,9 +22,11 @@ create_table, check_updates, get_file_info, + is_valid_onion, HOME_DIRECTORY, add_http_to_link, construct_output_name, + has_desktop_environment, convert_timestamp_to_utc, ) @@ -45,8 +47,15 @@ def firefox_options() -> Options: :returns: A Selenium WebDriver Options object with preset configurations. """ options = Options() - if args.headless: + + # If system has a desktop environment, make headless optional. Otherwise, make it compulsory. + if has_desktop_environment(): + if args.headless: + options.add_argument("--headless") + else: + log.info("Running headless as default (no desktop environment found)...") options.add_argument("--headless") + options.set_preference("network.proxy.type", 1) options.set_preference("network.proxy.socks", "127.0.0.1") options.set_preference("network.proxy.socks_port", 9050) @@ -230,7 +239,7 @@ def get_onions_on_page(onion_url: str) -> list: ----------------- - `https?`: Matches either 'http' or 'https'. - `://`: Matches the '://' that follows the protocol. - - `\S+`: Matches one or more non-whitespace characters. + - `\\S+`: Matches one or more non-whitespace characters. """ # Initialize an empty list to store valid URLs @@ -303,79 +312,87 @@ def onion_summary_tables( return captured_onions_table, skipped_onions_table -def start(): +def start_capturing(): """ Main entrypoint to start the web scraping process and capture screenshots of onion websites. """ firefox_pool = None # Initialize to None, so it's accessible in the 'finally' block start_time = datetime.now() # Record the start time for performance measurement - try: - tor_service(command="start") # Start the Tor service. + target_onion_url = args.onion - show_banner() - log.info(f"Starting πŸ§…Tor2Tor {__version__} {start_time}...") + if is_valid_onion(url=target_onion_url): + try: + tor_service(command="start") # Start the Tor service. - check_updates() - path_finder( - url=args.onion - ) # Create a directory with the onion link as the name. + show_banner() + log.info(f"Starting πŸ§…Tor2Tor {__version__} {start_time}...") - # Fetch onion URLs from the provided URL - onions = get_onions_on_page(onion_url=add_http_to_link(link=args.onion)) + check_updates() + path_finder( + url=target_onion_url + ) # Create a directory with the onion link as the name. - firefox_pool = open_firefox_pool(pool_size=args.pool) + # Fetch onion URLs from the provided URL + onions = get_onions_on_page( + onion_url=add_http_to_link(link=target_onion_url) + ) - # Create a table where capture screenshots will be displayed - screenshots_table = create_table( - table_title="Screenshots", - table_headers=["#", "filename", "size (bytes)", "created"], - ) + firefox_pool = open_firefox_pool(pool_size=args.pool) + + # Create a table where capture screenshots will be displayed + screenshots_table = create_table( + table_title="Screenshots", + table_headers=["#", "filename", "size (bytes)", "created"], + ) - # Initialize Queue and add tasks - queue = Queue() + # Initialize Queue and add tasks + queue = Queue() - for onion_index, onion in enumerate(onions, start=1): - queue.put((onion_index, onion)) - if onion_index == args.limit: - # If onion index is equal to the limit set in -l/--limit, break the loop. - break + for onion_index, onion in enumerate(onions, start=1): + queue.put((onion_index, onion)) + if onion_index == args.limit: + # If onion index is equal to the limit set in -l/--limit, break the loop. + break - # Initialize threads - threads = [] - for _ in range(args.threads): # create 3 (default) worker threads - t = Thread(target=worker, args=(queue, screenshots_table, firefox_pool)) - t.start() - threads.append(t) + # Initialize threads + threads = [] + for _ in range(args.threads): # create 3 (default) worker threads + t = Thread(target=worker, args=(queue, screenshots_table, firefox_pool)) + t.start() + threads.append(t) - # Wait for all threads to finish - for thread in threads: - thread.join() + # Wait for all threads to finish + for thread in threads: + thread.join() - log.info("DONE!\n") + log.info("DONE!\n") - # Print table showing captured screenshots - print(screenshots_table) - print("\n") + # Print table showing captured screenshots + print(screenshots_table) + print("\n") - # Print the summary tables for captured and skipped onions - captured_onions, skipped_onions = onion_summary_tables( - captured_onions=list(captured_onions_queue.queue), - skipped_onions=list(skipped_onions_queue.queue), - ) - log.info(f"{len(captured_onions_queue.queue)} onions captured.") - print(captured_onions) - - log.info(f"{len(skipped_onions_queue.queue)} onions skipped.") - print(skipped_onions) - - except KeyboardInterrupt: - log.warning(f"User Interruption detected ([yellow]Ctrl+C[/])") - exit() - except Exception as e: - log.error(f"An error occurred: [red]{e}[/]") - exit() - finally: - if firefox_pool is not None: - close_firefox_pool(pool=firefox_pool) - tor_service(command="stop") - log.info(f"Stopped in {datetime.now() - start_time} seconds.") + # Print the summary tables for captured and skipped onions + captured_onions, skipped_onions = onion_summary_tables( + captured_onions=list(captured_onions_queue.queue), + skipped_onions=list(skipped_onions_queue.queue), + ) + log.info(f"{len(captured_onions_queue.queue)} onions captured.") + print(captured_onions) + + log.info(f"{len(skipped_onions_queue.queue)} onions skipped.") + print(skipped_onions) + + except KeyboardInterrupt: + log.warning(f"User Interruption detected ([yellow]Ctrl+C[/])") + exit() + except Exception as e: + log.error(f"An error occurred: [red]{e}[/]") + exit() + finally: + if firefox_pool is not None: + close_firefox_pool(pool=firefox_pool) + + tor_service(command="stop") + log.info(f"Stopped in {datetime.now() - start_time} seconds.") + else: + log.warning(f"{target_onion_url} does not seem to be a valid onion.")