refactor/rewrite channel_op.py, split highlight spam protection out

This commit is contained in:
jesopo 2019-08-14 14:38:47 +01:00
parent 4d8ce63efd
commit fa279aab93
2 changed files with 144 additions and 274 deletions

View file

@ -5,17 +5,8 @@
from src import ModuleManager, utils from src import ModuleManager, utils
class UserNotFoundException(Exception): KICK_REASON = "your behavior is not conducive to the desired environment"
pass
class InvalidTimeoutException(Exception):
pass
@utils.export("channelset", utils.IntSetting("highlight-spam-threshold",
"Set the number of nicknames in a message that qualifies as spam"))
@utils.export("channelset", utils.BoolSetting("highlight-spam-protection",
"Enable/Disable highlight spam protection"))
@utils.export("channelset", utils.BoolSetting("highlight-spam-ban",
"Enable/Disable banning highlight spammers instead of just kicking"))
@utils.export("channelset", utils.Setting("ban-format", @utils.export("channelset", utils.Setting("ban-format",
"Set ban format ($n = nick, $u = username, $h = hostname)", "Set ban format ($n = nick, $u = username, $h = hostname)",
example="*!$u@$h")) example="*!$u@$h"))
@ -23,7 +14,60 @@ class InvalidTimeoutException(Exception):
["qmode", "insp", "unreal", "none"], ["qmode", "insp", "unreal", "none"],
"Set this server's method of muting users")) "Set this server's method of muting users"))
class Module(ModuleManager.BaseModule): class Module(ModuleManager.BaseModule):
_name = "ChanOp" def _parse_time(self, args, min_args):
if args[0][0] == "+":
if len(args[1:]) < min_args:
raise utils.EventError("Not enough arguments")
time = utils.from_pretty_time(args[0][1:])
if time == None:
raise utils.EventError("Invalid timeframe")
return time, args[1:]
return None, args
def _kick(self, server, channel, target_nickname, reason):
target_user = server.get_user(target_nickname, create=False)
if target_user and channel.has_user(target_user):
reason = " ".join(reason) or KICK_REASON
channel.send_kick(target_user.nickname, reason)
else:
raise utils.EventError("No such user")
@utils.hook("received.command.kick")
@utils.hook("received.command.k", alias_of="k")
@utils.kwarg("min_args", 1)
@utils.kwarg("require_mode", "o")
@utils.kwarg("require_access", "kick")
@utils.kwarg("usage", "<nickname> [reason]")
def kick(self, event):
self._kick(event["server"], event["target"], event["args_split"][0],
event["args_split"][1:])
def _format_hostmask(self, user, s):
return s.replace("$n", user.nickname).replace("$u", user.username
).replace("$h", user.hostname)
def _get_hostmask(self, channel, user):
format = channel.get_setting("ban-format", "*!$u@$h")
hostmask_split = [
self._format_hostmask(user, s) for s in format.split("$$")]
return "$".join(hostmask_split)
def _ban(self, server, channel, target, allow_hostmask, time, add):
hostmask = None
target_user = server.get_user(target, create=False)
if target_user and channel.has_user(target_user):
hostmask = self._get_hostmask(channel, target_user)
else:
if not allow_hostmask:
raise utils.EventError("No such user")
hostmask = target
if not add:
channel.send_unban(hostmask)
else:
channel.send_ban(hostmask)
if not time == None:
self.timers.add_persistent("unban", time, server_id=server.id,
channel_name=channel.name, hostmask=hostmask)
@utils.hook("timer.unban") @utils.hook("timer.unban")
def _timer_unban(self, event): def _timer_unban(self, event):
@ -32,284 +76,79 @@ class Module(ModuleManager.BaseModule):
channel = server.channels.get(event["channel_name"]) channel = server.channels.get(event["channel_name"])
channel.send_unban(event["hostmask"]) channel.send_unban(event["hostmask"])
def _kick(self, server, channel, nickname, reason): @utils.hook("received.command.ban")
target_user = server.get_user(nickname) @utils.kwarg("min_args", 1)
if channel.has_user(target_user): @utils.kwarg("require_mode", "o")
channel.send_kick(nickname, reason) @utils.kwarg("require_access", "ban")
else: @utils.kwarg("usage", "[+time] <target>")
raise UserNotFoundException("That user is not in this channel")
def _kick_command(self, event, channel, args_split):
target = args_split[0]
reason = " ".join(args_split[1:]) or None
try:
self._kick(event["server"], channel, target, reason)
except UserNotFoundException as e:
event["stderr"].write(str(e))
@utils.hook("received.command.kick", private_only=True, min_args=2)
def private_kick(self, event):
"""
:help: Kick a user from the current channel
:usage: <nickname> [reason]
:prefix: Kick
"""
channel = event["server"].channels.get(event["args_split"][0])
event["check_assert"](utils.Check("channel-access", channel, "kick"))
self._kick_command(event, channel, event["args_split"][1:])
@utils.hook("received.command.k", alias_of="kick")
@utils.hook("received.command.kick", channel_only=True, min_args=1)
def kick(self, event):
"""
:help: Kick a user from the current channel
:usage: <nickname> [reason]
:require_mode: o
:require_access: kick
:prefix: Kick
"""
self._kick_command(event, event["target"], event["args_split"])
def _ban_format(self, user, s):
return s.replace("$n", user.nickname).replace("$u", user.username
).replace("$h", user.hostname)
def _ban_user(self, channel, ban, user):
if channel.has_user(user):
format = channel.get_setting("ban-format", "*!$u@$h")
hostmask_split = format.split("$$")
hostmask_split = [self._ban_format(user, s) for s in hostmask_split]
hostmask = "".join(hostmask_split)
if ban:
channel.send_ban(hostmask)
else:
channel.send_unban(hostmask)
return hostmask
else:
raise UserNotFoundException("That user is not in this channel")
def _ban(self, server, channel, ban, target):
target_user = server.get_user(target)
if channel.has_user(target_user):
return self._ban_user(channel, ban, target_user)
else:
if ban:
channel.send_ban(target)
else:
channel.send_unban(target)
return target
@utils.hook("received.command.ban", private_only=True, min_args=2)
def private_ban(self, event):
"""
:help: Ban a user/hostmask from the current channel
:usage: <channel> <nickname/hostmask>
"""
channel = event["server"].channels.get(event["args_split"][0])
event["check_assert"](utils.Check("channel-access", channel, "ban"))
self._ban(event["server"], channel, True, event["args_split"][1])
@utils.hook("received.command.ban", channel_only=True, min_args=1)
def ban(self, event): def ban(self, event):
""" time, args = self._parse_time(event["args_split"], 1)
:help: Ban a user/hostmask from the current channel self._ban(event["server"], event["target"], args[0], True, time, True)
:usage: <nickname/hostmask>
:require_mode: o
:require_access: ban
"""
self._ban(event["server"], event["target"], True,
event["args_split"][0])
def _temp_ban(self, event, accept_hostmask): @utils.hook("received.command.unban")
timeout = utils.from_pretty_time(event["args_split"][1]) @utils.kwarg("min_args", 1)
if not timeout: @utils.kwarg("require_mode", "o")
raise InvalidTimeoutException( @utils.kwarg("require_access", "ban")
"Please provided a valid time above 0 seconds") @utils.kwarg("usage", "<target>")
if accept_hostmask:
hostmask = self._ban(event["server"], event["target"], True,
event["args_split"][0])
else:
hostmask = self._ban_user(event["target"], True,
event["server"].get_user(event["args_split"][0]))
self.timers.add_persistent("unban", timeout,
server_id=event["server"].id,
channel_name=event["target"].name, hostmask=hostmask)
@utils.hook("received.command.tb", alias_of="tempban")
@utils.hook("received.command.tempban", channel_only=True, min_args=2)
def temp_ban(self, event):
"""
:help: Temporarily ban someone from the current channel
:usage: <nickname/hostmask> <time> [reason]
:require_mode: o
:require_access: ban
:prefix: Tempban
"""
try:
self._temp_ban(event, True)
except InvalidTimeoutException as e:
event["stderr"].write(str(e))
@utils.hook("received.command.tkb", alias_of="tempkickban")
@utils.hook("received.command.tempkickban", channel_only=True,
min_args=2)
def temp_kick_ban(self, event):
"""
:help: Temporarily kick and ban someone from the current channel
:usage: <nickname> <time> [reason]
:require_mode: o
:require_access: kickban
:prefix: TKB
"""
reason = " ".join(event["args_split"][2:]) or None
try:
self._temp_ban(event, False)
self._kick(event["server"], event["target"], event["args_split"][0],
reason)
except InvalidTimeoutException as e:
event["stderr"].write(str(e))
except UserNotFoundException as e:
event["stderr"].write(str(e))
@utils.hook("received.command.unban", private_only=True, min_args=2)
def private_unban(self, event):
"""
:help: Unban a user/hostmask from the current channel
:usage: <channel> <nickname/hostmask>
"""
channel = event["server"].channels.get(event["args_split"][0])
event["check_assert"](utils.Check("channel-access", channel, "ban"))
self._ban(event["server"], channel, False, event["args_split"][1])
@utils.hook("received.command.unban", channel_only=True, min_args=1)
def unban(self, event): def unban(self, event):
""" self._ban(event["server"], event["target"], event["args_split"][0],
:help: Unban a user/hostmask from the current channel True, None, False)
:usage: <nickname/hostmask>
:require_mode: o
:require_access: ban
"""
self._ban(event["server"], event["target"], False,
event["args_split"][0])
@utils.hook("received.command.kickban")
@utils.hook("received.command.kb", alias_of="kickban") @utils.hook("received.command.kb", alias_of="kickban")
@utils.hook("received.command.kickban", channel_only=True, min_args=1) @utils.kwarg("min_args", 1)
@utils.kwarg("require_mode", "o")
@utils.kwarg("require_access", "kickban")
@utils.kwarg("usage", "[+time] <nickname> [reason]")
def kickban(self, event): def kickban(self, event):
""" time, args = self._parse_time(event["args_split"], 1)
:help: Kick and ban a user from the current channel self._ban(event["server"], event["target"], args[0], False, time)
:usage: <nickname> [reason] self._kick(event["server"], event["target"], args[0], args[1:])
:require_mode: o
:require_access: kickban
:prefix: Kickban
"""
target = event["args_split"][0]
reason = " ".join(event["args_split"][1:]) or None
try:
self._ban(event["server"], event["target"], True, target)
self._kick(event["server"], event["target"], target, reason)
except UserNotFoundException as e:
event["stderr"].write(str(e))
@utils.hook("received.command.op", channel_only=True) @utils.hook("received.command.op")
@utils.hook("received.command.deop")
@utils.kwarg("channel_only", True)
@utils.kwarg("require_mode", "o")
@utils.kwarg("require_access", "op")
@utils.kwarg("usage", "[nickname]")
def op(self, event): def op(self, event):
""" add = event.name == "received.command.op"
:help: Op a user in the current channel target = event["args_split"][0] if event["args"] else event[
:usage: [nickname] "user"].nickname
:require_mode: o event["target"].send_mode("+o" if add else "-o", target)
:require_access: op
"""
target = event["user"].nickname if not event["args_split"] else event[
"args_split"][0]
event["target"].send_mode("+o", [target])
@utils.hook("received.command.deop", channel_only=True)
def deop(self, event):
"""
:help: Remove op from a user in the current channel
:usage: [nickname]
:require_mode: o
:require_access: op
"""
target = event["user"].nickname if not event["args_split"] else event[
"args_split"][0]
event["target"].send_mode("-o", [target])
@utils.hook("received.command.voice", channel_only=True) @utils.hook("received.command.voice")
def voice(self, event): @utils.hook("received.command.devoice")
""" @utils.kwarg("channel_only", True)
:help: Voice a user in the current channel @utils.kwarg("require_mode", "o")
:usage: [nickname] @utils.kwarg("require_access", "voice")
:require_mode: o @utils.kwarg("usage", "[nickname]")
:require_access: voice def op(self, event):
""" add = event.name == "received.command.voice"
target = event["user"].nickname if not event["args_split"] else event[ target = event["args_split"][0] if event["args"] else event[
"args_split"][0] "user"].nickname
event["target"].send_mode("+v", [target]) event["target"].send_mode("+v" if add else "-v", target)
@utils.hook("received.command.devoice", channel_only=True)
def devoice(self, event):
"""
:help: Remove voice from a user in the current channel
:usage: [nickname]
:require_mode: o
:require_access: voice
"""
target = event["user"].nickname if not event["args_split"] else event[
"args_split"][0]
event["target"].send_mode("-v", [target])
@utils.hook("received.command.topic", min_args=1, channel_only=True, @utils.hook("received.command.topic")
remove_empty=False) @utils.kwarg("min_args", 1)
@utils.kwarg("channel_only", True)
@utils.kwarg("require_mode", "o")
@utils.kwarg("require_access", "topic")
@utils.kwarg("remove_empty", False)
@utils.kwarg("usage", "<topic>")
def topic(self, event): def topic(self, event):
"""
:help: Set the topic in the current channel
:usage: <topic>
:require_mode: o
:require_access: topic
"""
event["target"].send_topic(event["args"]) event["target"].send_topic(event["args"])
@utils.hook("received.command.tappend", min_args=1, channel_only=True,
remove_empty=False) @utils.hook("received.command.tappend")
@utils.kwarg("min_args", 1)
@utils.kwarg("channel_only", True)
@utils.kwarg("require_mode", "o")
@utils.kwarg("require_access", "topic")
@utils.kwarg("remove_empty", False)
@utils.kwarg("usage", "<topic>")
def tappend(self, event): def tappend(self, event):
"""
:help: Append to the topic in the current channel
:usage: <topic>
:require_mode: o
:require_access: topic
"""
event["target"].send_topic(event["target"].topic + event["args"]) event["target"].send_topic(event["target"].topic + event["args"])
@utils.hook("received.message.channel")
def highlight_spam(self, event):
if event["channel"].get_setting("highlight-spam-protection", False):
nicknames = list(map(lambda user: user.nickname,
event["channel"].users)) + [event["server"].nickname]
highlights = set(nicknames) & set(event["message_split"])
if len(highlights) > 1 and len(highlights) >= event["channel"
].get_setting("highlight-spam-threshold", 10):
has_mode = event["channel"].mode_or_above(event["user"], "v")
should_ban = event["channel"].get_setting("highlight-spam-ban",
False)
if not has_mode:
if should_ban:
event["channel"].send_ban("*!%s@%s" % (
event["user"].username, event["user"].hostname))
event["channel"].send_kick(event["user"].nickname,
"highlight spam detected")
@utils.hook("received.command.leave", channel_only=True)
def leave(self, event):
"""
:help: Part me from the current channel
:require_mode: o
"""
event["target"].send_part()
def _mute_method(self, server, user): def _mute_method(self, server, user):
mask = "*!*@%s" % user.hostname mask = "*!*@%s" % user.hostname
mute_method = server.get_setting("mute-method", "qmode").lower() mute_method = server.get_setting("mute-method", "qmode").lower()

31
modules/highlight_spam.py Normal file
View file

@ -0,0 +1,31 @@
#--depends-on config
from src import ModuleManager, utils
@utils.export("channelset", utils.IntSetting("highlight-spam-threshold",
"Set the number of nicknames in a message that qualifies as spam"))
@utils.export("channelset", utils.BoolSetting("highlight-spam-protection",
"Enable/Disable highlight spam protection"))
@utils.export("channelset", utils.BoolSetting("highlight-spam-ban",
"Enable/Disable banning highlight spammers instead of just kicking"))
class Module(ModuleManager.BaseModule):
@utils.hook("received.message.channel")
def highlight_spam(self, event):
if event["channel"].get_setting("highlight-spam-protection", False):
nicknames = list(map(lambda user: user.nickname,
event["channel"].users))
highlights = set(nicknames) & set(event["message_split"])
print(highlights)
if len(highlights) > 1 and len(highlights) >= event["channel"
].get_setting("highlight-spam-threshold", 10):
has_mode = event["channel"].mode_or_above(event["user"], "v")
should_ban = event["channel"].get_setting("highlight-spam-ban",
False)
if not has_mode:
if should_ban:
event["channel"].send_ban("*!%s@%s" % (
event["user"].username, event["user"].hostname))
event["channel"].send_kick(event["user"].nickname,
"highlight spam detected")