468 lines
21 KiB
Python
468 lines
21 KiB
Python
import http.server
|
|
import json
|
|
import logging
|
|
import multiprocessing
|
|
import os
|
|
import queue # For queue.Empty exception
|
|
import shutil # For removing temp dir if TemporaryDirectory context manager not used for whole scope
|
|
import socketserver
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
|
|
import requests # For checking server readiness
|
|
|
|
# Adjust the Python path to include the parent directory (project root)
|
|
# so that 'lib.downloader' can be imported.
|
|
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
|
|
from lib.downloader import STATE_FILE_NAME, download_site
|
|
|
|
# Configure logging for the example script
|
|
logger = logging.getLogger(__name__)
|
|
logging.basicConfig(
|
|
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
|
|
)
|
|
|
|
|
|
# This function needs to be at the top level for multiprocessing to find it.
|
|
def run_download_test_process(test_name, downloader_kwargs, queue):
|
|
"""
|
|
Wrapper to run download_site in a separate process and put summary in a queue.
|
|
"""
|
|
logger.info(f"--- Running Test in subprocess: {test_name} ---")
|
|
summary = None
|
|
try:
|
|
summary = download_site(**downloader_kwargs)
|
|
logger.info(f"Test {test_name} completed in subprocess.")
|
|
except Exception as e:
|
|
logger.error(f"Error in test {test_name} (subprocess): {e}", exc_info=True)
|
|
# summary will remain None or be an incomplete one if error is after its creation
|
|
finally:
|
|
queue.put({"test_name": test_name, "summary": summary})
|
|
|
|
|
|
def create_temp_site_files(base_dir):
|
|
"""Creates the dummy HTML files in a 'test_site' subdirectory of base_dir."""
|
|
site_dir = os.path.join(base_dir, "test_site")
|
|
os.makedirs(os.path.join(site_dir, "sub"), exist_ok=True)
|
|
|
|
with open(os.path.join(site_dir, "index.html"), "w") as f:
|
|
f.write(
|
|
'<h1>Index</h1><a href="page1.html">Page 1</a> <a href="sub/page2.html">Page 2</a> <a href="ignored_page.html">Ignored</a> <a href="nonexistent.html">Non Existent</a>'
|
|
)
|
|
with open(os.path.join(site_dir, "page1.html"), "w") as f:
|
|
f.write('<h1>Page 1</h1><a href="index.html">Index</a>')
|
|
with open(os.path.join(site_dir, "sub", "page2.html"), "w") as f:
|
|
f.write(
|
|
'<h1>Page 2</h1><a href="../index.html">Index Back</a> <a href="http://neverssl.com">External</a>'
|
|
)
|
|
with open(os.path.join(site_dir, "ignored_page.html"), "w") as f:
|
|
f.write("<h1>Ignored Page</h1>")
|
|
logger.info(f"Created dummy site files in {site_dir}")
|
|
return site_dir
|
|
|
|
|
|
# Top-level target function for the HTTP server process
|
|
def _http_server_target_function(directory, host, port):
|
|
import functools
|
|
|
|
# Use functools.partial to set the 'directory' argument for SimpleHTTPRequestHandler
|
|
# This ensures the server serves files from the specified 'directory'.
|
|
Handler = functools.partial(
|
|
http.server.SimpleHTTPRequestHandler, directory=directory
|
|
)
|
|
|
|
try:
|
|
with socketserver.TCPServer((host, port), Handler) as httpd:
|
|
logger.info(
|
|
f"HTTP server process (PID {os.getpid()}) started on {host}:{port}, serving {directory}"
|
|
)
|
|
httpd.serve_forever()
|
|
except Exception as e:
|
|
logger.error(
|
|
f"HTTP server process (PID {os.getpid()}) failed: {e}", exc_info=True
|
|
)
|
|
raise
|
|
|
|
|
|
def start_http_server_process(directory, host, port):
|
|
"""Starts a simple HTTP server in a separate process."""
|
|
# Removed server_ready_event = multiprocessing.Event()
|
|
|
|
server_process = multiprocessing.Process(
|
|
target=_http_server_target_function,
|
|
args=(directory, host, port), # Removed server_ready_event from args
|
|
daemon=True,
|
|
)
|
|
server_process.start()
|
|
logger.info(
|
|
f"HTTP server process (PID: {server_process.pid}) initiated for {directory} on {host}:{port}"
|
|
)
|
|
# Removed event waiting logic
|
|
|
|
return server_process
|
|
|
|
|
|
def find_free_port():
|
|
"""Finds an available port on the local machine."""
|
|
with socketserver.TCPServer(
|
|
("localhost", 0), http.server.BaseHTTPRequestHandler
|
|
) as s:
|
|
return s.server_address[1]
|
|
|
|
|
|
def check_server_ready(url, retries=10, delay=0.5):
|
|
"""Checks if the server is responding to requests."""
|
|
for i in range(retries):
|
|
try:
|
|
response = requests.get(url, timeout=1)
|
|
if response.status_code == 200:
|
|
logger.info(f"Server is ready at {url}")
|
|
return True
|
|
except requests.ConnectionError:
|
|
logger.debug(
|
|
f"Server not ready yet at {url}, attempt {i + 1}/{retries}. Retrying in {delay}s..."
|
|
)
|
|
except requests.Timeout:
|
|
logger.debug(
|
|
f"Server timed out at {url}, attempt {i + 1}/{retries}. Retrying in {delay}s..."
|
|
)
|
|
time.sleep(delay)
|
|
logger.error(f"Server failed to start at {url} after {retries} retries.")
|
|
return False
|
|
|
|
|
|
def main():
|
|
# Using TemporaryDirectory for automatic cleanup
|
|
with tempfile.TemporaryDirectory(prefix="downloader_test_") as temp_base_dir:
|
|
logger.info(f"Created temporary base directory: {temp_base_dir}")
|
|
|
|
# 1. Create the dummy website files
|
|
site_root_path = create_temp_site_files(
|
|
temp_base_dir
|
|
) # This is /tmp/xxxx/test_site
|
|
|
|
# 2. Start the HTTP server
|
|
host = "localhost"
|
|
port = find_free_port()
|
|
server_process = start_http_server_process(site_root_path, host, port)
|
|
|
|
test_url_base = f"http://{host}:{port}/" # Server serves from site_root_path, so URLs are relative to that
|
|
|
|
# 3. Check if server is ready
|
|
# We check the index.html which is at the root of what's being served
|
|
if not check_server_ready(test_url_base + "index.html"):
|
|
logger.error("Test server failed to become ready. Aborting tests.")
|
|
if server_process.is_alive():
|
|
server_process.terminate()
|
|
server_process.join(timeout=5)
|
|
return
|
|
|
|
# 4. Define test parameters
|
|
# Destination for downloaded content will also be inside the temp_base_dir
|
|
download_destination_root = os.path.join(temp_base_dir, "downloaded_content")
|
|
os.makedirs(download_destination_root, exist_ok=True)
|
|
|
|
tests_params_config = [
|
|
(
|
|
"1: Basic recursive download (depth 2)",
|
|
{
|
|
"start_url": test_url_base + "index.html",
|
|
"dest_dir": os.path.join(download_destination_root, "test1"),
|
|
"recursive": True,
|
|
"follow_links": True,
|
|
"depth_limit": 2,
|
|
"max_age_hours": 0,
|
|
},
|
|
),
|
|
(
|
|
"2: With ignore_paths and max_age (reuse test1 dir)",
|
|
{
|
|
"start_url": test_url_base + "index.html",
|
|
"dest_dir": os.path.join(
|
|
download_destination_root, "test1"
|
|
), # Use same dest
|
|
"recursive": True,
|
|
"follow_links": True,
|
|
"depth_limit": 2,
|
|
"ignore_paths": ["ignored_page.html"],
|
|
"max_age_hours": 1, # Should skip files from test1 if downloaded recently
|
|
},
|
|
),
|
|
(
|
|
"3: Non-recursive (single page)",
|
|
{
|
|
"start_url": test_url_base + "page1.html",
|
|
"dest_dir": os.path.join(download_destination_root, "test3"),
|
|
"recursive": False, # Effectively depth_limit 0 for the spider
|
|
"max_age_hours": 0,
|
|
},
|
|
),
|
|
(
|
|
"4: Depth limit 0 (only start_url)",
|
|
{
|
|
"start_url": test_url_base + "index.html",
|
|
"dest_dir": os.path.join(download_destination_root, "test4_depth0"),
|
|
"recursive": True, # 'recursive' flag enables depth control
|
|
"follow_links": True,
|
|
"depth_limit": 0, # Spider should only download index.html
|
|
"max_age_hours": 0,
|
|
},
|
|
),
|
|
(
|
|
"5: Depth limit 1",
|
|
{
|
|
"start_url": test_url_base + "index.html",
|
|
"dest_dir": os.path.join(download_destination_root, "test5_depth1"),
|
|
"recursive": True,
|
|
"follow_links": True,
|
|
"depth_limit": 1, # index.html and its direct links
|
|
"max_age_hours": 0,
|
|
},
|
|
),
|
|
]
|
|
|
|
# 5. Run tests using multiprocessing
|
|
# A queue to get results back from subprocesses
|
|
results_queue = multiprocessing.Queue()
|
|
processes = []
|
|
|
|
for test_name, downloader_kwargs in tests_params_config:
|
|
# Ensure dest_dir exists for each test before starting
|
|
os.makedirs(downloader_kwargs["dest_dir"], exist_ok=True)
|
|
|
|
p = multiprocessing.Process(
|
|
target=run_download_test_process,
|
|
args=(test_name, downloader_kwargs, results_queue),
|
|
)
|
|
processes.append(p)
|
|
p.start()
|
|
|
|
# Wait for all processes to complete
|
|
for p in processes:
|
|
p.join()
|
|
|
|
# Collect and print results
|
|
logger.info("\n--- All Test Processes Completed. Results: ---")
|
|
all_tests_passed = True
|
|
results_collected = 0
|
|
failed_tests_details = [] # Store details of failed tests
|
|
|
|
# ANSI escape codes for colors
|
|
RED = "\033[91m"
|
|
GREEN = "\033[92m"
|
|
RESET = "\033[0m"
|
|
|
|
while results_collected < len(tests_params_config):
|
|
current_test_passed = True
|
|
failure_reason = ""
|
|
try:
|
|
result = results_queue.get(timeout=10) # Timeout to avoid hanging
|
|
results_collected += 1
|
|
test_name = result["test_name"]
|
|
summary = result["summary"]
|
|
|
|
print(f"\nResult for Test: {test_name}")
|
|
if summary:
|
|
print(f" Summary: {json.dumps(summary, indent=2)}")
|
|
# Basic check: if errors array in summary is empty, consider it a pass for now
|
|
if summary.get("errors") and len(summary.get("errors")) > 0:
|
|
failure_reason = (
|
|
f"Reported errors in summary: {summary.get('errors')}"
|
|
)
|
|
logger.error(f" Test '{test_name}' {failure_reason}")
|
|
current_test_passed = False
|
|
elif (
|
|
summary.get("successful_downloads", 0) == 0
|
|
and not (
|
|
test_name.startswith(
|
|
"4:"
|
|
) # Test 4 might have 0 successful if only start_url is processed
|
|
and summary.get("total_processed_urls", 0)
|
|
> 0 # and it was processed
|
|
)
|
|
and not test_name.startswith(
|
|
"2:"
|
|
) # Test 2 might have 0 successful if all skipped
|
|
):
|
|
# This condition is a bit loose. Specific checks below are more important.
|
|
# For now, we don't mark as failed here unless other checks also fail.
|
|
pass
|
|
|
|
# Specific checks for state and re-download
|
|
if test_name.startswith("1:"): # After Test 1
|
|
state_file = summary.get("state_file_path")
|
|
if state_file and os.path.exists(state_file):
|
|
with open(state_file, "r") as f:
|
|
state = json.load(f)
|
|
expected_success_files = [
|
|
test_url_base + "index.html",
|
|
test_url_base + "page1.html",
|
|
test_url_base + "sub/page2.html",
|
|
]
|
|
actual_success_count = 0
|
|
for url, data in state.items():
|
|
if (
|
|
url in expected_success_files
|
|
and data.get("status") == "success"
|
|
):
|
|
actual_success_count += 1
|
|
if actual_success_count >= 3:
|
|
logger.info(
|
|
f" Test 1: State file check PASSED for key successful files."
|
|
)
|
|
else:
|
|
failure_reason = f"State file check FAILED. Expected ~3 successes, got {actual_success_count}. State: {state}"
|
|
logger.error(f" Test 1: {failure_reason}")
|
|
current_test_passed = False
|
|
else:
|
|
failure_reason = (
|
|
"State file not found or summary incomplete."
|
|
)
|
|
logger.error(f" Test 1: {failure_reason}")
|
|
current_test_passed = False
|
|
|
|
elif test_name.startswith(
|
|
"2:"
|
|
): # After Test 2 (re-run on test1 dir)
|
|
state_file = summary.get("state_file_path")
|
|
if state_file and os.path.exists(state_file):
|
|
with open(state_file, "r") as f:
|
|
state = json.load(f)
|
|
skipped_count = 0
|
|
main_files_to_check_skip = [
|
|
test_url_base + "index.html",
|
|
test_url_base + "page1.html",
|
|
test_url_base + "sub/page2.html",
|
|
]
|
|
for url_to_check in main_files_to_check_skip:
|
|
if (
|
|
url_to_check in state
|
|
and state[url_to_check].get("status")
|
|
== "skipped_max_age"
|
|
):
|
|
skipped_count += 1
|
|
|
|
if skipped_count >= 3:
|
|
logger.info(
|
|
f" Test 2: Re-download check (skipped_max_age) PASSED for key files."
|
|
)
|
|
else:
|
|
failure_reason = f"Re-download check FAILED. Expected ~3 skips, got {skipped_count}. State: {state}"
|
|
logger.error(f" Test 2: {failure_reason}")
|
|
current_test_passed = False
|
|
|
|
if (
|
|
test_url_base + "ignored_page.html" in state
|
|
and state[test_url_base + "ignored_page.html"].get(
|
|
"status"
|
|
)
|
|
== "success"
|
|
):
|
|
ignore_fail_reason = "ignored_page.html was downloaded, but should have been ignored."
|
|
logger.error(f" Test 2: {ignore_fail_reason}")
|
|
if not failure_reason:
|
|
failure_reason = ignore_fail_reason
|
|
else:
|
|
failure_reason += f"; {ignore_fail_reason}"
|
|
current_test_passed = False
|
|
else:
|
|
failure_reason = (
|
|
"State file not found or summary incomplete."
|
|
)
|
|
logger.error(f" Test 2: {failure_reason}")
|
|
current_test_passed = False
|
|
|
|
elif test_name.startswith("4:"): # Depth 0
|
|
state_file = summary.get("state_file_path")
|
|
if state_file and os.path.exists(state_file):
|
|
with open(state_file, "r") as f:
|
|
state = json.load(f)
|
|
if (
|
|
len(state) == 1
|
|
and (test_url_base + "index.html") in state
|
|
and (
|
|
state[test_url_base + "index.html"].get("status")
|
|
== "success"
|
|
# Allow "failed" for depth 0 if the single URL itself failed,
|
|
# as the test is about *not* crawling further.
|
|
or state[test_url_base + "index.html"].get("status")
|
|
== "failed"
|
|
)
|
|
):
|
|
logger.info(
|
|
f" Test 4: Depth 0 check PASSED (1 item in state)."
|
|
)
|
|
else:
|
|
failure_reason = f"Depth 0 check FAILED. Expected 1 item processed, got {len(state)}. State: {state}"
|
|
logger.error(f" Test 4: {failure_reason}")
|
|
current_test_passed = False
|
|
else:
|
|
failure_reason = (
|
|
"State file not found or summary incomplete."
|
|
)
|
|
logger.error(f" Test 4: {failure_reason}")
|
|
current_test_passed = False
|
|
else:
|
|
failure_reason = (
|
|
"Did not return a summary (likely failed hard in subprocess)."
|
|
)
|
|
logger.error(f" Test '{test_name}' {failure_reason}")
|
|
current_test_passed = False
|
|
|
|
except queue.Empty: # Changed from multiprocessing.queues.Empty
|
|
test_name = f"Unknown Test (result {results_collected + 1} of {len(tests_params_config)})"
|
|
failure_reason = "Queue was empty after waiting, a subprocess might have died without putting result."
|
|
logger.error(failure_reason)
|
|
current_test_passed = False
|
|
# Do not break here, try to collect other results if any.
|
|
# Instead, mark this attempt as a failure.
|
|
# We increment results_collected because we "processed" an attempt to get a result.
|
|
|
|
if not current_test_passed:
|
|
all_tests_passed = False
|
|
failed_tests_details.append(
|
|
{"name": test_name, "reason": failure_reason}
|
|
)
|
|
|
|
# 6. Terminate the server
|
|
logger.info("Terminating HTTP server process...")
|
|
if server_process.is_alive():
|
|
server_process.terminate()
|
|
server_process.join(timeout=5) # Wait for it to terminate
|
|
if server_process.is_alive():
|
|
logger.warning(
|
|
"Server process did not terminate gracefully, attempting to kill."
|
|
)
|
|
server_process.kill() # Force kill if terminate didn't work
|
|
server_process.join(timeout=5)
|
|
|
|
if server_process.is_alive():
|
|
logger.error("SERVER PROCESS COULD NOT BE STOPPED.")
|
|
else:
|
|
logger.info("HTTP server process stopped.")
|
|
|
|
if failed_tests_details:
|
|
logger.error(f"\n--- {RED}Summary of Failed Tests{RESET} ---")
|
|
for failed_test in failed_tests_details:
|
|
logger.error(f"{RED} Test: {failed_test['name']}{RESET}")
|
|
logger.error(f"{RED} Reason: {failed_test['reason']}{RESET}")
|
|
logger.error(f"\n{RED}Some downloader tests FAILED.{RESET}")
|
|
sys.exit(1) # Exit with error code if tests failed
|
|
else:
|
|
logger.info(
|
|
f"\n{GREEN}All downloader tests PASSED (based on implemented checks).{RESET}"
|
|
)
|
|
|
|
# Note: TemporaryDirectory temp_base_dir is automatically cleaned up here
|
|
logger.info(
|
|
f"Temporary base directory {temp_base_dir} and its contents (should be) removed."
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# It's good practice to protect the main call for multiprocessing,
|
|
# especially on Windows, though 'spawn' (default on macOS for 3.8+) is generally safer.
|
|
multiprocessing.freeze_support() # For PyInstaller compatibility, good habit
|
|
main()
|