From 859ac84b6837f9bed0f50798f4cd4fdf843877e2 Mon Sep 17 00:00:00 2001 From: Unstable Kitsune Date: Thu, 11 Sep 2025 17:22:36 -0400 Subject: [PATCH] feat: add password protection cog for channels Adds new Discord bot cog enabling password-protected channels. Includes verification, channel management, whitelist/blacklist, and admin password recovery features. --- pp/__init__.py | 5 + pp/info.json | 10 ++ pp/passwdprotect.py | 366 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 381 insertions(+) create mode 100644 pp/__init__.py create mode 100644 pp/info.json create mode 100644 pp/passwdprotect.py diff --git a/pp/__init__.py b/pp/__init__.py new file mode 100644 index 0000000..6277d92 --- /dev/null +++ b/pp/__init__.py @@ -0,0 +1,5 @@ +from .passwdprotect import Passwd + +async def setup(bot): + cog = Passwd(bot) + await bot.add_cog(cog) diff --git a/pp/info.json b/pp/info.json new file mode 100644 index 0000000..a407a96 --- /dev/null +++ b/pp/info.json @@ -0,0 +1,10 @@ +{ + "author": ["kitsunic"], + "description": "A cog for password protected channels", + "end_user_data_statement": "This cog stores user, avatar, and guild data. A simple delete request to bot to remove your data or guild data", + "install_msg": "Thanks for installing & testing.", + "min_bot_version": "3.5.0", + "short": "A pwd channel protection cog", + "tags": ["embed"], + "type": "COG" +} diff --git a/pp/passwdprotect.py b/pp/passwdprotect.py new file mode 100644 index 0000000..d19e936 --- /dev/null +++ b/pp/passwdprotect.py @@ -0,0 +1,366 @@ +import discord +import hashlib +import asyncio +import random +import string +import datetime + + +from redbot.core import commands, checks, Config + + +class Passwd(commands.Cog): + def __init__(self, bot): + self.bot = bot + self.config = Config.get_conf(self, identifier=50204090) # Unique identifier + # Default settings within the config + default_guild = { + "password_channels": {}, + "password_attempts": {}, + "admin_channel": None, + "password_recovery_requests": {}, + "channel_data": {}, + "whitelist": [], + "blacklist": [], + } + self.config.register_guild(**default_guild) + +# +# ------------------------------------------------------------------------------------------------------------- +# + @commands.group(name="passwdprotect", aliases=["pwd"]) + async def passwdprotect(self, ctx): + """Password management commands""" + if not ctx.invoked_subcommand: + await ctx.send("Missing subcommand.") +# +# ------------------------------------------------------------------------------------------------------------- +# + @passwdprotect.command(name="verify", aliases=["ver"]) + async def verify(self, ctx, password: str): + """Verifies the password for the current channel.""" + channel = ctx.channel + + async with self.config.guild(ctx.guild).password_channels() as password_channels: + if channel.id in password_channels: + stored_hash = password_channels[channel.id]["password_hash"] + provided_hash = hashlib.sha256(password.encode("utf-8")).hexdigest() + + if stored_hash == provided_hash: + # Password verified successfully + role_id = password_channels[channel.id]["role_id"] + role = ctx.guild.get_role(role_id) + if role: + await ctx.author.add_roles(role) + await ctx.send(f"Password verified! You now have access to {channel.mention}.") + else: + await ctx.send("Password verified, but the access role seems to be missing. Please contact an admin.") + else: + await ctx.send("Incorrect password.") + else: + await ctx.send("This channel is not password protected.") + + @passwdprotect.group(name="channels", aliases=["chs", "--ch"]) + async def channels(self, ctx): + """Manage protected channels""" + if not ctx.invoked_subcommand: + await ctx.send("Invalid subcommand.") +# +# ------------------------------------------------------------------------------------------------------------- +# + @channels.command(name="add", aliases=["+"]) + @commands.has_permissions(manage_channels=True) + async def channels_add(self, ctx, channel: discord.TextChannel, password: str): + """Adds a password to a channel""" + hashed_password = self.generate_hashed_password(password) + async with self.config.guild(ctx.guild).password_channels() as password_channels: + # Check if the channel already has a password + if channel.id in password_channels: + await ctx.send(f"{channel.mention} already has a password. Use the `remove` command first.") + return + + # Create the role + role_name = f"{channel.name}_access" + role = await ctx.guild.create_role(name=role_name) + + # Grant permissions to the role + await channel.set_permissions(role, view_channel=True, send_messages=True) + await channel.set_permissions(ctx.guild.default_role, view_channel=False, read_message_history=False) # Restrict access for default role + + # Store password, channel ID, and role ID in the config + password_channels[channel.id] = { + "password_hash": hashed_password, + "role_id": role.id + } + + await ctx.send(f"Password set for {channel.mention}") +# +# ------------------------------------------------------------------------------------------------------------- +# + @channels.command(name="remove", aliases=["-"]) + @commands.has_permissions(manage_channels=True) + async def channels_remove(self, ctx, channel: discord.TextChannel): + """Removes the password from a text channel.""" + async with self.config.guild(ctx.guild).password_channels() as password_channels: + if channel.id in password_channels: + # Delete the channel's data from the config + del password_channels[channel.id] + + # Delete the associated role (optional) + role_id = channel.get("role_id") + if role_id: + role = ctx.guild.get_role(role_id) + if role: + await role.delete() + + await ctx.send(f"Password removed from {channel.mention}") + else: + await ctx.send(f"{channel.mention} does not have a password.") + + @channels.command(name="fails", aliases=["--fail"]) + @commands.has_permissions(manage_guild=True) + async def channels_fails(self, ctx, limit: int = None): + """Sets or checks the maximum failed password attempts.""" + # ... handle failed attempt limit ... +# +# ------------------------------------------------------------------------------------------------------------- +# + @channels.command(name="whitelist", aliases=["wlist"]) + @commands.has_permissions(manage_guild=True) + async def channels_white(self, ctx, subcommand: str, user_or_role: discord.Object = None): + """Manages the whitelist.""" + guild_id = str(ctx.guild.id) + password_collection = self.get_password_collection(guild_id) + + if not user_or_role: + await ctx.send("Please specify a user or role to add/remove.") + return + + if subcommand == "add": + await self.add_to_whitelist(password_collection, user_or_role) + elif subcommand == "remove": + await self.remove_from_whitelist(password_collection, user_or_role) + else: + await ctx.send("Invalid subcommand. Use `add` or `remove`.") +# +# ------------------------------------------------------------------------------------------------------------- +# + @channels.command(name="blacklist", aliases=["blist"]) + @commands.has_permissions(manage_guild=True) + async def channels_black(self, ctx, subcommand: str, user_or_role: discord.Object = None): + """Manages the blacklist.""" + if not user_or_role: + await ctx.send("Please specify a user or role to add/remove.") + return + + if subcommand == "add": + await self.add_to_blacklist(ctx, user_or_role) + elif subcommand == "remove": + await self.remove_from_blacklist(ctx, user_or_role) + else: + await ctx.send("Invalid subcommand. Use `add` or `remove`.") +# +# ------------------------------------------------------------------------------------------------------------- +# + @passwdprotect.group(name="admin", aliases=["adm"]) + async def admin(self, ctx): + """Admin commands for password protection""" + if not ctx.invoked_subcommand: + await ctx.send("Missing subcommand.") +# +# ------------------------------------------------------------------------------------------------------------- +# + @admin.command(name="recover", aliases=["rec"]) + async def admin_recover(self, ctx): + """Initiates password recovery process.""" + requests = await self.config.guild(ctx.guild).password_recovery_requests() + if not requests: + await ctx.send("No pending password recovery requests.") + return + + admin_channel_id = await self.config.guild(ctx.guild).admin_channel() + if not admin_channel_id: + await ctx.send("Admin notification channel not set. Use `[p]passwdprotect admin notify` to set it.") + return + + admin_channel = self.bot.get_channel(admin_channel_id) + # Add Approve/Deny buttons using View + view = ui.View() + view.add_item(ui.Button(style=discord.ButtonStyle.green, label="Approve", custom_id=f"approve_{user_id}_{channel.id}")) + view.add_item(ui.Button(style=discord.ButtonStyle.red, label="Deny", custom_id=f"deny_{user_id}_{channel.id}")) + + for user_id, request_data in requests.items(): + user = self.bot.get_user(int(user_id)) + channel = self.bot.get_channel(request_data["channel_id"]) + timestamp = request_data["timestamp"] + + # Log the request to the admin channel + embed = discord.Embed(title="Password Recovery Request", color=discord.Color.gold()) + embed.add_field(name="User", value=user.mention if user else f"User ID: {user_id}", inline=False) + embed.add_field(name="Server", value=ctx.guild.name, inline=False) + embed.add_field(name="Channel", value=channel.mention if channel else f"Channel ID: {request_data['channel_id']}", inline=False) + embed.add_field(name="Highest Role", value=user.top_role.mention if user else "Unknown", inline=False) + embed.set_footer(text=f"Requested at {timestamp}") + + # Add Approve/Deny buttons + view = SimpleView() + view.add_item(Button(style=discord.ButtonStyle.green, label="Approve", custom_id=f"approve_{user_id}_{channel.id}")) + view.add_item(Button(style=discord.ButtonStyle.red, label="Deny", custom_id=f"deny_{user_id}_{channel.id}")) + + # Ping the role (if set) in the admin channel + role_to_ping = await self.config.guild(ctx.guild).get_raw("admin_role") # Assuming you store the role ID in config + if role_to_ping: + await admin_channel.send(f"<@&{role_to_ping}>", embed=embed, view=view) + else: + await admin_channel.send(embed=embed, view=view) +# +# ------------------------------------------------------------------------------------------------------------- +# + @admin.command(name="notify", aliases=["noti"]) + @commands.has_permissions(manage_guild=True) + async def admin_notify(self, ctx, channel: discord.TextChannel = None): + """Sets or removes the admin notification channel.""" + if channel: + # Set the admin notification channel + await self.config.guild(ctx.guild).admin_channel.set(channel.id) + await ctx.send(f"Admin notification channel set to {channel.mention}") + else: + # Remove the admin notification channel + await self.config.guild(ctx.guild).admin_channel.set(None) + await ctx.send("Admin notification channel removed.") +# +# ------------------------------------------------------------------------------------------------------------- +# + @commands.Cog.listener() + async def on_interaction(self, interaction, channel: discord.TextChannel): + if interaction.type == discord.InteractionType.component: + custom_id = interaction.data["custom_id"] + if custom_id.startswith("approve_") or custom_id.startswith("deny_"): + action, user_id, channel_id = custom_id.split("_") + user_id = int(user_id) + channel_id = int(channel_id) + + async with self.config.guild(interaction.guild).password_recovery_requests() as requests: + if user_id in requests: + del requests[user_id] # Remove the request + + if action == "approve": + temp_code = self.generate_temporary_access_code() # Implement this function + expiry_time = datetime.datetime.now() + datetime.timedelta(hours=1) # 1-hour validity + + async with self.config.guild(interaction.guild).temp_access_codes() as temp_codes: + temp_codes[temp_code] = {"user_id": user_id, "channel_id": channel_id, "expiry": expiry_time.isoformat()} + + user = self.bot.get_user(user_id) + if user: + await user.send(f"Here's your temporary access code for channel {channel.mention}: `{temp_code}`. It's valid for 1 hour.") + elif user: + await interaction.response.send_message(f"Approved password recovery for {user.mention if user else user_id} on channel {channel.mention}. Temporary code sent.", ephemeral=True) + else: + await interaction.response.send_message("Channel data not found. Cannot reset password.", ephemeral=True) +# +# ------------------------------------------------------------------------------------------------------------- +# + # Helper functions + def generate_hashed_password(self, password): + # This remains the same, no changes needed + return hashlib.sha256(password.encode("utf-8")).hexdigest() +# +# ------------------------------------------------------------------------------------------------------------- +# + async def set_channel_password(self, ctx: commands.Context, channel_id, password): + """Sets the password for a channel using the config. + + Args: + ctx (commands.Context): The command context to access guild information. + channel_id (int): The ID of the channel. + password (str): The password to set. + """ + hashed_password = self.generate_hashed_password(password) + async with self.config.guild(ctx.guild).password_channels() as password_channels: + password_channels[channel_id] = { + "password_hash": hashed_password + } +# +# ------------------------------------------------------------------------------------------------------------- +# + def generate_temporary_access_code(self): + """Generates a temporary access code.""" + characters = string.ascii_letters + string.digits + code = ''.join(random.choice(characters) for i in range(10)) # 10-character code + timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") + return f"{code}-{timestamp}" +# +# ------------------------------------------------------------------------------------------------------------- +# + async def add_to_whitelist(self, ctx, collection, user_or_role): + """Adds a user or role to the whitelist.""" + async with self.config.guild(ctx.guild).whitelist() as whitelist: + user_or_role_id = user_or_role.id + if any(item["id"] == user_or_role_id for item in whitelist): + await ctx.send(f"{user_or_role} is already in the whitelist.") + return + + whitelist.append({"id": user_or_role_id, "type": "user" if isinstance(user_or_role, discord.Member) else "role"}) + + embed = discord.Embed(title="Whitelist Confirmation", description=f"Are you sure you want to add {user_or_role} to the whitelist?") + embed.add_field(name="Action", value="Add to Whitelist") + embed.set_footer(text="React with ✅ to confirm or ❌ to cancel.") + confirmation_msg = await ctx.send(embed=embed) + + try: + reaction, user = await ctx.wait_for('reaction_add', timeout=30.0, check=lambda r, u: r.message.id == confirmation_msg.id and u == ctx.author and str(r.emoji) in ['✅', '❌']) + if str(reaction.emoji) == '✅': + await confirmation_msg.delete() + await ctx.send(f"{user_or_role} added to the whitelist.") + else: + await confirmation_msg.delete() + await ctx.send("Action cancelled.") + except asyncio.TimeoutError: + await confirmation_msg.delete() + await ctx.send("Confirmation timed out. Action cancelled.") +# +# ------------------------------------------------------------------------------------------------------------- +# + async def remove_from_whitelist(self, ctx, collection, user_or_role): + """Removes a user or role from the whitelist.""" + async with self.config.guild(ctx.guild).whitelist() as whitelist: + user_or_role_id = user_or_role.id + whitelist[:] = [item for item in whitelist if item["id"] != user_or_role_id] + await ctx.send(f"{user_or_role} removed from the whitelist.") +# +# ------------------------------------------------------------------------------------------------------------- +# + async def add_to_blacklist(self, ctx, user_or_role): + """Adds a user or role to the blacklist.""" + async with self.config.guild(ctx.guild).blacklist() as blacklist: + user_or_role_id = user_or_role.id + if any(item["id"] == user_or_role_id for item in blacklist): + await ctx.send(f"{user_or_role} is already in the blacklist.") + return + + blacklist.append({"id": user_or_role_id, "type": "user" if isinstance(user_or_role, discord.Member) else "role"}) + + await ctx.send(f"{user_or_role} added to the blacklist.") +# +# ------------------------------------------------------------------------------------------------------------- +# + async def remove_from_blacklist(self, ctx, user_or_role): + """Removes a user or role from the blacklist.""" + async with self.config.guild(ctx.guild).blacklist() as blacklist: + user_or_role_id = user_or_role.id + blacklist[:] = [item for item in blacklist if item["id"] != user_or_role_id] + await ctx.send(f"{user_or_role} removed from the blacklist.") +# +# ------------------------------------------------------------------------------------------------------------- +# + async def cog_load(self): + self.bot.add_listener(self.on_cog_reload_error, "Package loading failed") + self.bot.add_listener(self.on_cog_reload_error, "SyntaxError:") + self.bot.add_listener(self.on_cog_reload_error, "IndentationError:") +# +# ------------------------------------------------------------------------------------------------------------- +# + async def on_cog_reload_error(self, ctx, error): + error_message = f"Error reloading cog '{ctx.cog.qualified_name}':\n```{error}```" + self.error_logs.append(error_message) \ No newline at end of file