generated from nhcarrigan/template
feat: add remaining cohort scripts (#4)
CI / dependency-pin-check-typescript (push) Successful in 5s
CI / dependency-pin-check-python (push) Successful in 4s
Security Scan and Upload / Security & DefectDojo Upload (push) Successful in 54s
CI / python (push) Successful in 9m24s
CI / typescript (push) Successful in 9m41s
CI / dependency-pin-check-typescript (push) Successful in 5s
CI / dependency-pin-check-python (push) Successful in 4s
Security Scan and Upload / Security & DefectDojo Upload (push) Successful in 54s
CI / python (push) Successful in 9m24s
CI / typescript (push) Successful in 9m41s
### Explanation _No response_ ### Issue _No response_ ### Attestations - [ ] I have read and agree to the [Code of Conduct](https://docs.nhcarrigan.com/community/coc/) - [ ] I have read and agree to the [Community Guidelines](https://docs.nhcarrigan.com/community/guide/). - [ ] My contribution complies with the [Contributor Covenant](https://docs.nhcarrigan.com/dev/covenant/). ### Dependencies - [ ] I have pinned the dependencies to a specific patch version. ### Style - [ ] I have run the linter and resolved any errors. - [ ] My pull request uses an appropriate title, matching the conventional commit standards. - [ ] My scope of feat/fix/chore/etc. correctly matches the nature of changes in my pull request. ### Tests - [ ] My contribution adds new code, and I have added tests to cover it. - [ ] My contribution modifies existing code, and I have updated the tests to reflect these changes. - [ ] All new and existing tests pass locally with my changes. - [ ] Code coverage remains at or above the configured threshold. ### Documentation _No response_ ### Versioning _No response_ Reviewed-on: #4 Co-authored-by: Naomi Carrigan <commits@nhcarrigan.com> Co-committed-by: Naomi Carrigan <commits@nhcarrigan.com>
This commit was merged in pull request #4.
This commit is contained in:
Executable
+251
@@ -0,0 +1,251 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Discord Team Activity Checker
|
||||
Checks for team members who haven't sent messages in their channels within 36 hours
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
import aiohttp
|
||||
|
||||
# Configuration
|
||||
DISCORD_TOKEN = os.environ["DISCORD_BOT_TOKEN"]
|
||||
DISCORD_API_BASE = "https://discord.com/api/v10"
|
||||
INACTIVE_THRESHOLD_HOURS = 36
|
||||
|
||||
# Load team assignments from file
|
||||
with open("team_assignments.json") as f:
|
||||
team_data = json.load(f)
|
||||
|
||||
# Build TEAMS dictionary with channel IDs and member lists
|
||||
TEAMS = {}
|
||||
CHANNEL_IDS = {
|
||||
"Jade Jasmine": "1464316501573107886",
|
||||
"Crimson Dahlia": "1464316744909852682",
|
||||
"Rose Camellia": "1464316751268286611",
|
||||
"Amber Wisteria": "1464316761410113641",
|
||||
"Ivory Orchid": "1464316770889240730",
|
||||
"Teal Iris": "1464316776459407448",
|
||||
"Peach Gardenia": "1464316785040953543",
|
||||
"Violet Carnation": "1464316805261824032",
|
||||
"Azure Lotus": "1464316814455472139",
|
||||
"Coral Sunflower": "1464316819711066263",
|
||||
"Indigo Tulip": "1464316826384072925",
|
||||
"Scarlet Hydrangea": "1464316839306985506",
|
||||
"Mint Narcissus": "1464316844251807952",
|
||||
"Sage Marigold": "1464316850669093040",
|
||||
}
|
||||
|
||||
for team in team_data:
|
||||
team_name = team["name"]
|
||||
if team_name in CHANNEL_IDS:
|
||||
TEAMS[team_name] = {
|
||||
"channel_id": CHANNEL_IDS[team_name],
|
||||
"member_ids": team["leaders"] + team["participants"],
|
||||
}
|
||||
|
||||
|
||||
class DiscordActivityChecker:
|
||||
def __init__(self, token: str):
|
||||
self.token = token
|
||||
self.headers = {
|
||||
"Authorization": f"Bot {token}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
self.session = None
|
||||
self.cutoff_time = datetime.now(timezone.utc) - timedelta(
|
||||
hours=INACTIVE_THRESHOLD_HOURS
|
||||
)
|
||||
|
||||
async def __aenter__(self):
|
||||
self.session = aiohttp.ClientSession()
|
||||
return self
|
||||
|
||||
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
||||
if self.session:
|
||||
await self.session.close()
|
||||
|
||||
async def get_user_info(self, user_id: str) -> dict:
|
||||
"""Get information about a specific user"""
|
||||
url = f"{DISCORD_API_BASE}/users/{user_id}"
|
||||
|
||||
async with self.session.get(url, headers=self.headers) as response:
|
||||
if response.status == 429:
|
||||
# Rate limited, wait a bit
|
||||
await asyncio.sleep(1)
|
||||
return None
|
||||
elif response.status != 200:
|
||||
print(f"Failed to get user {user_id}: {response.status}")
|
||||
return None
|
||||
|
||||
user_data = await response.json()
|
||||
return user_data
|
||||
|
||||
async def get_recent_message_authors(self, channel_id: str) -> set[str]:
|
||||
"""Get user IDs of everyone who sent a message in the last 36 hours"""
|
||||
url = f"{DISCORD_API_BASE}/channels/{channel_id}/messages?limit=100"
|
||||
active_users = set()
|
||||
|
||||
async with self.session.get(url, headers=self.headers) as response:
|
||||
if response.status != 200:
|
||||
print(
|
||||
f"Failed to get messages for channel {channel_id}: "
|
||||
f"{response.status}"
|
||||
)
|
||||
return active_users
|
||||
|
||||
messages = await response.json()
|
||||
|
||||
for message in messages:
|
||||
# Parse message timestamp
|
||||
timestamp = datetime.fromisoformat(
|
||||
message["timestamp"].replace("Z", "+00:00")
|
||||
)
|
||||
|
||||
# If message is within our threshold, add the author
|
||||
if timestamp > self.cutoff_time:
|
||||
active_users.add(message["author"]["id"])
|
||||
else:
|
||||
# Messages are returned newest first, so we can break early
|
||||
break
|
||||
|
||||
return active_users
|
||||
|
||||
async def send_message_to_channel(self, channel_id: str, content: str) -> bool:
|
||||
"""Send a message to a Discord channel"""
|
||||
url = f"{DISCORD_API_BASE}/channels/{channel_id}/messages"
|
||||
data = {"content": content}
|
||||
|
||||
async with self.session.post(url, headers=self.headers, json=data) as response:
|
||||
if response.status == 200:
|
||||
print(f"✅ Message sent to channel {channel_id}")
|
||||
return True
|
||||
else:
|
||||
print(
|
||||
f"❌ Failed to send message to channel {channel_id}: "
|
||||
f"{response.status}"
|
||||
)
|
||||
return False
|
||||
|
||||
async def check_team_activity(
|
||||
self, team_name: str, channel_id: str, member_ids: list[str]
|
||||
) -> dict:
|
||||
"""Check activity for a specific team"""
|
||||
print(f"Checking {team_name}...")
|
||||
|
||||
# Get active users in the channel
|
||||
active_users = await self.get_recent_message_authors(channel_id)
|
||||
|
||||
# Find inactive members
|
||||
inactive_members = []
|
||||
total_members = len(member_ids)
|
||||
|
||||
for member_id in member_ids:
|
||||
if member_id not in active_users:
|
||||
# Get user information
|
||||
user_info = await self.get_user_info(member_id)
|
||||
if user_info:
|
||||
# Skip bots
|
||||
if user_info.get("bot", False):
|
||||
total_members -= 1
|
||||
continue
|
||||
|
||||
inactive_members.append(
|
||||
{"id": member_id, "mention": f"<@{member_id}>"}
|
||||
)
|
||||
else:
|
||||
# If we can't get user info, still track them as inactive
|
||||
inactive_members.append(
|
||||
{"id": member_id, "mention": f"<@{member_id}>"}
|
||||
)
|
||||
|
||||
return {
|
||||
"team": team_name,
|
||||
"total_members": total_members,
|
||||
"inactive_members": inactive_members,
|
||||
"inactive_count": len(inactive_members),
|
||||
}
|
||||
|
||||
|
||||
async def main():
|
||||
"""Main function to check all teams"""
|
||||
print(
|
||||
f"Discord Activity Checker - Checking for inactivity over "
|
||||
f"{INACTIVE_THRESHOLD_HOURS} hours"
|
||||
)
|
||||
cutoff = datetime.now(timezone.utc) - timedelta(hours=INACTIVE_THRESHOLD_HOURS)
|
||||
print(f"Cutoff time: {cutoff}")
|
||||
print("-" * 80)
|
||||
|
||||
async with DiscordActivityChecker(DISCORD_TOKEN) as checker:
|
||||
results = []
|
||||
|
||||
for team_name, team_info in TEAMS.items():
|
||||
channel_id = team_info["channel_id"]
|
||||
member_ids = team_info["member_ids"]
|
||||
|
||||
result = await checker.check_team_activity(
|
||||
team_name, channel_id, member_ids
|
||||
)
|
||||
results.append(result)
|
||||
|
||||
# Display results
|
||||
print("\n" + "=" * 80)
|
||||
print("INACTIVE MEMBERS REPORT")
|
||||
print("=" * 80)
|
||||
|
||||
# Check if user wants to send messages (via command line arg)
|
||||
send_messages = "--send" in sys.argv
|
||||
|
||||
for team_result in results:
|
||||
print(f"\n📋 {team_result['team']}")
|
||||
print(f" Total Members: {team_result['total_members']}")
|
||||
print(f" Inactive: {team_result['inactive_count']}")
|
||||
|
||||
if team_result["inactive_members"]:
|
||||
print(" Inactive Members:")
|
||||
for member in team_result["inactive_members"]:
|
||||
print(f" - {member['mention']}")
|
||||
|
||||
# Send message to team channel if requested
|
||||
if send_messages and team_result["inactive_count"] > 0:
|
||||
channel_id = TEAMS[team_result["team"]]["channel_id"]
|
||||
|
||||
# Build message
|
||||
mentions = "\n".join(
|
||||
[m["mention"] for m in team_result["inactive_members"]]
|
||||
)
|
||||
message = (
|
||||
"Good morning, the following people have not sent a "
|
||||
"message here in the last 36 hours. If you are on this "
|
||||
"list, please confirm you are still participating.\n\n"
|
||||
f"{mentions}"
|
||||
)
|
||||
|
||||
await checker.send_message_to_channel(channel_id, message)
|
||||
await asyncio.sleep(1) # Small delay between messages
|
||||
else:
|
||||
print(" ✅ All members are active!")
|
||||
|
||||
print("\n" + "=" * 80)
|
||||
|
||||
# Save results to JSON
|
||||
with open("discord_activity_report.json", "w") as f:
|
||||
json.dump(
|
||||
{
|
||||
"generated_at": datetime.now(timezone.utc).isoformat(),
|
||||
"threshold_hours": INACTIVE_THRESHOLD_HOURS,
|
||||
"results": results,
|
||||
},
|
||||
f,
|
||||
indent=2,
|
||||
)
|
||||
|
||||
print("\n📄 Detailed report saved to discord_activity_report.json")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
Reference in New Issue
Block a user