generated from nhcarrigan/template
3aa90fa316
CI / dependency-pin-check-typescript (push) Successful in 4s
CI / dependency-pin-check-python (push) Successful in 4s
Security Scan and Upload / Security & DefectDojo Upload (push) Successful in 1m7s
CI / typescript (push) Failing after 4m48s
CI / python (push) Failing after 4m54s
765 lines
26 KiB
Python
765 lines
26 KiB
Python
#!/usr/bin/env python3
|
|
# /// script
|
|
# requires-python = ">=3.11"
|
|
# dependencies = [
|
|
# "requests==2.32.3",
|
|
# "tqdm==4.67.1",
|
|
# "questionary==2.0.1",
|
|
# "rich==13.9.4",
|
|
# ]
|
|
# ///
|
|
"""
|
|
Interactive TUI wizard for downloading Gargoyle-compatible interactive fiction games from IFDB.
|
|
|
|
Fetches the IFDB SQL dump, builds an in-memory index, then walks the user through
|
|
filtering by format, rating, and genre before downloading the matching games.
|
|
|
|
Usage:
|
|
uv run download.py
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import io
|
|
import re
|
|
import sys
|
|
import zipfile
|
|
from collections import defaultdict
|
|
from pathlib import Path
|
|
from urllib.parse import unquote, urljoin, urlparse
|
|
|
|
import questionary
|
|
import requests
|
|
from rich.console import Console
|
|
from rich.table import Table
|
|
from tqdm import tqdm
|
|
|
|
|
|
console = Console()
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Format families — groups every Gargoyle-compatible extension by interpreter
|
|
# ---------------------------------------------------------------------------
|
|
|
|
FORMAT_FAMILIES: dict[str, frozenset[str]] = {
|
|
"Z-machine": frozenset({".z1", ".z2", ".z3", ".z4", ".z5", ".z6", ".z7", ".z8", ".zblorb", ".zlb"}),
|
|
"Glulx": frozenset({".ulx", ".gblorb", ".glb", ".blorb", ".blb"}),
|
|
"TADS 2": frozenset({".gam"}),
|
|
"TADS 3": frozenset({".t3"}),
|
|
"Hugo": frozenset({".hex"}),
|
|
"ADRIFT": frozenset({".taf"}),
|
|
"Alan": frozenset({".acd", ".a2c", ".a3c"}),
|
|
"Level 9": frozenset({".l9", ".sna"}),
|
|
"Magnetic Scrolls": frozenset({".mag"}),
|
|
"AGT": frozenset({".agx"}),
|
|
"JACL": frozenset({".jacl", ".j2"}),
|
|
"Scott Adams": frozenset({".saga"}),
|
|
}
|
|
|
|
GARGOYLE_EXTENSIONS: frozenset[str] = frozenset().union(*FORMAT_FAMILIES.values())
|
|
|
|
DUMP_URL_CANDIDATES: list[str] = [
|
|
"https://ifarchive.org/if-archive/info/ifdb/ifdb-archive-20260301.zip",
|
|
"https://ifarchive.org/if-archive/info/ifdb/ifdb-archive-20251201.zip",
|
|
"https://ifarchive.org/if-archive/info/ifdb/ifdb-archive-20250901.zip",
|
|
"https://ifarchive.org/if-archive/info/ifdb/ifdb-archive-20250601.zip",
|
|
"https://ifarchive.org/if-archive/info/ifdb/ifdb-archive-20250301.zip",
|
|
"https://ifarchive.org/if-archive/info/ifdb/ifdb-archive-20241201.zip",
|
|
"https://ifarchive.org/if-archive/info/ifdb/ifdb-archive-20240901.zip",
|
|
"https://ifarchive.org/if-archive/info/ifdb/ifdb-archive-20240601.zip",
|
|
"https://ifarchive.org/if-archive/info/ifdb/ifdb-archive-20240301.zip",
|
|
]
|
|
|
|
IFARCHIVE_BASE = "https://ifarchive.org"
|
|
BAYESIAN_WEIGHT = 10
|
|
|
|
BROWSER_HEADERS = {
|
|
"User-Agent": (
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
|
|
"(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
|
|
),
|
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Welcome screen
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def show_welcome() -> None:
|
|
console.print()
|
|
console.print("[bold cyan]━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[/bold cyan]")
|
|
console.print("[bold white] IFDB Interactive Fiction Downloader[/bold white]")
|
|
console.print("[bold cyan]━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[/bold cyan]")
|
|
console.print()
|
|
console.print(
|
|
"This tool downloads interactive fiction games from the [bold]IF Database (IFDB)[/bold],\n"
|
|
"filtered to only include files playable in [bold]Gargoyle[/bold] — a multi-interpreter\n"
|
|
"IF player supporting Z-machine, Glulx, TADS, Hugo, ADRIFT, and more.\n"
|
|
)
|
|
console.print("[bold]Here's how it works:[/bold]")
|
|
console.print(" 1. Download and parse the IFDB SQL database dump (~50 MB compressed)")
|
|
console.print(" 2. Build an in-memory index of all games, ratings, and download links")
|
|
console.print(" 3. Walk you through three filters: format, rating, and genre")
|
|
console.print(" 4. Show you a summary of how many games match before you commit")
|
|
console.print(" 5. Download everything to a directory of your choice")
|
|
console.print()
|
|
console.print("[dim]Files are saved with their original names — no renaming.[/dim]")
|
|
console.print()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Dump fetch
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def find_dump_url() -> str:
|
|
console.print("[bold]Searching for the latest IFDB dump on IF Archive...[/bold]")
|
|
for url in DUMP_URL_CANDIDATES:
|
|
try:
|
|
response = requests.head(url, timeout=15, allow_redirects=True)
|
|
if response.status_code == 200:
|
|
console.print(f" [green]✓[/green] Found: {url}")
|
|
return url
|
|
console.print(f" [dim]{response.status_code}: {url}[/dim]")
|
|
except requests.RequestException as exc:
|
|
console.print(f" [red]✗[/red] {exc}: {url}")
|
|
raise SystemExit(
|
|
"\nCould not auto-detect the IFDB dump URL. "
|
|
"Please check your internet connection and try again."
|
|
)
|
|
|
|
|
|
def download_bytes(url: str, label: str) -> bytes:
|
|
response = requests.get(url, stream=True, timeout=120)
|
|
response.raise_for_status()
|
|
total = int(response.headers.get("content-length", 0))
|
|
buffer = io.BytesIO()
|
|
with tqdm(total=total or None, unit="B", unit_scale=True, desc=label) as bar:
|
|
for chunk in response.iter_content(chunk_size=65_536):
|
|
buffer.write(chunk)
|
|
bar.update(len(chunk))
|
|
return buffer.getvalue()
|
|
|
|
|
|
def extract_sql_from_zip(zip_data: bytes) -> str:
|
|
with zipfile.ZipFile(io.BytesIO(zip_data)) as archive:
|
|
sql_names = [n for n in archive.namelist() if n.endswith(".sql")]
|
|
if not sql_names:
|
|
raise SystemExit("No .sql file found inside the IFDB dump zip.")
|
|
main = max(sql_names, key=lambda n: archive.getinfo(n).file_size)
|
|
console.print(f"Extracting [bold]{main}[/bold] ({archive.getinfo(main).file_size:,} bytes)...")
|
|
return archive.read(main).decode("utf-8", errors="replace")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# MySQL dump parser
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _parse_sql_value(raw: str) -> str | None:
|
|
stripped = raw.strip()
|
|
return None if stripped.upper() == "NULL" else stripped
|
|
|
|
|
|
def parse_mysql_values(values_str: str) -> list[tuple[str | None, ...]]:
|
|
rows: list[tuple[str | None, ...]] = []
|
|
current_row: list[str | None] = []
|
|
token_chars: list[str] = []
|
|
in_string = False
|
|
depth = 0
|
|
i = 0
|
|
length = len(values_str)
|
|
|
|
while i < length:
|
|
char = values_str[i]
|
|
|
|
if in_string:
|
|
if char == "\\":
|
|
if i + 1 < length:
|
|
token_chars.append(values_str[i + 1])
|
|
i += 2
|
|
else:
|
|
i += 1
|
|
continue
|
|
if char == "'":
|
|
if i + 1 < length and values_str[i + 1] == "'":
|
|
token_chars.append("'")
|
|
i += 2
|
|
continue
|
|
in_string = False
|
|
i += 1
|
|
continue
|
|
token_chars.append(char)
|
|
i += 1
|
|
continue
|
|
|
|
if char == "'":
|
|
in_string = True
|
|
i += 1
|
|
continue
|
|
|
|
if char == "(":
|
|
depth += 1
|
|
if depth == 1:
|
|
current_row = []
|
|
token_chars = []
|
|
else:
|
|
token_chars.append(char)
|
|
i += 1
|
|
continue
|
|
|
|
if char == ")":
|
|
depth -= 1
|
|
if depth == 0:
|
|
current_row.append(_parse_sql_value("".join(token_chars)))
|
|
rows.append(tuple(current_row))
|
|
current_row = []
|
|
token_chars = []
|
|
else:
|
|
token_chars.append(char)
|
|
i += 1
|
|
continue
|
|
|
|
if char == "," and depth == 1:
|
|
current_row.append(_parse_sql_value("".join(token_chars)))
|
|
token_chars = []
|
|
i += 1
|
|
continue
|
|
|
|
if depth > 0:
|
|
token_chars.append(char)
|
|
i += 1
|
|
|
|
return rows
|
|
|
|
|
|
def _extract_column_names(create_body: str) -> list[str]:
|
|
columns: list[str] = []
|
|
for match in re.finditer(r"^\s*`(\w+)`\s+\w", create_body, re.MULTILINE):
|
|
columns.append(match.group(1))
|
|
return columns
|
|
|
|
|
|
def parse_dump(sql: str, tables_wanted: set[str]) -> dict[str, list[dict]]:
|
|
table_columns: dict[str, list[str]] = {}
|
|
table_data: dict[str, list[dict]] = {t: [] for t in tables_wanted}
|
|
|
|
console.print("Splitting dump into statements...")
|
|
statements = sql.split(";\n")
|
|
console.print(f" {len(statements):,} statements found")
|
|
|
|
create_re = re.compile(r"CREATE\s+TABLE\s+`(\w+)`\s*\((.+)\)", re.DOTALL | re.IGNORECASE)
|
|
insert_re = re.compile(
|
|
r"INSERT\s+INTO\s+`(\w+)`(?:\s*\(([^)]+)\))?\s+VALUES\s*(.+)",
|
|
re.DOTALL | re.IGNORECASE,
|
|
)
|
|
|
|
for statement in tqdm(statements, desc="Parsing statements", unit="stmt"):
|
|
upper = statement.lstrip()[:20].upper()
|
|
|
|
if upper.startswith("CREATE"):
|
|
match = create_re.search(statement)
|
|
if match:
|
|
name = match.group(1)
|
|
if name in tables_wanted:
|
|
table_columns[name] = _extract_column_names(match.group(2))
|
|
|
|
elif upper.startswith("INSERT"):
|
|
match = insert_re.search(statement)
|
|
if not match:
|
|
continue
|
|
name = match.group(1)
|
|
if name not in tables_wanted:
|
|
continue
|
|
|
|
if match.group(2):
|
|
columns = [c.strip().strip("`").strip('"') for c in match.group(2).split(",")]
|
|
else:
|
|
columns = table_columns.get(name, [])
|
|
|
|
if not columns:
|
|
continue
|
|
|
|
for row in parse_mysql_values(match.group(3)):
|
|
if len(row) == len(columns):
|
|
table_data[name].append(dict(zip(columns, row)))
|
|
|
|
for table in tables_wanted:
|
|
cols = table_columns.get(table, [])
|
|
console.print(
|
|
f" [bold]{table}[/bold]: {len(table_data[table]):,} rows "
|
|
f"({', '.join(cols[:6])}{'...' if len(cols) > 6 else ''})"
|
|
)
|
|
|
|
return table_data
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# URL utilities
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def is_gargoyle_url(url: str) -> bool:
|
|
return Path(urlparse(url).path.lower()).suffix in GARGOYLE_EXTENSIONS
|
|
|
|
|
|
def resolve_url(url: str) -> str:
|
|
return url if urlparse(url).scheme else urljoin(IFARCHIVE_BASE, url)
|
|
|
|
|
|
def get_format_family(url: str) -> str | None:
|
|
ext = Path(urlparse(url).path.lower()).suffix
|
|
for family, extensions in FORMAT_FAMILIES.items():
|
|
if ext in extensions:
|
|
return family
|
|
return None
|
|
|
|
|
|
def best_link(links: list[dict]) -> dict | None:
|
|
uncompressed = [
|
|
lnk for lnk in links
|
|
if is_gargoyle_url(lnk["url"])
|
|
and lnk.get("compression") in (None, "", "0", "false", "FALSE")
|
|
]
|
|
if uncompressed:
|
|
return uncompressed[0]
|
|
compatible = [lnk for lnk in links if is_gargoyle_url(lnk["url"])]
|
|
return compatible[0] if compatible else None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Index building
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def build_indices(data: dict[str, list[dict]]) -> dict:
|
|
console.print("\n[bold]Building indices...[/bold]")
|
|
|
|
game_title: dict[str, str] = {}
|
|
game_author: dict[str, str] = {}
|
|
game_genre: dict[str, str] = {}
|
|
|
|
for row in data["games"]:
|
|
gid = row.get("id")
|
|
if not gid:
|
|
continue
|
|
game_title[gid] = row.get("title") or f"game_{gid}"
|
|
game_author[gid] = row.get("author") or ""
|
|
genre = (row.get("genre") or "").strip()
|
|
game_genre[gid] = genre if genre else "Uncategorised"
|
|
|
|
ratings_by_game: dict[str, list[float]] = defaultdict(list)
|
|
for row in data["reviews"]:
|
|
gid = row.get("gameid")
|
|
raw = row.get("rating")
|
|
if not gid or raw in (None, "0", "NULL", ""):
|
|
continue
|
|
try:
|
|
ratings_by_game[gid].append(float(raw))
|
|
except ValueError:
|
|
pass
|
|
|
|
raw_avg: dict[str, float] = {
|
|
gid: sum(rs) / len(rs) for gid, rs in ratings_by_game.items()
|
|
}
|
|
|
|
links_by_game: dict[str, list[dict]] = defaultdict(list)
|
|
for row in data["gamelinks"]:
|
|
gid = row.get("gameid")
|
|
url = row.get("url", "")
|
|
if not gid or not url:
|
|
continue
|
|
full_url = resolve_url(url)
|
|
if is_gargoyle_url(full_url):
|
|
links_by_game[gid].append({**row, "url": full_url})
|
|
|
|
all_gargoyle_ids: set[str] = set(links_by_game.keys())
|
|
|
|
game_family: dict[str, str] = {}
|
|
for gid in all_gargoyle_ids:
|
|
link = best_link(links_by_game[gid])
|
|
if link:
|
|
game_family[gid] = get_format_family(link["url"]) or "Unknown"
|
|
|
|
console.print(f" Games in DB: {len(game_title):,}")
|
|
console.print(f" Games with ratings: {len(ratings_by_game):,}")
|
|
console.print(f" Games with Gargoyle links: {len(all_gargoyle_ids):,}")
|
|
|
|
return {
|
|
"game_title": game_title,
|
|
"game_author": game_author,
|
|
"game_genre": game_genre,
|
|
"raw_avg": raw_avg,
|
|
"links_by_game": links_by_game,
|
|
"all_gargoyle_ids": all_gargoyle_ids,
|
|
"game_family": game_family,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Filter helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
RATING_KEYS = ["all", "rated", "≥ 2", "≥ 3", "≥ 4", "≥ 5"]
|
|
|
|
RATING_LABELS: dict[str, str] = {
|
|
"all": "All (including unrated)",
|
|
"rated": "Any rated game (≥ 1 star)",
|
|
"≥ 2": "≥ 2 stars",
|
|
"≥ 3": "≥ 3 stars",
|
|
"≥ 4": "≥ 4 stars",
|
|
"≥ 5": "≥ 5 stars (perfect scores only)",
|
|
}
|
|
|
|
|
|
def count_by_format(indices: dict) -> dict[str, int]:
|
|
counts: dict[str, int] = defaultdict(int)
|
|
for gid in indices["all_gargoyle_ids"]:
|
|
family = indices["game_family"].get(gid, "Unknown")
|
|
counts[family] += 1
|
|
return dict(sorted(counts.items(), key=lambda kv: kv[1], reverse=True))
|
|
|
|
|
|
def count_by_rating(indices: dict) -> dict[str, int]:
|
|
all_ids = indices["all_gargoyle_ids"]
|
|
raw_avg = indices["raw_avg"]
|
|
return {
|
|
"all": len(all_ids),
|
|
"rated": sum(1 for gid in all_ids if gid in raw_avg),
|
|
"≥ 2": sum(1 for gid in all_ids if raw_avg.get(gid, 0) >= 2),
|
|
"≥ 3": sum(1 for gid in all_ids if raw_avg.get(gid, 0) >= 3),
|
|
"≥ 4": sum(1 for gid in all_ids if raw_avg.get(gid, 0) >= 4),
|
|
"≥ 5": sum(1 for gid in all_ids if raw_avg.get(gid, 0) >= 5),
|
|
}
|
|
|
|
|
|
def count_by_genre(indices: dict) -> dict[str, int]:
|
|
counts: dict[str, int] = defaultdict(int)
|
|
for gid in indices["all_gargoyle_ids"]:
|
|
genre = indices["game_genre"].get(gid, "Uncategorised")
|
|
counts[genre] += 1
|
|
return dict(sorted(counts.items(), key=lambda kv: kv[1], reverse=True))
|
|
|
|
|
|
def _passes_rating_filter(gid: str, raw_avg: dict[str, float], rating_key: str) -> bool:
|
|
if rating_key == "all":
|
|
return True
|
|
if rating_key == "rated":
|
|
return gid in raw_avg
|
|
threshold = float(rating_key.replace("≥ ", ""))
|
|
return raw_avg.get(gid, 0) >= threshold
|
|
|
|
|
|
def apply_filters(
|
|
indices: dict,
|
|
selected_families: set[str],
|
|
rating_key: str,
|
|
selected_genres: set[str],
|
|
) -> list[str]:
|
|
raw_avg = indices["raw_avg"]
|
|
result: list[str] = []
|
|
for gid in indices["all_gargoyle_ids"]:
|
|
if indices["game_family"].get(gid, "Unknown") not in selected_families:
|
|
continue
|
|
if not _passes_rating_filter(gid, raw_avg, rating_key):
|
|
continue
|
|
if indices["game_genre"].get(gid, "Uncategorised") not in selected_genres:
|
|
continue
|
|
result.append(gid)
|
|
return result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TUI wizard steps
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def ask_formats(indices: dict) -> set[str]:
|
|
format_counts = count_by_format(indices)
|
|
|
|
console.print()
|
|
console.print("[bold cyan]Step 1 of 3 — File Formats[/bold cyan]")
|
|
console.print(
|
|
"Select the formats you want to include. "
|
|
"[dim]All are pre-selected — uncheck any you don't want.[/dim]"
|
|
)
|
|
console.print()
|
|
|
|
choices = [
|
|
questionary.Choice(
|
|
title=f"{family} ({count:,} games)",
|
|
value=family,
|
|
checked=True,
|
|
)
|
|
for family, count in format_counts.items()
|
|
if count > 0
|
|
]
|
|
|
|
selected = questionary.checkbox("Formats to include:", choices=choices).ask()
|
|
if selected is None:
|
|
sys.exit(0)
|
|
if not selected:
|
|
console.print("[yellow]Nothing selected — defaulting to all formats.[/yellow]")
|
|
return set(format_counts.keys())
|
|
return set(selected)
|
|
|
|
|
|
def ask_rating(indices: dict) -> str:
|
|
rating_counts = count_by_rating(indices)
|
|
|
|
console.print()
|
|
console.print("[bold cyan]Step 2 of 3 — Minimum Rating[/bold cyan]")
|
|
console.print(
|
|
"Choose the minimum average rating a game must have to be included.\n"
|
|
"[dim]Counts are independent of your format selection.[/dim]"
|
|
)
|
|
console.print()
|
|
|
|
choices = [
|
|
questionary.Choice(
|
|
title=f"{RATING_LABELS[key]} ({rating_counts[key]:,} games)",
|
|
value=key,
|
|
)
|
|
for key in RATING_KEYS
|
|
]
|
|
|
|
selected = questionary.select("Minimum rating:", choices=choices).ask()
|
|
if selected is None:
|
|
sys.exit(0)
|
|
return selected
|
|
|
|
|
|
def ask_genres(indices: dict) -> set[str]:
|
|
genre_counts = count_by_genre(indices)
|
|
|
|
console.print()
|
|
console.print("[bold cyan]Step 3 of 3 — Genres[/bold cyan]")
|
|
console.print(
|
|
"Select the genres you want to include. "
|
|
"[dim]All are pre-selected — uncheck any you don't want.\n"
|
|
"Counts are independent of your format and rating selections.[/dim]"
|
|
)
|
|
console.print()
|
|
|
|
choices = [
|
|
questionary.Choice(
|
|
title=f"{genre} ({count:,} games)",
|
|
value=genre,
|
|
checked=True,
|
|
)
|
|
for genre, count in genre_counts.items()
|
|
if count > 0
|
|
]
|
|
|
|
selected = questionary.checkbox("Genres to include:", choices=choices).ask()
|
|
if selected is None:
|
|
sys.exit(0)
|
|
if not selected:
|
|
console.print("[yellow]Nothing selected — defaulting to all genres.[/yellow]")
|
|
return set(genre_counts.keys())
|
|
return set(selected)
|
|
|
|
|
|
def show_filter_summary(
|
|
indices: dict,
|
|
selected_families: set[str],
|
|
rating_key: str,
|
|
selected_genres: set[str],
|
|
) -> int:
|
|
format_counts = count_by_format(indices)
|
|
rating_counts = count_by_rating(indices)
|
|
genre_counts = count_by_genre(indices)
|
|
|
|
format_total = sum(format_counts.get(f, 0) for f in selected_families)
|
|
rating_total = rating_counts[rating_key]
|
|
genre_total = sum(genre_counts.get(g, 0) for g in selected_genres)
|
|
combined = apply_filters(indices, selected_families, rating_key, selected_genres)
|
|
|
|
if len(selected_families) <= 4:
|
|
families_label = ", ".join(sorted(selected_families))
|
|
else:
|
|
families_label = f"{len(selected_families)} formats selected"
|
|
|
|
if len(selected_genres) <= 3:
|
|
genres_label = ", ".join(sorted(selected_genres))
|
|
else:
|
|
genres_label = f"{len(selected_genres)} genres selected"
|
|
|
|
table = Table(title="Filter Summary", show_header=True, header_style="bold cyan")
|
|
table.add_column("Filter", style="bold")
|
|
table.add_column("Selection")
|
|
table.add_column("Matching games", justify="right")
|
|
|
|
table.add_row("Format", families_label, f"{format_total:,}")
|
|
table.add_row("Rating", RATING_LABELS[rating_key], f"{rating_total:,}")
|
|
table.add_row("Genre", genres_label, f"{genre_total:,}")
|
|
table.add_section()
|
|
table.add_row(
|
|
"[bold]Combined[/bold]",
|
|
"[dim]all three filters applied[/dim]",
|
|
f"[bold green]{len(combined):,}[/bold green]",
|
|
)
|
|
|
|
console.print()
|
|
console.print(table)
|
|
console.print()
|
|
|
|
return len(combined)
|
|
|
|
|
|
def ask_output_path() -> Path:
|
|
console.print()
|
|
|
|
def validate_path(raw: str) -> bool | str:
|
|
if not raw.strip():
|
|
return "Please enter a path."
|
|
p = Path(raw.strip()).expanduser()
|
|
if p.exists() and not p.is_dir():
|
|
return f"{raw!r} exists and is not a directory."
|
|
return True
|
|
|
|
path_str = questionary.text(
|
|
"Where should the games be saved? (absolute path to a directory)",
|
|
validate=validate_path,
|
|
).ask()
|
|
if path_str is None:
|
|
sys.exit(0)
|
|
|
|
output_dir = Path(path_str.strip()).expanduser().resolve()
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
return output_dir
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Download
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def download_games(indices: dict, matching_ids: list[str], output_dir: Path) -> None:
|
|
console.print(f"\n[bold]Downloading {len(matching_ids):,} games to:[/bold] {output_dir}")
|
|
console.print()
|
|
|
|
errors: list[str] = []
|
|
skipped = 0
|
|
downloaded = 0
|
|
|
|
for gid in tqdm(matching_ids, desc="Downloading", unit="game"):
|
|
title = indices["game_title"].get(gid, f"game_{gid}")
|
|
link = best_link(indices["links_by_game"].get(gid, []))
|
|
|
|
if not link:
|
|
errors.append(f"{title}: no suitable download link")
|
|
continue
|
|
|
|
url = link["url"]
|
|
filename = unquote(Path(urlparse(url).path).name)
|
|
if not filename:
|
|
filename = f"game_{gid}" + Path(urlparse(url).path).suffix
|
|
|
|
filepath = output_dir / filename
|
|
if filepath.exists():
|
|
skipped += 1
|
|
continue
|
|
|
|
try:
|
|
response = requests.get(url, timeout=60, stream=True, headers=BROWSER_HEADERS)
|
|
response.raise_for_status()
|
|
with filepath.open("wb") as fh:
|
|
for chunk in response.iter_content(chunk_size=65_536):
|
|
fh.write(chunk)
|
|
downloaded += 1
|
|
except requests.RequestException as exc:
|
|
errors.append(f"{title}: {exc}")
|
|
if filepath.exists():
|
|
filepath.unlink()
|
|
|
|
console.print()
|
|
console.print("[bold]━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[/bold]")
|
|
console.print(f"[green]Downloaded:[/green] {downloaded:,}")
|
|
console.print(f"[dim]Skipped (already present):[/dim] {skipped:,}")
|
|
console.print(f"[red]Errors:[/red] {len(errors):,}")
|
|
console.print(f"[bold]Saved to:[/bold] {output_dir}")
|
|
|
|
if errors:
|
|
console.print(f"\n[red]First {min(20, len(errors))} errors:[/red]")
|
|
for msg in errors[:20]:
|
|
console.print(f" {msg}")
|
|
if len(errors) > 20:
|
|
console.print(f" ... and {len(errors) - 20} more")
|
|
error_log = output_dir / "download_errors.txt"
|
|
error_log.write_text("\n".join(errors), encoding="utf-8")
|
|
console.print(f"\n[dim]Full error log:[/dim] {error_log}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def main() -> None:
|
|
show_welcome()
|
|
|
|
confirmed = questionary.confirm(
|
|
"Ready to fetch the IFDB database and get started?"
|
|
).ask()
|
|
if not confirmed:
|
|
console.print("[dim]Bye! 👋[/dim]")
|
|
sys.exit(0)
|
|
|
|
console.print()
|
|
dump_url = find_dump_url()
|
|
console.print(f"\n[bold]Downloading:[/bold] {dump_url}")
|
|
zip_data = download_bytes(dump_url, "IFDB dump")
|
|
|
|
console.print("\n[bold]Extracting SQL from archive...[/bold]")
|
|
sql = extract_sql_from_zip(zip_data)
|
|
console.print(f"SQL text: {len(sql):,} characters")
|
|
del zip_data
|
|
|
|
console.print("\n[bold]Parsing database tables (this may take a minute)...[/bold]")
|
|
data = parse_dump(sql, {"games", "gamelinks", "reviews"})
|
|
del sql
|
|
|
|
indices = build_indices(data)
|
|
del data
|
|
|
|
# Filter wizard — loops if the user wants to edit
|
|
selected_families: set[str] = set()
|
|
rating_key: str = "all"
|
|
selected_genres: set[str] = set()
|
|
|
|
first_run = True
|
|
while True:
|
|
selected_families = ask_formats(indices)
|
|
rating_key = ask_rating(indices)
|
|
selected_genres = ask_genres(indices)
|
|
|
|
match_count = show_filter_summary(indices, selected_families, rating_key, selected_genres)
|
|
|
|
if match_count == 0:
|
|
console.print("[yellow]No games match your current filters — please adjust them.[/yellow]")
|
|
action = questionary.select(
|
|
"What would you like to do?",
|
|
choices=["Edit filters", "Quit"],
|
|
).ask()
|
|
if action != "Edit filters":
|
|
sys.exit(0)
|
|
continue
|
|
|
|
action = questionary.select(
|
|
f"Download {match_count:,} matching games?",
|
|
choices=[
|
|
questionary.Choice(f"Yes — download all {match_count:,} games", value="download"),
|
|
questionary.Choice("Edit filters", value="edit"),
|
|
questionary.Choice("Quit", value="quit"),
|
|
],
|
|
).ask()
|
|
|
|
if action is None or action == "quit":
|
|
sys.exit(0)
|
|
if action == "edit":
|
|
continue
|
|
break
|
|
|
|
output_dir = ask_output_path()
|
|
matching_ids = apply_filters(indices, selected_families, rating_key, selected_genres)
|
|
download_games(indices, matching_ids, output_dir)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|