generated from nhcarrigan/template
a40188413a
All Python cohort scripts now use DATA_DIR = Path(__file__).parent.parent.parent / "data" to correctly resolve the repo-root data/ directory regardless of the working directory set by run.sh. All TypeScript scripts have expanded JSDoc headers documenting data file requirements and environment variables.
212 lines
6.7 KiB
Python
212 lines
6.7 KiB
Python
"""Analyse applicant availability from a markdown table and produce UTC block stats.
|
|
|
|
Reads a markdown table of availability responses and a Discord verification file,
|
|
then produces a JSON summary of coverage across morning/afternoon/evening UTC blocks
|
|
for each day of the week.
|
|
|
|
Data files (place in data/):
|
|
- table.md Markdown table of applicant availability responses
|
|
- discord_verification.json Discord ID verification results (from verify_discord.py)
|
|
|
|
Outputs (written to data/):
|
|
- availability_analysis.json UTC block distribution per applicant
|
|
|
|
Env vars:
|
|
- None
|
|
"""
|
|
|
|
import json
|
|
import re
|
|
from collections import defaultdict
|
|
from pathlib import Path
|
|
|
|
DATA_DIR = Path(__file__).parent.parent.parent / "data"
|
|
|
|
DAYS = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]
|
|
|
|
UTC_BLOCKS = {
|
|
"mornings": (6, 12), # 06:00 - 12:00 UTC
|
|
"afternoons": (12, 18), # 12:00 - 18:00 UTC
|
|
"evenings": (18, 24), # 18:00 - 00:00 UTC
|
|
"nights": (0, 6), # 00:00 - 06:00 UTC
|
|
}
|
|
|
|
|
|
def parse_utc_offset(timezone_str: str) -> float:
|
|
"""Extract UTC offset from timezone string like 'America/New_York (UTC-5)'"""
|
|
match = re.search(r"UTC([+-]?\d+(?::\d+)?)", timezone_str)
|
|
if match:
|
|
offset_str = match.group(1)
|
|
if ":" in offset_str:
|
|
parts = offset_str.split(":")
|
|
hours = int(parts[0])
|
|
minutes = int(parts[1]) if len(parts) > 1 else 0
|
|
if hours < 0:
|
|
return hours - minutes / 60
|
|
return hours + minutes / 60
|
|
return float(offset_str)
|
|
return 0
|
|
|
|
|
|
def parse_time_slots(time_str: str) -> list[tuple[int, int]]:
|
|
"""Parse time slots like '17:00-18:00' or '07:00-08:00; 19:00-20:00'"""
|
|
slots = []
|
|
if not time_str or time_str.lower() in ["n/a", "na", ""]:
|
|
return slots
|
|
|
|
parts = time_str.split(";")
|
|
for part in parts:
|
|
part = part.strip()
|
|
match = re.search(r"(\d{1,2}):(\d{2})\s*-\s*(\d{1,2}):(\d{2})", part)
|
|
if match:
|
|
start_hour = int(match.group(1))
|
|
end_hour = int(match.group(3))
|
|
slots.append((start_hour, end_hour))
|
|
return slots
|
|
|
|
|
|
def local_hour_to_utc(local_hour: int, utc_offset: float) -> int:
|
|
"""Convert local hour to UTC hour"""
|
|
utc_hour = local_hour - utc_offset
|
|
return int(utc_hour) % 24
|
|
|
|
|
|
def get_utc_blocks_for_hour(utc_hour: int) -> list[str]:
|
|
"""Determine which UTC block(s) an hour falls into"""
|
|
blocks = []
|
|
for block_name, (start, end) in UTC_BLOCKS.items():
|
|
if block_name == "nights":
|
|
if utc_hour >= 0 and utc_hour < 6:
|
|
blocks.append(block_name)
|
|
elif block_name == "evenings":
|
|
if utc_hour >= 18 and utc_hour < 24:
|
|
blocks.append(block_name)
|
|
elif utc_hour >= start and utc_hour < end:
|
|
blocks.append(block_name)
|
|
return blocks
|
|
|
|
|
|
def analyze_applicant_availability(timezone_str: str, day_slots: dict) -> dict:
|
|
"""Analyze availability for one applicant"""
|
|
utc_offset = parse_utc_offset(timezone_str)
|
|
|
|
block_counts = defaultdict(int)
|
|
all_utc_hours = set()
|
|
|
|
for day in DAYS:
|
|
slots = day_slots.get(day, [])
|
|
for start_hour, end_hour in slots:
|
|
for hour in range(start_hour, end_hour):
|
|
utc_hour = local_hour_to_utc(hour, utc_offset)
|
|
all_utc_hours.add(utc_hour)
|
|
blocks = get_utc_blocks_for_hour(utc_hour)
|
|
for block in blocks:
|
|
block_counts[block] += 1
|
|
|
|
available_blocks = []
|
|
for block in ["mornings", "afternoons", "evenings", "nights"]:
|
|
if block_counts[block] >= 3:
|
|
available_blocks.append(block)
|
|
|
|
return {
|
|
"utc_offset": utc_offset,
|
|
"timezone": timezone_str,
|
|
"available_blocks": available_blocks,
|
|
"block_counts": dict(block_counts),
|
|
"total_unique_utc_hours": len(all_utc_hours),
|
|
}
|
|
|
|
|
|
def parse_table_md() -> list[dict]:
|
|
"""Parse table.md and extract availability data"""
|
|
with open(DATA_DIR / "table.md") as f:
|
|
content = f.read()
|
|
|
|
lines = content.strip().split("\n")
|
|
|
|
header_idx = None
|
|
for i, line in enumerate(lines):
|
|
if line.startswith("| Discord ID"):
|
|
header_idx = i
|
|
break
|
|
|
|
if header_idx is None:
|
|
raise ValueError("Could not find table header")
|
|
|
|
headers = [h.strip() for h in lines[header_idx].split("|")[1:-1]]
|
|
|
|
applicants = []
|
|
for line in lines[header_idx + 2 :]:
|
|
if not line.startswith("|"):
|
|
continue
|
|
|
|
cells = [c.strip() for c in line.split("|")[1:-1]]
|
|
if len(cells) < len(headers):
|
|
continue
|
|
|
|
row = dict(zip(headers, cells))
|
|
applicants.append(row)
|
|
|
|
return applicants
|
|
|
|
|
|
def main():
|
|
with open(DATA_DIR / "discord_verification.json") as f:
|
|
verification = json.load(f)
|
|
|
|
verified_ids = {v[0] for v in verification["verified"]}
|
|
print(f"Verified applicants: {len(verified_ids)}")
|
|
|
|
applicants = parse_table_md()
|
|
print(f"Total applicants in table: {len(applicants)}")
|
|
|
|
availability_results = []
|
|
|
|
for applicant in applicants:
|
|
discord_id = applicant.get("Discord ID", "")
|
|
if discord_id not in verified_ids:
|
|
continue
|
|
|
|
timezone = applicant.get("Timezone", "")
|
|
|
|
day_slots = {}
|
|
for day in DAYS:
|
|
time_str = applicant.get(day, "")
|
|
day_slots[day] = parse_time_slots(time_str)
|
|
|
|
analysis = analyze_applicant_availability(timezone, day_slots)
|
|
|
|
availability_results.append(
|
|
{
|
|
"discord_id": discord_id,
|
|
"timezone": timezone,
|
|
"utc_offset": analysis["utc_offset"],
|
|
"available_blocks": analysis["available_blocks"],
|
|
"block_counts": analysis["block_counts"],
|
|
"total_unique_utc_hours": analysis["total_unique_utc_hours"],
|
|
}
|
|
)
|
|
|
|
with open(DATA_DIR / "availability_analysis.json", "w") as f:
|
|
json.dump(availability_results, f, indent=2)
|
|
|
|
block_distribution = defaultdict(int)
|
|
for result in availability_results:
|
|
for block in result["available_blocks"]:
|
|
block_distribution[block] += 1
|
|
|
|
print("\n=== AVAILABILITY ANALYSIS COMPLETE ===")
|
|
print(f"Analyzed: {len(availability_results)} applicants")
|
|
print("\nBlock Distribution (applicants available in each block):")
|
|
for block in ["mornings", "afternoons", "evenings", "nights"]:
|
|
print(f" {block.capitalize()}: {block_distribution[block]}")
|
|
|
|
no_blocks = sum(1 for r in availability_results if not r["available_blocks"])
|
|
print(f"\nApplicants with no clear block availability: {no_blocks}")
|
|
|
|
print("\nResults saved to availability_analysis.json")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|