feat: add password protection cog for channels
Adds new Discord bot cog enabling password-protected channels. Includes verification, channel management, whitelist/blacklist, and admin password recovery features.
This commit is contained in:
5
pp/__init__.py
Normal file
5
pp/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
from .passwdprotect import Passwd
|
||||
|
||||
async def setup(bot):
|
||||
cog = Passwd(bot)
|
||||
await bot.add_cog(cog)
|
||||
10
pp/info.json
Normal file
10
pp/info.json
Normal file
@@ -0,0 +1,10 @@
|
||||
{
|
||||
"author": ["kitsunic"],
|
||||
"description": "A cog for password protected channels",
|
||||
"end_user_data_statement": "This cog stores user, avatar, and guild data. A simple delete request to bot to remove your data or guild data",
|
||||
"install_msg": "Thanks for installing & testing.",
|
||||
"min_bot_version": "3.5.0",
|
||||
"short": "A pwd channel protection cog",
|
||||
"tags": ["embed"],
|
||||
"type": "COG"
|
||||
}
|
||||
366
pp/passwdprotect.py
Normal file
366
pp/passwdprotect.py
Normal file
@@ -0,0 +1,366 @@
|
||||
import discord
|
||||
import hashlib
|
||||
import asyncio
|
||||
import random
|
||||
import string
|
||||
import datetime
|
||||
|
||||
|
||||
from redbot.core import commands, checks, Config
|
||||
|
||||
|
||||
class Passwd(commands.Cog):
|
||||
def __init__(self, bot):
|
||||
self.bot = bot
|
||||
self.config = Config.get_conf(self, identifier=50204090) # Unique identifier
|
||||
# Default settings within the config
|
||||
default_guild = {
|
||||
"password_channels": {},
|
||||
"password_attempts": {},
|
||||
"admin_channel": None,
|
||||
"password_recovery_requests": {},
|
||||
"channel_data": {},
|
||||
"whitelist": [],
|
||||
"blacklist": [],
|
||||
}
|
||||
self.config.register_guild(**default_guild)
|
||||
|
||||
#
|
||||
# -------------------------------------------------------------------------------------------------------------
|
||||
#
|
||||
@commands.group(name="passwdprotect", aliases=["pwd"])
|
||||
async def passwdprotect(self, ctx):
|
||||
"""Password management commands"""
|
||||
if not ctx.invoked_subcommand:
|
||||
await ctx.send("Missing subcommand.")
|
||||
#
|
||||
# -------------------------------------------------------------------------------------------------------------
|
||||
#
|
||||
@passwdprotect.command(name="verify", aliases=["ver"])
|
||||
async def verify(self, ctx, password: str):
|
||||
"""Verifies the password for the current channel."""
|
||||
channel = ctx.channel
|
||||
|
||||
async with self.config.guild(ctx.guild).password_channels() as password_channels:
|
||||
if channel.id in password_channels:
|
||||
stored_hash = password_channels[channel.id]["password_hash"]
|
||||
provided_hash = hashlib.sha256(password.encode("utf-8")).hexdigest()
|
||||
|
||||
if stored_hash == provided_hash:
|
||||
# Password verified successfully
|
||||
role_id = password_channels[channel.id]["role_id"]
|
||||
role = ctx.guild.get_role(role_id)
|
||||
if role:
|
||||
await ctx.author.add_roles(role)
|
||||
await ctx.send(f"Password verified! You now have access to {channel.mention}.")
|
||||
else:
|
||||
await ctx.send("Password verified, but the access role seems to be missing. Please contact an admin.")
|
||||
else:
|
||||
await ctx.send("Incorrect password.")
|
||||
else:
|
||||
await ctx.send("This channel is not password protected.")
|
||||
|
||||
@passwdprotect.group(name="channels", aliases=["chs", "--ch"])
|
||||
async def channels(self, ctx):
|
||||
"""Manage protected channels"""
|
||||
if not ctx.invoked_subcommand:
|
||||
await ctx.send("Invalid subcommand.")
|
||||
#
|
||||
# -------------------------------------------------------------------------------------------------------------
|
||||
#
|
||||
@channels.command(name="add", aliases=["+"])
|
||||
@commands.has_permissions(manage_channels=True)
|
||||
async def channels_add(self, ctx, channel: discord.TextChannel, password: str):
|
||||
"""Adds a password to a channel"""
|
||||
hashed_password = self.generate_hashed_password(password)
|
||||
async with self.config.guild(ctx.guild).password_channels() as password_channels:
|
||||
# Check if the channel already has a password
|
||||
if channel.id in password_channels:
|
||||
await ctx.send(f"{channel.mention} already has a password. Use the `remove` command first.")
|
||||
return
|
||||
|
||||
# Create the role
|
||||
role_name = f"{channel.name}_access"
|
||||
role = await ctx.guild.create_role(name=role_name)
|
||||
|
||||
# Grant permissions to the role
|
||||
await channel.set_permissions(role, view_channel=True, send_messages=True)
|
||||
await channel.set_permissions(ctx.guild.default_role, view_channel=False, read_message_history=False) # Restrict access for default role
|
||||
|
||||
# Store password, channel ID, and role ID in the config
|
||||
password_channels[channel.id] = {
|
||||
"password_hash": hashed_password,
|
||||
"role_id": role.id
|
||||
}
|
||||
|
||||
await ctx.send(f"Password set for {channel.mention}")
|
||||
#
|
||||
# -------------------------------------------------------------------------------------------------------------
|
||||
#
|
||||
@channels.command(name="remove", aliases=["-"])
|
||||
@commands.has_permissions(manage_channels=True)
|
||||
async def channels_remove(self, ctx, channel: discord.TextChannel):
|
||||
"""Removes the password from a text channel."""
|
||||
async with self.config.guild(ctx.guild).password_channels() as password_channels:
|
||||
if channel.id in password_channels:
|
||||
# Delete the channel's data from the config
|
||||
del password_channels[channel.id]
|
||||
|
||||
# Delete the associated role (optional)
|
||||
role_id = channel.get("role_id")
|
||||
if role_id:
|
||||
role = ctx.guild.get_role(role_id)
|
||||
if role:
|
||||
await role.delete()
|
||||
|
||||
await ctx.send(f"Password removed from {channel.mention}")
|
||||
else:
|
||||
await ctx.send(f"{channel.mention} does not have a password.")
|
||||
|
||||
@channels.command(name="fails", aliases=["--fail"])
|
||||
@commands.has_permissions(manage_guild=True)
|
||||
async def channels_fails(self, ctx, limit: int = None):
|
||||
"""Sets or checks the maximum failed password attempts."""
|
||||
# ... handle failed attempt limit ...
|
||||
#
|
||||
# -------------------------------------------------------------------------------------------------------------
|
||||
#
|
||||
@channels.command(name="whitelist", aliases=["wlist"])
|
||||
@commands.has_permissions(manage_guild=True)
|
||||
async def channels_white(self, ctx, subcommand: str, user_or_role: discord.Object = None):
|
||||
"""Manages the whitelist."""
|
||||
guild_id = str(ctx.guild.id)
|
||||
password_collection = self.get_password_collection(guild_id)
|
||||
|
||||
if not user_or_role:
|
||||
await ctx.send("Please specify a user or role to add/remove.")
|
||||
return
|
||||
|
||||
if subcommand == "add":
|
||||
await self.add_to_whitelist(password_collection, user_or_role)
|
||||
elif subcommand == "remove":
|
||||
await self.remove_from_whitelist(password_collection, user_or_role)
|
||||
else:
|
||||
await ctx.send("Invalid subcommand. Use `add` or `remove`.")
|
||||
#
|
||||
# -------------------------------------------------------------------------------------------------------------
|
||||
#
|
||||
@channels.command(name="blacklist", aliases=["blist"])
|
||||
@commands.has_permissions(manage_guild=True)
|
||||
async def channels_black(self, ctx, subcommand: str, user_or_role: discord.Object = None):
|
||||
"""Manages the blacklist."""
|
||||
if not user_or_role:
|
||||
await ctx.send("Please specify a user or role to add/remove.")
|
||||
return
|
||||
|
||||
if subcommand == "add":
|
||||
await self.add_to_blacklist(ctx, user_or_role)
|
||||
elif subcommand == "remove":
|
||||
await self.remove_from_blacklist(ctx, user_or_role)
|
||||
else:
|
||||
await ctx.send("Invalid subcommand. Use `add` or `remove`.")
|
||||
#
|
||||
# -------------------------------------------------------------------------------------------------------------
|
||||
#
|
||||
@passwdprotect.group(name="admin", aliases=["adm"])
|
||||
async def admin(self, ctx):
|
||||
"""Admin commands for password protection"""
|
||||
if not ctx.invoked_subcommand:
|
||||
await ctx.send("Missing subcommand.")
|
||||
#
|
||||
# -------------------------------------------------------------------------------------------------------------
|
||||
#
|
||||
@admin.command(name="recover", aliases=["rec"])
|
||||
async def admin_recover(self, ctx):
|
||||
"""Initiates password recovery process."""
|
||||
requests = await self.config.guild(ctx.guild).password_recovery_requests()
|
||||
if not requests:
|
||||
await ctx.send("No pending password recovery requests.")
|
||||
return
|
||||
|
||||
admin_channel_id = await self.config.guild(ctx.guild).admin_channel()
|
||||
if not admin_channel_id:
|
||||
await ctx.send("Admin notification channel not set. Use `[p]passwdprotect admin notify` to set it.")
|
||||
return
|
||||
|
||||
admin_channel = self.bot.get_channel(admin_channel_id)
|
||||
# Add Approve/Deny buttons using View
|
||||
view = ui.View()
|
||||
view.add_item(ui.Button(style=discord.ButtonStyle.green, label="Approve", custom_id=f"approve_{user_id}_{channel.id}"))
|
||||
view.add_item(ui.Button(style=discord.ButtonStyle.red, label="Deny", custom_id=f"deny_{user_id}_{channel.id}"))
|
||||
|
||||
for user_id, request_data in requests.items():
|
||||
user = self.bot.get_user(int(user_id))
|
||||
channel = self.bot.get_channel(request_data["channel_id"])
|
||||
timestamp = request_data["timestamp"]
|
||||
|
||||
# Log the request to the admin channel
|
||||
embed = discord.Embed(title="Password Recovery Request", color=discord.Color.gold())
|
||||
embed.add_field(name="User", value=user.mention if user else f"User ID: {user_id}", inline=False)
|
||||
embed.add_field(name="Server", value=ctx.guild.name, inline=False)
|
||||
embed.add_field(name="Channel", value=channel.mention if channel else f"Channel ID: {request_data['channel_id']}", inline=False)
|
||||
embed.add_field(name="Highest Role", value=user.top_role.mention if user else "Unknown", inline=False)
|
||||
embed.set_footer(text=f"Requested at {timestamp}")
|
||||
|
||||
# Add Approve/Deny buttons
|
||||
view = SimpleView()
|
||||
view.add_item(Button(style=discord.ButtonStyle.green, label="Approve", custom_id=f"approve_{user_id}_{channel.id}"))
|
||||
view.add_item(Button(style=discord.ButtonStyle.red, label="Deny", custom_id=f"deny_{user_id}_{channel.id}"))
|
||||
|
||||
# Ping the role (if set) in the admin channel
|
||||
role_to_ping = await self.config.guild(ctx.guild).get_raw("admin_role") # Assuming you store the role ID in config
|
||||
if role_to_ping:
|
||||
await admin_channel.send(f"<@&{role_to_ping}>", embed=embed, view=view)
|
||||
else:
|
||||
await admin_channel.send(embed=embed, view=view)
|
||||
#
|
||||
# -------------------------------------------------------------------------------------------------------------
|
||||
#
|
||||
@admin.command(name="notify", aliases=["noti"])
|
||||
@commands.has_permissions(manage_guild=True)
|
||||
async def admin_notify(self, ctx, channel: discord.TextChannel = None):
|
||||
"""Sets or removes the admin notification channel."""
|
||||
if channel:
|
||||
# Set the admin notification channel
|
||||
await self.config.guild(ctx.guild).admin_channel.set(channel.id)
|
||||
await ctx.send(f"Admin notification channel set to {channel.mention}")
|
||||
else:
|
||||
# Remove the admin notification channel
|
||||
await self.config.guild(ctx.guild).admin_channel.set(None)
|
||||
await ctx.send("Admin notification channel removed.")
|
||||
#
|
||||
# -------------------------------------------------------------------------------------------------------------
|
||||
#
|
||||
@commands.Cog.listener()
|
||||
async def on_interaction(self, interaction, channel: discord.TextChannel):
|
||||
if interaction.type == discord.InteractionType.component:
|
||||
custom_id = interaction.data["custom_id"]
|
||||
if custom_id.startswith("approve_") or custom_id.startswith("deny_"):
|
||||
action, user_id, channel_id = custom_id.split("_")
|
||||
user_id = int(user_id)
|
||||
channel_id = int(channel_id)
|
||||
|
||||
async with self.config.guild(interaction.guild).password_recovery_requests() as requests:
|
||||
if user_id in requests:
|
||||
del requests[user_id] # Remove the request
|
||||
|
||||
if action == "approve":
|
||||
temp_code = self.generate_temporary_access_code() # Implement this function
|
||||
expiry_time = datetime.datetime.now() + datetime.timedelta(hours=1) # 1-hour validity
|
||||
|
||||
async with self.config.guild(interaction.guild).temp_access_codes() as temp_codes:
|
||||
temp_codes[temp_code] = {"user_id": user_id, "channel_id": channel_id, "expiry": expiry_time.isoformat()}
|
||||
|
||||
user = self.bot.get_user(user_id)
|
||||
if user:
|
||||
await user.send(f"Here's your temporary access code for channel {channel.mention}: `{temp_code}`. It's valid for 1 hour.")
|
||||
elif user:
|
||||
await interaction.response.send_message(f"Approved password recovery for {user.mention if user else user_id} on channel {channel.mention}. Temporary code sent.", ephemeral=True)
|
||||
else:
|
||||
await interaction.response.send_message("Channel data not found. Cannot reset password.", ephemeral=True)
|
||||
#
|
||||
# -------------------------------------------------------------------------------------------------------------
|
||||
#
|
||||
# Helper functions
|
||||
def generate_hashed_password(self, password):
|
||||
# This remains the same, no changes needed
|
||||
return hashlib.sha256(password.encode("utf-8")).hexdigest()
|
||||
#
|
||||
# -------------------------------------------------------------------------------------------------------------
|
||||
#
|
||||
async def set_channel_password(self, ctx: commands.Context, channel_id, password):
|
||||
"""Sets the password for a channel using the config.
|
||||
|
||||
Args:
|
||||
ctx (commands.Context): The command context to access guild information.
|
||||
channel_id (int): The ID of the channel.
|
||||
password (str): The password to set.
|
||||
"""
|
||||
hashed_password = self.generate_hashed_password(password)
|
||||
async with self.config.guild(ctx.guild).password_channels() as password_channels:
|
||||
password_channels[channel_id] = {
|
||||
"password_hash": hashed_password
|
||||
}
|
||||
#
|
||||
# -------------------------------------------------------------------------------------------------------------
|
||||
#
|
||||
def generate_temporary_access_code(self):
|
||||
"""Generates a temporary access code."""
|
||||
characters = string.ascii_letters + string.digits
|
||||
code = ''.join(random.choice(characters) for i in range(10)) # 10-character code
|
||||
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
|
||||
return f"{code}-{timestamp}"
|
||||
#
|
||||
# -------------------------------------------------------------------------------------------------------------
|
||||
#
|
||||
async def add_to_whitelist(self, ctx, collection, user_or_role):
|
||||
"""Adds a user or role to the whitelist."""
|
||||
async with self.config.guild(ctx.guild).whitelist() as whitelist:
|
||||
user_or_role_id = user_or_role.id
|
||||
if any(item["id"] == user_or_role_id for item in whitelist):
|
||||
await ctx.send(f"{user_or_role} is already in the whitelist.")
|
||||
return
|
||||
|
||||
whitelist.append({"id": user_or_role_id, "type": "user" if isinstance(user_or_role, discord.Member) else "role"})
|
||||
|
||||
embed = discord.Embed(title="Whitelist Confirmation", description=f"Are you sure you want to add {user_or_role} to the whitelist?")
|
||||
embed.add_field(name="Action", value="Add to Whitelist")
|
||||
embed.set_footer(text="React with ✅ to confirm or ❌ to cancel.")
|
||||
confirmation_msg = await ctx.send(embed=embed)
|
||||
|
||||
try:
|
||||
reaction, user = await ctx.wait_for('reaction_add', timeout=30.0, check=lambda r, u: r.message.id == confirmation_msg.id and u == ctx.author and str(r.emoji) in ['✅', '❌'])
|
||||
if str(reaction.emoji) == '✅':
|
||||
await confirmation_msg.delete()
|
||||
await ctx.send(f"{user_or_role} added to the whitelist.")
|
||||
else:
|
||||
await confirmation_msg.delete()
|
||||
await ctx.send("Action cancelled.")
|
||||
except asyncio.TimeoutError:
|
||||
await confirmation_msg.delete()
|
||||
await ctx.send("Confirmation timed out. Action cancelled.")
|
||||
#
|
||||
# -------------------------------------------------------------------------------------------------------------
|
||||
#
|
||||
async def remove_from_whitelist(self, ctx, collection, user_or_role):
|
||||
"""Removes a user or role from the whitelist."""
|
||||
async with self.config.guild(ctx.guild).whitelist() as whitelist:
|
||||
user_or_role_id = user_or_role.id
|
||||
whitelist[:] = [item for item in whitelist if item["id"] != user_or_role_id]
|
||||
await ctx.send(f"{user_or_role} removed from the whitelist.")
|
||||
#
|
||||
# -------------------------------------------------------------------------------------------------------------
|
||||
#
|
||||
async def add_to_blacklist(self, ctx, user_or_role):
|
||||
"""Adds a user or role to the blacklist."""
|
||||
async with self.config.guild(ctx.guild).blacklist() as blacklist:
|
||||
user_or_role_id = user_or_role.id
|
||||
if any(item["id"] == user_or_role_id for item in blacklist):
|
||||
await ctx.send(f"{user_or_role} is already in the blacklist.")
|
||||
return
|
||||
|
||||
blacklist.append({"id": user_or_role_id, "type": "user" if isinstance(user_or_role, discord.Member) else "role"})
|
||||
|
||||
await ctx.send(f"{user_or_role} added to the blacklist.")
|
||||
#
|
||||
# -------------------------------------------------------------------------------------------------------------
|
||||
#
|
||||
async def remove_from_blacklist(self, ctx, user_or_role):
|
||||
"""Removes a user or role from the blacklist."""
|
||||
async with self.config.guild(ctx.guild).blacklist() as blacklist:
|
||||
user_or_role_id = user_or_role.id
|
||||
blacklist[:] = [item for item in blacklist if item["id"] != user_or_role_id]
|
||||
await ctx.send(f"{user_or_role} removed from the blacklist.")
|
||||
#
|
||||
# -------------------------------------------------------------------------------------------------------------
|
||||
#
|
||||
async def cog_load(self):
|
||||
self.bot.add_listener(self.on_cog_reload_error, "Package loading failed")
|
||||
self.bot.add_listener(self.on_cog_reload_error, "SyntaxError:")
|
||||
self.bot.add_listener(self.on_cog_reload_error, "IndentationError:")
|
||||
#
|
||||
# -------------------------------------------------------------------------------------------------------------
|
||||
#
|
||||
async def on_cog_reload_error(self, ctx, error):
|
||||
error_message = f"Error reloading cog '{ctx.cog.qualified_name}':\n```{error}```"
|
||||
self.error_logs.append(error_message)
|
||||
Reference in New Issue
Block a user