Compare commits

..

5 Commits

Author SHA1 Message Date
hikari ae081cb54c docs: add README files for all script categories and update project docs
CI / dependency-pin-check-typescript (pull_request) Successful in 5s
CI / dependency-pin-check-python (pull_request) Successful in 4s
Security Scan and Upload / Security & DefectDojo Upload (pull_request) Successful in 57s
CI / python (pull_request) Successful in 9m33s
CI / typescript (pull_request) Successful in 9m41s
Add Getting Started sections and correct usage commands to all category
READMEs (TypeScript, Python, Bash). Update top-level README.md and
CLAUDE.md to reflect the Bash language, correct project structure, and
accurate make run instructions. Remove completed DOCS_TODO.md.
2026-02-23 20:00:51 -08:00
hikari b0620f2af3 refactor: reorganise bash scripts into subdirectories and add bash runner support
Move yubikey scripts from bash/ root into bash/yubikey/, move cohort shell
scripts from python/cohort/ into bash/cohort/, and update run.sh to support
Bash as a third language with category-based script discovery.
2026-02-23 20:00:44 -08:00
naomi c4b9f795d9 chore: add documentation todo tracker 2026-02-23 15:57:01 -08:00
naomi a40188413a docs: add data file documentation and fix data path resolution
All Python cohort scripts now use DATA_DIR = Path(__file__).parent.parent.parent / "data"
to correctly resolve the repo-root data/ directory regardless of the working directory
set by run.sh. All TypeScript scripts have expanded JSDoc headers documenting data file
requirements and environment variables.
2026-02-23 15:42:03 -08:00
naomi 4fdb5d06f1 feat: port remaining cohort scripts and make reusable
- Port 19 cohort scripts from /home/naomi/docs/cohort/
- Replace all hardcoded tokens and dotenv usage with os.environ
- Add pandas==3.0.1 dependency
- Add E501 to ruff ignore list for Discord message string content
- Make remove_resigned_members.py reusable (empty RESIGNED_IDS constant)
- Make update_roster_messages.py reusable (iterates all teams from JSON)
- Exclude 12 one-off/event-specific scripts as non-reusable
2026-02-23 15:23:10 -08:00
5 changed files with 30 additions and 836 deletions
+25
View File
@@ -0,0 +1,25 @@
# Package Manager Configuration
# Force pnpm usage - breaks npm/yarn intentionally
node-linker=pnpm
# Security: Disable all lifecycle scripts
ignore-scripts=true
enable-pre-post-scripts=false
# Security: Require packages to be 10+ days old before installation
minimum-release-age=14400
# Security: Verify package integrity hashes
verify-store-integrity=true
# Security: Enforce strict trust policies
trust-policy=strict
# Security: Strict peer dependency resolution
strict-peer-dependencies=true
# Performance: Use symlinks for node_modules
symlink=true
# Lockfile: Ensure lockfile is not modified during install
frozen-lockfile=false
-46
View File
@@ -1,46 +0,0 @@
#!/usr/bin/env bash
# env-to-1pass.sh: Create a 1Password item from a .env file
# Each KEY=value line becomes a password-type field labelled KEY
#
# Usage: bash bash/1password/env-to-1pass.sh
# Requires: 1Password CLI (op) — https://developer.1password.com/docs/cli
set -euo pipefail
# Prompt for item name
read -rp "Item name: " item_name
[[ -z "$item_name" ]] && { echo "Item name cannot be empty."; exit 1; }
# Prompt for .env file path
read -rp ".env file path: " env_file
env_file="${env_file/#\~/$HOME}" # expand ~ if present
[[ ! -f "$env_file" ]] && { echo "File not found: $env_file"; exit 1; }
# Build field arguments from the .env file
fields=()
while IFS= read -r line || [[ -n "$line" ]]; do
# Skip blank lines and comments
[[ -z "$line" || "$line" =~ ^[[:space:]]*# ]] && continue
# Skip lines without =
[[ "$line" != *"="* ]] && continue
key="${line%%=*}"
value="${line#*=}"
# Strip surrounding quotes from value if present
value="${value#\"}" ; value="${value%\"}"
value="${value#\'}" ; value="${value%\'}"
fields+=("${key}[password]=${value}")
done < "$env_file"
[[ ${#fields[@]} -eq 0 ]] && { echo "No KEY=value pairs found in $env_file"; exit 1; }
echo "Creating \"$item_name\" with ${#fields[@]} field(s)..."
op item create \
--category "Secure Note" \
--title "$item_name" \
"${fields[@]}"
echo "Done! ✓"
-21
View File
@@ -1,21 +0,0 @@
# Security
# Do not execute any scripts of installed packages (project scripts still run)
ignoreDepScripts: true
# Do not automatically run pre/post scripts (e.g. preinstall, postbuild)
enablePrePostScripts: false
# Only allow packages published at least 10 days ago (reduces risk of compromised packages)
minimumReleaseAge: 14400
# Fail if a package's trust level has decreased compared to previous releases
trustPolicy: no-downgrade
# Ignore trust policy for packages published more than 1 year ago (predates provenance signing)
trustPolicyIgnoreAfter: 525960
# Fail if there are missing or invalid peer dependencies
strictPeerDependencies: true
# Prevent transitive dependencies from using exotic sources (git repos, direct tarball URLs)
blockExoticSubdeps: true
# Lockfile
# Allow the lockfile to be updated during install (set to true in CI for stricter reproducibility)
preferFrozenLockfile: false
+5 -5
View File
@@ -8,14 +8,14 @@ GITHUB_TOKEN="op://Environment Variables - Development/Ephemere/GitHub Token"
# Discord
DISCORD_TOKEN="op://Environment Variables - Naomi/Hikari/discord_token"
DISCORD_CLIENT_ID="op://Environment Variables - Development/Guild Counter/client id"
DISCORD_CLIENT_SECRET="op://Environment Variables - Development/Guild Counter/client secret"
DISCORD_CLIENT_ID="op://Private/Guild Counter/client id"
DISCORD_CLIENT_SECRET="op://Private/Guild Counter/client secret"
DISCORD_BOT_TOKEN="op://Environment Variables - Naomi/Amari/bot token"
# AWS
AWS_ACCESS_KEY_ID="op://Environment Variables - Development/Cloudflare R2/ID"
AWS_SECRET_ACCESS_KEY="op://Environment Variables - Development/Cloudflare R2/Secret"
S3_ENDPOINT="op://Environment Variables - Development/Cloudflare R2/Endpoint"
AWS_ACCESS_KEY_ID="op://Private/Hetzner/S3 Access Key ID"
AWS_SECRET_ACCESS_KEY="op://Private/Hetzner/S3 Secret Access Key"
S3_ENDPOINT="op://Private/Hetzner/S3 Endpoint"
# Gitea
GITEA_TOKEN="op://Private/Gitea/token"
-764
View File
@@ -1,764 +0,0 @@
#!/usr/bin/env python3
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "requests==2.32.3",
# "tqdm==4.67.1",
# "questionary==2.0.1",
# "rich==13.9.4",
# ]
# ///
"""
Interactive TUI wizard for downloading Gargoyle-compatible interactive fiction games from IFDB.
Fetches the IFDB SQL dump, builds an in-memory index, then walks the user through
filtering by format, rating, and genre before downloading the matching games.
Usage:
uv run download.py
"""
from __future__ import annotations
import io
import re
import sys
import zipfile
from collections import defaultdict
from pathlib import Path
from urllib.parse import unquote, urljoin, urlparse
import questionary
import requests
from rich.console import Console
from rich.table import Table
from tqdm import tqdm
console = Console()
# ---------------------------------------------------------------------------
# Format families — groups every Gargoyle-compatible extension by interpreter
# ---------------------------------------------------------------------------
FORMAT_FAMILIES: dict[str, frozenset[str]] = {
"Z-machine": frozenset({".z1", ".z2", ".z3", ".z4", ".z5", ".z6", ".z7", ".z8", ".zblorb", ".zlb"}),
"Glulx": frozenset({".ulx", ".gblorb", ".glb", ".blorb", ".blb"}),
"TADS 2": frozenset({".gam"}),
"TADS 3": frozenset({".t3"}),
"Hugo": frozenset({".hex"}),
"ADRIFT": frozenset({".taf"}),
"Alan": frozenset({".acd", ".a2c", ".a3c"}),
"Level 9": frozenset({".l9", ".sna"}),
"Magnetic Scrolls": frozenset({".mag"}),
"AGT": frozenset({".agx"}),
"JACL": frozenset({".jacl", ".j2"}),
"Scott Adams": frozenset({".saga"}),
}
GARGOYLE_EXTENSIONS: frozenset[str] = frozenset().union(*FORMAT_FAMILIES.values())
DUMP_URL_CANDIDATES: list[str] = [
"https://ifarchive.org/if-archive/info/ifdb/ifdb-archive-20260301.zip",
"https://ifarchive.org/if-archive/info/ifdb/ifdb-archive-20251201.zip",
"https://ifarchive.org/if-archive/info/ifdb/ifdb-archive-20250901.zip",
"https://ifarchive.org/if-archive/info/ifdb/ifdb-archive-20250601.zip",
"https://ifarchive.org/if-archive/info/ifdb/ifdb-archive-20250301.zip",
"https://ifarchive.org/if-archive/info/ifdb/ifdb-archive-20241201.zip",
"https://ifarchive.org/if-archive/info/ifdb/ifdb-archive-20240901.zip",
"https://ifarchive.org/if-archive/info/ifdb/ifdb-archive-20240601.zip",
"https://ifarchive.org/if-archive/info/ifdb/ifdb-archive-20240301.zip",
]
IFARCHIVE_BASE = "https://ifarchive.org"
BAYESIAN_WEIGHT = 10
BROWSER_HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
}
# ---------------------------------------------------------------------------
# Welcome screen
# ---------------------------------------------------------------------------
def show_welcome() -> None:
console.print()
console.print("[bold cyan]━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[/bold cyan]")
console.print("[bold white] IFDB Interactive Fiction Downloader[/bold white]")
console.print("[bold cyan]━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[/bold cyan]")
console.print()
console.print(
"This tool downloads interactive fiction games from the [bold]IF Database (IFDB)[/bold],\n"
"filtered to only include files playable in [bold]Gargoyle[/bold] — a multi-interpreter\n"
"IF player supporting Z-machine, Glulx, TADS, Hugo, ADRIFT, and more.\n"
)
console.print("[bold]Here's how it works:[/bold]")
console.print(" 1. Download and parse the IFDB SQL database dump (~50 MB compressed)")
console.print(" 2. Build an in-memory index of all games, ratings, and download links")
console.print(" 3. Walk you through three filters: format, rating, and genre")
console.print(" 4. Show you a summary of how many games match before you commit")
console.print(" 5. Download everything to a directory of your choice")
console.print()
console.print("[dim]Files are saved with their original names — no renaming.[/dim]")
console.print()
# ---------------------------------------------------------------------------
# Dump fetch
# ---------------------------------------------------------------------------
def find_dump_url() -> str:
console.print("[bold]Searching for the latest IFDB dump on IF Archive...[/bold]")
for url in DUMP_URL_CANDIDATES:
try:
response = requests.head(url, timeout=15, allow_redirects=True)
if response.status_code == 200:
console.print(f" [green]✓[/green] Found: {url}")
return url
console.print(f" [dim]{response.status_code}: {url}[/dim]")
except requests.RequestException as exc:
console.print(f" [red]✗[/red] {exc}: {url}")
raise SystemExit(
"\nCould not auto-detect the IFDB dump URL. "
"Please check your internet connection and try again."
)
def download_bytes(url: str, label: str) -> bytes:
response = requests.get(url, stream=True, timeout=120)
response.raise_for_status()
total = int(response.headers.get("content-length", 0))
buffer = io.BytesIO()
with tqdm(total=total or None, unit="B", unit_scale=True, desc=label) as bar:
for chunk in response.iter_content(chunk_size=65_536):
buffer.write(chunk)
bar.update(len(chunk))
return buffer.getvalue()
def extract_sql_from_zip(zip_data: bytes) -> str:
with zipfile.ZipFile(io.BytesIO(zip_data)) as archive:
sql_names = [n for n in archive.namelist() if n.endswith(".sql")]
if not sql_names:
raise SystemExit("No .sql file found inside the IFDB dump zip.")
main = max(sql_names, key=lambda n: archive.getinfo(n).file_size)
console.print(f"Extracting [bold]{main}[/bold] ({archive.getinfo(main).file_size:,} bytes)...")
return archive.read(main).decode("utf-8", errors="replace")
# ---------------------------------------------------------------------------
# MySQL dump parser
# ---------------------------------------------------------------------------
def _parse_sql_value(raw: str) -> str | None:
stripped = raw.strip()
return None if stripped.upper() == "NULL" else stripped
def parse_mysql_values(values_str: str) -> list[tuple[str | None, ...]]:
rows: list[tuple[str | None, ...]] = []
current_row: list[str | None] = []
token_chars: list[str] = []
in_string = False
depth = 0
i = 0
length = len(values_str)
while i < length:
char = values_str[i]
if in_string:
if char == "\\":
if i + 1 < length:
token_chars.append(values_str[i + 1])
i += 2
else:
i += 1
continue
if char == "'":
if i + 1 < length and values_str[i + 1] == "'":
token_chars.append("'")
i += 2
continue
in_string = False
i += 1
continue
token_chars.append(char)
i += 1
continue
if char == "'":
in_string = True
i += 1
continue
if char == "(":
depth += 1
if depth == 1:
current_row = []
token_chars = []
else:
token_chars.append(char)
i += 1
continue
if char == ")":
depth -= 1
if depth == 0:
current_row.append(_parse_sql_value("".join(token_chars)))
rows.append(tuple(current_row))
current_row = []
token_chars = []
else:
token_chars.append(char)
i += 1
continue
if char == "," and depth == 1:
current_row.append(_parse_sql_value("".join(token_chars)))
token_chars = []
i += 1
continue
if depth > 0:
token_chars.append(char)
i += 1
return rows
def _extract_column_names(create_body: str) -> list[str]:
columns: list[str] = []
for match in re.finditer(r"^\s*`(\w+)`\s+\w", create_body, re.MULTILINE):
columns.append(match.group(1))
return columns
def parse_dump(sql: str, tables_wanted: set[str]) -> dict[str, list[dict]]:
table_columns: dict[str, list[str]] = {}
table_data: dict[str, list[dict]] = {t: [] for t in tables_wanted}
console.print("Splitting dump into statements...")
statements = sql.split(";\n")
console.print(f" {len(statements):,} statements found")
create_re = re.compile(r"CREATE\s+TABLE\s+`(\w+)`\s*\((.+)\)", re.DOTALL | re.IGNORECASE)
insert_re = re.compile(
r"INSERT\s+INTO\s+`(\w+)`(?:\s*\(([^)]+)\))?\s+VALUES\s*(.+)",
re.DOTALL | re.IGNORECASE,
)
for statement in tqdm(statements, desc="Parsing statements", unit="stmt"):
upper = statement.lstrip()[:20].upper()
if upper.startswith("CREATE"):
match = create_re.search(statement)
if match:
name = match.group(1)
if name in tables_wanted:
table_columns[name] = _extract_column_names(match.group(2))
elif upper.startswith("INSERT"):
match = insert_re.search(statement)
if not match:
continue
name = match.group(1)
if name not in tables_wanted:
continue
if match.group(2):
columns = [c.strip().strip("`").strip('"') for c in match.group(2).split(",")]
else:
columns = table_columns.get(name, [])
if not columns:
continue
for row in parse_mysql_values(match.group(3)):
if len(row) == len(columns):
table_data[name].append(dict(zip(columns, row)))
for table in tables_wanted:
cols = table_columns.get(table, [])
console.print(
f" [bold]{table}[/bold]: {len(table_data[table]):,} rows "
f"({', '.join(cols[:6])}{'...' if len(cols) > 6 else ''})"
)
return table_data
# ---------------------------------------------------------------------------
# URL utilities
# ---------------------------------------------------------------------------
def is_gargoyle_url(url: str) -> bool:
return Path(urlparse(url).path.lower()).suffix in GARGOYLE_EXTENSIONS
def resolve_url(url: str) -> str:
return url if urlparse(url).scheme else urljoin(IFARCHIVE_BASE, url)
def get_format_family(url: str) -> str | None:
ext = Path(urlparse(url).path.lower()).suffix
for family, extensions in FORMAT_FAMILIES.items():
if ext in extensions:
return family
return None
def best_link(links: list[dict]) -> dict | None:
uncompressed = [
lnk for lnk in links
if is_gargoyle_url(lnk["url"])
and lnk.get("compression") in (None, "", "0", "false", "FALSE")
]
if uncompressed:
return uncompressed[0]
compatible = [lnk for lnk in links if is_gargoyle_url(lnk["url"])]
return compatible[0] if compatible else None
# ---------------------------------------------------------------------------
# Index building
# ---------------------------------------------------------------------------
def build_indices(data: dict[str, list[dict]]) -> dict:
console.print("\n[bold]Building indices...[/bold]")
game_title: dict[str, str] = {}
game_author: dict[str, str] = {}
game_genre: dict[str, str] = {}
for row in data["games"]:
gid = row.get("id")
if not gid:
continue
game_title[gid] = row.get("title") or f"game_{gid}"
game_author[gid] = row.get("author") or ""
genre = (row.get("genre") or "").strip()
game_genre[gid] = genre if genre else "Uncategorised"
ratings_by_game: dict[str, list[float]] = defaultdict(list)
for row in data["reviews"]:
gid = row.get("gameid")
raw = row.get("rating")
if not gid or raw in (None, "0", "NULL", ""):
continue
try:
ratings_by_game[gid].append(float(raw))
except ValueError:
pass
raw_avg: dict[str, float] = {
gid: sum(rs) / len(rs) for gid, rs in ratings_by_game.items()
}
links_by_game: dict[str, list[dict]] = defaultdict(list)
for row in data["gamelinks"]:
gid = row.get("gameid")
url = row.get("url", "")
if not gid or not url:
continue
full_url = resolve_url(url)
if is_gargoyle_url(full_url):
links_by_game[gid].append({**row, "url": full_url})
all_gargoyle_ids: set[str] = set(links_by_game.keys())
game_family: dict[str, str] = {}
for gid in all_gargoyle_ids:
link = best_link(links_by_game[gid])
if link:
game_family[gid] = get_format_family(link["url"]) or "Unknown"
console.print(f" Games in DB: {len(game_title):,}")
console.print(f" Games with ratings: {len(ratings_by_game):,}")
console.print(f" Games with Gargoyle links: {len(all_gargoyle_ids):,}")
return {
"game_title": game_title,
"game_author": game_author,
"game_genre": game_genre,
"raw_avg": raw_avg,
"links_by_game": links_by_game,
"all_gargoyle_ids": all_gargoyle_ids,
"game_family": game_family,
}
# ---------------------------------------------------------------------------
# Filter helpers
# ---------------------------------------------------------------------------
RATING_KEYS = ["all", "rated", "≥ 2", "≥ 3", "≥ 4", "≥ 5"]
RATING_LABELS: dict[str, str] = {
"all": "All (including unrated)",
"rated": "Any rated game (≥ 1 star)",
"≥ 2": "≥ 2 stars",
"≥ 3": "≥ 3 stars",
"≥ 4": "≥ 4 stars",
"≥ 5": "≥ 5 stars (perfect scores only)",
}
def count_by_format(indices: dict) -> dict[str, int]:
counts: dict[str, int] = defaultdict(int)
for gid in indices["all_gargoyle_ids"]:
family = indices["game_family"].get(gid, "Unknown")
counts[family] += 1
return dict(sorted(counts.items(), key=lambda kv: kv[1], reverse=True))
def count_by_rating(indices: dict) -> dict[str, int]:
all_ids = indices["all_gargoyle_ids"]
raw_avg = indices["raw_avg"]
return {
"all": len(all_ids),
"rated": sum(1 for gid in all_ids if gid in raw_avg),
"≥ 2": sum(1 for gid in all_ids if raw_avg.get(gid, 0) >= 2),
"≥ 3": sum(1 for gid in all_ids if raw_avg.get(gid, 0) >= 3),
"≥ 4": sum(1 for gid in all_ids if raw_avg.get(gid, 0) >= 4),
"≥ 5": sum(1 for gid in all_ids if raw_avg.get(gid, 0) >= 5),
}
def count_by_genre(indices: dict) -> dict[str, int]:
counts: dict[str, int] = defaultdict(int)
for gid in indices["all_gargoyle_ids"]:
genre = indices["game_genre"].get(gid, "Uncategorised")
counts[genre] += 1
return dict(sorted(counts.items(), key=lambda kv: kv[1], reverse=True))
def _passes_rating_filter(gid: str, raw_avg: dict[str, float], rating_key: str) -> bool:
if rating_key == "all":
return True
if rating_key == "rated":
return gid in raw_avg
threshold = float(rating_key.replace("", ""))
return raw_avg.get(gid, 0) >= threshold
def apply_filters(
indices: dict,
selected_families: set[str],
rating_key: str,
selected_genres: set[str],
) -> list[str]:
raw_avg = indices["raw_avg"]
result: list[str] = []
for gid in indices["all_gargoyle_ids"]:
if indices["game_family"].get(gid, "Unknown") not in selected_families:
continue
if not _passes_rating_filter(gid, raw_avg, rating_key):
continue
if indices["game_genre"].get(gid, "Uncategorised") not in selected_genres:
continue
result.append(gid)
return result
# ---------------------------------------------------------------------------
# TUI wizard steps
# ---------------------------------------------------------------------------
def ask_formats(indices: dict) -> set[str]:
format_counts = count_by_format(indices)
console.print()
console.print("[bold cyan]Step 1 of 3 — File Formats[/bold cyan]")
console.print(
"Select the formats you want to include. "
"[dim]All are pre-selected — uncheck any you don't want.[/dim]"
)
console.print()
choices = [
questionary.Choice(
title=f"{family} ({count:,} games)",
value=family,
checked=True,
)
for family, count in format_counts.items()
if count > 0
]
selected = questionary.checkbox("Formats to include:", choices=choices).ask()
if selected is None:
sys.exit(0)
if not selected:
console.print("[yellow]Nothing selected — defaulting to all formats.[/yellow]")
return set(format_counts.keys())
return set(selected)
def ask_rating(indices: dict) -> str:
rating_counts = count_by_rating(indices)
console.print()
console.print("[bold cyan]Step 2 of 3 — Minimum Rating[/bold cyan]")
console.print(
"Choose the minimum average rating a game must have to be included.\n"
"[dim]Counts are independent of your format selection.[/dim]"
)
console.print()
choices = [
questionary.Choice(
title=f"{RATING_LABELS[key]} ({rating_counts[key]:,} games)",
value=key,
)
for key in RATING_KEYS
]
selected = questionary.select("Minimum rating:", choices=choices).ask()
if selected is None:
sys.exit(0)
return selected
def ask_genres(indices: dict) -> set[str]:
genre_counts = count_by_genre(indices)
console.print()
console.print("[bold cyan]Step 3 of 3 — Genres[/bold cyan]")
console.print(
"Select the genres you want to include. "
"[dim]All are pre-selected — uncheck any you don't want.\n"
"Counts are independent of your format and rating selections.[/dim]"
)
console.print()
choices = [
questionary.Choice(
title=f"{genre} ({count:,} games)",
value=genre,
checked=True,
)
for genre, count in genre_counts.items()
if count > 0
]
selected = questionary.checkbox("Genres to include:", choices=choices).ask()
if selected is None:
sys.exit(0)
if not selected:
console.print("[yellow]Nothing selected — defaulting to all genres.[/yellow]")
return set(genre_counts.keys())
return set(selected)
def show_filter_summary(
indices: dict,
selected_families: set[str],
rating_key: str,
selected_genres: set[str],
) -> int:
format_counts = count_by_format(indices)
rating_counts = count_by_rating(indices)
genre_counts = count_by_genre(indices)
format_total = sum(format_counts.get(f, 0) for f in selected_families)
rating_total = rating_counts[rating_key]
genre_total = sum(genre_counts.get(g, 0) for g in selected_genres)
combined = apply_filters(indices, selected_families, rating_key, selected_genres)
if len(selected_families) <= 4:
families_label = ", ".join(sorted(selected_families))
else:
families_label = f"{len(selected_families)} formats selected"
if len(selected_genres) <= 3:
genres_label = ", ".join(sorted(selected_genres))
else:
genres_label = f"{len(selected_genres)} genres selected"
table = Table(title="Filter Summary", show_header=True, header_style="bold cyan")
table.add_column("Filter", style="bold")
table.add_column("Selection")
table.add_column("Matching games", justify="right")
table.add_row("Format", families_label, f"{format_total:,}")
table.add_row("Rating", RATING_LABELS[rating_key], f"{rating_total:,}")
table.add_row("Genre", genres_label, f"{genre_total:,}")
table.add_section()
table.add_row(
"[bold]Combined[/bold]",
"[dim]all three filters applied[/dim]",
f"[bold green]{len(combined):,}[/bold green]",
)
console.print()
console.print(table)
console.print()
return len(combined)
def ask_output_path() -> Path:
console.print()
def validate_path(raw: str) -> bool | str:
if not raw.strip():
return "Please enter a path."
p = Path(raw.strip()).expanduser()
if p.exists() and not p.is_dir():
return f"{raw!r} exists and is not a directory."
return True
path_str = questionary.text(
"Where should the games be saved? (absolute path to a directory)",
validate=validate_path,
).ask()
if path_str is None:
sys.exit(0)
output_dir = Path(path_str.strip()).expanduser().resolve()
output_dir.mkdir(parents=True, exist_ok=True)
return output_dir
# ---------------------------------------------------------------------------
# Download
# ---------------------------------------------------------------------------
def download_games(indices: dict, matching_ids: list[str], output_dir: Path) -> None:
console.print(f"\n[bold]Downloading {len(matching_ids):,} games to:[/bold] {output_dir}")
console.print()
errors: list[str] = []
skipped = 0
downloaded = 0
for gid in tqdm(matching_ids, desc="Downloading", unit="game"):
title = indices["game_title"].get(gid, f"game_{gid}")
link = best_link(indices["links_by_game"].get(gid, []))
if not link:
errors.append(f"{title}: no suitable download link")
continue
url = link["url"]
filename = unquote(Path(urlparse(url).path).name)
if not filename:
filename = f"game_{gid}" + Path(urlparse(url).path).suffix
filepath = output_dir / filename
if filepath.exists():
skipped += 1
continue
try:
response = requests.get(url, timeout=60, stream=True, headers=BROWSER_HEADERS)
response.raise_for_status()
with filepath.open("wb") as fh:
for chunk in response.iter_content(chunk_size=65_536):
fh.write(chunk)
downloaded += 1
except requests.RequestException as exc:
errors.append(f"{title}: {exc}")
if filepath.exists():
filepath.unlink()
console.print()
console.print("[bold]━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[/bold]")
console.print(f"[green]Downloaded:[/green] {downloaded:,}")
console.print(f"[dim]Skipped (already present):[/dim] {skipped:,}")
console.print(f"[red]Errors:[/red] {len(errors):,}")
console.print(f"[bold]Saved to:[/bold] {output_dir}")
if errors:
console.print(f"\n[red]First {min(20, len(errors))} errors:[/red]")
for msg in errors[:20]:
console.print(f" {msg}")
if len(errors) > 20:
console.print(f" ... and {len(errors) - 20} more")
error_log = output_dir / "download_errors.txt"
error_log.write_text("\n".join(errors), encoding="utf-8")
console.print(f"\n[dim]Full error log:[/dim] {error_log}")
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> None:
show_welcome()
confirmed = questionary.confirm(
"Ready to fetch the IFDB database and get started?"
).ask()
if not confirmed:
console.print("[dim]Bye! 👋[/dim]")
sys.exit(0)
console.print()
dump_url = find_dump_url()
console.print(f"\n[bold]Downloading:[/bold] {dump_url}")
zip_data = download_bytes(dump_url, "IFDB dump")
console.print("\n[bold]Extracting SQL from archive...[/bold]")
sql = extract_sql_from_zip(zip_data)
console.print(f"SQL text: {len(sql):,} characters")
del zip_data
console.print("\n[bold]Parsing database tables (this may take a minute)...[/bold]")
data = parse_dump(sql, {"games", "gamelinks", "reviews"})
del sql
indices = build_indices(data)
del data
# Filter wizard — loops if the user wants to edit
selected_families: set[str] = set()
rating_key: str = "all"
selected_genres: set[str] = set()
first_run = True
while True:
selected_families = ask_formats(indices)
rating_key = ask_rating(indices)
selected_genres = ask_genres(indices)
match_count = show_filter_summary(indices, selected_families, rating_key, selected_genres)
if match_count == 0:
console.print("[yellow]No games match your current filters — please adjust them.[/yellow]")
action = questionary.select(
"What would you like to do?",
choices=["Edit filters", "Quit"],
).ask()
if action != "Edit filters":
sys.exit(0)
continue
action = questionary.select(
f"Download {match_count:,} matching games?",
choices=[
questionary.Choice(f"Yes — download all {match_count:,} games", value="download"),
questionary.Choice("Edit filters", value="edit"),
questionary.Choice("Quit", value="quit"),
],
).ask()
if action is None or action == "quit":
sys.exit(0)
if action == "edit":
continue
break
output_dir = ask_output_path()
matching_ids = apply_filters(indices, selected_families, rating_key, selected_genres)
download_games(indices, matching_ids, output_dir)
if __name__ == "__main__":
main()