Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emergency Staff Alert #48

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions cogs/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,18 @@ async def on_command_error(self, ctx, error):
elif isinstance(error, commands.MissingRequiredArgument):
await ctx.send_help(ctx.command)
elif isinstance(error, commands.CommandOnCooldown):
await ctx.send(
f"You're on cooldown! Try again in **{time.human_timedelta(timedelta(seconds=error.retry_after))}**.",
ephemeral=True,
)
retry_after = lambda seconds: time.human_timedelta(timedelta(seconds=seconds))
name = error.type.name
if name == "user":
await ctx.send(
f"You're on cooldown! Try again in **{retry_after(error.retry_after)}**.",
ephemeral=True,
)
else:
number = error.cooldown.rate
per_text = f"per {name}" if name != 'default' else 'globally'
fmt = f"**{number} time{'s' if number > 1 else ''} {per_text}** every **{retry_after(error.cooldown.per)}**"
await ctx.send(f"This command can only be used {fmt}! Try again in **{retry_after(error.retry_after)}**.", ephemeral=True)
elif isinstance(error, commands.CheckFailure):
await ctx.send(error, ephemeral=True)
elif isinstance(error, commands.UserInputError):
Expand Down
245 changes: 242 additions & 3 deletions cogs/moderation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from contextlib import suppress
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
import textwrap
from typing import Optional, Union

import discord
Expand All @@ -12,6 +13,7 @@
from discord.ui import button

from helpers import checks, constants, time
from helpers.context import GuiduckContext
from helpers.pagination import AsyncEmbedFieldsPageSource
from helpers.utils import FakeUser, FetchUserConverter, with_attachment_urls

Expand Down Expand Up @@ -327,14 +329,75 @@ async def execute(self, ctx):
await super().execute(ctx)


class EmergencyAlertBan(Action):
type = "emergency_alert_ban"
past_tense = "permanently banned from emergency staff alerts"
emoji = "\N{BELL WITH CANCELLATION STROKE}"
color = discord.Color.magenta()

async def execute(self, ctx):
await ctx.bot.mongo.db.member.update_one(
{"_id": {"id": self.target.id, "guild_id": ctx.guild.id}},
{"$set": {"emergency_alert_banned": True}, "$unset": {"emergency_alert_banned_until": 1}},
upsert=True,
)
await super().execute(ctx)


class EmergencyAlertTempBan(Action):
type = "emergency_alert_temporary_ban"
past_tense = "banned from emergency staff alerts"
emoji = "\N{BELL WITH CANCELLATION STROKE}"
color = discord.Color.magenta()

async def execute(self, ctx):
await ctx.bot.mongo.db.member.update_one(
{"_id": {"id": self.target.id, "guild_id": ctx.guild.id}},
{"$set": {"emergency_alert_banned_until": self.expires_at}, "$unset": {"emergency_alert_banned": 1}},
upsert=True,
)
await super().execute(ctx)


class EmergencyAlertUnban(Action):
type = "emergency_alert_unban"
past_tense = "removed from emergency staff alert ban"
emoji = "\N{BELL}"
color = discord.Color.green()

async def execute(self, ctx):
await ctx.bot.mongo.db.member.update_one(
{"_id": {"id": self.target.id, "guild_id": ctx.guild.id}},
{"$unset": {"emergency_alert_banned": 1, "emergency_alert_banned_until": 1}},
upsert=True,
)
await super().execute(ctx)


@dataclass
class FakeContext:
bot: commands.Bot
guild: discord.Guild


cls_dict = {
x.type: x for x in (Kick, Ban, Unban, Warn, Note, Timeout, Untimeout, Mute, Unmute, TradingMute, TradingUnmute)
x.type: x
for x in (
Kick,
Ban,
Unban,
Warn,
Note,
Timeout,
Untimeout,
Mute,
Unmute,
TradingMute,
TradingUnmute,
EmergencyAlertBan,
EmergencyAlertTempBan,
EmergencyAlertUnban,
)
}


Expand Down Expand Up @@ -365,6 +428,40 @@ async def convert(self, ctx, arg):
raise commands.MemberNotFound(arg)


EMERGENCY_COOLDOWN_HOURS = 1
EMERGENCY_ROLE_NAME = "Emergency Staff"


class EmergencyView(discord.ui.View):
def __init__(self, ctx: GuiduckContext):
super().__init__(timeout=None)
self.ctx = ctx
self.message: discord.Message

@discord.ui.button(label="Resolve", style=discord.ButtonStyle.green)
async def resolve(self, interaction: discord.Interaction, button: discord.Button):
await interaction.response.defer()
button.label = "Resolved"
button.disabled = True

embed = self.message.embeds[0]
embed.color = discord.Color.green()
embed.set_footer(text=f"Resolved by @{interaction.user} ({interaction.user.id})")
await self.message.edit(embed=embed, view=self)

async def interaction_check(self, interaction: discord.Interaction):
user = interaction.user
checks = any(role.id in constants.TRIAL_MODERATOR_ROLES for role in getattr(user, "roles", [])) or user.id in {
self.ctx.bot.owner_id,
self.ctx.author.id,
*self.ctx.bot.owner_ids,
}
if not checks:
await interaction.response.send_message("You can't use this!", ephemeral=True)
return False
return True


class Moderation(commands.Cog):
"""For moderation."""

Expand Down Expand Up @@ -492,6 +589,142 @@ async def on_member_kick(self, target, entry):
)
await self.save_action(action)

@commands.hybrid_group(
aliases=("emergency-staff", "alert", "alert-staff"),
help=textwrap.dedent(
f"""
Emergency command to alert staff members with the *{EMERGENCY_ROLE_NAME}* role.

Do no abuse. Meant for use during emergencies that need immediate staff attention.
"""
),
cooldown_after_parsing=True,
fallback="send",
invoke_without_subcommand=True,
)
@commands.cooldown(1, EMERGENCY_COOLDOWN_HOURS * 60 * 60, commands.BucketType.guild) # Cooldown per guild
@checks.is_not_emergency_alert_banned()
@commands.guild_only()
async def emergency(self, ctx: GuiduckContext, *, reason: str):
WitherredAway marked this conversation as resolved.
Show resolved Hide resolved
"""Emergency command to alert staff members with the role."""

role = discord.utils.get(ctx.guild.roles, name=EMERGENCY_ROLE_NAME)
if role is None:
ctx.command.reset_cooldown(ctx)
return await ctx.send(
f"No role name *{EMERGENCY_ROLE_NAME}* found in this server. Please ask an Administrator to create one.",
ephemeral=True,
)

number_staff = len(role.members)
confirm_embed = discord.Embed(
color=discord.Color.red(),
title="🚨 Emergency Staff Alert",
description=(
f"This command is designed for use in case of emergencies actively happening in our server(s) that need"
f" immediate staff attention. This will ping **{number_staff}** staff member{'' if number_staff == 1 else 's'}"
f" currently assigned to the {role.mention} role, and you will be assisted shortly."
),
).add_field(
name="Are you sure that you want to send an Emergency Staff Alert for the following reason?",
value=reason,
inline=False,
)

if not await ctx.confirm(timeout=120, embeds=[confirm_embed, constants.EMERGENCY_RULES_EMBED], ephemeral=True):
ctx.command.reset_cooldown(ctx)
return await ctx.send("Aborted.", ephemeral=True)

alert_embed = discord.Embed(color=discord.Color.red(), title="🚨 Emergency Staff Alert Issued", description="")
alert_embed.set_author(
name=f"{ctx.author} ({ctx.author.id})",
icon_url=ctx.author.display_avatar,
)
alert_embed.add_field(name="Reason", value=reason, inline=False)
view = EmergencyView(ctx)
view.add_item(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't this button be part of the class?

Copy link
Contributor Author

@WitherredAway WitherredAway Jan 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To my knowledge the only ways are to either 1) clear_items() in the __init__ and then add the buttons manually or 2) Have a separate button subclass for the resolve button and then add them in the __init__ manually. This way felt easier to me but definitely can do one of those two (or any other way i may be unaware of), what do you think?

discord.ui.Button(
label="Logs",
url=f"https://admin.poketwo.net/logs/{ctx.guild.id}/{ctx.channel.id}?before={ctx.message.id+1}",
)
)
view.message = await ctx.reply(
role.mention,
embed=alert_embed,
view=view,
mention_author=False,
allowed_mentions=discord.AllowedMentions(roles=[role]),
)

@emergency.error
async def emergency_error(self, ctx: GuiduckContext, error):
if isinstance(error, commands.CommandOnCooldown):
await ctx.send(
f"An Emergency Staff Alert has already been issued recently and is currently on cooldown. Please use `?report` instead if necessary.",
ephemeral=True,
)

@emergency.command(name="ban", usage="<target> [expires_at] [reason]")
@checks.is_trial_moderator()
@commands.guild_only()
async def emergency_ban(self, ctx: GuiduckContext, target: discord.Member, *, time_and_reason):
"""Temporarily or permanently bans a member from using the Emergency Staff Alert command.

You must have the Trial Moderator role to use this.
"""

if any(role.id in constants.TRIAL_MODERATOR_ROLES for role in getattr(target, "roles", [])):
return await ctx.send("You can't punish that person!", ephemeral=True)

expires_at, reason = await self.parse_time_and_reason(ctx, time_and_reason)

permanent_ban = expires_at is None
action_cls = EmergencyAlertBan if permanent_ban else EmergencyAlertTempBan
action = action_cls(
target=target,
user=ctx.author,
reason=reason,
guild_id=ctx.guild.id,
created_at=ctx.message.created_at,
expires_at=expires_at,
)
await action.execute(ctx)
await action.notify()

if permanent_ban:
return await ctx.send(
f"Permanently banned **{target}** from issuing emergency staff alerts (Case #{action._id}).",
ephemeral=True,
)
else:
return await ctx.send(
f"Banned **{target}** from issuing emergency staff alerts for **{time.human_timedelta(action.duration)}** (Case #{action._id}).",
ephemeral=True,
)

@emergency.command(name="unban", usage="<target> [reason]")
@checks.is_trial_moderator()
@commands.guild_only()
async def emergency_unban(self, ctx: GuiduckContext, target: discord.Member, *, reason=None):
"""Unbans a member who has been banned from using the Emergency Staff Alert command.

You must have the Trial Moderator role to use this.
"""

action = EmergencyAlertUnban(
target=target,
user=ctx.author,
reason=reason,
guild_id=ctx.guild.id,
)
await action.execute(ctx)
await action.notify()

await ctx.send(
f"Unbanned **{target}** from issuing emergency staff alerts (Case #{action._id}).",
ephemeral=True,
)

async def run_purge(self, ctx, limit, check):
class ConfirmPurgeView(discord.ui.View):
@button(label=f"Purge up to {limit} messages", style=discord.ButtonStyle.danger)
Expand Down Expand Up @@ -626,7 +859,9 @@ async def note(self, ctx, target: Union[discord.Member, discord.User], *, note:
if len(note) == 0:
return await ctx.send_help(ctx.command)
elif len(note) > constants.EMBED_FIELD_CHAR_LIMIT:
return await ctx.send(f"History notes (including attachment URLs) can be at most {constants.EMBED_FIELD_CHAR_LIMIT} characters.")
return await ctx.send(
f"History notes (including attachment URLs) can be at most {constants.EMBED_FIELD_CHAR_LIMIT} characters."
)

action = Note(
target=target,
Expand Down Expand Up @@ -881,6 +1116,8 @@ async def reverse_raw_action(self, raw_action):
action_type = SymbolicUntimeout
elif action.type == "trading_mute":
action_type = TradingUnmute
elif action.type in ("emergency_alert_ban", "emergency_alert_temporary_ban"):
action_type = EmergencyAlertUnban
else:
return

Expand Down Expand Up @@ -975,7 +1212,9 @@ async def history_note(self, ctx, id: int, *, note: Optional[str] = ""):
if len(note) == 0:
return await ctx.send_help(ctx.command)
elif len(note) > constants.EMBED_FIELD_CHAR_LIMIT:
return await ctx.send(f"History notes (including attachment URLs) can be at most {constants.EMBED_FIELD_CHAR_LIMIT} characters.")
return await ctx.send(
f"History notes (including attachment URLs) can be at most {constants.EMBED_FIELD_CHAR_LIMIT} characters."
)

reset = note.lower() == "reset"

Expand Down
1 change: 1 addition & 0 deletions cogs/role_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
813433839471820810: 930346845547409439,
732712709514199110: 930346847443255306,
794438698241884200: 930346848529547314,
1193628150899425390: 1193629193641136218
}

ROLE_MAPPING_SUPPORT_TO_COMMUNITY = {v: k for k, v in ROLE_MAPPING_COMMUNITY_TO_SUPPORT.items()}
Expand Down
31 changes: 31 additions & 0 deletions helpers/checks.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
from datetime import datetime, timedelta, timezone
from discord.ext import commands

from . import constants
from . import time


class NotInGuild(commands.CheckFailure):
pass


class EmergencyAlertBanned(commands.CheckFailure):
pass


def is_community_manager():
return commands.check_any(commands.is_owner(), commands.has_any_role(*constants.COMMUNITY_MANAGER_ROLES))

Expand Down Expand Up @@ -44,3 +50,28 @@ async def predicate(ctx):
return user.get("level", 0) >= level

return commands.check(predicate)


def is_not_emergency_alert_banned():
async def predicate(ctx):
member = await ctx.bot.mongo.db.member.find_one({"_id": {"id": ctx.author.id, "guild_id": ctx.guild.id}})
now = datetime.now(timezone.utc)

permanently_banned = member.get("emergency_alert_banned")
if permanently_banned:
ctx.command.reset_cooldown(ctx)
raise EmergencyAlertBanned(
"You've been permanently banned from issuing emergency staff alerts due to violation(s) of its rules. If you think that this was a mistake, please contact a staff member."
)

temp_banned_until = member.get("emergency_alert_banned_until")
temp_banned = temp_banned_until is not None
if temp_banned and temp_banned_until > now:
ctx.command.reset_cooldown(ctx)
duration_seconds = (temp_banned_until - now).total_seconds()
raise EmergencyAlertBanned(
f"You've been banned from issuing emergency staff alerts for **{time.human_timedelta(timedelta(seconds=duration_seconds))}** due to violation(s) of its rules."
)
return True

return commands.check(predicate)
Loading