import discord import hashlib import asyncio import random import string import datetime from redbot.core import commands, checks, Config class Passwd(commands.Cog): def __init__(self, bot): self.bot = bot self.config = Config.get_conf(self, identifier=50204090) # Unique identifier # Default settings within the config default_guild = { "password_channels": {}, "password_attempts": {}, "admin_channel": None, "password_recovery_requests": {}, "channel_data": {}, "whitelist": [], "blacklist": [], } self.config.register_guild(**default_guild) # # ------------------------------------------------------------------------------------------------------------- # @commands.group(name="passwdprotect", aliases=["pwd"]) async def passwdprotect(self, ctx): """Password management commands""" if not ctx.invoked_subcommand: await ctx.send("Missing subcommand.") # # ------------------------------------------------------------------------------------------------------------- # @passwdprotect.command(name="verify", aliases=["ver"]) async def verify(self, ctx, password: str): """Verifies the password for the current channel.""" channel = ctx.channel async with self.config.guild(ctx.guild).password_channels() as password_channels: if channel.id in password_channels: stored_hash = password_channels[channel.id]["password_hash"] provided_hash = hashlib.sha256(password.encode("utf-8")).hexdigest() if stored_hash == provided_hash: # Password verified successfully role_id = password_channels[channel.id]["role_id"] role = ctx.guild.get_role(role_id) if role: await ctx.author.add_roles(role) await ctx.send(f"Password verified! You now have access to {channel.mention}.") else: await ctx.send("Password verified, but the access role seems to be missing. Please contact an admin.") else: await ctx.send("Incorrect password.") else: await ctx.send("This channel is not password protected.") @passwdprotect.group(name="channels", aliases=["chs", "--ch"]) async def channels(self, ctx): """Manage protected channels""" if not ctx.invoked_subcommand: await ctx.send("Invalid subcommand.") # # ------------------------------------------------------------------------------------------------------------- # @channels.command(name="add", aliases=["+"]) @commands.has_permissions(manage_channels=True) async def channels_add(self, ctx, channel: discord.TextChannel, password: str): """Adds a password to a channel""" hashed_password = self.generate_hashed_password(password) async with self.config.guild(ctx.guild).password_channels() as password_channels: # Check if the channel already has a password if channel.id in password_channels: await ctx.send(f"{channel.mention} already has a password. Use the `remove` command first.") return # Create the role role_name = f"{channel.name}_access" role = await ctx.guild.create_role(name=role_name) # Grant permissions to the role await channel.set_permissions(role, view_channel=True, send_messages=True) await channel.set_permissions(ctx.guild.default_role, view_channel=False, read_message_history=False) # Restrict access for default role # Store password, channel ID, and role ID in the config password_channels[channel.id] = { "password_hash": hashed_password, "role_id": role.id } await ctx.send(f"Password set for {channel.mention}") # # ------------------------------------------------------------------------------------------------------------- # @channels.command(name="remove", aliases=["-"]) @commands.has_permissions(manage_channels=True) async def channels_remove(self, ctx, channel: discord.TextChannel): """Removes the password from a text channel.""" async with self.config.guild(ctx.guild).password_channels() as password_channels: if channel.id in password_channels: # Delete the channel's data from the config del password_channels[channel.id] # Delete the associated role (optional) role_id = channel.get("role_id") if role_id: role = ctx.guild.get_role(role_id) if role: await role.delete() await ctx.send(f"Password removed from {channel.mention}") else: await ctx.send(f"{channel.mention} does not have a password.") @channels.command(name="fails", aliases=["--fail"]) @commands.has_permissions(manage_guild=True) async def channels_fails(self, ctx, limit: int = None): """Sets or checks the maximum failed password attempts.""" # ... handle failed attempt limit ... # # ------------------------------------------------------------------------------------------------------------- # @channels.command(name="whitelist", aliases=["wlist"]) @commands.has_permissions(manage_guild=True) async def channels_white(self, ctx, subcommand: str, user_or_role: discord.Object = None): """Manages the whitelist.""" guild_id = str(ctx.guild.id) password_collection = self.get_password_collection(guild_id) if not user_or_role: await ctx.send("Please specify a user or role to add/remove.") return if subcommand == "add": await self.add_to_whitelist(password_collection, user_or_role) elif subcommand == "remove": await self.remove_from_whitelist(password_collection, user_or_role) else: await ctx.send("Invalid subcommand. Use `add` or `remove`.") # # ------------------------------------------------------------------------------------------------------------- # @channels.command(name="blacklist", aliases=["blist"]) @commands.has_permissions(manage_guild=True) async def channels_black(self, ctx, subcommand: str, user_or_role: discord.Object = None): """Manages the blacklist.""" if not user_or_role: await ctx.send("Please specify a user or role to add/remove.") return if subcommand == "add": await self.add_to_blacklist(ctx, user_or_role) elif subcommand == "remove": await self.remove_from_blacklist(ctx, user_or_role) else: await ctx.send("Invalid subcommand. Use `add` or `remove`.") # # ------------------------------------------------------------------------------------------------------------- # @passwdprotect.group(name="admin", aliases=["adm"]) async def admin(self, ctx): """Admin commands for password protection""" if not ctx.invoked_subcommand: await ctx.send("Missing subcommand.") # # ------------------------------------------------------------------------------------------------------------- # @admin.command(name="recover", aliases=["rec"]) async def admin_recover(self, ctx): """Initiates password recovery process.""" requests = await self.config.guild(ctx.guild).password_recovery_requests() if not requests: await ctx.send("No pending password recovery requests.") return admin_channel_id = await self.config.guild(ctx.guild).admin_channel() if not admin_channel_id: await ctx.send("Admin notification channel not set. Use `[p]passwdprotect admin notify` to set it.") return admin_channel = self.bot.get_channel(admin_channel_id) # Add Approve/Deny buttons using View view = ui.View() view.add_item(ui.Button(style=discord.ButtonStyle.green, label="Approve", custom_id=f"approve_{user_id}_{channel.id}")) view.add_item(ui.Button(style=discord.ButtonStyle.red, label="Deny", custom_id=f"deny_{user_id}_{channel.id}")) for user_id, request_data in requests.items(): user = self.bot.get_user(int(user_id)) channel = self.bot.get_channel(request_data["channel_id"]) timestamp = request_data["timestamp"] # Log the request to the admin channel embed = discord.Embed(title="Password Recovery Request", color=discord.Color.gold()) embed.add_field(name="User", value=user.mention if user else f"User ID: {user_id}", inline=False) embed.add_field(name="Server", value=ctx.guild.name, inline=False) embed.add_field(name="Channel", value=channel.mention if channel else f"Channel ID: {request_data['channel_id']}", inline=False) embed.add_field(name="Highest Role", value=user.top_role.mention if user else "Unknown", inline=False) embed.set_footer(text=f"Requested at {timestamp}") # Add Approve/Deny buttons view = SimpleView() view.add_item(Button(style=discord.ButtonStyle.green, label="Approve", custom_id=f"approve_{user_id}_{channel.id}")) view.add_item(Button(style=discord.ButtonStyle.red, label="Deny", custom_id=f"deny_{user_id}_{channel.id}")) # Ping the role (if set) in the admin channel role_to_ping = await self.config.guild(ctx.guild).get_raw("admin_role") # Assuming you store the role ID in config if role_to_ping: await admin_channel.send(f"<@&{role_to_ping}>", embed=embed, view=view) else: await admin_channel.send(embed=embed, view=view) # # ------------------------------------------------------------------------------------------------------------- # @admin.command(name="notify", aliases=["noti"]) @commands.has_permissions(manage_guild=True) async def admin_notify(self, ctx, channel: discord.TextChannel = None): """Sets or removes the admin notification channel.""" if channel: # Set the admin notification channel await self.config.guild(ctx.guild).admin_channel.set(channel.id) await ctx.send(f"Admin notification channel set to {channel.mention}") else: # Remove the admin notification channel await self.config.guild(ctx.guild).admin_channel.set(None) await ctx.send("Admin notification channel removed.") # # ------------------------------------------------------------------------------------------------------------- # @commands.Cog.listener() async def on_interaction(self, interaction, channel: discord.TextChannel): if interaction.type == discord.InteractionType.component: custom_id = interaction.data["custom_id"] if custom_id.startswith("approve_") or custom_id.startswith("deny_"): action, user_id, channel_id = custom_id.split("_") user_id = int(user_id) channel_id = int(channel_id) async with self.config.guild(interaction.guild).password_recovery_requests() as requests: if user_id in requests: del requests[user_id] # Remove the request if action == "approve": temp_code = self.generate_temporary_access_code() # Implement this function expiry_time = datetime.datetime.now() + datetime.timedelta(hours=1) # 1-hour validity async with self.config.guild(interaction.guild).temp_access_codes() as temp_codes: temp_codes[temp_code] = {"user_id": user_id, "channel_id": channel_id, "expiry": expiry_time.isoformat()} user = self.bot.get_user(user_id) if user: await user.send(f"Here's your temporary access code for channel {channel.mention}: `{temp_code}`. It's valid for 1 hour.") elif user: await interaction.response.send_message(f"Approved password recovery for {user.mention if user else user_id} on channel {channel.mention}. Temporary code sent.", ephemeral=True) else: await interaction.response.send_message("Channel data not found. Cannot reset password.", ephemeral=True) # # ------------------------------------------------------------------------------------------------------------- # # Helper functions def generate_hashed_password(self, password): # This remains the same, no changes needed return hashlib.sha256(password.encode("utf-8")).hexdigest() # # ------------------------------------------------------------------------------------------------------------- # async def set_channel_password(self, ctx: commands.Context, channel_id, password): """Sets the password for a channel using the config. Args: ctx (commands.Context): The command context to access guild information. channel_id (int): The ID of the channel. password (str): The password to set. """ hashed_password = self.generate_hashed_password(password) async with self.config.guild(ctx.guild).password_channels() as password_channels: password_channels[channel_id] = { "password_hash": hashed_password } # # ------------------------------------------------------------------------------------------------------------- # def generate_temporary_access_code(self): """Generates a temporary access code.""" characters = string.ascii_letters + string.digits code = ''.join(random.choice(characters) for i in range(10)) # 10-character code timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") return f"{code}-{timestamp}" # # ------------------------------------------------------------------------------------------------------------- # async def add_to_whitelist(self, ctx, collection, user_or_role): """Adds a user or role to the whitelist.""" async with self.config.guild(ctx.guild).whitelist() as whitelist: user_or_role_id = user_or_role.id if any(item["id"] == user_or_role_id for item in whitelist): await ctx.send(f"{user_or_role} is already in the whitelist.") return whitelist.append({"id": user_or_role_id, "type": "user" if isinstance(user_or_role, discord.Member) else "role"}) embed = discord.Embed(title="Whitelist Confirmation", description=f"Are you sure you want to add {user_or_role} to the whitelist?") embed.add_field(name="Action", value="Add to Whitelist") embed.set_footer(text="React with ✅ to confirm or ❌ to cancel.") confirmation_msg = await ctx.send(embed=embed) try: reaction, user = await ctx.wait_for('reaction_add', timeout=30.0, check=lambda r, u: r.message.id == confirmation_msg.id and u == ctx.author and str(r.emoji) in ['✅', '❌']) if str(reaction.emoji) == '✅': await confirmation_msg.delete() await ctx.send(f"{user_or_role} added to the whitelist.") else: await confirmation_msg.delete() await ctx.send("Action cancelled.") except asyncio.TimeoutError: await confirmation_msg.delete() await ctx.send("Confirmation timed out. Action cancelled.") # # ------------------------------------------------------------------------------------------------------------- # async def remove_from_whitelist(self, ctx, collection, user_or_role): """Removes a user or role from the whitelist.""" async with self.config.guild(ctx.guild).whitelist() as whitelist: user_or_role_id = user_or_role.id whitelist[:] = [item for item in whitelist if item["id"] != user_or_role_id] await ctx.send(f"{user_or_role} removed from the whitelist.") # # ------------------------------------------------------------------------------------------------------------- # async def add_to_blacklist(self, ctx, user_or_role): """Adds a user or role to the blacklist.""" async with self.config.guild(ctx.guild).blacklist() as blacklist: user_or_role_id = user_or_role.id if any(item["id"] == user_or_role_id for item in blacklist): await ctx.send(f"{user_or_role} is already in the blacklist.") return blacklist.append({"id": user_or_role_id, "type": "user" if isinstance(user_or_role, discord.Member) else "role"}) await ctx.send(f"{user_or_role} added to the blacklist.") # # ------------------------------------------------------------------------------------------------------------- # async def remove_from_blacklist(self, ctx, user_or_role): """Removes a user or role from the blacklist.""" async with self.config.guild(ctx.guild).blacklist() as blacklist: user_or_role_id = user_or_role.id blacklist[:] = [item for item in blacklist if item["id"] != user_or_role_id] await ctx.send(f"{user_or_role} removed from the blacklist.") # # ------------------------------------------------------------------------------------------------------------- # async def cog_load(self): self.bot.add_listener(self.on_cog_reload_error, "Package loading failed") self.bot.add_listener(self.on_cog_reload_error, "SyntaxError:") self.bot.add_listener(self.on_cog_reload_error, "IndentationError:") # # ------------------------------------------------------------------------------------------------------------- # async def on_cog_reload_error(self, ctx, error): error_message = f"Error reloading cog '{ctx.cog.qualified_name}':\n```{error}```" self.error_logs.append(error_message)