fix: resolve lint issues for Python and TypeScript
CI / dependency-pin-check-typescript (pull_request) Successful in 4s
CI / dependency-pin-check-python (pull_request) Successful in 4s
Security Scan and Upload / Security & DefectDojo Upload (pull_request) Successful in 1m15s
CI / typescript (pull_request) Successful in 9m40s
CI / python (pull_request) Successful in 9m22s

- Update pyproject.toml to ignore T201 (print statements) and other rules
- Fix quote styles, bare except, set comprehensions in Python scripts
- Rename interactive-runner.ts to interactiveRunner.ts (camelCase)
- Refactor TypeScript to use import.meta.url instead of __dirname
- Add proper JSDoc headers and rename abbreviated variables
This commit is contained in:
2026-01-23 19:46:23 -08:00
committed by Naomi Carrigan
parent 611ca895f8
commit f8598d6ddf
11 changed files with 599 additions and 403 deletions
+58 -49
View File
@@ -2,22 +2,23 @@ import json
import re
from collections import defaultdict
DAYS = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
DAYS = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]
UTC_BLOCKS = {
'mornings': (6, 12), # 06:00 - 12:00 UTC
'afternoons': (12, 18), # 12:00 - 18:00 UTC
'evenings': (18, 24), # 18:00 - 00:00 UTC
'nights': (0, 6) # 00:00 - 06:00 UTC
"mornings": (6, 12), # 06:00 - 12:00 UTC
"afternoons": (12, 18), # 12:00 - 18:00 UTC
"evenings": (18, 24), # 18:00 - 00:00 UTC
"nights": (0, 6), # 00:00 - 06:00 UTC
}
def parse_utc_offset(timezone_str: str) -> float:
"""Extract UTC offset from timezone string like 'America/New_York (UTC-5)'"""
match = re.search(r'UTC([+-]?\d+(?::\d+)?)', timezone_str)
match = re.search(r"UTC([+-]?\d+(?::\d+)?)", timezone_str)
if match:
offset_str = match.group(1)
if ':' in offset_str:
parts = offset_str.split(':')
if ":" in offset_str:
parts = offset_str.split(":")
hours = int(parts[0])
minutes = int(parts[1]) if len(parts) > 1 else 0
if hours < 0:
@@ -26,42 +27,45 @@ def parse_utc_offset(timezone_str: str) -> float:
return float(offset_str)
return 0
def parse_time_slots(time_str: str) -> list[tuple[int, int]]:
"""Parse time slots like '17:00-18:00' or '07:00-08:00; 19:00-20:00'"""
slots = []
if not time_str or time_str.lower() in ['n/a', 'na', '']:
if not time_str or time_str.lower() in ["n/a", "na", ""]:
return slots
parts = time_str.split(';')
parts = time_str.split(";")
for part in parts:
part = part.strip()
match = re.search(r'(\d{1,2}):(\d{2})\s*-\s*(\d{1,2}):(\d{2})', part)
match = re.search(r"(\d{1,2}):(\d{2})\s*-\s*(\d{1,2}):(\d{2})", part)
if match:
start_hour = int(match.group(1))
end_hour = int(match.group(3))
slots.append((start_hour, end_hour))
return slots
def local_hour_to_utc(local_hour: int, utc_offset: float) -> int:
"""Convert local hour to UTC hour"""
utc_hour = local_hour - utc_offset
return int(utc_hour) % 24
def get_utc_blocks_for_hour(utc_hour: int) -> list[str]:
"""Determine which UTC block(s) an hour falls into"""
blocks = []
for block_name, (start, end) in UTC_BLOCKS.items():
if block_name == 'nights':
if block_name == "nights":
if utc_hour >= 0 and utc_hour < 6:
blocks.append(block_name)
elif block_name == 'evenings':
elif block_name == "evenings":
if utc_hour >= 18 and utc_hour < 24:
blocks.append(block_name)
else:
if utc_hour >= start and utc_hour < end:
blocks.append(block_name)
elif utc_hour >= start and utc_hour < end:
blocks.append(block_name)
return blocks
def analyze_applicant_availability(timezone_str: str, day_slots: dict) -> dict:
"""Analyze availability for one applicant"""
utc_offset = parse_utc_offset(timezone_str)
@@ -80,42 +84,43 @@ def analyze_applicant_availability(timezone_str: str, day_slots: dict) -> dict:
block_counts[block] += 1
available_blocks = []
for block in ['mornings', 'afternoons', 'evenings', 'nights']:
for block in ["mornings", "afternoons", "evenings", "nights"]:
if block_counts[block] >= 3:
available_blocks.append(block)
return {
'utc_offset': utc_offset,
'timezone': timezone_str,
'available_blocks': available_blocks,
'block_counts': dict(block_counts),
'total_unique_utc_hours': len(all_utc_hours)
"utc_offset": utc_offset,
"timezone": timezone_str,
"available_blocks": available_blocks,
"block_counts": dict(block_counts),
"total_unique_utc_hours": len(all_utc_hours),
}
def parse_table_md() -> list[dict]:
"""Parse table.md and extract availability data"""
with open('table.md', 'r') as f:
with open("table.md") as f:
content = f.read()
lines = content.strip().split('\n')
lines = content.strip().split("\n")
header_idx = None
for i, line in enumerate(lines):
if line.startswith('| Discord ID'):
if line.startswith("| Discord ID"):
header_idx = i
break
if header_idx is None:
raise ValueError("Could not find table header")
headers = [h.strip() for h in lines[header_idx].split('|')[1:-1]]
headers = [h.strip() for h in lines[header_idx].split("|")[1:-1]]
applicants = []
for line in lines[header_idx + 2:]:
if not line.startswith('|'):
for line in lines[header_idx + 2 :]:
if not line.startswith("|"):
continue
cells = [c.strip() for c in line.split('|')[1:-1]]
cells = [c.strip() for c in line.split("|")[1:-1]]
if len(cells) < len(headers):
continue
@@ -124,11 +129,12 @@ def parse_table_md() -> list[dict]:
return applicants
def main():
with open('discord_verification.json', 'r') as f:
with open("discord_verification.json") as f:
verification = json.load(f)
verified_ids = set(v[0] for v in verification['verified'])
verified_ids = {v[0] for v in verification["verified"]}
print(f"Verified applicants: {len(verified_ids)}")
applicants = parse_table_md()
@@ -137,46 +143,49 @@ def main():
availability_results = []
for applicant in applicants:
discord_id = applicant.get('Discord ID', '')
discord_id = applicant.get("Discord ID", "")
if discord_id not in verified_ids:
continue
timezone = applicant.get('Timezone', '')
timezone = applicant.get("Timezone", "")
day_slots = {}
for day in DAYS:
time_str = applicant.get(day, '')
time_str = applicant.get(day, "")
day_slots[day] = parse_time_slots(time_str)
analysis = analyze_applicant_availability(timezone, day_slots)
availability_results.append({
'discord_id': discord_id,
'timezone': timezone,
'utc_offset': analysis['utc_offset'],
'available_blocks': analysis['available_blocks'],
'block_counts': analysis['block_counts'],
'total_unique_utc_hours': analysis['total_unique_utc_hours']
})
availability_results.append(
{
"discord_id": discord_id,
"timezone": timezone,
"utc_offset": analysis["utc_offset"],
"available_blocks": analysis["available_blocks"],
"block_counts": analysis["block_counts"],
"total_unique_utc_hours": analysis["total_unique_utc_hours"],
}
)
with open('availability_analysis.json', 'w') as f:
with open("availability_analysis.json", "w") as f:
json.dump(availability_results, f, indent=2)
block_distribution = defaultdict(int)
for result in availability_results:
for block in result['available_blocks']:
for block in result["available_blocks"]:
block_distribution[block] += 1
print(f"\n=== AVAILABILITY ANALYSIS COMPLETE ===")
print("\n=== AVAILABILITY ANALYSIS COMPLETE ===")
print(f"Analyzed: {len(availability_results)} applicants")
print(f"\nBlock Distribution (applicants available in each block):")
for block in ['mornings', 'afternoons', 'evenings', 'nights']:
print("\nBlock Distribution (applicants available in each block):")
for block in ["mornings", "afternoons", "evenings", "nights"]:
print(f" {block.capitalize()}: {block_distribution[block]}")
no_blocks = sum(1 for r in availability_results if not r['available_blocks'])
no_blocks = sum(1 for r in availability_results if not r["available_blocks"])
print(f"\nApplicants with no clear block availability: {no_blocks}")
print(f"\nResults saved to availability_analysis.json")
print("\nResults saved to availability_analysis.json")
if __name__ == "__main__":
main()
+9 -9
View File
@@ -1,12 +1,12 @@
#!/usr/bin/env python3
"""
Assign the Cohort role to all 155 participants.
"""Assign the Cohort role to all 155 participants.
Respects Discord rate limits with proper backoff and retry logic.
"""
import json
import os
import time
import requests
BOT_TOKEN = os.environ["DISCORD_BOT_TOKEN"]
@@ -14,10 +14,8 @@ GUILD_ID = "692816967895220344"
COHORT_ROLE_ID = "1464314780935258112"
BASE_URL = "https://discord.com/api/v10"
HEADERS = {
"Authorization": f"Bot {BOT_TOKEN}",
"Content-Length": "0"
}
HEADERS = {"Authorization": f"Bot {BOT_TOKEN}", "Content-Length": "0"}
def assign_role_with_retry(user_id: str, role_id: str, max_retries: int = 5) -> bool:
url = f"{BASE_URL}/guilds/{GUILD_ID}/members/{user_id}/roles/{role_id}"
@@ -35,21 +33,22 @@ def assign_role_with_retry(user_id: str, role_id: str, max_retries: int = 5) ->
if retry_after is None:
try:
retry_after = response.json().get("retry_after", 1)
except:
except Exception:
retry_after = 1
retry_after = float(retry_after)
print(f" Rate limited! Waiting {retry_after:.2f}s before retry...")
time.sleep(retry_after)
else:
print(f" Error {response.status_code}: {response.text}")
backoff_time = (2 ** attempt) * 0.5
backoff_time = (2**attempt) * 0.5
print(f" Retrying in {backoff_time:.2f}s...")
time.sleep(backoff_time)
return False
def main():
with open("team_assignments.json", "r") as f:
with open("team_assignments.json") as f:
teams = json.load(f)
all_users = []
@@ -82,5 +81,6 @@ def main():
print("-" * 50)
print(f"Complete! Success: {success_count}, Failed: {fail_count}")
if __name__ == "__main__":
main()
+6 -9
View File
@@ -1,22 +1,19 @@
#!/usr/bin/env python3
"""
Assign team-specific roles to all 155 participants.
"""Assign team-specific roles to all 155 participants.
Respects Discord rate limits with proper backoff and retry logic.
"""
import json
import os
import time
import requests
BOT_TOKEN = os.environ["DISCORD_BOT_TOKEN"]
GUILD_ID = "692816967895220344"
BASE_URL = "https://discord.com/api/v10"
HEADERS = {
"Authorization": f"Bot {BOT_TOKEN}",
"Content-Length": "0"
}
HEADERS = {"Authorization": f"Bot {BOT_TOKEN}", "Content-Length": "0"}
TEAM_ROLE_IDS = {
"Jade Jasmine": "1464314923780931677",
@@ -52,14 +49,14 @@ def assign_role_with_retry(user_id: str, role_id: str, max_retries: int = 5) ->
if retry_after is None:
try:
retry_after = response.json().get("retry_after", 1)
except:
except Exception:
retry_after = 1
retry_after = float(retry_after)
print(f" Rate limited! Waiting {retry_after:.2f}s before retry...")
time.sleep(retry_after)
else:
print(f" Error {response.status_code}: {response.text}")
backoff_time = (2 ** attempt) * 0.5
backoff_time = (2**attempt) * 0.5
print(f" Retrying in {backoff_time:.2f}s...")
time.sleep(backoff_time)
@@ -67,7 +64,7 @@ def assign_role_with_retry(user_id: str, role_id: str, max_retries: int = 5) ->
def main():
with open("team_assignments.json", "r") as f:
with open("team_assignments.json") as f:
teams = json.load(f)
print(f"Assigning team roles to {len(teams)} teams...")
+106 -73
View File
@@ -1,19 +1,19 @@
import json
import re
import time
import urllib.request
import urllib.error
from typing import Optional
import urllib.request
# GitHub API (no auth needed for public repos, but rate limited)
GITHUB_API = "https://api.github.com"
def extract_github_info(url: str) -> tuple[Optional[str], Optional[str]]:
def extract_github_info(url: str) -> tuple[str | None, str | None]:
"""Extract owner and repo from GitHub URL."""
# Handle various GitHub URL formats
patterns = [
r'github\.com/([^/]+)/([^/\s?#]+)', # github.com/owner/repo
r'github\.com/([^/\s?#]+)/?$', # github.com/owner (profile)
r"github\.com/([^/]+)/([^/\s?#]+)", # github.com/owner/repo
r"github\.com/([^/\s?#]+)/?$", # github.com/owner (profile)
]
for pattern in patterns:
@@ -21,12 +21,13 @@ def extract_github_info(url: str) -> tuple[Optional[str], Optional[str]]:
if match:
groups = match.groups()
if len(groups) == 2:
return groups[0], groups[1].rstrip('.git')
return groups[0], groups[1].rstrip(".git")
elif len(groups) == 1:
return groups[0], None
return None, None
def fetch_github_user(username: str) -> Optional[dict]:
def fetch_github_user(username: str) -> dict | None:
"""Fetch GitHub user profile."""
url = f"{GITHUB_API}/users/{username}"
req = urllib.request.Request(url)
@@ -36,9 +37,10 @@ def fetch_github_user(username: str) -> Optional[dict]:
try:
response = urllib.request.urlopen(req, timeout=10)
return json.loads(response.read().decode())
except Exception as e:
except Exception:
return None
def fetch_github_repos(username: str) -> list:
"""Fetch user's public repos."""
url = f"{GITHUB_API}/users/{username}/repos?per_page=100&sort=updated"
@@ -49,9 +51,10 @@ def fetch_github_repos(username: str) -> list:
try:
response = urllib.request.urlopen(req, timeout=10)
return json.loads(response.read().decode())
except Exception as e:
except Exception:
return []
def fetch_repo_languages(owner: str, repo: str) -> dict:
"""Fetch languages used in a repo."""
url = f"{GITHUB_API}/repos/{owner}/{repo}/languages"
@@ -62,21 +65,22 @@ def fetch_repo_languages(owner: str, repo: str) -> dict:
try:
response = urllib.request.urlopen(req, timeout=10)
return json.loads(response.read().decode())
except Exception as e:
except Exception:
return {}
def analyze_proficiency_text(text: str) -> tuple[str, list[str]]:
"""Analyze self-described proficiency text."""
text_lower = text.lower()
# Extract languages/technologies mentioned
tech_patterns = [
r'\b(python|java|javascript|typescript|c\+\+|c#|ruby|go|rust|swift|kotlin|php|perl|scala|r)\b',
r'\b(react|angular|vue|node|express|django|flask|spring|rails|laravel)\b',
r'\b(html|css|sass|scss|tailwind|bootstrap)\b',
r'\b(sql|mysql|postgresql|mongodb|redis|firebase)\b',
r'\b(docker|kubernetes|aws|azure|gcp|git)\b',
r'\b(machine learning|ml|ai|data science|tensorflow|pytorch)\b',
r"\b(python|java|javascript|typescript|c\+\+|c#|ruby|go|rust|swift|kotlin|php|perl|scala|r)\b",
r"\b(react|angular|vue|node|express|django|flask|spring|rails|laravel)\b",
r"\b(html|css|sass|scss|tailwind|bootstrap)\b",
r"\b(sql|mysql|postgresql|mongodb|redis|firebase)\b",
r"\b(docker|kubernetes|aws|azure|gcp|git)\b",
r"\b(machine learning|ml|ai|data science|tensorflow|pytorch)\b",
]
technologies = set()
@@ -85,126 +89,152 @@ def analyze_proficiency_text(text: str) -> tuple[str, list[str]]:
technologies.update(matches)
# Determine level from keywords
beginner_keywords = ['beginner', 'learning', 'new to', 'just started', 'basic', 'novice', 'early']
intermediate_keywords = ['intermediate', 'comfortable', 'familiar', 'some experience', 'worked with']
advanced_keywords = ['advanced', 'expert', 'senior', 'professional', 'years of experience', 'proficient', 'strong']
beginner_keywords = [
"beginner",
"learning",
"new to",
"just started",
"basic",
"novice",
"early",
]
intermediate_keywords = [
"intermediate",
"comfortable",
"familiar",
"some experience",
"worked with",
]
advanced_keywords = [
"advanced",
"expert",
"senior",
"professional",
"years of experience",
"proficient",
"strong",
]
level = 'intermediate' # default
level = "intermediate" # default
if any(kw in text_lower for kw in advanced_keywords):
level = 'advanced'
level = "advanced"
elif any(kw in text_lower for kw in beginner_keywords):
level = 'beginner'
level = "beginner"
elif any(kw in text_lower for kw in intermediate_keywords):
level = 'intermediate'
level = "intermediate"
return level, list(technologies)
def evaluate_applicant(applicant: dict, index: int, total: int) -> dict:
"""Evaluate a single applicant's technical proficiency."""
discord_id = applicant['discord_id']
project_url = applicant['project_url']
proficiency_self = applicant['proficiency_self']
project_reason = applicant['project_reason']
discord_id = applicant["discord_id"]
project_url = applicant["project_url"]
proficiency_self = applicant["proficiency_self"]
project_reason = applicant["project_reason"]
print(f"[{index+1}/{total}] Evaluating {discord_id}...")
print(f"[{index + 1}/{total}] Evaluating {discord_id}...")
result = {
'discord_id': discord_id,
'github_username': None,
'github_repos_count': 0,
'github_followers': 0,
'languages_from_github': [],
'languages_from_text': [],
'self_described_level': None,
'final_proficiency': 'intermediate', # default
'tech_stack': [],
'notes': []
"discord_id": discord_id,
"github_username": None,
"github_repos_count": 0,
"github_followers": 0,
"languages_from_github": [],
"languages_from_text": [],
"self_described_level": None,
"final_proficiency": "intermediate", # default
"tech_stack": [],
"notes": [],
}
# Analyze self-description
text_level, text_techs = analyze_proficiency_text(proficiency_self + " " + project_reason)
result['self_described_level'] = text_level
result['languages_from_text'] = text_techs
text_level, text_techs = analyze_proficiency_text(
proficiency_self + " " + project_reason
)
result["self_described_level"] = text_level
result["languages_from_text"] = text_techs
# Fetch GitHub data if URL provided
if project_url and 'github.com' in project_url:
if project_url and "github.com" in project_url:
owner, repo = extract_github_info(project_url)
if owner:
result['github_username'] = owner
result["github_username"] = owner
# Fetch user profile
user_data = fetch_github_user(owner)
if user_data:
result['github_repos_count'] = user_data.get('public_repos', 0)
result['github_followers'] = user_data.get('followers', 0)
result["github_repos_count"] = user_data.get("public_repos", 0)
result["github_followers"] = user_data.get("followers", 0)
# Fetch repos to get languages
repos = fetch_github_repos(owner)
all_languages = set()
for r in repos[:10]: # Check top 10 repos
if r.get('language'):
all_languages.add(r['language'].lower())
result['languages_from_github'] = list(all_languages)
if r.get("language"):
all_languages.add(r["language"].lower())
result["languages_from_github"] = list(all_languages)
# If specific repo provided, get its languages
if repo:
repo_langs = fetch_repo_languages(owner, repo)
for lang in repo_langs.keys():
for lang in repo_langs:
all_languages.add(lang.lower())
result['languages_from_github'] = list(all_languages)
result["languages_from_github"] = list(all_languages)
time.sleep(0.5) # Rate limiting
# Combine tech stack
all_tech = set(result['languages_from_github']) | set(result['languages_from_text'])
result['tech_stack'] = sorted(list(all_tech))
all_tech = set(result["languages_from_github"]) | set(result["languages_from_text"])
result["tech_stack"] = sorted(all_tech)
# Determine final proficiency
# Factors: self-description, GitHub activity, tech diversity
github_score = 0
if result['github_repos_count'] >= 20:
if result["github_repos_count"] >= 20:
github_score += 2
elif result['github_repos_count'] >= 10:
elif result["github_repos_count"] >= 10:
github_score += 1
if result['github_followers'] >= 50:
if result["github_followers"] >= 50:
github_score += 2
elif result['github_followers'] >= 10:
elif result["github_followers"] >= 10:
github_score += 1
tech_count = len(result['tech_stack'])
tech_count = len(result["tech_stack"])
if tech_count >= 6:
github_score += 2
elif tech_count >= 3:
github_score += 1
# Map self-described level to score
level_scores = {'beginner': 0, 'intermediate': 2, 'advanced': 4}
level_scores = {"beginner": 0, "intermediate": 2, "advanced": 4}
self_score = level_scores.get(text_level, 2)
# Combined score
total_score = github_score + self_score
if total_score >= 7:
result['final_proficiency'] = 'advanced'
result["final_proficiency"] = "advanced"
elif total_score >= 3:
result['final_proficiency'] = 'intermediate'
result["final_proficiency"] = "intermediate"
else:
result['final_proficiency'] = 'beginner'
result["final_proficiency"] = "beginner"
# Add notes
if not project_url or 'github.com' not in project_url:
result['notes'].append('No GitHub URL provided')
if result['github_repos_count'] == 0 and result['github_username']:
result['notes'].append('GitHub profile has no public repos')
if not project_url or "github.com" not in project_url:
result["notes"].append("No GitHub URL provided")
if result["github_repos_count"] == 0 and result["github_username"]:
result["notes"].append("GitHub profile has no public repos")
return result
def main():
# Load applicants
with open('applicants_to_evaluate.json', 'r') as f:
with open("applicants_to_evaluate.json") as f:
applicants = json.load(f)
print(f"Evaluating {len(applicants)} applicants...\n")
@@ -216,23 +246,26 @@ def main():
# Progress update every 10
if (i + 1) % 10 == 0:
print(f" Progress: {i+1}/{len(applicants)} complete")
print(f" Progress: {i + 1}/{len(applicants)} complete")
# Save results
with open('proficiency_evaluations.json', 'w') as f:
with open("proficiency_evaluations.json", "w") as f:
json.dump(evaluations, f, indent=2)
# Summary
beginner = sum(1 for e in evaluations if e['final_proficiency'] == 'beginner')
intermediate = sum(1 for e in evaluations if e['final_proficiency'] == 'intermediate')
advanced = sum(1 for e in evaluations if e['final_proficiency'] == 'advanced')
beginner = sum(1 for e in evaluations if e["final_proficiency"] == "beginner")
intermediate = sum(
1 for e in evaluations if e["final_proficiency"] == "intermediate"
)
advanced = sum(1 for e in evaluations if e["final_proficiency"] == "advanced")
print(f"\n=== EVALUATION COMPLETE ===")
print("\n=== EVALUATION COMPLETE ===")
print(f"Beginner: {beginner}")
print(f"Intermediate: {intermediate}")
print(f"Advanced: {advanced}")
print(f"Total: {len(evaluations)}")
print(f"\nResults saved to proficiency_evaluations.json")
print("\nResults saved to proficiency_evaluations.json")
if __name__ == "__main__":
main()
+78 -48
View File
@@ -1,61 +1,63 @@
import json
BLOCK_EMOJIS = {
'mornings': '🌅',
'afternoons': '☀️',
'evenings': '🌆',
'nights': '🌙'
}
BLOCK_EMOJIS = {"mornings": "🌅", "afternoons": "☀️", "evenings": "🌆", "nights": "🌙"}
def load_all_data():
"""Load all evaluation data files"""
with open('discord_verification.json', 'r') as f:
with open("discord_verification.json") as f:
verification = json.load(f)
with open('proficiency_evaluations.json', 'r') as f:
with open("proficiency_evaluations.json") as f:
proficiency = json.load(f)
with open('availability_analysis.json', 'r') as f:
with open("availability_analysis.json") as f:
availability = json.load(f)
with open('leadership_candidates.json', 'r') as f:
with open("leadership_candidates.json") as f:
candidates = json.load(f)
with open('leadership_evaluations.json', 'r') as f:
with open("leadership_evaluations.json") as f:
leadership = json.load(f)
return verification, proficiency, availability, candidates, leadership
def build_lookup_dicts(verification, proficiency, availability, leadership):
"""Build lookup dictionaries by discord_id"""
verified_usernames = {v[0]: v[1] for v in verification['verified']}
verified_usernames = {v[0]: v[1] for v in verification["verified"]}
prof_by_id = {p['discord_id']: p for p in proficiency}
prof_by_id = {p["discord_id"]: p for p in proficiency}
avail_by_id = {a['discord_id']: a for a in availability}
avail_by_id = {a["discord_id"]: a for a in availability}
lead_by_id = {l['discord_id']: l for l in leadership}
lead_by_id = {l["discord_id"]: l for l in leadership}
return verified_usernames, prof_by_id, avail_by_id, lead_by_id
def format_availability_blocks(blocks):
"""Format availability blocks with emojis"""
if not blocks:
return "No consistent availability"
formatted = []
for block in ['mornings', 'afternoons', 'evenings', 'nights']:
for block in ["mornings", "afternoons", "evenings", "nights"]:
if block in blocks:
formatted.append(f"{BLOCK_EMOJIS[block]} {block.capitalize()}")
return ", ".join(formatted)
def format_tech_stack(tech_stack):
"""Format tech stack list"""
if not tech_stack:
return "Not specified"
return ", ".join(sorted(tech_stack))
def generate_participants_md(non_leader_ids, verified_usernames, prof_by_id, avail_by_id):
def generate_participants_md(
non_leader_ids, verified_usernames, prof_by_id, avail_by_id
):
"""Generate participants.md for non-leaders"""
lines = [
"# Cohort Participants",
@@ -63,7 +65,7 @@ def generate_participants_md(non_leader_ids, verified_usernames, prof_by_id, ava
f"**Total Participants**: {len(non_leader_ids)}",
"",
"---",
""
"",
]
beginner_count = 0
@@ -78,16 +80,16 @@ def generate_participants_md(non_leader_ids, verified_usernames, prof_by_id, ava
prof = prof_by_id.get(discord_id, {})
avail = avail_by_id.get(discord_id, {})
proficiency = prof.get('final_proficiency', 'unknown')
tech_stack = prof.get('tech_stack', [])
blocks = avail.get('available_blocks', [])
notes = prof.get('notes', [])
proficiency = prof.get("final_proficiency", "unknown")
tech_stack = prof.get("tech_stack", [])
blocks = avail.get("available_blocks", [])
notes = prof.get("notes", [])
if proficiency == 'beginner':
if proficiency == "beginner":
beginner_count += 1
elif proficiency == 'intermediate':
elif proficiency == "intermediate":
intermediate_count += 1
elif proficiency == 'advanced':
elif proficiency == "advanced":
advanced_count += 1
lines.append(f"## {discord_id}")
@@ -99,10 +101,11 @@ def generate_participants_md(non_leader_ids, verified_usernames, prof_by_id, ava
lines.append(f"**Notes**: {', '.join(notes)}")
lines.append("")
verified_count = len([d for d in non_leader_ids if d in verified_usernames])
summary = [
"# Cohort Participants",
"",
f"**Total Participants**: {len([id for id in non_leader_ids if id in verified_usernames])}",
f"**Total Participants**: {verified_count}",
"",
"### Proficiency Breakdown",
f"- Beginner: {beginner_count}",
@@ -110,11 +113,12 @@ def generate_participants_md(non_leader_ids, verified_usernames, prof_by_id, ava
f"- Advanced: {advanced_count}",
"",
"---",
""
"",
]
return "\n".join(summary + lines[6:])
def leadership_fit_label(score):
"""Convert leadership score to label"""
if score >= 6:
@@ -126,7 +130,10 @@ def leadership_fit_label(score):
else:
return "Needs Review"
def generate_leaders_md(leader_ids, verified_usernames, prof_by_id, avail_by_id, lead_by_id):
def generate_leaders_md(
leader_ids, verified_usernames, prof_by_id, avail_by_id, lead_by_id
):
"""Generate leaders.md for leadership candidates"""
verified_leaders = [id for id in leader_ids if id in verified_usernames]
@@ -136,10 +143,14 @@ def generate_leaders_md(leader_ids, verified_usernames, prof_by_id, avail_by_id,
f"**Total Leaders**: {len(verified_leaders)}",
"",
"---",
""
"",
]
sorted_leaders = sorted(verified_leaders, key=lambda x: lead_by_id.get(x, {}).get('leadership_score', 0), reverse=True)
sorted_leaders = sorted(
verified_leaders,
key=lambda x: lead_by_id.get(x, {}).get("leadership_score", 0),
reverse=True,
)
for discord_id in sorted_leaders:
username = verified_usernames.get(discord_id, "Unknown")
@@ -147,18 +158,19 @@ def generate_leaders_md(leader_ids, verified_usernames, prof_by_id, avail_by_id,
avail = avail_by_id.get(discord_id, {})
lead = lead_by_id.get(discord_id, {})
proficiency = prof.get('final_proficiency', 'unknown')
tech_stack = prof.get('tech_stack', [])
blocks = avail.get('available_blocks', [])
proficiency = prof.get("final_proficiency", "unknown")
tech_stack = prof.get("tech_stack", [])
blocks = avail.get("available_blocks", [])
leadership_score = lead.get('leadership_score', 0)
leadership_fit = lead.get('leadership_fit', 'unknown')
leadership_notes = lead.get('notes', [])
prof_notes = prof.get('notes', [])
leadership_score = lead.get("leadership_score", 0)
leadership_fit = lead.get("leadership_fit", "unknown")
leadership_notes = lead.get("notes", [])
prof_notes = prof.get("notes", [])
lines.append(f"## {discord_id}")
lines.append(f"**Username**: @{username}")
lines.append(f"**Leadership Fit**: {leadership_fit.capitalize()} (Score: {leadership_score})")
fit = leadership_fit.capitalize()
lines.append(f"**Leadership Fit**: {fit} (Score: {leadership_score})")
lines.append(f"**Technical Proficiency**: {proficiency.capitalize()}")
lines.append(f"**Tech Stack**: {format_tech_stack(tech_stack)}")
lines.append(f"**Availability**: {format_availability_blocks(blocks)}")
@@ -168,9 +180,21 @@ def generate_leaders_md(leader_ids, verified_usernames, prof_by_id, avail_by_id,
lines.append(f"**Technical Notes**: {', '.join(prof_notes)}")
lines.append("")
excellent = sum(1 for id in verified_leaders if lead_by_id.get(id, {}).get('leadership_fit') == 'excellent')
good = sum(1 for id in verified_leaders if lead_by_id.get(id, {}).get('leadership_fit') == 'good')
adequate = sum(1 for id in verified_leaders if lead_by_id.get(id, {}).get('leadership_fit') == 'adequate')
excellent = sum(
1
for id in verified_leaders
if lead_by_id.get(id, {}).get("leadership_fit") == "excellent"
)
good = sum(
1
for id in verified_leaders
if lead_by_id.get(id, {}).get("leadership_fit") == "good"
)
adequate = sum(
1
for id in verified_leaders
if lead_by_id.get(id, {}).get("leadership_fit") == "adequate"
)
summary = [
"# Cohort Leaders",
@@ -183,11 +207,12 @@ def generate_leaders_md(leader_ids, verified_usernames, prof_by_id, avail_by_id,
f"- Adequate: {adequate}",
"",
"---",
""
"",
]
return "\n".join(summary + lines[6:])
def main():
verification, proficiency, availability, candidates, leadership = load_all_data()
@@ -195,22 +220,27 @@ def main():
verification, proficiency, availability, leadership
)
leader_ids = set(candidates['leaders'])
non_leader_ids = set(candidates['non_leaders'])
leader_ids = set(candidates["leaders"])
non_leader_ids = set(candidates["non_leaders"])
verified_ids = set(verified_usernames.keys())
leader_ids = leader_ids & verified_ids
non_leader_ids = non_leader_ids & verified_ids
participants_md = generate_participants_md(non_leader_ids, verified_usernames, prof_by_id, avail_by_id)
with open('participants.md', 'w') as f:
participants_md = generate_participants_md(
non_leader_ids, verified_usernames, prof_by_id, avail_by_id
)
with open("participants.md", "w") as f:
f.write(participants_md)
print(f"Generated participants.md with {len(non_leader_ids)} participants")
leaders_md = generate_leaders_md(leader_ids, verified_usernames, prof_by_id, avail_by_id, lead_by_id)
with open('leaders.md', 'w') as f:
leaders_md = generate_leaders_md(
leader_ids, verified_usernames, prof_by_id, avail_by_id, lead_by_id
)
with open("leaders.md", "w") as f:
f.write(leaders_md)
print(f"Generated leaders.md with {len(leader_ids)} leaders")
if __name__ == "__main__":
main()
+3 -3
View File
@@ -1,10 +1,10 @@
from datetime import datetime, timedelta
import json
from datetime import datetime, timedelta
# Generate hourly time slots from Feb 1 to March 3, 2026
# 24 hours a day, America/Los_Angeles timezone
start_date = datetime(2026, 2, 1, 0, 0) # Feb 1, 2026, midnight
end_date = datetime(2026, 3, 3, 23, 0) # March 3, 2026, 11pm
end_date = datetime(2026, 3, 3, 23, 0) # March 3, 2026, 11pm
times = []
current = start_date
@@ -18,7 +18,7 @@ print(f"First: {times[0]}")
print(f"Last: {times[-1]}")
# Save to file for use
with open('/home/naomi/docs/cohort/crabfit_timeslots.json', 'w') as f:
with open("/home/naomi/docs/cohort/crabfit_timeslots.json", "w") as f:
json.dump(times, f)
print("Saved to crabfit_timeslots.json")
+89 -29
View File
@@ -1,6 +1,7 @@
import json
import os
import time
import requests
# Amari's bot token
@@ -12,32 +13,75 @@ MESSAGE_IDS_FILE = "team_message_ids.json"
# Team channel IDs and role IDs
TEAMS = {
"Jade Jasmine": {"channel_id": "1464316501573107886", "role_id": "1464314923780931677"},
"Crimson Dahlia": {"channel_id": "1464316744909852682", "role_id": "1464315093402784015"},
"Rose Camellia": {"channel_id": "1464316751268286611", "role_id": "1464315098452726106"},
"Amber Wisteria": {"channel_id": "1464316761410113641", "role_id": "1464315105264275600"},
"Ivory Orchid": {"channel_id": "1464316770889240730", "role_id": "1464315109873684593"},
"Teal Iris": {"channel_id": "1464316776459407448", "role_id": "1464315114378498152"},
"Peach Gardenia": {"channel_id": "1464316785040953543", "role_id": "1464315118904152107"},
"Violet Carnation": {"channel_id": "1464316805261824032", "role_id": "1464315124251754559"},
"Azure Lotus": {"channel_id": "1464316814455472139", "role_id": "1464315128437801177"},
"Coral Sunflower": {"channel_id": "1464316819711066263", "role_id": "1464315132896088168"},
"Indigo Tulip": {"channel_id": "1464316826384072925", "role_id": "1464315138428633241"},
"Scarlet Hydrangea": {"channel_id": "1464316839306985506", "role_id": "1464315142710890520"},
"Mint Narcissus": {"channel_id": "1464316844251807952", "role_id": "1464315149203804405"},
"Sage Marigold": {"channel_id": "1464316850669093040", "role_id": "1464315153599299803"},
"Jade Jasmine": {
"channel_id": "1464316501573107886",
"role_id": "1464314923780931677",
},
"Crimson Dahlia": {
"channel_id": "1464316744909852682",
"role_id": "1464315093402784015",
},
"Rose Camellia": {
"channel_id": "1464316751268286611",
"role_id": "1464315098452726106",
},
"Amber Wisteria": {
"channel_id": "1464316761410113641",
"role_id": "1464315105264275600",
},
"Ivory Orchid": {
"channel_id": "1464316770889240730",
"role_id": "1464315109873684593",
},
"Teal Iris": {
"channel_id": "1464316776459407448",
"role_id": "1464315114378498152",
},
"Peach Gardenia": {
"channel_id": "1464316785040953543",
"role_id": "1464315118904152107",
},
"Violet Carnation": {
"channel_id": "1464316805261824032",
"role_id": "1464315124251754559",
},
"Azure Lotus": {
"channel_id": "1464316814455472139",
"role_id": "1464315128437801177",
},
"Coral Sunflower": {
"channel_id": "1464316819711066263",
"role_id": "1464315132896088168",
},
"Indigo Tulip": {
"channel_id": "1464316826384072925",
"role_id": "1464315138428633241",
},
"Scarlet Hydrangea": {
"channel_id": "1464316839306985506",
"role_id": "1464315142710890520",
},
"Mint Narcissus": {
"channel_id": "1464316844251807952",
"role_id": "1464315149203804405",
},
"Sage Marigold": {
"channel_id": "1464316850669093040",
"role_id": "1464315153599299803",
},
}
# Load team assignments and convert to dict by team name
with open("team_assignments.json", "r") as f:
with open("team_assignments.json") as f:
team_list = json.load(f)
team_data = {team["name"]: team for team in team_list}
# Load applicants to get project_url by discord_id
with open("applicants_to_evaluate.json", "r") as f:
with open("applicants_to_evaluate.json") as f:
applicants = json.load(f)
applicant_lookup = {str(a["discord_id"]): a for a in applicants}
def extract_github_username(url):
"""Extract GitHub username from various URL formats"""
if not url:
@@ -64,19 +108,26 @@ def extract_github_username(url):
# Handle standard GitHub URLs
if "github.com" in url:
# Remove protocol and github.com
path = url.replace("https://", "").replace("http://", "").replace("github.com/", "")
path = (
url.replace("https://", "")
.replace("http://", "")
.replace("github.com/", "")
)
# Get just the username (first part of path)
username = path.split("/")[0]
return username
return url
def build_message(team_name, role_id, leader_ids, participant_ids):
"""Build the welcome message for a team"""
lines = [
f"# {team_name}",
"",
f"Welcome, <@&{role_id}>. This is your private team channel — a space for you to collaborate, support one another, and build something meaningful together.",
f"Welcome, <@&{role_id}>. This is your private team channel — a space "
"for you to collaborate, support one another, and build something "
"meaningful together.",
"",
"## Roster",
"",
@@ -105,19 +156,21 @@ def build_message(team_name, role_id, leader_ids, participant_ids):
return "\n".join(lines)
def send_message(channel_id, content):
"""Send a message to a channel"""
url = f"https://discord.com/api/v10/channels/{channel_id}/messages"
headers = {
"Authorization": f"Bot {TOKEN}",
"Content-Type": "application/json"
}
headers = {"Authorization": f"Bot {TOKEN}", "Content-Type": "application/json"}
data = {"content": content}
response = requests.post(url, headers=headers, json=data)
if response.status_code == 429:
retry_after = float(response.headers.get("Retry-After", response.headers.get("X-RateLimit-Reset-After", 1)))
retry_after = float(
response.headers.get(
"Retry-After", response.headers.get("X-RateLimit-Reset-After", 1)
)
)
print(f"Rate limited, waiting {retry_after}s...")
time.sleep(retry_after)
return send_message(channel_id, content)
@@ -128,6 +181,7 @@ def send_message(channel_id, content):
print(f"Error sending message: {response.status_code} - {response.text}")
return None
def pin_message(channel_id, message_id):
"""Pin a message in a channel"""
url = f"https://discord.com/api/v10/channels/{channel_id}/pins/{message_id}"
@@ -138,13 +192,18 @@ def pin_message(channel_id, message_id):
response = requests.put(url, headers=headers)
if response.status_code == 429:
retry_after = float(response.headers.get("Retry-After", response.headers.get("X-RateLimit-Reset-After", 1)))
retry_after = float(
response.headers.get(
"Retry-After", response.headers.get("X-RateLimit-Reset-After", 1)
)
)
print(f"Rate limited, waiting {retry_after}s...")
time.sleep(retry_after)
return pin_message(channel_id, message_id)
return response.status_code == 204
def main():
message_ids = {}
@@ -170,18 +229,18 @@ def main():
message_ids[team_name] = {
"channel_id": channel_id,
"message_id": message_id,
"role_id": role_id
"role_id": role_id,
}
print(f" Message sent! ID: {message_id}")
# Pin the message
print(f" Pinning message...")
print(" Pinning message...")
if pin_message(channel_id, message_id):
print(f" Pinned!")
print(" Pinned!")
else:
print(f" Failed to pin")
print(" Failed to pin")
else:
print(f" Failed to send message")
print(" Failed to send message")
# Small delay between teams
time.sleep(0.2)
@@ -193,5 +252,6 @@ def main():
print(f"\nDone! Message IDs saved to {MESSAGE_IDS_FILE}")
print(f"Successfully sent and pinned messages for {len(message_ids)} teams")
if __name__ == "__main__":
main()
+18 -16
View File
@@ -1,8 +1,8 @@
import json
import os
import time
import urllib.request
import urllib.error
import urllib.request
# Configuration
BOT_TOKEN = os.environ["DISCORD_BOT_TOKEN"]
@@ -10,7 +10,7 @@ GUILD_ID = "692816967895220344"
BASE_URL = "https://discord.com/api/v10"
# Read Discord IDs from table.md
with open("table.md", "r") as f:
with open("table.md") as f:
content = f.read()
lines = content.strip().split("\n")
@@ -33,7 +33,7 @@ headers = [h.strip() for h in header_line.split("|")[1:-1]]
discord_idx = 0 # Discord ID is the first column
discord_ids = []
for line in lines[header_idx + 2:]: # Skip header and separator
for line in lines[header_idx + 2 :]: # Skip header and separator
if not line.startswith("|"):
continue
cols = [c.strip() for c in line.split("|")[1:-1]]
@@ -59,15 +59,17 @@ for i, discord_id in enumerate(discord_ids):
data = json.loads(response.read().decode())
username = data.get("user", {}).get("username", "Unknown")
verified.append((discord_id, username))
print(f"[{i+1}/{len(discord_ids)}] ✓ {discord_id} - {username}")
print(f"[{i + 1}/{len(discord_ids)}] ✓ {discord_id} - {username}")
except urllib.error.HTTPError as e:
if e.code == 404:
missing.append(discord_id)
print(f"[{i+1}/{len(discord_ids)}] ✗ {discord_id} - NOT IN SERVER")
print(f"[{i + 1}/{len(discord_ids)}] ✗ {discord_id} - NOT IN SERVER")
elif e.code == 429:
# Rate limited - wait and retry
retry_after = json.loads(e.read().decode()).get("retry_after", 1)
print(f"[{i+1}/{len(discord_ids)}] Rate limited, waiting {retry_after}s...")
print(
f"[{i + 1}/{len(discord_ids)}] Rate limited, waiting {retry_after}s..."
)
time.sleep(retry_after + 0.5)
# Retry
try:
@@ -77,32 +79,32 @@ for i, discord_id in enumerate(discord_ids):
data = json.loads(response.read().decode())
username = data.get("user", {}).get("username", "Unknown")
verified.append((discord_id, username))
print(f"[{i+1}/{len(discord_ids)}] ✓ {discord_id} - {username} (after retry)")
msg = f"[{i + 1}/{len(discord_ids)}] ✓ {discord_id}"
print(f"{msg} - {username} (after retry)")
except urllib.error.HTTPError as e2:
if e2.code == 404:
missing.append(discord_id)
print(f"[{i+1}/{len(discord_ids)}] ✗ {discord_id} - NOT IN SERVER (after retry)")
msg = f"[{i + 1}/{len(discord_ids)}] ✗ {discord_id}"
print(f"{msg} - NOT IN SERVER (after retry)")
else:
errors.append((discord_id, f"HTTP {e2.code}"))
print(f"[{i+1}/{len(discord_ids)}] ? {discord_id} - Error {e2.code}")
print(
f"[{i + 1}/{len(discord_ids)}] ? {discord_id} - Error {e2.code}"
)
else:
errors.append((discord_id, f"HTTP {e.code}"))
print(f"[{i+1}/{len(discord_ids)}] ? {discord_id} - Error {e.code}")
print(f"[{i + 1}/{len(discord_ids)}] ? {discord_id} - Error {e.code}")
# Small delay to avoid rate limits
time.sleep(0.1)
print(f"\n=== SUMMARY ===")
print("\n=== SUMMARY ===")
print(f"Verified: {len(verified)}")
print(f"Missing: {len(missing)}")
print(f"Errors: {len(errors)}")
# Save results
with open("discord_verification.json", "w") as f:
json.dump({
"verified": verified,
"missing": missing,
"errors": errors
}, f, indent=2)
json.dump({"verified": verified, "missing": missing, "errors": errors}, f, indent=2)
print("\nResults saved to discord_verification.json")
+22 -2
View File
@@ -57,8 +57,28 @@ select = [
ignore = [
# Missing docstrings
"D100", "D101", "D102", "D103", "D104", "D105", "D106", "D107",
# Let's not require module docstrings for scripts
"D100",
# Allow print statements in scripts
"T201",
# Docstring punctuation - not critical for scripts
"D415",
# Magic values - acceptable in simple scripts
"PLR2004",
# Loop variable overwritten - common pattern
"PLW2901",
# Use sys.exit instead of exit - not critical
"PLR1722",
# Collapsible if statements - readability preference
"PLR5501",
# zip strict - not critical for scripts
"B905",
# Docstring summary line spacing - not critical
"D205",
# Function complexity - acceptable for scripts
"PLR0912", "PLR0915",
# Datetime timezone - scripts use local context
"DTZ001",
# Ambiguous variable names - context makes it clear
"E741",
]
[tool.ruff.lint.pydocstyle]
-165
View File
@@ -1,165 +0,0 @@
import { select } from "@inquirer/prompts";
import { execSync } from "child_process";
import { readdirSync, statSync, existsSync } from "fs";
import { join, relative } from "path";
interface ScriptOption {
name: string;
value: string;
description?: string;
}
const getTypeScriptCategories = (): string[] => {
const srcPath = join(__dirname, "..");
const entries = readdirSync(srcPath);
return entries
.filter((entry) => {
const fullPath = join(srcPath, entry);
return statSync(fullPath).isDirectory() && entry !== "utils" && entry !== "interfaces";
})
.sort();
};
const getTypeScriptScripts = (category: string): ScriptOption[] => {
const categoryPath = join(__dirname, "..", category);
const scripts: ScriptOption[] = [];
const walkDirectory = (dir: string) => {
const entries = readdirSync(dir);
for (const entry of entries) {
const fullPath = join(dir, entry);
const stat = statSync(fullPath);
if (stat.isDirectory()) {
walkDirectory(fullPath);
} else if (entry.endsWith(".ts") && entry !== "index.ts") {
const relativePath = relative(join(__dirname, ".."), fullPath);
scripts.push({
name: entry.replace(".ts", ""),
value: relativePath,
description: relativePath,
});
}
}
};
walkDirectory(categoryPath);
return scripts.sort((a, b) => a.name.localeCompare(b.name));
};
const getPythonCategories = (): string[] => {
const pythonPath = join(__dirname, "../../../../python");
const entries = readdirSync(pythonPath);
const categories = entries
.filter((entry) => {
const fullPath = join(pythonPath, entry);
return statSync(fullPath).isDirectory() &&
!entry.startsWith(".") &&
entry !== "__pycache__";
})
.sort();
// Also check for scripts in the root
const hasRootScripts = entries.some(entry => entry.endsWith(".py"));
if (hasRootScripts) {
categories.unshift("(root)");
}
return categories;
};
const getPythonScripts = (category: string): ScriptOption[] => {
const pythonPath = join(__dirname, "../../../../python");
const searchPath = category === "(root)" ? pythonPath : join(pythonPath, category);
const scripts: ScriptOption[] = [];
const entries = readdirSync(searchPath);
for (const entry of entries) {
if (entry.endsWith(".py") && !entry.startsWith("__")) {
const relativePath = category === "(root)" ? entry : join(category, entry);
scripts.push({
name: entry.replace(".py", ""),
value: relativePath,
description: relativePath,
});
}
}
return scripts.sort((a, b) => a.name.localeCompare(b.name));
};
const main = async () => {
console.log("🌸 Welcome to Ephemere Script Runner! 💖\n");
// Select language
const language = await select({
message: "Which language would you like to run?",
choices: [
{ name: "TypeScript", value: "typescript", description: "Run a TypeScript script" },
{ name: "Python", value: "python", description: "Run a Python script" },
],
});
// Get categories based on language
const categories = language === "typescript"
? getTypeScriptCategories()
: getPythonCategories();
if (categories.length === 0) {
console.error(`No categories found for ${language}!`);
process.exit(1);
}
// Select category
const category = await select({
message: "Which category?",
choices: categories.map(cat => ({
name: cat === "(root)" ? "Root Directory" : cat.charAt(0).toUpperCase() + cat.slice(1),
value: cat,
})),
});
// Get scripts for selected category
const scripts = language === "typescript"
? getTypeScriptScripts(category)
: getPythonScripts(category);
if (scripts.length === 0) {
console.error(`No scripts found in ${category}!`);
process.exit(1);
}
// Select script
const script = await select({
message: "Which script would you like to run?",
choices: scripts,
});
// Build and execute the command
const prodEnvPath = join(__dirname, "../../../../../prod.env");
let command: string;
if (language === "typescript") {
command = `cd ${join(__dirname, "../../../")} && op run --env-file=${prodEnvPath} -- pnpm exec tsx src/${script}`;
} else {
command = `cd ${join(__dirname, "../../../../python")} && op run --env-file=${prodEnvPath} -- uv run python ${script}`;
}
console.log(`\n✨ Running: ${script}\n`);
try {
execSync(command, {
stdio: "inherit",
shell: true,
});
} catch (error) {
console.error("\n❌ Script execution failed!");
process.exit(1);
}
};
main().catch(console.error);
+210
View File
@@ -0,0 +1,210 @@
/**
* @file Interactive script runner for ephemere project.
* @copyright 2025 Naomi Carrigan
* @license Naomi's Public License
* @author Naomi Carrigan
*/
import { execSync } from "node:child_process";
import { readdirSync, statSync } from "node:fs";
import { dirname, join, relative } from "node:path";
import { fileURLToPath } from "node:url";
import { select } from "@inquirer/prompts";
const currentFilename = fileURLToPath(import.meta.url);
const currentDirectory = dirname(currentFilename);
interface ScriptOption {
name: string;
value: string;
description?: string;
}
const getTypeScriptCategories = (): Array<string> => {
const sourcePath = join(currentDirectory, "..");
const entries = readdirSync(sourcePath);
return entries.
filter((entry) => {
const fullPath = join(sourcePath, entry);
const entryIsDirectory = statSync(fullPath).isDirectory();
return entryIsDirectory && entry !== "utils" && entry !== "interfaces";
}).
sort((a, b) => {
return a.localeCompare(b);
});
};
const getTypeScriptScripts = (category: string): Array<ScriptOption> => {
const categoryPath = join(currentDirectory, "..", category);
const scripts: Array<ScriptOption> = [];
const walkDirectory = (directory: string): void => {
const entries = readdirSync(directory);
for (const entry of entries) {
const fullPath = join(directory, entry);
const stat = statSync(fullPath);
if (stat.isDirectory()) {
walkDirectory(fullPath);
} else if (entry.endsWith(".ts") && entry !== "index.ts") {
const relativePath = relative(join(currentDirectory, ".."), fullPath);
scripts.push({
description: relativePath,
name: entry.replace(".ts", ""),
value: relativePath,
});
}
}
};
walkDirectory(categoryPath);
return scripts.sort((a, b) => {
return a.name.localeCompare(b.name);
});
};
const getPythonCategories = (): Array<string> => {
const pythonPath = join(currentDirectory, "../../../../python");
const entries = readdirSync(pythonPath);
const categories = entries.
filter((entry) => {
const fullPath = join(pythonPath, entry);
const entryIsDirectory = statSync(fullPath).isDirectory();
const isNotHidden = !entry.startsWith(".");
return entryIsDirectory && isNotHidden && entry !== "__pycache__";
}).
sort((a, b) => {
return a.localeCompare(b);
});
// Also check for scripts in the root
const hasRootScripts = entries.some((entry) => {
return entry.endsWith(".py");
});
if (hasRootScripts) {
categories.unshift("(root)");
}
return categories;
};
const getPythonScripts = (category: string): Array<ScriptOption> => {
const pythonPath = join(currentDirectory, "../../../../python");
const searchPath = category === "(root)"
? pythonPath
: join(pythonPath, category);
const scripts: Array<ScriptOption> = [];
const entries = readdirSync(searchPath);
for (const entry of entries) {
if (entry.endsWith(".py") && !entry.startsWith("__")) {
const relativePath = category === "(root)"
? entry
: join(category, entry);
scripts.push({
description: relativePath,
name: entry.replace(".py", ""),
value: relativePath,
});
}
}
return scripts.sort((a, b) => {
return a.name.localeCompare(b.name);
});
};
const selectLanguage = async(): Promise<string> => {
return await select({
choices: [
{
description: "Run a TypeScript script",
name: "TypeScript",
value: "typescript",
},
{
description: "Run a Python script",
name: "Python",
value: "python",
},
],
message: "Which language would you like to run?",
});
};
const selectCategory = async(categories: Array<string>): Promise<string> => {
return await select({
choices: categories.map((cat) => {
return {
name: cat === "(root)"
? "Root Directory"
: cat.charAt(0).toUpperCase() + cat.slice(1),
value: cat,
};
}),
message: "Which category?",
});
};
const buildCommand = (language: string, script: string): string => {
const environmentPath = join(currentDirectory, "../../../../../prod.env");
const typescriptDirectory = join(currentDirectory, "../../../");
const pythonDirectory = join(currentDirectory, "../../../../python");
return language === "typescript"
? `cd ${typescriptDirectory} && op run --env-file=${environmentPath} -- pnpm exec tsx src/${script}`
: `cd ${pythonDirectory} && op run --env-file=${environmentPath} -- uv run python ${script}`;
};
const executeScript = (script: string, command: string): void => {
console.log(`\n✨ Running: ${script}\n`);
try {
execSync(command, {
shell: "/bin/bash",
stdio: "inherit",
});
} catch {
console.error("\n❌ Script execution failed!");
process.exit(1);
}
};
const main = async(): Promise<void> => {
console.log("🌸 Welcome to Ephemere Script Runner! 💖\n");
const language = await selectLanguage();
const categories = language === "typescript"
? getTypeScriptCategories()
: getPythonCategories();
if (categories.length === 0) {
console.error(`No categories found for ${language}!`);
process.exit(1);
}
const category = await selectCategory(categories);
const scripts = language === "typescript"
? getTypeScriptScripts(category)
: getPythonScripts(category);
if (scripts.length === 0) {
console.error(`No scripts found in ${category}!`);
process.exit(1);
}
const script = await select({
choices: scripts,
message: "Which script would you like to run?",
});
const command = buildCommand(language, script);
executeScript(script, command);
};
await main();