#!/usr/bin/env python3 """Catch-Up Activity Report. Generates a markdown report of Discord and GitHub activity since Feb 15, 2026. Covers Discord messages in team channels (+ threads) and GitHub activity (PRs opened, issues opened, issue comments, PR comments, PR reviews, commits). """ import asyncio import json import os import subprocess from datetime import datetime, timezone from pathlib import Path import aiohttp DATA_DIR = Path(__file__).parent.parent.parent / "data" DISCORD_BOT_TOKEN = os.environ["DISCORD_BOT_TOKEN"] DISCORD_API_BASE = "https://discord.com/api/v10" GITHUB_API_BASE = "https://api.github.com" GITHUB_ORG = "nhcarrigan-spring-2026-cohort" CUTOFF = datetime(2026, 2, 15, 0, 0, 0, tzinfo=timezone.utc) CUTOFF_ISO = CUTOFF.isoformat().replace("+00:00", "Z") OUTPUT_FILE = "catch_up_report.md" TEXT_CHANNEL_IDS: dict[str, str] = { "Crimson Dahlia": "1464316744909852682", "Rose Camellia": "1464316751268286611", "Amber Wisteria": "1464316761410113641", "Ivory Orchid": "1464316770889240730", "Teal Iris": "1464316776459407448", "Peach Gardenia": "1464316785040953543", "Violet Carnation": "1464316805261824032", "Azure Lotus": "1464316814455472139", "Coral Sunflower": "1464316819711066263", "Indigo Tulip": "1464316826384072925", "Scarlet Hydrangea": "1464316839306985506", "Mint Narcissus": "1464316844251807952", "Sage Marigold": "1464316850669093040", } def team_repo_slug(team_name: str) -> str: """Convert a team name to its repository slug.""" return team_name.lower().replace(" ", "-") def get_github_token() -> str: """Retrieve the GitHub token via the gh CLI.""" result = subprocess.run( ["gh", "auth", "token"], capture_output=True, text=True, check=True ) return result.stdout.strip() class ActivityCollector: """Collects Discord and GitHub activity for the catch-up report.""" def __init__(self, discord_token: str, github_token: str) -> None: self.discord_headers = { "Authorization": f"Bot {discord_token}", "Content-Type": "application/json", } self.github_headers = { "Authorization": f"Bearer {github_token}", "Accept": "application/vnd.github+json", "X-GitHub-Api-Version": "2022-11-28", } self.session: aiohttp.ClientSession | None = None async def __aenter__(self) -> "ActivityCollector": self.session = aiohttp.ClientSession() return self async def __aexit__( self, exc_type: object, exc_val: object, exc_tb: object ) -> None: if self.session: await self.session.close() async def get_discord_username(self, user_id: str) -> str: """Fetch a Discord user's display name or username.""" url = f"{DISCORD_API_BASE}/users/{user_id}" async with self.session.get(url, headers=self.discord_headers) as response: if response.status == 429: retry_after = float((await response.json()).get("retry_after", 1)) await asyncio.sleep(retry_after) return await self.get_discord_username(user_id) if response.status != 200: return "*(unknown)*" data = await response.json() return data.get("global_name") or data.get("username") or "*(unknown)*" async def _get_discord_thread_ids(self, channel_id: str) -> list[str]: """Return IDs of all active and archived threads in a channel.""" thread_ids: list[str] = [] url = f"{DISCORD_API_BASE}/channels/{channel_id}/threads/active" async with self.session.get(url, headers=self.discord_headers) as response: if response.status == 200: data = await response.json() thread_ids.extend(t["id"] for t in data.get("threads", [])) for archive_type in ("public", "private"): url = ( f"{DISCORD_API_BASE}/channels/{channel_id}" f"/threads/archived/{archive_type}" ) async with self.session.get(url, headers=self.discord_headers) as response: if response.status == 200: data = await response.json() thread_ids.extend(t["id"] for t in data.get("threads", [])) return thread_ids async def _count_messages_in_channel( self, channel_id: str, label: str = "" ) -> dict[str, int]: """Count messages per Discord user ID since CUTOFF.""" counts: dict[str, int] = {} before_id: str | None = None page = 0 while True: url = f"{DISCORD_API_BASE}/channels/{channel_id}/messages?limit=100" if before_id: url += f"&before={before_id}" async with self.session.get(url, headers=self.discord_headers) as response: if response.status == 429: retry_after = float((await response.json()).get("retry_after", 1)) print(f" [Discord] rate limited, waiting {retry_after:.1f}s...") await asyncio.sleep(retry_after) continue if response.status != 200: print(f" [Discord] channel {channel_id} → HTTP {response.status}") break messages: list[dict] = await response.json() if not messages: break page += 1 prefix = f" ({label})" if label else "" print( f" [Discord]{prefix} page {page} — {len(messages)} messages fetched", # noqa: E501 end="\r", ) reached_cutoff = False for message in messages: ts = datetime.fromisoformat( message["timestamp"].replace("Z", "+00:00") ) if ts < CUTOFF: reached_cutoff = True break if message["author"].get("bot", False): continue author_id = message["author"]["id"] counts[author_id] = counts.get(author_id, 0) + 1 if reached_cutoff or len(messages) < 100: print() break before_id = messages[-1]["id"] await asyncio.sleep(0.5) return counts async def collect_discord_counts( self, team_name: str, channel_id: str, member_ids: list[str] ) -> dict[str, int]: """Return message counts per member for a team's channel and threads.""" print(" [Discord] Scanning main channel...") totals: dict[str, int] = await self._count_messages_in_channel( channel_id, label="main" ) thread_ids = await self._get_discord_thread_ids(channel_id) total_threads = len(thread_ids) for i, thread_id in enumerate(thread_ids, start=1): print(f" [Discord] Scanning thread {i}/{total_threads}...") thread_counts = await self._count_messages_in_channel( thread_id, label=f"thread {i}/{total_threads}" ) for user_id, count in thread_counts.items(): totals[user_id] = totals.get(user_id, 0) + count await asyncio.sleep(0.3) if total_threads == 0: print(" [Discord] No threads found.") return {member_id: totals.get(member_id, 0) for member_id in member_ids} async def _github_get_all_pages(self, url: str, params: dict) -> list[dict]: """Fetch all pages from a paginated GitHub REST API endpoint.""" results: list[dict] = [] page = 1 while True: paged_params = {**params, "per_page": 100, "page": page} async with self.session.get( url, headers=self.github_headers, params=paged_params ) as response: if response.status in (404, 422): break if response.status == 403: print(f" [GitHub] rate limited on {url}, waiting 60s...") await asyncio.sleep(60) continue if response.status != 200: print(f" [GitHub] {url} → HTTP {response.status}") break data: list[dict] = await response.json() if not data: break results.extend(data) if len(data) < 100: break page += 1 await asyncio.sleep(0.2) return results async def collect_github_counts( self, team_name: str, github_usernames: list[str] ) -> dict[str, dict[str, int]]: """Return activity counts per member for a team's GitHub repository.""" repo_slug = team_repo_slug(team_name) repo = f"{GITHUB_ORG}/{repo_slug}" print(f" [GitHub] repo: {repo}") counts: dict[str, dict[str, int]] = { username: { "prs_opened": 0, "issues_opened": 0, "issue_comments": 0, "pr_comments": 0, "pr_reviews": 0, "commits": 0, } for username in github_usernames if username } def resolve_username(login: str) -> str | None: lower = login.lower() for u in github_usernames: if u and u.lower() == lower: return u return None print(" [GitHub] Fetching PRs...") prs = await self._github_get_all_pages( f"{GITHUB_API_BASE}/repos/{repo}/pulls", {"state": "all", "sort": "created", "direction": "desc"}, ) print(f" [GitHub] {len(prs)} PRs fetched — counting opens since cutoff...") for pr in prs: created_at = datetime.fromisoformat(pr["created_at"].replace("Z", "+00:00")) if created_at < CUTOFF: break login = pr["user"]["login"] username = resolve_username(login) if username: counts[username]["prs_opened"] += 1 print(" [GitHub] Fetching issues...") issues = await self._github_get_all_pages( f"{GITHUB_API_BASE}/repos/{repo}/issues", { "state": "all", "sort": "created", "direction": "desc", "since": CUTOFF_ISO, }, ) print(f" [GitHub] {len(issues)} issues/PRs fetched — counting issue opens...") for issue in issues: if "pull_request" in issue: continue created_at = datetime.fromisoformat( issue["created_at"].replace("Z", "+00:00") ) if created_at < CUTOFF: continue login = issue["user"]["login"] username = resolve_username(login) if username: counts[username]["issues_opened"] += 1 print(" [GitHub] Fetching issue comments...") issue_comments = await self._github_get_all_pages( f"{GITHUB_API_BASE}/repos/{repo}/issues/comments", {"sort": "created", "direction": "desc", "since": CUTOFF_ISO}, ) print(f" [GitHub] {len(issue_comments)} issue comments fetched.") for comment in issue_comments: created_at = datetime.fromisoformat( comment["created_at"].replace("Z", "+00:00") ) if created_at < CUTOFF: continue login = comment["user"]["login"] username = resolve_username(login) if username: counts[username]["issue_comments"] += 1 print(" [GitHub] Fetching PR review comments...") pr_comments = await self._github_get_all_pages( f"{GITHUB_API_BASE}/repos/{repo}/pulls/comments", {"sort": "created", "direction": "desc", "since": CUTOFF_ISO}, ) print(f" [GitHub] {len(pr_comments)} PR review comments fetched.") for comment in pr_comments: created_at = datetime.fromisoformat( comment["created_at"].replace("Z", "+00:00") ) if created_at < CUTOFF: continue login = comment["user"]["login"] username = resolve_username(login) if username: counts[username]["pr_comments"] += 1 all_pr_numbers = [pr["number"] for pr in prs] total_prs = len(all_pr_numbers) print(f" [GitHub] Fetching reviews for {total_prs} PRs...") for i, pr_number in enumerate(all_pr_numbers, start=1): print(f" [GitHub] PR reviews: {i}/{total_prs}", end="\r") reviews = await self._github_get_all_pages( f"{GITHUB_API_BASE}/repos/{repo}/pulls/{pr_number}/reviews", {}, ) for review in reviews: submitted_at_raw = review.get("submitted_at") if not submitted_at_raw: continue submitted_at = datetime.fromisoformat( submitted_at_raw.replace("Z", "+00:00") ) if submitted_at < CUTOFF: continue login = review["user"]["login"] username = resolve_username(login) if username: counts[username]["pr_reviews"] += 1 await asyncio.sleep(0.1) if total_prs > 0: print() member_list = list(counts.keys()) total_members = len(member_list) print(f" [GitHub] Fetching commits for {total_members} members...") for i, username in enumerate(member_list, start=1): print(f" [GitHub] Commits: {i}/{total_members} ({username})", end="\r") commits = await self._github_get_all_pages( f"{GITHUB_API_BASE}/repos/{repo}/commits", {"author": username, "since": CUTOFF_ISO}, ) counts[username]["commits"] = len(commits) await asyncio.sleep(0.2) if total_members > 0: print() return counts def build_report( team_data: list[dict], discord_to_github: dict[str, str], discord_usernames: dict[str, str], discord_results: dict[str, dict[str, int]], github_results: dict[str, dict[str, dict[str, int]]], ) -> str: """Build the markdown activity report.""" lines = [ "# Catch-Up Activity Report", "", f"**Period:** 2026-02-15 00:00 UTC → " f"{datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M')} UTC", "", "## Activity by Team", "", "| Discord ID | Discord Username | GitHub Username | Team | " "Discord Messages | PRs Opened | Issues Opened | Issue Comments | " "PR Comments | PR Reviews | Commits |", "|------------|-----------------|-----------------|------|" "-----------------|------------|---------------|----------------|" "-------------|------------|---------|", ] for team in team_data: team_name = team["name"] if team_name == "Jade Jasmine": continue member_ids = team["leaders"] + team["participants"] team_discord_counts = discord_results.get(team_name, {}) team_github_counts = github_results.get(team_name, {}) for member_id in member_ids: github_username = discord_to_github.get(member_id, "") discord_username = discord_usernames.get(member_id, "*(unknown)*") discord_msg_count = team_discord_counts.get(member_id, 0) if github_username: gh = team_github_counts.get(github_username, {}) prs = gh.get("prs_opened", 0) issues = gh.get("issues_opened", 0) issue_comments = gh.get("issue_comments", 0) pr_comments = gh.get("pr_comments", 0) pr_reviews = gh.get("pr_reviews", 0) commits = gh.get("commits", 0) else: prs = issues = issue_comments = pr_comments = pr_reviews = commits = ( "N/A" ) lines.append( f"| {member_id} | {discord_username} | {github_username or 'N/A'} " f"| {team_name} | {discord_msg_count} | {prs} | {issues} " f"| {issue_comments} | {pr_comments} | {pr_reviews} | {commits} |" ) lines.append("") lines.append( f"*Generated at {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')} UTC*" ) return "\n".join(lines) async def main() -> None: """Run the catch-up activity report.""" print("Loading data files...") with open(DATA_DIR / "team_assignments.json") as f: team_data: list[dict] = json.load(f) with open(DATA_DIR / "discord_to_github.json") as f: discord_to_github: dict[str, str] = json.load(f) print("Getting GitHub token via gh CLI...") github_token = get_github_token() print(f"\nCollecting activity since {CUTOFF.isoformat()}...\n") discord_results: dict[str, dict[str, int]] = {} github_results: dict[str, dict[str, dict[str, int]]] = {} discord_usernames: dict[str, str] = {} async with ActivityCollector(DISCORD_BOT_TOKEN, github_token) as collector: all_member_ids: list[str] = [] for team in team_data: if team["name"] == "Jade Jasmine": continue all_member_ids.extend(team["leaders"] + team["participants"]) unique_member_ids = list(dict.fromkeys(all_member_ids)) total_members = len(unique_member_ids) print(f"Fetching Discord usernames for {total_members} members...") for i, member_id in enumerate(unique_member_ids, start=1): if member_id not in discord_usernames: print(f" username {i}/{total_members}...", end="\r") discord_usernames[member_id] = await collector.get_discord_username( member_id ) await asyncio.sleep(0.3) print(f" Done — {total_members} usernames fetched. ") for team in team_data: team_name = team["name"] if team_name == "Jade Jasmine": continue print(f"\n=== {team_name} ===") channel_id = TEXT_CHANNEL_IDS[team_name] member_ids = team["leaders"] + team["participants"] discord_results[team_name] = await collector.collect_discord_counts( team_name, channel_id, member_ids ) github_usernames_for_team = [ discord_to_github[mid] for mid in member_ids if mid in discord_to_github and discord_to_github[mid] ] github_results[team_name] = await collector.collect_github_counts( team_name, github_usernames_for_team ) print("\nBuilding report...") report = build_report( team_data, discord_to_github, discord_usernames, discord_results, github_results, ) with open(OUTPUT_FILE, "w") as f: f.write(report) print(f"\n✅ Report saved to {OUTPUT_FILE}") if __name__ == "__main__": asyncio.run(main())