6 Commits

Author SHA1 Message Date
859ac84b68 feat: add password protection cog for channels
Adds new Discord bot cog enabling password-protected channels.
Includes verification, channel management, whitelist/blacklist,
and admin password recovery features.
2025-09-11 17:22:36 -04:00
5ed8002b40 feat: add Mass Outreach & Review System
Introduces multi-step workflow for user outreach, time selection, and mass reviews with temporary data storage for process efficiency.
2025-09-11 17:21:42 -04:00
c94667ee52 feat: Adds ModMail cog with core setup
Introduces a private, forum-based ModMail system for relaying user DMs to staff threads, enhancing moderation and support through persistent message storage.

Relies on 'rich' requirement for better formatting; includes metadata for easy installation and help integration.
2025-09-11 17:18:22 -04:00
83facc2fb9 feat: add initial files for KofiShop cog 2025-09-11 17:17:40 -04:00
5b9c27fcad feat: add kBump database bridge cog
Integrates MongoDB connection to kBump services for Red bot, enabling data reading and interaction.

Adds command to check server bump status with detailed embed output.
2025-09-11 17:16:29 -04:00
c9431d0c86 feat: Add /workorder.9tkd to .gitignore
Includes .vs/ and pyproject.toml to prevent tracking unintended files
Fixes license typo in README.md from GPU-3.0 to GPL-3.0
2025-09-11 17:15:29 -04:00
20 changed files with 552 additions and 2 deletions

5
.gitignore vendored
View File

@@ -174,3 +174,8 @@ cython_debug/
# PyPI configuration file
.pypirc
/workorder.9tkd
.vs/
pyproject.toml

View File

@@ -65,7 +65,7 @@ As this is my main focus, All the support is welcomed & extremely appreicated! I
- [Cog-Creators](https://github.com/Cog-Creators)
---
## License
This repository and is cogs are protected under the GPU-3.0 License.
For Further information visit [GPC-30 License](https://git.kitsunic.org/kitsunicWorks/unstable-cogs/src/branch/main/LICENSE)
This repository and is cogs are protected under the GPL-3.0 License.
For Further information visit [GPL-30 License](https://git.kitsunic.org/kitsunicWorks/unstable-cogs/src/branch/main/LICENSE)
Copyright (C) 2025-present kitsunicWorks unstable-cog

4
kbump/__init__.py Normal file
View File

@@ -0,0 +1,4 @@
from .kbump import kBump
async def setup(bot):
await bot.add_cog(kBump(bot))

8
kbump/info.json Normal file
View File

@@ -0,0 +1,8 @@
{
"author": ["kitsunicWorks"],
"name": "kBump",
"short": "A bridge to the kBump database.",
"description": "Allows this Red bot to read and interact with data from the separate kBump services.",
"requirements": ["pymongo"],
"tags": ["utility", "database", "bridge"]
}

104
kbump/kbump.py Normal file
View File

@@ -0,0 +1,104 @@
import discord
import pymongo
from redbot.core import checks, commands
from redbot.core.bot import Red
class kBump(commands.Cog):
"""
A bridge to the kBump MongoDB database.
"""
def __init__(self, bot: Red):
self.bot = bot
self.client = None
self.servers_collection = None
# Create a background task to connect to the DB after the bot is ready
self.db_init_task = self.bot.loop.create_task(self.initialize_db())
async def initialize_db(self):
"""Initializes the database connection."""
await self.bot.wait_until_ready()
try:
# Retrieve the secure URI we set with the '[p]set api' command
uri = (await self.bot.get_shared_api_tokens("mongodb")).get("uri")
if not uri:
print("[kBump] MongoDB URI not set! Use '[p]set api mongodb uri ...' to set it.")
return
self.client = pymongo.MongoClient(uri)
# Connect to the same database and collection as your other bot
db = self.client["BytesBump"]
self.servers_collection = db["servers"]
print("[kBump] Successfully connected to the BytesBump MongoDB.")
except pymongo.errors.PyMongoError as e:
print(f"[kBump] PyMongo error connecting to BytesBump MongoDB: {e}")
return
except Exception as e:
print(f"[kBump] General error connecting to BytesBump MongoDB: {e}")
def cog_unload(self):
"""Close the client connection when the cog is unloaded."""
if self.client:
try:
self.client.close()
print("[kBump] Closed connection to the BytesBump MongoDB.")
except Exception as e:
print(f"[kBump] Error closing connection to the BytesBump MongoDB: {e}")
# Cancel the background task
self.db_init_task.cancel()
@commands.command()
@commands.guild_only()
@checks.mod_or_permissions(manage_guild=True)
async def bumpstatus(self, ctx: commands.Context, guild: discord.Guild = None):
"""
Checks the kBump setup status for a server.
If no server is provided, it checks the current server.
"""
try:
if not self.servers_collection:
await ctx.send("Database connection is not available. Please check the console logs.")
return
if not self.servers_collection:
await ctx.send("The kBump database connection is not yet initialized. Please wait for the initialization task to complete.")
return
target_guild = guild or ctx.guild
server_data = self.servers_collection.find_one({"_id": target_guild.id})
if not server_data:
await ctx.send(f"No kBump data found for the server: `{target_guild.name}`.")
return
# Rest of the code remains the same
except pymongo.errors.PyMongoError as e:
print(f"[kBump] PyMongo error getting kBump data for server: {e}")
await ctx.send(f"Error getting kBump data for the server: `{target_guild.name}`. Please check the console logs for more information.")
return
except Exception as e:
print(f"[kBump] General error getting kBump data for server: {e}")
await ctx.send(f"Error getting kBump data for the server: `{target_guild.name}`. Please check the console logs for more information.")
return
embed_color = discord.Color(server_data.get('color', 0x2F3136))
embed = discord.Embed(
title=f"kBump Status for {target_guild.name}",
color=embed_color
)
embed.set_thumbnail(url=target_guild.icon.url if target_guild.icon else None)
description = server_data.get('description', 'Not set.')
embed.add_field(name="📝 Description", value=f"```{description[:1000]}```", inline=False)
custom_tags = server_data.get('tags', 'Not set.')
embed.add_field(name="🏷️ Custom Tags", value=custom_tags, inline=False)
discovery_tags = server_data.get('discovery_tags')
if discovery_tags:
embed.add_field(name="🌐 Official Discovery Tags", value=discovery_tags, inline=False)
vanity = server_data.get('vanity_code')
if vanity:
embed.add_field(name="🔗 Vanity URL", value=f"`discord.gg/{vanity}`", inline=True)
invite = server_data.get('invite_code')
if invite:
embed.add_field(name="✉️ Invite Code", value=f"`{invite}`", inline=True)
await ctx.send(embed=embed)

0
kofi-shop/README.md Normal file
View File

0
kofi-shop/__init__.py Normal file
View File

16
kofi-shop/info.json Normal file
View File

@@ -0,0 +1,16 @@
{
"author": [ "unstableCogs" ],
"install_msg": "Thank you for installing the Ko-fi Shop cog! Use `[p]help KofiShop` for commands.",
"name": "KofiShop",
"short": "An interactive front-end for a Ko-fi store.",
"description": "Allows users to place orders and submit reviews for items available on the client's Ko-fi store. This cog acts as a bridge to an external shop.",
"tags": [
"kofi",
"shop",
"store",
"review",
"utility"
],
"requirements": [ "rich" ],
"end_user_data_statement": "This cog persistently stores user IDs, order details, and submitted reviews to manage shop history and transactions."
}

0
kofi-shop/kofi-shop.py Normal file
View File

0
modmail/README.md Normal file
View File

0
modmail/__init__.py Normal file
View File

16
modmail/info.json Normal file
View File

@@ -0,0 +1,16 @@
{
"author": [ "unstableCogs" ],
"install_msg": "Thank you for installing the ModMail cog! Please use `[p]help ModMail` for setup commands.",
"name": "ModMail",
"short": "A private, forum-based ModMail system.",
"description": "A comprehensive ModMail system that relays user DMs to a private forum channel for staff to review and respond to. This serves as the core for all ticket-based interactions.",
"tags": [
"modmail",
"moderation",
"support",
"tickets",
"utility"
],
"requirements": [ "rich" ],
"end_user_data_statement": "This cog persistently stores user IDs and message content from ModMail threads for moderation and support history."
}

0
modmail/modmail.py Normal file
View File

0
mors/README.md Normal file
View File

0
mors/__init__.py Normal file
View File

16
mors/info.json Normal file
View File

@@ -0,0 +1,16 @@
{
"author": [ "unstableCogs" ],
"install_msg": "Thank you for installing the Mass Outreach & Review System! For a full command list, please see the wiki.",
"name": "MassOutreach",
"short": "A multi-step system for mass outreach and reviews.",
"description": "Manages a complete workflow for user outreach. The process begins with /game, uses /over for time selection, and concludes with a /pudding-head command for the user to submit a mass review.",
"tags": [
"mass",
"outreach",
"review",
"tickets",
"utility"
],
"requirements": [ "rich" ],
"end_user_data_statement": "This cog stores user IDs and ticket information temporarily for the duration of the outreach process. Submitted reviews are stored persistently."
}

0
mors/mors.py Normal file
View File

5
pp/__init__.py Normal file
View File

@@ -0,0 +1,5 @@
from .passwdprotect import Passwd
async def setup(bot):
cog = Passwd(bot)
await bot.add_cog(cog)

10
pp/info.json Normal file
View File

@@ -0,0 +1,10 @@
{
"author": ["kitsunic"],
"description": "A cog for password protected channels",
"end_user_data_statement": "This cog stores user, avatar, and guild data. A simple delete request to bot to remove your data or guild data",
"install_msg": "Thanks for installing & testing.",
"min_bot_version": "3.5.0",
"short": "A pwd channel protection cog",
"tags": ["embed"],
"type": "COG"
}

366
pp/passwdprotect.py Normal file
View File

@@ -0,0 +1,366 @@
import discord
import hashlib
import asyncio
import random
import string
import datetime
from redbot.core import commands, checks, Config
class Passwd(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.config = Config.get_conf(self, identifier=50204090) # Unique identifier
# Default settings within the config
default_guild = {
"password_channels": {},
"password_attempts": {},
"admin_channel": None,
"password_recovery_requests": {},
"channel_data": {},
"whitelist": [],
"blacklist": [],
}
self.config.register_guild(**default_guild)
#
# -------------------------------------------------------------------------------------------------------------
#
@commands.group(name="passwdprotect", aliases=["pwd"])
async def passwdprotect(self, ctx):
"""Password management commands"""
if not ctx.invoked_subcommand:
await ctx.send("Missing subcommand.")
#
# -------------------------------------------------------------------------------------------------------------
#
@passwdprotect.command(name="verify", aliases=["ver"])
async def verify(self, ctx, password: str):
"""Verifies the password for the current channel."""
channel = ctx.channel
async with self.config.guild(ctx.guild).password_channels() as password_channels:
if channel.id in password_channels:
stored_hash = password_channels[channel.id]["password_hash"]
provided_hash = hashlib.sha256(password.encode("utf-8")).hexdigest()
if stored_hash == provided_hash:
# Password verified successfully
role_id = password_channels[channel.id]["role_id"]
role = ctx.guild.get_role(role_id)
if role:
await ctx.author.add_roles(role)
await ctx.send(f"Password verified! You now have access to {channel.mention}.")
else:
await ctx.send("Password verified, but the access role seems to be missing. Please contact an admin.")
else:
await ctx.send("Incorrect password.")
else:
await ctx.send("This channel is not password protected.")
@passwdprotect.group(name="channels", aliases=["chs", "--ch"])
async def channels(self, ctx):
"""Manage protected channels"""
if not ctx.invoked_subcommand:
await ctx.send("Invalid subcommand.")
#
# -------------------------------------------------------------------------------------------------------------
#
@channels.command(name="add", aliases=["+"])
@commands.has_permissions(manage_channels=True)
async def channels_add(self, ctx, channel: discord.TextChannel, password: str):
"""Adds a password to a channel"""
hashed_password = self.generate_hashed_password(password)
async with self.config.guild(ctx.guild).password_channels() as password_channels:
# Check if the channel already has a password
if channel.id in password_channels:
await ctx.send(f"{channel.mention} already has a password. Use the `remove` command first.")
return
# Create the role
role_name = f"{channel.name}_access"
role = await ctx.guild.create_role(name=role_name)
# Grant permissions to the role
await channel.set_permissions(role, view_channel=True, send_messages=True)
await channel.set_permissions(ctx.guild.default_role, view_channel=False, read_message_history=False) # Restrict access for default role
# Store password, channel ID, and role ID in the config
password_channels[channel.id] = {
"password_hash": hashed_password,
"role_id": role.id
}
await ctx.send(f"Password set for {channel.mention}")
#
# -------------------------------------------------------------------------------------------------------------
#
@channels.command(name="remove", aliases=["-"])
@commands.has_permissions(manage_channels=True)
async def channels_remove(self, ctx, channel: discord.TextChannel):
"""Removes the password from a text channel."""
async with self.config.guild(ctx.guild).password_channels() as password_channels:
if channel.id in password_channels:
# Delete the channel's data from the config
del password_channels[channel.id]
# Delete the associated role (optional)
role_id = channel.get("role_id")
if role_id:
role = ctx.guild.get_role(role_id)
if role:
await role.delete()
await ctx.send(f"Password removed from {channel.mention}")
else:
await ctx.send(f"{channel.mention} does not have a password.")
@channels.command(name="fails", aliases=["--fail"])
@commands.has_permissions(manage_guild=True)
async def channels_fails(self, ctx, limit: int = None):
"""Sets or checks the maximum failed password attempts."""
# ... handle failed attempt limit ...
#
# -------------------------------------------------------------------------------------------------------------
#
@channels.command(name="whitelist", aliases=["wlist"])
@commands.has_permissions(manage_guild=True)
async def channels_white(self, ctx, subcommand: str, user_or_role: discord.Object = None):
"""Manages the whitelist."""
guild_id = str(ctx.guild.id)
password_collection = self.get_password_collection(guild_id)
if not user_or_role:
await ctx.send("Please specify a user or role to add/remove.")
return
if subcommand == "add":
await self.add_to_whitelist(password_collection, user_or_role)
elif subcommand == "remove":
await self.remove_from_whitelist(password_collection, user_or_role)
else:
await ctx.send("Invalid subcommand. Use `add` or `remove`.")
#
# -------------------------------------------------------------------------------------------------------------
#
@channels.command(name="blacklist", aliases=["blist"])
@commands.has_permissions(manage_guild=True)
async def channels_black(self, ctx, subcommand: str, user_or_role: discord.Object = None):
"""Manages the blacklist."""
if not user_or_role:
await ctx.send("Please specify a user or role to add/remove.")
return
if subcommand == "add":
await self.add_to_blacklist(ctx, user_or_role)
elif subcommand == "remove":
await self.remove_from_blacklist(ctx, user_or_role)
else:
await ctx.send("Invalid subcommand. Use `add` or `remove`.")
#
# -------------------------------------------------------------------------------------------------------------
#
@passwdprotect.group(name="admin", aliases=["adm"])
async def admin(self, ctx):
"""Admin commands for password protection"""
if not ctx.invoked_subcommand:
await ctx.send("Missing subcommand.")
#
# -------------------------------------------------------------------------------------------------------------
#
@admin.command(name="recover", aliases=["rec"])
async def admin_recover(self, ctx):
"""Initiates password recovery process."""
requests = await self.config.guild(ctx.guild).password_recovery_requests()
if not requests:
await ctx.send("No pending password recovery requests.")
return
admin_channel_id = await self.config.guild(ctx.guild).admin_channel()
if not admin_channel_id:
await ctx.send("Admin notification channel not set. Use `[p]passwdprotect admin notify` to set it.")
return
admin_channel = self.bot.get_channel(admin_channel_id)
# Add Approve/Deny buttons using View
view = ui.View()
view.add_item(ui.Button(style=discord.ButtonStyle.green, label="Approve", custom_id=f"approve_{user_id}_{channel.id}"))
view.add_item(ui.Button(style=discord.ButtonStyle.red, label="Deny", custom_id=f"deny_{user_id}_{channel.id}"))
for user_id, request_data in requests.items():
user = self.bot.get_user(int(user_id))
channel = self.bot.get_channel(request_data["channel_id"])
timestamp = request_data["timestamp"]
# Log the request to the admin channel
embed = discord.Embed(title="Password Recovery Request", color=discord.Color.gold())
embed.add_field(name="User", value=user.mention if user else f"User ID: {user_id}", inline=False)
embed.add_field(name="Server", value=ctx.guild.name, inline=False)
embed.add_field(name="Channel", value=channel.mention if channel else f"Channel ID: {request_data['channel_id']}", inline=False)
embed.add_field(name="Highest Role", value=user.top_role.mention if user else "Unknown", inline=False)
embed.set_footer(text=f"Requested at {timestamp}")
# Add Approve/Deny buttons
view = SimpleView()
view.add_item(Button(style=discord.ButtonStyle.green, label="Approve", custom_id=f"approve_{user_id}_{channel.id}"))
view.add_item(Button(style=discord.ButtonStyle.red, label="Deny", custom_id=f"deny_{user_id}_{channel.id}"))
# Ping the role (if set) in the admin channel
role_to_ping = await self.config.guild(ctx.guild).get_raw("admin_role") # Assuming you store the role ID in config
if role_to_ping:
await admin_channel.send(f"<@&{role_to_ping}>", embed=embed, view=view)
else:
await admin_channel.send(embed=embed, view=view)
#
# -------------------------------------------------------------------------------------------------------------
#
@admin.command(name="notify", aliases=["noti"])
@commands.has_permissions(manage_guild=True)
async def admin_notify(self, ctx, channel: discord.TextChannel = None):
"""Sets or removes the admin notification channel."""
if channel:
# Set the admin notification channel
await self.config.guild(ctx.guild).admin_channel.set(channel.id)
await ctx.send(f"Admin notification channel set to {channel.mention}")
else:
# Remove the admin notification channel
await self.config.guild(ctx.guild).admin_channel.set(None)
await ctx.send("Admin notification channel removed.")
#
# -------------------------------------------------------------------------------------------------------------
#
@commands.Cog.listener()
async def on_interaction(self, interaction, channel: discord.TextChannel):
if interaction.type == discord.InteractionType.component:
custom_id = interaction.data["custom_id"]
if custom_id.startswith("approve_") or custom_id.startswith("deny_"):
action, user_id, channel_id = custom_id.split("_")
user_id = int(user_id)
channel_id = int(channel_id)
async with self.config.guild(interaction.guild).password_recovery_requests() as requests:
if user_id in requests:
del requests[user_id] # Remove the request
if action == "approve":
temp_code = self.generate_temporary_access_code() # Implement this function
expiry_time = datetime.datetime.now() + datetime.timedelta(hours=1) # 1-hour validity
async with self.config.guild(interaction.guild).temp_access_codes() as temp_codes:
temp_codes[temp_code] = {"user_id": user_id, "channel_id": channel_id, "expiry": expiry_time.isoformat()}
user = self.bot.get_user(user_id)
if user:
await user.send(f"Here's your temporary access code for channel {channel.mention}: `{temp_code}`. It's valid for 1 hour.")
elif user:
await interaction.response.send_message(f"Approved password recovery for {user.mention if user else user_id} on channel {channel.mention}. Temporary code sent.", ephemeral=True)
else:
await interaction.response.send_message("Channel data not found. Cannot reset password.", ephemeral=True)
#
# -------------------------------------------------------------------------------------------------------------
#
# Helper functions
def generate_hashed_password(self, password):
# This remains the same, no changes needed
return hashlib.sha256(password.encode("utf-8")).hexdigest()
#
# -------------------------------------------------------------------------------------------------------------
#
async def set_channel_password(self, ctx: commands.Context, channel_id, password):
"""Sets the password for a channel using the config.
Args:
ctx (commands.Context): The command context to access guild information.
channel_id (int): The ID of the channel.
password (str): The password to set.
"""
hashed_password = self.generate_hashed_password(password)
async with self.config.guild(ctx.guild).password_channels() as password_channels:
password_channels[channel_id] = {
"password_hash": hashed_password
}
#
# -------------------------------------------------------------------------------------------------------------
#
def generate_temporary_access_code(self):
"""Generates a temporary access code."""
characters = string.ascii_letters + string.digits
code = ''.join(random.choice(characters) for i in range(10)) # 10-character code
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
return f"{code}-{timestamp}"
#
# -------------------------------------------------------------------------------------------------------------
#
async def add_to_whitelist(self, ctx, collection, user_or_role):
"""Adds a user or role to the whitelist."""
async with self.config.guild(ctx.guild).whitelist() as whitelist:
user_or_role_id = user_or_role.id
if any(item["id"] == user_or_role_id for item in whitelist):
await ctx.send(f"{user_or_role} is already in the whitelist.")
return
whitelist.append({"id": user_or_role_id, "type": "user" if isinstance(user_or_role, discord.Member) else "role"})
embed = discord.Embed(title="Whitelist Confirmation", description=f"Are you sure you want to add {user_or_role} to the whitelist?")
embed.add_field(name="Action", value="Add to Whitelist")
embed.set_footer(text="React with ✅ to confirm or ❌ to cancel.")
confirmation_msg = await ctx.send(embed=embed)
try:
reaction, user = await ctx.wait_for('reaction_add', timeout=30.0, check=lambda r, u: r.message.id == confirmation_msg.id and u == ctx.author and str(r.emoji) in ['', ''])
if str(reaction.emoji) == '':
await confirmation_msg.delete()
await ctx.send(f"{user_or_role} added to the whitelist.")
else:
await confirmation_msg.delete()
await ctx.send("Action cancelled.")
except asyncio.TimeoutError:
await confirmation_msg.delete()
await ctx.send("Confirmation timed out. Action cancelled.")
#
# -------------------------------------------------------------------------------------------------------------
#
async def remove_from_whitelist(self, ctx, collection, user_or_role):
"""Removes a user or role from the whitelist."""
async with self.config.guild(ctx.guild).whitelist() as whitelist:
user_or_role_id = user_or_role.id
whitelist[:] = [item for item in whitelist if item["id"] != user_or_role_id]
await ctx.send(f"{user_or_role} removed from the whitelist.")
#
# -------------------------------------------------------------------------------------------------------------
#
async def add_to_blacklist(self, ctx, user_or_role):
"""Adds a user or role to the blacklist."""
async with self.config.guild(ctx.guild).blacklist() as blacklist:
user_or_role_id = user_or_role.id
if any(item["id"] == user_or_role_id for item in blacklist):
await ctx.send(f"{user_or_role} is already in the blacklist.")
return
blacklist.append({"id": user_or_role_id, "type": "user" if isinstance(user_or_role, discord.Member) else "role"})
await ctx.send(f"{user_or_role} added to the blacklist.")
#
# -------------------------------------------------------------------------------------------------------------
#
async def remove_from_blacklist(self, ctx, user_or_role):
"""Removes a user or role from the blacklist."""
async with self.config.guild(ctx.guild).blacklist() as blacklist:
user_or_role_id = user_or_role.id
blacklist[:] = [item for item in blacklist if item["id"] != user_or_role_id]
await ctx.send(f"{user_or_role} removed from the blacklist.")
#
# -------------------------------------------------------------------------------------------------------------
#
async def cog_load(self):
self.bot.add_listener(self.on_cog_reload_error, "Package loading failed")
self.bot.add_listener(self.on_cog_reload_error, "SyntaxError:")
self.bot.add_listener(self.on_cog_reload_error, "IndentationError:")
#
# -------------------------------------------------------------------------------------------------------------
#
async def on_cog_reload_error(self, ctx, error):
error_message = f"Error reloading cog '{ctx.cog.qualified_name}':\n```{error}```"
self.error_logs.append(error_message)