chore: lints
CI / dependency-pin-check-python (pull_request) Successful in 4s
CI / dependency-pin-check-typescript (pull_request) Successful in 4s
Security Scan and Upload / Security & DefectDojo Upload (pull_request) Successful in 55s
CI / python (pull_request) Successful in 9m25s
CI / typescript (pull_request) Successful in 9m46s

This commit is contained in:
2026-02-02 12:26:17 -08:00
parent 38f1eacbb3
commit 9cca617d3f
4 changed files with 204 additions and 104 deletions
+70 -42
View File
@@ -1,17 +1,15 @@
#!/usr/bin/env python3
"""
Discord Team Activity Checker
"""Discord Team Activity Checker
Checks for team members who haven't sent messages in their channels within 36 hours
"""
import asyncio
import aiohttp
from datetime import datetime, timezone, timedelta
from typing import Dict, List, Set
import json
import time
import sys
import os
import sys
from datetime import datetime, timedelta, timezone
import aiohttp
# Configuration
DISCORD_TOKEN = os.environ["DISCORD_BOT_TOKEN"]
@@ -19,7 +17,7 @@ DISCORD_API_BASE = "https://discord.com/api/v10"
INACTIVE_THRESHOLD_HOURS = 36
# Load team assignments from file
with open("team_assignments.json", "r") as f:
with open("team_assignments.json") as f:
team_data = json.load(f)
# Build TEAMS dictionary with channel IDs and member lists
@@ -38,7 +36,7 @@ CHANNEL_IDS = {
"Indigo Tulip": "1464316826384072925",
"Scarlet Hydrangea": "1464316839306985506",
"Mint Narcissus": "1464316844251807952",
"Sage Marigold": "1464316850669093040"
"Sage Marigold": "1464316850669093040",
}
for team in team_data:
@@ -46,18 +44,21 @@ for team in team_data:
if team_name in CHANNEL_IDS:
TEAMS[team_name] = {
"channel_id": CHANNEL_IDS[team_name],
"member_ids": team["leaders"] + team["participants"]
"member_ids": team["leaders"] + team["participants"],
}
class DiscordActivityChecker:
def __init__(self, token: str):
self.token = token
self.headers = {
"Authorization": f"Bot {token}",
"Content-Type": "application/json"
"Content-Type": "application/json",
}
self.session = None
self.cutoff_time = datetime.now(timezone.utc) - timedelta(hours=INACTIVE_THRESHOLD_HOURS)
self.cutoff_time = datetime.now(timezone.utc) - timedelta(
hours=INACTIVE_THRESHOLD_HOURS
)
async def __aenter__(self):
self.session = aiohttp.ClientSession()
@@ -67,7 +68,7 @@ class DiscordActivityChecker:
if self.session:
await self.session.close()
async def get_user_info(self, user_id: str) -> Dict:
async def get_user_info(self, user_id: str) -> dict:
"""Get information about a specific user"""
url = f"{DISCORD_API_BASE}/users/{user_id}"
@@ -83,21 +84,26 @@ class DiscordActivityChecker:
user_data = await response.json()
return user_data
async def get_recent_message_authors(self, channel_id: str) -> Set[str]:
async def get_recent_message_authors(self, channel_id: str) -> set[str]:
"""Get user IDs of everyone who sent a message in the last 36 hours"""
url = f"{DISCORD_API_BASE}/channels/{channel_id}/messages?limit=100"
active_users = set()
async with self.session.get(url, headers=self.headers) as response:
if response.status != 200:
print(f"Failed to get messages for channel {channel_id}: {response.status}")
print(
f"Failed to get messages for channel {channel_id}: "
f"{response.status}"
)
return active_users
messages = await response.json()
for message in messages:
# Parse message timestamp
timestamp = datetime.fromisoformat(message["timestamp"].replace("Z", "+00:00"))
timestamp = datetime.fromisoformat(
message["timestamp"].replace("Z", "+00:00")
)
# If message is within our threshold, add the author
if timestamp > self.cutoff_time:
@@ -118,10 +124,15 @@ class DiscordActivityChecker:
print(f"✅ Message sent to channel {channel_id}")
return True
else:
print(f"❌ Failed to send message to channel {channel_id}: {response.status}")
print(
f"❌ Failed to send message to channel {channel_id}: "
f"{response.status}"
)
return False
async def check_team_activity(self, team_name: str, channel_id: str, member_ids: List[str]) -> Dict:
async def check_team_activity(
self, team_name: str, channel_id: str, member_ids: list[str]
) -> dict:
"""Check activity for a specific team"""
print(f"Checking {team_name}...")
@@ -142,28 +153,31 @@ class DiscordActivityChecker:
total_members -= 1
continue
inactive_members.append({
"id": member_id,
"mention": f"<@{member_id}>"
})
inactive_members.append(
{"id": member_id, "mention": f"<@{member_id}>"}
)
else:
# If we can't get user info, still track them as inactive
inactive_members.append({
"id": member_id,
"mention": f"<@{member_id}>"
})
inactive_members.append(
{"id": member_id, "mention": f"<@{member_id}>"}
)
return {
"team": team_name,
"total_members": total_members,
"inactive_members": inactive_members,
"inactive_count": len(inactive_members)
"inactive_count": len(inactive_members),
}
async def main():
"""Main function to check all teams"""
print(f"Discord Activity Checker - Checking for inactivity over {INACTIVE_THRESHOLD_HOURS} hours")
print(f"Cutoff time: {datetime.now(timezone.utc) - timedelta(hours=INACTIVE_THRESHOLD_HOURS)}")
print(
f"Discord Activity Checker - Checking for inactivity over "
f"{INACTIVE_THRESHOLD_HOURS} hours"
)
cutoff = datetime.now(timezone.utc) - timedelta(hours=INACTIVE_THRESHOLD_HOURS)
print(f"Cutoff time: {cutoff}")
print("-" * 80)
async with DiscordActivityChecker(DISCORD_TOKEN) as checker:
@@ -173,7 +187,9 @@ async def main():
channel_id = team_info["channel_id"]
member_ids = team_info["member_ids"]
result = await checker.check_team_activity(team_name, channel_id, member_ids)
result = await checker.check_team_activity(
team_name, channel_id, member_ids
)
results.append(result)
# Display results
@@ -189,18 +205,25 @@ async def main():
print(f" Total Members: {team_result['total_members']}")
print(f" Inactive: {team_result['inactive_count']}")
if team_result['inactive_members']:
if team_result["inactive_members"]:
print(" Inactive Members:")
for member in team_result['inactive_members']:
for member in team_result["inactive_members"]:
print(f" - {member['mention']}")
# Send message to team channel if requested
if send_messages and team_result['inactive_count'] > 0:
channel_id = TEAMS[team_result['team']]['channel_id']
if send_messages and team_result["inactive_count"] > 0:
channel_id = TEAMS[team_result["team"]]["channel_id"]
# Build message
mentions = "\n".join([m['mention'] for m in team_result['inactive_members']])
message = f"Good morning, the following people have not sent a message here in the last 36 hours. If you are on this list, please confirm you are still participating.\n\n{mentions}"
mentions = "\n".join(
[m["mention"] for m in team_result["inactive_members"]]
)
message = (
"Good morning, the following people have not sent a "
"message here in the last 36 hours. If you are on this "
"list, please confirm you are still participating.\n\n"
f"{mentions}"
)
await checker.send_message_to_channel(channel_id, message)
await asyncio.sleep(1) # Small delay between messages
@@ -211,13 +234,18 @@ async def main():
# Save results to JSON
with open("discord_activity_report.json", "w") as f:
json.dump({
"generated_at": datetime.now(timezone.utc).isoformat(),
"threshold_hours": INACTIVE_THRESHOLD_HOURS,
"results": results
}, f, indent=2)
json.dump(
{
"generated_at": datetime.now(timezone.utc).isoformat(),
"threshold_hours": INACTIVE_THRESHOLD_HOURS,
"results": results,
},
f,
indent=2,
)
print("\n📄 Detailed report saved to discord_activity_report.json")
if __name__ == "__main__":
asyncio.run(main())
asyncio.run(main())