Files
ephemere/python/cohort/discord_activity_checker.py
T
naomi f5e8deca59
CI / dependency-pin-check-typescript (push) Successful in 5s
CI / dependency-pin-check-python (push) Successful in 4s
Security Scan and Upload / Security & DefectDojo Upload (push) Successful in 54s
CI / python (push) Successful in 9m24s
CI / typescript (push) Successful in 9m41s
feat: add remaining cohort scripts (#4)
### Explanation

_No response_

### Issue

_No response_

### Attestations

- [ ] I have read and agree to the [Code of Conduct](https://docs.nhcarrigan.com/community/coc/)
- [ ] I have read and agree to the [Community Guidelines](https://docs.nhcarrigan.com/community/guide/).
- [ ] My contribution complies with the [Contributor Covenant](https://docs.nhcarrigan.com/dev/covenant/).

### Dependencies

- [ ] I have pinned the dependencies to a specific patch version.

### Style

- [ ] I have run the linter and resolved any errors.
- [ ] My pull request uses an appropriate title, matching the conventional commit standards.
- [ ] My scope of feat/fix/chore/etc. correctly matches the nature of changes in my pull request.

### Tests

- [ ] My contribution adds new code, and I have added tests to cover it.
- [ ] My contribution modifies existing code, and I have updated the tests to reflect these changes.
- [ ] All new and existing tests pass locally with my changes.
- [ ] Code coverage remains at or above the configured threshold.

### Documentation

_No response_

### Versioning

_No response_

Reviewed-on: #4
Co-authored-by: Naomi Carrigan <commits@nhcarrigan.com>
Co-committed-by: Naomi Carrigan <commits@nhcarrigan.com>
2026-02-02 12:36:44 -08:00

252 lines
8.6 KiB
Python
Executable File

#!/usr/bin/env python3
"""Discord Team Activity Checker
Checks for team members who haven't sent messages in their channels within 36 hours
"""
import asyncio
import json
import os
import sys
from datetime import datetime, timedelta, timezone
import aiohttp
# Configuration
DISCORD_TOKEN = os.environ["DISCORD_BOT_TOKEN"]
DISCORD_API_BASE = "https://discord.com/api/v10"
INACTIVE_THRESHOLD_HOURS = 36
# Load team assignments from file
with open("team_assignments.json") as f:
team_data = json.load(f)
# Build TEAMS dictionary with channel IDs and member lists
TEAMS = {}
CHANNEL_IDS = {
"Jade Jasmine": "1464316501573107886",
"Crimson Dahlia": "1464316744909852682",
"Rose Camellia": "1464316751268286611",
"Amber Wisteria": "1464316761410113641",
"Ivory Orchid": "1464316770889240730",
"Teal Iris": "1464316776459407448",
"Peach Gardenia": "1464316785040953543",
"Violet Carnation": "1464316805261824032",
"Azure Lotus": "1464316814455472139",
"Coral Sunflower": "1464316819711066263",
"Indigo Tulip": "1464316826384072925",
"Scarlet Hydrangea": "1464316839306985506",
"Mint Narcissus": "1464316844251807952",
"Sage Marigold": "1464316850669093040",
}
for team in team_data:
team_name = team["name"]
if team_name in CHANNEL_IDS:
TEAMS[team_name] = {
"channel_id": CHANNEL_IDS[team_name],
"member_ids": team["leaders"] + team["participants"],
}
class DiscordActivityChecker:
def __init__(self, token: str):
self.token = token
self.headers = {
"Authorization": f"Bot {token}",
"Content-Type": "application/json",
}
self.session = None
self.cutoff_time = datetime.now(timezone.utc) - timedelta(
hours=INACTIVE_THRESHOLD_HOURS
)
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def get_user_info(self, user_id: str) -> dict:
"""Get information about a specific user"""
url = f"{DISCORD_API_BASE}/users/{user_id}"
async with self.session.get(url, headers=self.headers) as response:
if response.status == 429:
# Rate limited, wait a bit
await asyncio.sleep(1)
return None
elif response.status != 200:
print(f"Failed to get user {user_id}: {response.status}")
return None
user_data = await response.json()
return user_data
async def get_recent_message_authors(self, channel_id: str) -> set[str]:
"""Get user IDs of everyone who sent a message in the last 36 hours"""
url = f"{DISCORD_API_BASE}/channels/{channel_id}/messages?limit=100"
active_users = set()
async with self.session.get(url, headers=self.headers) as response:
if response.status != 200:
print(
f"Failed to get messages for channel {channel_id}: "
f"{response.status}"
)
return active_users
messages = await response.json()
for message in messages:
# Parse message timestamp
timestamp = datetime.fromisoformat(
message["timestamp"].replace("Z", "+00:00")
)
# If message is within our threshold, add the author
if timestamp > self.cutoff_time:
active_users.add(message["author"]["id"])
else:
# Messages are returned newest first, so we can break early
break
return active_users
async def send_message_to_channel(self, channel_id: str, content: str) -> bool:
"""Send a message to a Discord channel"""
url = f"{DISCORD_API_BASE}/channels/{channel_id}/messages"
data = {"content": content}
async with self.session.post(url, headers=self.headers, json=data) as response:
if response.status == 200:
print(f"✅ Message sent to channel {channel_id}")
return True
else:
print(
f"❌ Failed to send message to channel {channel_id}: "
f"{response.status}"
)
return False
async def check_team_activity(
self, team_name: str, channel_id: str, member_ids: list[str]
) -> dict:
"""Check activity for a specific team"""
print(f"Checking {team_name}...")
# Get active users in the channel
active_users = await self.get_recent_message_authors(channel_id)
# Find inactive members
inactive_members = []
total_members = len(member_ids)
for member_id in member_ids:
if member_id not in active_users:
# Get user information
user_info = await self.get_user_info(member_id)
if user_info:
# Skip bots
if user_info.get("bot", False):
total_members -= 1
continue
inactive_members.append(
{"id": member_id, "mention": f"<@{member_id}>"}
)
else:
# If we can't get user info, still track them as inactive
inactive_members.append(
{"id": member_id, "mention": f"<@{member_id}>"}
)
return {
"team": team_name,
"total_members": total_members,
"inactive_members": inactive_members,
"inactive_count": len(inactive_members),
}
async def main():
"""Main function to check all teams"""
print(
f"Discord Activity Checker - Checking for inactivity over "
f"{INACTIVE_THRESHOLD_HOURS} hours"
)
cutoff = datetime.now(timezone.utc) - timedelta(hours=INACTIVE_THRESHOLD_HOURS)
print(f"Cutoff time: {cutoff}")
print("-" * 80)
async with DiscordActivityChecker(DISCORD_TOKEN) as checker:
results = []
for team_name, team_info in TEAMS.items():
channel_id = team_info["channel_id"]
member_ids = team_info["member_ids"]
result = await checker.check_team_activity(
team_name, channel_id, member_ids
)
results.append(result)
# Display results
print("\n" + "=" * 80)
print("INACTIVE MEMBERS REPORT")
print("=" * 80)
# Check if user wants to send messages (via command line arg)
send_messages = "--send" in sys.argv
for team_result in results:
print(f"\n📋 {team_result['team']}")
print(f" Total Members: {team_result['total_members']}")
print(f" Inactive: {team_result['inactive_count']}")
if team_result["inactive_members"]:
print(" Inactive Members:")
for member in team_result["inactive_members"]:
print(f" - {member['mention']}")
# Send message to team channel if requested
if send_messages and team_result["inactive_count"] > 0:
channel_id = TEAMS[team_result["team"]]["channel_id"]
# Build message
mentions = "\n".join(
[m["mention"] for m in team_result["inactive_members"]]
)
message = (
"Good morning, the following people have not sent a "
"message here in the last 36 hours. If you are on this "
"list, please confirm you are still participating.\n\n"
f"{mentions}"
)
await checker.send_message_to_channel(channel_id, message)
await asyncio.sleep(1) # Small delay between messages
else:
print(" ✅ All members are active!")
print("\n" + "=" * 80)
# Save results to JSON
with open("discord_activity_report.json", "w") as f:
json.dump(
{
"generated_at": datetime.now(timezone.utc).isoformat(),
"threshold_hours": INACTIVE_THRESHOLD_HOURS,
"results": results,
},
f,
indent=2,
)
print("\n📄 Detailed report saved to discord_activity_report.json")
if __name__ == "__main__":
asyncio.run(main())