Adds new cogs including DataManager, Hiring, KofiShop, Logging, ModMail, MORS, ServiceReview, StaffMsg, and Translator to enhance bot functionality for data management, hiring processes, logging events, and more.
129 lines
4.8 KiB
Python
129 lines
4.8 KiB
Python
import discord
|
|
from redbot.core import commands, Config
|
|
import datetime
|
|
from typing import Dict, Optional, Literal
|
|
from discord.ext import tasks
|
|
|
|
SUPPORTED_COGS = Literal["ModMail", "Hiring", "ServiceReview", "StaffMsg", "Logging"]
|
|
|
|
class DataManager(commands.Cog):
|
|
"""
|
|
A cog to automatically manage and purge old data from other cogs.
|
|
"""
|
|
|
|
def __init__(self, bot):
|
|
self.bot = bot
|
|
self.config = Config.get_conf(self, identifier=1234567894, force_registration=True)
|
|
default_guild = {
|
|
"policies": {} # {cog_name: days}
|
|
}
|
|
self.config.register_guild(**default_guild)
|
|
self.data_purge_loop.start()
|
|
|
|
def cog_unload(self):
|
|
self.data_purge_loop.cancel()
|
|
|
|
@tasks.loop(hours=24)
|
|
async def data_purge_loop(self):
|
|
"""Periodically purges old data based on configured policies."""
|
|
await self.bot.wait_until_ready()
|
|
all_guilds = self.bot.guilds
|
|
for guild in all_guilds:
|
|
policies = await self.config.guild(guild).policies()
|
|
if not policies:
|
|
continue
|
|
|
|
for cog_name, days in policies.items():
|
|
cog = self.bot.get_cog(cog_name)
|
|
if not cog or not hasattr(cog, "config"):
|
|
continue
|
|
|
|
retention_delta = datetime.timedelta(days=days)
|
|
|
|
# Determine the correct config group to purge
|
|
# This needs to match what we defined in the other cogs
|
|
data_group_name = ""
|
|
if cog_name == "ModMail":
|
|
data_group_name = "closed_threads"
|
|
elif cog_name == "Hiring":
|
|
data_group_name = "closed_applications"
|
|
elif cog_name == "ServiceReview":
|
|
data_group_name = "reviews"
|
|
elif cog_name in ["StaffMsg", "Logging"]:
|
|
data_group_name = "logged_events"
|
|
|
|
if not data_group_name:
|
|
continue
|
|
|
|
async with cog.config.guild(guild).get_attr(data_group_name)() as data_log:
|
|
to_delete = []
|
|
for entry_id, timestamp_str in data_log.items():
|
|
try:
|
|
entry_timestamp = datetime.datetime.fromisoformat(timestamp_str)
|
|
if (datetime.datetime.now(datetime.timezone.utc) - entry_timestamp) > retention_delta:
|
|
to_delete.append(entry_id)
|
|
except (ValueError, TypeError):
|
|
continue # Skip invalid timestamps
|
|
|
|
for entry_id in to_delete:
|
|
del data_log[entry_id]
|
|
|
|
# --- SETTINGS COMMANDS ---
|
|
@commands.group(aliases=["dmset"]) # type: ignore
|
|
@commands.guild_only()
|
|
@commands.admin_or_permissions(manage_guild=True)
|
|
async def datamanagerset(self, ctx: commands.Context):
|
|
"""Configure DataManager settings."""
|
|
pass
|
|
|
|
@datamanagerset.command(name="policy")
|
|
async def set_purge_policy(self, ctx: commands.Context, cog_name: SUPPORTED_COGS, days: int):
|
|
"""
|
|
Set the data retention policy for a cog.
|
|
|
|
Use 0 days to disable purging for a cog.
|
|
"""
|
|
if not ctx.guild:
|
|
return
|
|
if days < 0:
|
|
await ctx.send("Please provide a non-negative number of days.")
|
|
return
|
|
|
|
async with self.config.guild(ctx.guild).policies() as policies:
|
|
if days == 0:
|
|
if cog_name in policies:
|
|
del policies[cog_name]
|
|
await ctx.send(f"Purge policy for `{cog_name}` has been removed.")
|
|
else:
|
|
await ctx.send(f"No purge policy was set for `{cog_name}`.")
|
|
else:
|
|
policies[cog_name] = days
|
|
await ctx.send(f"Data from `{cog_name}` will now be purged after {days} days.")
|
|
|
|
@datamanagerset.command(name="view")
|
|
async def view_policies(self, ctx: commands.Context):
|
|
"""View the current data retention policies."""
|
|
if not ctx.guild:
|
|
return
|
|
|
|
policies = await self.config.guild(ctx.guild).policies()
|
|
if not policies:
|
|
await ctx.send("No data retention policies have been set for this server.")
|
|
return
|
|
|
|
embed = discord.Embed(
|
|
title="Data Retention Policies",
|
|
color=await ctx.embed_color()
|
|
)
|
|
description = "Data from the following cogs will be automatically purged after the specified duration:"
|
|
for cog_name, days in policies.items():
|
|
description += f"\n- **{cog_name}**: {days} days"
|
|
|
|
embed.description = description
|
|
await ctx.send(embed=embed)
|
|
|
|
|
|
async def setup(bot):
|
|
await bot.add_cog(DataManager(bot))
|
|
|