1 Commits

Author SHA1 Message Date
859ac84b68 feat: add password protection cog for channels
Adds new Discord bot cog enabling password-protected channels.
Includes verification, channel management, whitelist/blacklist,
and admin password recovery features.
2025-09-11 17:22:36 -04:00
3 changed files with 381 additions and 0 deletions

5
pp/__init__.py Normal file
View File

@@ -0,0 +1,5 @@
from .passwdprotect import Passwd
async def setup(bot):
cog = Passwd(bot)
await bot.add_cog(cog)

10
pp/info.json Normal file
View File

@@ -0,0 +1,10 @@
{
"author": ["kitsunic"],
"description": "A cog for password protected channels",
"end_user_data_statement": "This cog stores user, avatar, and guild data. A simple delete request to bot to remove your data or guild data",
"install_msg": "Thanks for installing & testing.",
"min_bot_version": "3.5.0",
"short": "A pwd channel protection cog",
"tags": ["embed"],
"type": "COG"
}

366
pp/passwdprotect.py Normal file
View File

@@ -0,0 +1,366 @@
import discord
import hashlib
import asyncio
import random
import string
import datetime
from redbot.core import commands, checks, Config
class Passwd(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.config = Config.get_conf(self, identifier=50204090) # Unique identifier
# Default settings within the config
default_guild = {
"password_channels": {},
"password_attempts": {},
"admin_channel": None,
"password_recovery_requests": {},
"channel_data": {},
"whitelist": [],
"blacklist": [],
}
self.config.register_guild(**default_guild)
#
# -------------------------------------------------------------------------------------------------------------
#
@commands.group(name="passwdprotect", aliases=["pwd"])
async def passwdprotect(self, ctx):
"""Password management commands"""
if not ctx.invoked_subcommand:
await ctx.send("Missing subcommand.")
#
# -------------------------------------------------------------------------------------------------------------
#
@passwdprotect.command(name="verify", aliases=["ver"])
async def verify(self, ctx, password: str):
"""Verifies the password for the current channel."""
channel = ctx.channel
async with self.config.guild(ctx.guild).password_channels() as password_channels:
if channel.id in password_channels:
stored_hash = password_channels[channel.id]["password_hash"]
provided_hash = hashlib.sha256(password.encode("utf-8")).hexdigest()
if stored_hash == provided_hash:
# Password verified successfully
role_id = password_channels[channel.id]["role_id"]
role = ctx.guild.get_role(role_id)
if role:
await ctx.author.add_roles(role)
await ctx.send(f"Password verified! You now have access to {channel.mention}.")
else:
await ctx.send("Password verified, but the access role seems to be missing. Please contact an admin.")
else:
await ctx.send("Incorrect password.")
else:
await ctx.send("This channel is not password protected.")
@passwdprotect.group(name="channels", aliases=["chs", "--ch"])
async def channels(self, ctx):
"""Manage protected channels"""
if not ctx.invoked_subcommand:
await ctx.send("Invalid subcommand.")
#
# -------------------------------------------------------------------------------------------------------------
#
@channels.command(name="add", aliases=["+"])
@commands.has_permissions(manage_channels=True)
async def channels_add(self, ctx, channel: discord.TextChannel, password: str):
"""Adds a password to a channel"""
hashed_password = self.generate_hashed_password(password)
async with self.config.guild(ctx.guild).password_channels() as password_channels:
# Check if the channel already has a password
if channel.id in password_channels:
await ctx.send(f"{channel.mention} already has a password. Use the `remove` command first.")
return
# Create the role
role_name = f"{channel.name}_access"
role = await ctx.guild.create_role(name=role_name)
# Grant permissions to the role
await channel.set_permissions(role, view_channel=True, send_messages=True)
await channel.set_permissions(ctx.guild.default_role, view_channel=False, read_message_history=False) # Restrict access for default role
# Store password, channel ID, and role ID in the config
password_channels[channel.id] = {
"password_hash": hashed_password,
"role_id": role.id
}
await ctx.send(f"Password set for {channel.mention}")
#
# -------------------------------------------------------------------------------------------------------------
#
@channels.command(name="remove", aliases=["-"])
@commands.has_permissions(manage_channels=True)
async def channels_remove(self, ctx, channel: discord.TextChannel):
"""Removes the password from a text channel."""
async with self.config.guild(ctx.guild).password_channels() as password_channels:
if channel.id in password_channels:
# Delete the channel's data from the config
del password_channels[channel.id]
# Delete the associated role (optional)
role_id = channel.get("role_id")
if role_id:
role = ctx.guild.get_role(role_id)
if role:
await role.delete()
await ctx.send(f"Password removed from {channel.mention}")
else:
await ctx.send(f"{channel.mention} does not have a password.")
@channels.command(name="fails", aliases=["--fail"])
@commands.has_permissions(manage_guild=True)
async def channels_fails(self, ctx, limit: int = None):
"""Sets or checks the maximum failed password attempts."""
# ... handle failed attempt limit ...
#
# -------------------------------------------------------------------------------------------------------------
#
@channels.command(name="whitelist", aliases=["wlist"])
@commands.has_permissions(manage_guild=True)
async def channels_white(self, ctx, subcommand: str, user_or_role: discord.Object = None):
"""Manages the whitelist."""
guild_id = str(ctx.guild.id)
password_collection = self.get_password_collection(guild_id)
if not user_or_role:
await ctx.send("Please specify a user or role to add/remove.")
return
if subcommand == "add":
await self.add_to_whitelist(password_collection, user_or_role)
elif subcommand == "remove":
await self.remove_from_whitelist(password_collection, user_or_role)
else:
await ctx.send("Invalid subcommand. Use `add` or `remove`.")
#
# -------------------------------------------------------------------------------------------------------------
#
@channels.command(name="blacklist", aliases=["blist"])
@commands.has_permissions(manage_guild=True)
async def channels_black(self, ctx, subcommand: str, user_or_role: discord.Object = None):
"""Manages the blacklist."""
if not user_or_role:
await ctx.send("Please specify a user or role to add/remove.")
return
if subcommand == "add":
await self.add_to_blacklist(ctx, user_or_role)
elif subcommand == "remove":
await self.remove_from_blacklist(ctx, user_or_role)
else:
await ctx.send("Invalid subcommand. Use `add` or `remove`.")
#
# -------------------------------------------------------------------------------------------------------------
#
@passwdprotect.group(name="admin", aliases=["adm"])
async def admin(self, ctx):
"""Admin commands for password protection"""
if not ctx.invoked_subcommand:
await ctx.send("Missing subcommand.")
#
# -------------------------------------------------------------------------------------------------------------
#
@admin.command(name="recover", aliases=["rec"])
async def admin_recover(self, ctx):
"""Initiates password recovery process."""
requests = await self.config.guild(ctx.guild).password_recovery_requests()
if not requests:
await ctx.send("No pending password recovery requests.")
return
admin_channel_id = await self.config.guild(ctx.guild).admin_channel()
if not admin_channel_id:
await ctx.send("Admin notification channel not set. Use `[p]passwdprotect admin notify` to set it.")
return
admin_channel = self.bot.get_channel(admin_channel_id)
# Add Approve/Deny buttons using View
view = ui.View()
view.add_item(ui.Button(style=discord.ButtonStyle.green, label="Approve", custom_id=f"approve_{user_id}_{channel.id}"))
view.add_item(ui.Button(style=discord.ButtonStyle.red, label="Deny", custom_id=f"deny_{user_id}_{channel.id}"))
for user_id, request_data in requests.items():
user = self.bot.get_user(int(user_id))
channel = self.bot.get_channel(request_data["channel_id"])
timestamp = request_data["timestamp"]
# Log the request to the admin channel
embed = discord.Embed(title="Password Recovery Request", color=discord.Color.gold())
embed.add_field(name="User", value=user.mention if user else f"User ID: {user_id}", inline=False)
embed.add_field(name="Server", value=ctx.guild.name, inline=False)
embed.add_field(name="Channel", value=channel.mention if channel else f"Channel ID: {request_data['channel_id']}", inline=False)
embed.add_field(name="Highest Role", value=user.top_role.mention if user else "Unknown", inline=False)
embed.set_footer(text=f"Requested at {timestamp}")
# Add Approve/Deny buttons
view = SimpleView()
view.add_item(Button(style=discord.ButtonStyle.green, label="Approve", custom_id=f"approve_{user_id}_{channel.id}"))
view.add_item(Button(style=discord.ButtonStyle.red, label="Deny", custom_id=f"deny_{user_id}_{channel.id}"))
# Ping the role (if set) in the admin channel
role_to_ping = await self.config.guild(ctx.guild).get_raw("admin_role") # Assuming you store the role ID in config
if role_to_ping:
await admin_channel.send(f"<@&{role_to_ping}>", embed=embed, view=view)
else:
await admin_channel.send(embed=embed, view=view)
#
# -------------------------------------------------------------------------------------------------------------
#
@admin.command(name="notify", aliases=["noti"])
@commands.has_permissions(manage_guild=True)
async def admin_notify(self, ctx, channel: discord.TextChannel = None):
"""Sets or removes the admin notification channel."""
if channel:
# Set the admin notification channel
await self.config.guild(ctx.guild).admin_channel.set(channel.id)
await ctx.send(f"Admin notification channel set to {channel.mention}")
else:
# Remove the admin notification channel
await self.config.guild(ctx.guild).admin_channel.set(None)
await ctx.send("Admin notification channel removed.")
#
# -------------------------------------------------------------------------------------------------------------
#
@commands.Cog.listener()
async def on_interaction(self, interaction, channel: discord.TextChannel):
if interaction.type == discord.InteractionType.component:
custom_id = interaction.data["custom_id"]
if custom_id.startswith("approve_") or custom_id.startswith("deny_"):
action, user_id, channel_id = custom_id.split("_")
user_id = int(user_id)
channel_id = int(channel_id)
async with self.config.guild(interaction.guild).password_recovery_requests() as requests:
if user_id in requests:
del requests[user_id] # Remove the request
if action == "approve":
temp_code = self.generate_temporary_access_code() # Implement this function
expiry_time = datetime.datetime.now() + datetime.timedelta(hours=1) # 1-hour validity
async with self.config.guild(interaction.guild).temp_access_codes() as temp_codes:
temp_codes[temp_code] = {"user_id": user_id, "channel_id": channel_id, "expiry": expiry_time.isoformat()}
user = self.bot.get_user(user_id)
if user:
await user.send(f"Here's your temporary access code for channel {channel.mention}: `{temp_code}`. It's valid for 1 hour.")
elif user:
await interaction.response.send_message(f"Approved password recovery for {user.mention if user else user_id} on channel {channel.mention}. Temporary code sent.", ephemeral=True)
else:
await interaction.response.send_message("Channel data not found. Cannot reset password.", ephemeral=True)
#
# -------------------------------------------------------------------------------------------------------------
#
# Helper functions
def generate_hashed_password(self, password):
# This remains the same, no changes needed
return hashlib.sha256(password.encode("utf-8")).hexdigest()
#
# -------------------------------------------------------------------------------------------------------------
#
async def set_channel_password(self, ctx: commands.Context, channel_id, password):
"""Sets the password for a channel using the config.
Args:
ctx (commands.Context): The command context to access guild information.
channel_id (int): The ID of the channel.
password (str): The password to set.
"""
hashed_password = self.generate_hashed_password(password)
async with self.config.guild(ctx.guild).password_channels() as password_channels:
password_channels[channel_id] = {
"password_hash": hashed_password
}
#
# -------------------------------------------------------------------------------------------------------------
#
def generate_temporary_access_code(self):
"""Generates a temporary access code."""
characters = string.ascii_letters + string.digits
code = ''.join(random.choice(characters) for i in range(10)) # 10-character code
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
return f"{code}-{timestamp}"
#
# -------------------------------------------------------------------------------------------------------------
#
async def add_to_whitelist(self, ctx, collection, user_or_role):
"""Adds a user or role to the whitelist."""
async with self.config.guild(ctx.guild).whitelist() as whitelist:
user_or_role_id = user_or_role.id
if any(item["id"] == user_or_role_id for item in whitelist):
await ctx.send(f"{user_or_role} is already in the whitelist.")
return
whitelist.append({"id": user_or_role_id, "type": "user" if isinstance(user_or_role, discord.Member) else "role"})
embed = discord.Embed(title="Whitelist Confirmation", description=f"Are you sure you want to add {user_or_role} to the whitelist?")
embed.add_field(name="Action", value="Add to Whitelist")
embed.set_footer(text="React with ✅ to confirm or ❌ to cancel.")
confirmation_msg = await ctx.send(embed=embed)
try:
reaction, user = await ctx.wait_for('reaction_add', timeout=30.0, check=lambda r, u: r.message.id == confirmation_msg.id and u == ctx.author and str(r.emoji) in ['', ''])
if str(reaction.emoji) == '':
await confirmation_msg.delete()
await ctx.send(f"{user_or_role} added to the whitelist.")
else:
await confirmation_msg.delete()
await ctx.send("Action cancelled.")
except asyncio.TimeoutError:
await confirmation_msg.delete()
await ctx.send("Confirmation timed out. Action cancelled.")
#
# -------------------------------------------------------------------------------------------------------------
#
async def remove_from_whitelist(self, ctx, collection, user_or_role):
"""Removes a user or role from the whitelist."""
async with self.config.guild(ctx.guild).whitelist() as whitelist:
user_or_role_id = user_or_role.id
whitelist[:] = [item for item in whitelist if item["id"] != user_or_role_id]
await ctx.send(f"{user_or_role} removed from the whitelist.")
#
# -------------------------------------------------------------------------------------------------------------
#
async def add_to_blacklist(self, ctx, user_or_role):
"""Adds a user or role to the blacklist."""
async with self.config.guild(ctx.guild).blacklist() as blacklist:
user_or_role_id = user_or_role.id
if any(item["id"] == user_or_role_id for item in blacklist):
await ctx.send(f"{user_or_role} is already in the blacklist.")
return
blacklist.append({"id": user_or_role_id, "type": "user" if isinstance(user_or_role, discord.Member) else "role"})
await ctx.send(f"{user_or_role} added to the blacklist.")
#
# -------------------------------------------------------------------------------------------------------------
#
async def remove_from_blacklist(self, ctx, user_or_role):
"""Removes a user or role from the blacklist."""
async with self.config.guild(ctx.guild).blacklist() as blacklist:
user_or_role_id = user_or_role.id
blacklist[:] = [item for item in blacklist if item["id"] != user_or_role_id]
await ctx.send(f"{user_or_role} removed from the blacklist.")
#
# -------------------------------------------------------------------------------------------------------------
#
async def cog_load(self):
self.bot.add_listener(self.on_cog_reload_error, "Package loading failed")
self.bot.add_listener(self.on_cog_reload_error, "SyntaxError:")
self.bot.add_listener(self.on_cog_reload_error, "IndentationError:")
#
# -------------------------------------------------------------------------------------------------------------
#
async def on_cog_reload_error(self, ctx, error):
error_message = f"Error reloading cog '{ctx.cog.qualified_name}':\n```{error}```"
self.error_logs.append(error_message)