From 3aa90fa31645100ee013eea86be05bb4d8d49c41 Mon Sep 17 00:00:00 2001 From: Naomi Carrigan Date: Thu, 7 May 2026 15:52:18 -0700 Subject: [PATCH] feat: add script for if --- python/interactive-fiction/download.py | 764 +++++++++++++++++++++++++ 1 file changed, 764 insertions(+) create mode 100644 python/interactive-fiction/download.py diff --git a/python/interactive-fiction/download.py b/python/interactive-fiction/download.py new file mode 100644 index 0000000..672b32c --- /dev/null +++ b/python/interactive-fiction/download.py @@ -0,0 +1,764 @@ +#!/usr/bin/env python3 +# /// script +# requires-python = ">=3.11" +# dependencies = [ +# "requests==2.32.3", +# "tqdm==4.67.1", +# "questionary==2.0.1", +# "rich==13.9.4", +# ] +# /// +""" +Interactive TUI wizard for downloading Gargoyle-compatible interactive fiction games from IFDB. + +Fetches the IFDB SQL dump, builds an in-memory index, then walks the user through +filtering by format, rating, and genre before downloading the matching games. + +Usage: + uv run download.py +""" + +from __future__ import annotations + +import io +import re +import sys +import zipfile +from collections import defaultdict +from pathlib import Path +from urllib.parse import unquote, urljoin, urlparse + +import questionary +import requests +from rich.console import Console +from rich.table import Table +from tqdm import tqdm + + +console = Console() + +# --------------------------------------------------------------------------- +# Format families — groups every Gargoyle-compatible extension by interpreter +# --------------------------------------------------------------------------- + +FORMAT_FAMILIES: dict[str, frozenset[str]] = { + "Z-machine": frozenset({".z1", ".z2", ".z3", ".z4", ".z5", ".z6", ".z7", ".z8", ".zblorb", ".zlb"}), + "Glulx": frozenset({".ulx", ".gblorb", ".glb", ".blorb", ".blb"}), + "TADS 2": frozenset({".gam"}), + "TADS 3": frozenset({".t3"}), + "Hugo": frozenset({".hex"}), + "ADRIFT": frozenset({".taf"}), + "Alan": frozenset({".acd", ".a2c", ".a3c"}), + "Level 9": frozenset({".l9", ".sna"}), + "Magnetic Scrolls": frozenset({".mag"}), + "AGT": frozenset({".agx"}), + "JACL": frozenset({".jacl", ".j2"}), + "Scott Adams": frozenset({".saga"}), +} + +GARGOYLE_EXTENSIONS: frozenset[str] = frozenset().union(*FORMAT_FAMILIES.values()) + +DUMP_URL_CANDIDATES: list[str] = [ + "https://ifarchive.org/if-archive/info/ifdb/ifdb-archive-20260301.zip", + "https://ifarchive.org/if-archive/info/ifdb/ifdb-archive-20251201.zip", + "https://ifarchive.org/if-archive/info/ifdb/ifdb-archive-20250901.zip", + "https://ifarchive.org/if-archive/info/ifdb/ifdb-archive-20250601.zip", + "https://ifarchive.org/if-archive/info/ifdb/ifdb-archive-20250301.zip", + "https://ifarchive.org/if-archive/info/ifdb/ifdb-archive-20241201.zip", + "https://ifarchive.org/if-archive/info/ifdb/ifdb-archive-20240901.zip", + "https://ifarchive.org/if-archive/info/ifdb/ifdb-archive-20240601.zip", + "https://ifarchive.org/if-archive/info/ifdb/ifdb-archive-20240301.zip", +] + +IFARCHIVE_BASE = "https://ifarchive.org" +BAYESIAN_WEIGHT = 10 + +BROWSER_HEADERS = { + "User-Agent": ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " + "(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36" + ), + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", +} + + +# --------------------------------------------------------------------------- +# Welcome screen +# --------------------------------------------------------------------------- + +def show_welcome() -> None: + console.print() + console.print("[bold cyan]━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[/bold cyan]") + console.print("[bold white] IFDB Interactive Fiction Downloader[/bold white]") + console.print("[bold cyan]━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[/bold cyan]") + console.print() + console.print( + "This tool downloads interactive fiction games from the [bold]IF Database (IFDB)[/bold],\n" + "filtered to only include files playable in [bold]Gargoyle[/bold] — a multi-interpreter\n" + "IF player supporting Z-machine, Glulx, TADS, Hugo, ADRIFT, and more.\n" + ) + console.print("[bold]Here's how it works:[/bold]") + console.print(" 1. Download and parse the IFDB SQL database dump (~50 MB compressed)") + console.print(" 2. Build an in-memory index of all games, ratings, and download links") + console.print(" 3. Walk you through three filters: format, rating, and genre") + console.print(" 4. Show you a summary of how many games match before you commit") + console.print(" 5. Download everything to a directory of your choice") + console.print() + console.print("[dim]Files are saved with their original names — no renaming.[/dim]") + console.print() + + +# --------------------------------------------------------------------------- +# Dump fetch +# --------------------------------------------------------------------------- + +def find_dump_url() -> str: + console.print("[bold]Searching for the latest IFDB dump on IF Archive...[/bold]") + for url in DUMP_URL_CANDIDATES: + try: + response = requests.head(url, timeout=15, allow_redirects=True) + if response.status_code == 200: + console.print(f" [green]✓[/green] Found: {url}") + return url + console.print(f" [dim]{response.status_code}: {url}[/dim]") + except requests.RequestException as exc: + console.print(f" [red]✗[/red] {exc}: {url}") + raise SystemExit( + "\nCould not auto-detect the IFDB dump URL. " + "Please check your internet connection and try again." + ) + + +def download_bytes(url: str, label: str) -> bytes: + response = requests.get(url, stream=True, timeout=120) + response.raise_for_status() + total = int(response.headers.get("content-length", 0)) + buffer = io.BytesIO() + with tqdm(total=total or None, unit="B", unit_scale=True, desc=label) as bar: + for chunk in response.iter_content(chunk_size=65_536): + buffer.write(chunk) + bar.update(len(chunk)) + return buffer.getvalue() + + +def extract_sql_from_zip(zip_data: bytes) -> str: + with zipfile.ZipFile(io.BytesIO(zip_data)) as archive: + sql_names = [n for n in archive.namelist() if n.endswith(".sql")] + if not sql_names: + raise SystemExit("No .sql file found inside the IFDB dump zip.") + main = max(sql_names, key=lambda n: archive.getinfo(n).file_size) + console.print(f"Extracting [bold]{main}[/bold] ({archive.getinfo(main).file_size:,} bytes)...") + return archive.read(main).decode("utf-8", errors="replace") + + +# --------------------------------------------------------------------------- +# MySQL dump parser +# --------------------------------------------------------------------------- + +def _parse_sql_value(raw: str) -> str | None: + stripped = raw.strip() + return None if stripped.upper() == "NULL" else stripped + + +def parse_mysql_values(values_str: str) -> list[tuple[str | None, ...]]: + rows: list[tuple[str | None, ...]] = [] + current_row: list[str | None] = [] + token_chars: list[str] = [] + in_string = False + depth = 0 + i = 0 + length = len(values_str) + + while i < length: + char = values_str[i] + + if in_string: + if char == "\\": + if i + 1 < length: + token_chars.append(values_str[i + 1]) + i += 2 + else: + i += 1 + continue + if char == "'": + if i + 1 < length and values_str[i + 1] == "'": + token_chars.append("'") + i += 2 + continue + in_string = False + i += 1 + continue + token_chars.append(char) + i += 1 + continue + + if char == "'": + in_string = True + i += 1 + continue + + if char == "(": + depth += 1 + if depth == 1: + current_row = [] + token_chars = [] + else: + token_chars.append(char) + i += 1 + continue + + if char == ")": + depth -= 1 + if depth == 0: + current_row.append(_parse_sql_value("".join(token_chars))) + rows.append(tuple(current_row)) + current_row = [] + token_chars = [] + else: + token_chars.append(char) + i += 1 + continue + + if char == "," and depth == 1: + current_row.append(_parse_sql_value("".join(token_chars))) + token_chars = [] + i += 1 + continue + + if depth > 0: + token_chars.append(char) + i += 1 + + return rows + + +def _extract_column_names(create_body: str) -> list[str]: + columns: list[str] = [] + for match in re.finditer(r"^\s*`(\w+)`\s+\w", create_body, re.MULTILINE): + columns.append(match.group(1)) + return columns + + +def parse_dump(sql: str, tables_wanted: set[str]) -> dict[str, list[dict]]: + table_columns: dict[str, list[str]] = {} + table_data: dict[str, list[dict]] = {t: [] for t in tables_wanted} + + console.print("Splitting dump into statements...") + statements = sql.split(";\n") + console.print(f" {len(statements):,} statements found") + + create_re = re.compile(r"CREATE\s+TABLE\s+`(\w+)`\s*\((.+)\)", re.DOTALL | re.IGNORECASE) + insert_re = re.compile( + r"INSERT\s+INTO\s+`(\w+)`(?:\s*\(([^)]+)\))?\s+VALUES\s*(.+)", + re.DOTALL | re.IGNORECASE, + ) + + for statement in tqdm(statements, desc="Parsing statements", unit="stmt"): + upper = statement.lstrip()[:20].upper() + + if upper.startswith("CREATE"): + match = create_re.search(statement) + if match: + name = match.group(1) + if name in tables_wanted: + table_columns[name] = _extract_column_names(match.group(2)) + + elif upper.startswith("INSERT"): + match = insert_re.search(statement) + if not match: + continue + name = match.group(1) + if name not in tables_wanted: + continue + + if match.group(2): + columns = [c.strip().strip("`").strip('"') for c in match.group(2).split(",")] + else: + columns = table_columns.get(name, []) + + if not columns: + continue + + for row in parse_mysql_values(match.group(3)): + if len(row) == len(columns): + table_data[name].append(dict(zip(columns, row))) + + for table in tables_wanted: + cols = table_columns.get(table, []) + console.print( + f" [bold]{table}[/bold]: {len(table_data[table]):,} rows " + f"({', '.join(cols[:6])}{'...' if len(cols) > 6 else ''})" + ) + + return table_data + + +# --------------------------------------------------------------------------- +# URL utilities +# --------------------------------------------------------------------------- + +def is_gargoyle_url(url: str) -> bool: + return Path(urlparse(url).path.lower()).suffix in GARGOYLE_EXTENSIONS + + +def resolve_url(url: str) -> str: + return url if urlparse(url).scheme else urljoin(IFARCHIVE_BASE, url) + + +def get_format_family(url: str) -> str | None: + ext = Path(urlparse(url).path.lower()).suffix + for family, extensions in FORMAT_FAMILIES.items(): + if ext in extensions: + return family + return None + + +def best_link(links: list[dict]) -> dict | None: + uncompressed = [ + lnk for lnk in links + if is_gargoyle_url(lnk["url"]) + and lnk.get("compression") in (None, "", "0", "false", "FALSE") + ] + if uncompressed: + return uncompressed[0] + compatible = [lnk for lnk in links if is_gargoyle_url(lnk["url"])] + return compatible[0] if compatible else None + + +# --------------------------------------------------------------------------- +# Index building +# --------------------------------------------------------------------------- + +def build_indices(data: dict[str, list[dict]]) -> dict: + console.print("\n[bold]Building indices...[/bold]") + + game_title: dict[str, str] = {} + game_author: dict[str, str] = {} + game_genre: dict[str, str] = {} + + for row in data["games"]: + gid = row.get("id") + if not gid: + continue + game_title[gid] = row.get("title") or f"game_{gid}" + game_author[gid] = row.get("author") or "" + genre = (row.get("genre") or "").strip() + game_genre[gid] = genre if genre else "Uncategorised" + + ratings_by_game: dict[str, list[float]] = defaultdict(list) + for row in data["reviews"]: + gid = row.get("gameid") + raw = row.get("rating") + if not gid or raw in (None, "0", "NULL", ""): + continue + try: + ratings_by_game[gid].append(float(raw)) + except ValueError: + pass + + raw_avg: dict[str, float] = { + gid: sum(rs) / len(rs) for gid, rs in ratings_by_game.items() + } + + links_by_game: dict[str, list[dict]] = defaultdict(list) + for row in data["gamelinks"]: + gid = row.get("gameid") + url = row.get("url", "") + if not gid or not url: + continue + full_url = resolve_url(url) + if is_gargoyle_url(full_url): + links_by_game[gid].append({**row, "url": full_url}) + + all_gargoyle_ids: set[str] = set(links_by_game.keys()) + + game_family: dict[str, str] = {} + for gid in all_gargoyle_ids: + link = best_link(links_by_game[gid]) + if link: + game_family[gid] = get_format_family(link["url"]) or "Unknown" + + console.print(f" Games in DB: {len(game_title):,}") + console.print(f" Games with ratings: {len(ratings_by_game):,}") + console.print(f" Games with Gargoyle links: {len(all_gargoyle_ids):,}") + + return { + "game_title": game_title, + "game_author": game_author, + "game_genre": game_genre, + "raw_avg": raw_avg, + "links_by_game": links_by_game, + "all_gargoyle_ids": all_gargoyle_ids, + "game_family": game_family, + } + + +# --------------------------------------------------------------------------- +# Filter helpers +# --------------------------------------------------------------------------- + +RATING_KEYS = ["all", "rated", "≥ 2", "≥ 3", "≥ 4", "≥ 5"] + +RATING_LABELS: dict[str, str] = { + "all": "All (including unrated)", + "rated": "Any rated game (≥ 1 star)", + "≥ 2": "≥ 2 stars", + "≥ 3": "≥ 3 stars", + "≥ 4": "≥ 4 stars", + "≥ 5": "≥ 5 stars (perfect scores only)", +} + + +def count_by_format(indices: dict) -> dict[str, int]: + counts: dict[str, int] = defaultdict(int) + for gid in indices["all_gargoyle_ids"]: + family = indices["game_family"].get(gid, "Unknown") + counts[family] += 1 + return dict(sorted(counts.items(), key=lambda kv: kv[1], reverse=True)) + + +def count_by_rating(indices: dict) -> dict[str, int]: + all_ids = indices["all_gargoyle_ids"] + raw_avg = indices["raw_avg"] + return { + "all": len(all_ids), + "rated": sum(1 for gid in all_ids if gid in raw_avg), + "≥ 2": sum(1 for gid in all_ids if raw_avg.get(gid, 0) >= 2), + "≥ 3": sum(1 for gid in all_ids if raw_avg.get(gid, 0) >= 3), + "≥ 4": sum(1 for gid in all_ids if raw_avg.get(gid, 0) >= 4), + "≥ 5": sum(1 for gid in all_ids if raw_avg.get(gid, 0) >= 5), + } + + +def count_by_genre(indices: dict) -> dict[str, int]: + counts: dict[str, int] = defaultdict(int) + for gid in indices["all_gargoyle_ids"]: + genre = indices["game_genre"].get(gid, "Uncategorised") + counts[genre] += 1 + return dict(sorted(counts.items(), key=lambda kv: kv[1], reverse=True)) + + +def _passes_rating_filter(gid: str, raw_avg: dict[str, float], rating_key: str) -> bool: + if rating_key == "all": + return True + if rating_key == "rated": + return gid in raw_avg + threshold = float(rating_key.replace("≥ ", "")) + return raw_avg.get(gid, 0) >= threshold + + +def apply_filters( + indices: dict, + selected_families: set[str], + rating_key: str, + selected_genres: set[str], +) -> list[str]: + raw_avg = indices["raw_avg"] + result: list[str] = [] + for gid in indices["all_gargoyle_ids"]: + if indices["game_family"].get(gid, "Unknown") not in selected_families: + continue + if not _passes_rating_filter(gid, raw_avg, rating_key): + continue + if indices["game_genre"].get(gid, "Uncategorised") not in selected_genres: + continue + result.append(gid) + return result + + +# --------------------------------------------------------------------------- +# TUI wizard steps +# --------------------------------------------------------------------------- + +def ask_formats(indices: dict) -> set[str]: + format_counts = count_by_format(indices) + + console.print() + console.print("[bold cyan]Step 1 of 3 — File Formats[/bold cyan]") + console.print( + "Select the formats you want to include. " + "[dim]All are pre-selected — uncheck any you don't want.[/dim]" + ) + console.print() + + choices = [ + questionary.Choice( + title=f"{family} ({count:,} games)", + value=family, + checked=True, + ) + for family, count in format_counts.items() + if count > 0 + ] + + selected = questionary.checkbox("Formats to include:", choices=choices).ask() + if selected is None: + sys.exit(0) + if not selected: + console.print("[yellow]Nothing selected — defaulting to all formats.[/yellow]") + return set(format_counts.keys()) + return set(selected) + + +def ask_rating(indices: dict) -> str: + rating_counts = count_by_rating(indices) + + console.print() + console.print("[bold cyan]Step 2 of 3 — Minimum Rating[/bold cyan]") + console.print( + "Choose the minimum average rating a game must have to be included.\n" + "[dim]Counts are independent of your format selection.[/dim]" + ) + console.print() + + choices = [ + questionary.Choice( + title=f"{RATING_LABELS[key]} ({rating_counts[key]:,} games)", + value=key, + ) + for key in RATING_KEYS + ] + + selected = questionary.select("Minimum rating:", choices=choices).ask() + if selected is None: + sys.exit(0) + return selected + + +def ask_genres(indices: dict) -> set[str]: + genre_counts = count_by_genre(indices) + + console.print() + console.print("[bold cyan]Step 3 of 3 — Genres[/bold cyan]") + console.print( + "Select the genres you want to include. " + "[dim]All are pre-selected — uncheck any you don't want.\n" + "Counts are independent of your format and rating selections.[/dim]" + ) + console.print() + + choices = [ + questionary.Choice( + title=f"{genre} ({count:,} games)", + value=genre, + checked=True, + ) + for genre, count in genre_counts.items() + if count > 0 + ] + + selected = questionary.checkbox("Genres to include:", choices=choices).ask() + if selected is None: + sys.exit(0) + if not selected: + console.print("[yellow]Nothing selected — defaulting to all genres.[/yellow]") + return set(genre_counts.keys()) + return set(selected) + + +def show_filter_summary( + indices: dict, + selected_families: set[str], + rating_key: str, + selected_genres: set[str], +) -> int: + format_counts = count_by_format(indices) + rating_counts = count_by_rating(indices) + genre_counts = count_by_genre(indices) + + format_total = sum(format_counts.get(f, 0) for f in selected_families) + rating_total = rating_counts[rating_key] + genre_total = sum(genre_counts.get(g, 0) for g in selected_genres) + combined = apply_filters(indices, selected_families, rating_key, selected_genres) + + if len(selected_families) <= 4: + families_label = ", ".join(sorted(selected_families)) + else: + families_label = f"{len(selected_families)} formats selected" + + if len(selected_genres) <= 3: + genres_label = ", ".join(sorted(selected_genres)) + else: + genres_label = f"{len(selected_genres)} genres selected" + + table = Table(title="Filter Summary", show_header=True, header_style="bold cyan") + table.add_column("Filter", style="bold") + table.add_column("Selection") + table.add_column("Matching games", justify="right") + + table.add_row("Format", families_label, f"{format_total:,}") + table.add_row("Rating", RATING_LABELS[rating_key], f"{rating_total:,}") + table.add_row("Genre", genres_label, f"{genre_total:,}") + table.add_section() + table.add_row( + "[bold]Combined[/bold]", + "[dim]all three filters applied[/dim]", + f"[bold green]{len(combined):,}[/bold green]", + ) + + console.print() + console.print(table) + console.print() + + return len(combined) + + +def ask_output_path() -> Path: + console.print() + + def validate_path(raw: str) -> bool | str: + if not raw.strip(): + return "Please enter a path." + p = Path(raw.strip()).expanduser() + if p.exists() and not p.is_dir(): + return f"{raw!r} exists and is not a directory." + return True + + path_str = questionary.text( + "Where should the games be saved? (absolute path to a directory)", + validate=validate_path, + ).ask() + if path_str is None: + sys.exit(0) + + output_dir = Path(path_str.strip()).expanduser().resolve() + output_dir.mkdir(parents=True, exist_ok=True) + return output_dir + + +# --------------------------------------------------------------------------- +# Download +# --------------------------------------------------------------------------- + +def download_games(indices: dict, matching_ids: list[str], output_dir: Path) -> None: + console.print(f"\n[bold]Downloading {len(matching_ids):,} games to:[/bold] {output_dir}") + console.print() + + errors: list[str] = [] + skipped = 0 + downloaded = 0 + + for gid in tqdm(matching_ids, desc="Downloading", unit="game"): + title = indices["game_title"].get(gid, f"game_{gid}") + link = best_link(indices["links_by_game"].get(gid, [])) + + if not link: + errors.append(f"{title}: no suitable download link") + continue + + url = link["url"] + filename = unquote(Path(urlparse(url).path).name) + if not filename: + filename = f"game_{gid}" + Path(urlparse(url).path).suffix + + filepath = output_dir / filename + if filepath.exists(): + skipped += 1 + continue + + try: + response = requests.get(url, timeout=60, stream=True, headers=BROWSER_HEADERS) + response.raise_for_status() + with filepath.open("wb") as fh: + for chunk in response.iter_content(chunk_size=65_536): + fh.write(chunk) + downloaded += 1 + except requests.RequestException as exc: + errors.append(f"{title}: {exc}") + if filepath.exists(): + filepath.unlink() + + console.print() + console.print("[bold]━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[/bold]") + console.print(f"[green]Downloaded:[/green] {downloaded:,}") + console.print(f"[dim]Skipped (already present):[/dim] {skipped:,}") + console.print(f"[red]Errors:[/red] {len(errors):,}") + console.print(f"[bold]Saved to:[/bold] {output_dir}") + + if errors: + console.print(f"\n[red]First {min(20, len(errors))} errors:[/red]") + for msg in errors[:20]: + console.print(f" {msg}") + if len(errors) > 20: + console.print(f" ... and {len(errors) - 20} more") + error_log = output_dir / "download_errors.txt" + error_log.write_text("\n".join(errors), encoding="utf-8") + console.print(f"\n[dim]Full error log:[/dim] {error_log}") + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def main() -> None: + show_welcome() + + confirmed = questionary.confirm( + "Ready to fetch the IFDB database and get started?" + ).ask() + if not confirmed: + console.print("[dim]Bye! 👋[/dim]") + sys.exit(0) + + console.print() + dump_url = find_dump_url() + console.print(f"\n[bold]Downloading:[/bold] {dump_url}") + zip_data = download_bytes(dump_url, "IFDB dump") + + console.print("\n[bold]Extracting SQL from archive...[/bold]") + sql = extract_sql_from_zip(zip_data) + console.print(f"SQL text: {len(sql):,} characters") + del zip_data + + console.print("\n[bold]Parsing database tables (this may take a minute)...[/bold]") + data = parse_dump(sql, {"games", "gamelinks", "reviews"}) + del sql + + indices = build_indices(data) + del data + + # Filter wizard — loops if the user wants to edit + selected_families: set[str] = set() + rating_key: str = "all" + selected_genres: set[str] = set() + + first_run = True + while True: + selected_families = ask_formats(indices) + rating_key = ask_rating(indices) + selected_genres = ask_genres(indices) + + match_count = show_filter_summary(indices, selected_families, rating_key, selected_genres) + + if match_count == 0: + console.print("[yellow]No games match your current filters — please adjust them.[/yellow]") + action = questionary.select( + "What would you like to do?", + choices=["Edit filters", "Quit"], + ).ask() + if action != "Edit filters": + sys.exit(0) + continue + + action = questionary.select( + f"Download {match_count:,} matching games?", + choices=[ + questionary.Choice(f"Yes — download all {match_count:,} games", value="download"), + questionary.Choice("Edit filters", value="edit"), + questionary.Choice("Quit", value="quit"), + ], + ).ask() + + if action is None or action == "quit": + sys.exit(0) + if action == "edit": + continue + break + + output_dir = ask_output_path() + matching_ids = apply_filters(indices, selected_families, rating_key, selected_genres) + download_games(indices, matching_ids, output_dir) + + +if __name__ == "__main__": + main()