Refactor modules/channel_op.py and add !tempban/!tempkickban
This commit is contained in:
parent
c415cbfd80
commit
16dc2c39a9
2 changed files with 105 additions and 35 deletions
|
@ -1,5 +1,10 @@
|
|||
from src import ModuleManager, Utils
|
||||
|
||||
class UserNotFoundException(Exception):
|
||||
pass
|
||||
class InvalidTimeoutException(Exception):
|
||||
pass
|
||||
|
||||
@Utils.export("channelset", {"setting": "highlight-spam-threshold",
|
||||
"help": "Set the number of nicknames in a message that qualifies as spam",
|
||||
"validate": Utils.int_or_none})
|
||||
|
@ -14,6 +19,20 @@ from src import ModuleManager, Utils
|
|||
class Module(ModuleManager.BaseModule):
|
||||
_name = "Channel Op"
|
||||
|
||||
@Utils.hook("timer.unban")
|
||||
def _timer_unban(self, event):
|
||||
server = self.bot.get_server(event["server_id"])
|
||||
if server.has_channel(event["channel_name"]):
|
||||
channel = server.get_channel(event["channel_name"])
|
||||
channel.send_unban(event["hostmask"])
|
||||
|
||||
def _kick(self, server, channel, nickname, reason):
|
||||
target_user = server.get_user(nickname)
|
||||
if channel.has_user(target_user):
|
||||
channel.send_kick(nickname, reason)
|
||||
else:
|
||||
raise UserNotFoundException("That user is not in this channel")
|
||||
|
||||
@Utils.hook("received.command.kick|k", channel_only=True,
|
||||
require_mode="o", usage="<nickname> [reason]", min_args=1)
|
||||
def kick(self, event):
|
||||
|
@ -21,32 +40,41 @@ class Module(ModuleManager.BaseModule):
|
|||
Kick a user from the current channel
|
||||
"""
|
||||
target = event["args_split"][0]
|
||||
target_user = event["server"].get_user(target)
|
||||
if event["args_split"][1:]:
|
||||
reason = " ".join(event["args_split"][1:])
|
||||
else:
|
||||
reason = None
|
||||
event["stderr"].set_prefix("Kick")
|
||||
if event["target"].has_user(target_user):
|
||||
if not event["server"].is_own_nickname(target):
|
||||
event["target"].send_kick(target, reason)
|
||||
else:
|
||||
event["stderr"].write("Nope.")
|
||||
else:
|
||||
event["stderr"].write("That user is not in this channel")
|
||||
reason = " ".join(event["args_split"][1:]) or None
|
||||
|
||||
try:
|
||||
self._kick(event["server"], event["target"], target, reason)
|
||||
except UserNotFoundException:
|
||||
event["stderr"].set_prefix("Kick")
|
||||
event["stderr"].write(str(e))
|
||||
|
||||
def _ban_format(self, user, s):
|
||||
return s.replace("$n", user.nickname).replace("$u", user.username
|
||||
).replace("$h", user.hostname)
|
||||
def _ban(self, channel, ban, user):
|
||||
format = channel.get_setting("ban-format", "*!$u@$h")
|
||||
hostmask_split = format.split("$$")
|
||||
hostmask_split = [self._ban_format(user, s) for s in hostmask_split]
|
||||
hostmask = "".join(hostmask_split)
|
||||
if ban:
|
||||
channel.send_ban(hostmask)
|
||||
def _ban_user(self, channel, ban, user):
|
||||
if channel.has_user(user):
|
||||
format = channel.get_setting("ban-format", "*!$u@$h")
|
||||
hostmask_split = format.split("$$")
|
||||
hostmask_split = [self._ban_format(user, s) for s in hostmask_split]
|
||||
hostmask = "".join(hostmask_split)
|
||||
if ban:
|
||||
channel.send_ban(hostmask)
|
||||
else:
|
||||
channel.send_unban(hostmask)
|
||||
return hostmask
|
||||
else:
|
||||
channel.send_unban(hostmask)
|
||||
raise UserNotFoundException("That user is not in this channel")
|
||||
|
||||
def _ban(self, server, channel, ban, target):
|
||||
target_user = server.get_user(target)
|
||||
if channel.has_user(target_user):
|
||||
return this._ban_user(channel, ban, target_user)
|
||||
else:
|
||||
if ban:
|
||||
event["target"].send_ban(target)
|
||||
else:
|
||||
event["target"].send_unban(target)
|
||||
return target
|
||||
|
||||
@Utils.hook("received.command.ban", channel_only=True, min_args=1,
|
||||
require_mode="o", usage="<nickname/hostmask>")
|
||||
|
@ -54,22 +82,55 @@ class Module(ModuleManager.BaseModule):
|
|||
"""
|
||||
Ban a user/hostmask from the current channel
|
||||
"""
|
||||
target_user = event["server"].get_user(event["args_split"][0])
|
||||
if event["target"].has_user(target_user):
|
||||
self._ban(event["target"], True, target_user)
|
||||
self._ban(event["server"], event["target"], True,
|
||||
event["args_split"][0])
|
||||
|
||||
def _temp_ban(self, event, accept_hostmask):
|
||||
timeout = Utils.from_pretty_time(event["args_split"][1])
|
||||
if not timeout:
|
||||
raise InvalidTimeoutException(
|
||||
"Please provided a valid time above 0 seconds")
|
||||
|
||||
if accept_hostmask:
|
||||
hostmask = self._ban(event["server"], event["target"], True,
|
||||
event["args_split"][0])
|
||||
else:
|
||||
event["target"].send_ban(event["args_split"][0])
|
||||
hostmask = self._ban_user(event["target"], True,
|
||||
event["server"].get_user(event["args_split"][0]))
|
||||
|
||||
self.bot.timers.add_persistent("unban", timeout,
|
||||
server_id=event["server"].id,
|
||||
channel_name=event["target"].name, hostmask=hostmask)
|
||||
@Utils.hook("received.command.tempban|tban|tb", channel_only=True,
|
||||
min_args=2, require_mode="o", usage="<nickname/hostmask>")
|
||||
def temp_ban(self, event):
|
||||
try:
|
||||
self._temp_ban(event, True)
|
||||
except InvalidTimeoutException as e:
|
||||
event["stderr"].set_prefix("Tempban")
|
||||
event["stderr"].write(str(e))
|
||||
@Utils.hook("received.command.tempkickban|tkban|tkb", channel_only=True,
|
||||
min_args=2, require_mode="o", usage="<nickname/hostmask>")
|
||||
def temp_kick_ban(self, event):
|
||||
reason = " ".join(event["args_split"][2:]) or None
|
||||
event["stderr"].set_prefix("TKB")
|
||||
try:
|
||||
self._temp_ban(event, False)
|
||||
self._kick(event["server"], event["target"], event["args_split"][0],
|
||||
reason)
|
||||
except InvalidTimeoutException as e:
|
||||
event["stderr"].write(str(e))
|
||||
except UserNotFoundException as e:
|
||||
event["stderr"].write(str(e))
|
||||
|
||||
@Utils.hook("received.command.unban", channel_only=True, min_args=1,
|
||||
require_mode="o", usage="<nickname/hostmask>")
|
||||
def unban(self, event):
|
||||
"""
|
||||
Unban a user/hostmask from the current channel
|
||||
"""
|
||||
target_user = event["server"].get_user(event["args_split"][0])
|
||||
if event["target"].has_user(target_user):
|
||||
self._ban(event["target"], False, target_user)
|
||||
else:
|
||||
event["target"].send_unban(event["args_split"][0])
|
||||
self._ban(event["server"], event["target"], False,
|
||||
event["args_split"][0])
|
||||
|
||||
@Utils.hook("received.command.kickban|kb", channel_only=True,
|
||||
require_mode="o", usage="<nickname> [reason]", min_args=1)
|
||||
|
@ -77,11 +138,14 @@ class Module(ModuleManager.BaseModule):
|
|||
"""
|
||||
Kick and ban a user from the current channel
|
||||
"""
|
||||
if event["server"].has_user(event["args_split"][0]):
|
||||
self.ban(event)
|
||||
self.kick(event)
|
||||
else:
|
||||
event["stderr"].write("That user is not in this channel")
|
||||
target = event["args_split"][0]
|
||||
reason = " ".join(event["args_split"][1:]) or None
|
||||
try:
|
||||
self._ban(event["server"], event["target"], True, target)
|
||||
self._kick(event["server"], event["target"], target, reason)
|
||||
except UserNotFoundException as e:
|
||||
event["stderr"].set_prefix("Kickban")
|
||||
event["stderr"].write(str(e))
|
||||
|
||||
@Utils.hook("received.command.op", channel_only=True,
|
||||
require_mode="o", usage="[nickname]")
|
||||
|
|
|
@ -37,6 +37,12 @@ class Bot(object):
|
|||
if connect and new_server.get_setting("connect", True):
|
||||
self.connect(new_server)
|
||||
return new_server
|
||||
|
||||
def get_server(self, id):
|
||||
for server in self.servers.values():
|
||||
if server.id == id:
|
||||
return server
|
||||
|
||||
def connect(self, server):
|
||||
try:
|
||||
server.connect()
|
||||
|
|
Loading…
Reference in a new issue