485 lines
18 KiB
Python
485 lines
18 KiB
Python
#--depends-on channel_access
|
|
#--depends-on check_mode
|
|
#--depends-on commands
|
|
#--depends-on config
|
|
|
|
import enum
|
|
from src import ModuleManager, utils
|
|
|
|
QUIET_METHODS = {
|
|
"qmode": ["q", "", "728", "729"],
|
|
"insp": ["b", "m:", "367", "368"],
|
|
"unreal": ["b", "~q:", "367", "368"]
|
|
}
|
|
ABAN_METHODS = {
|
|
"chary": "$a:",
|
|
"insp": "R:",
|
|
"unreal": "~a:"
|
|
}
|
|
|
|
KICK_REASON = "your behavior is not conducive to the desired environment"
|
|
NO_QUIETS = "This network doesn't support quiets"
|
|
NO_ABANS = "This network doesn't support account bans"
|
|
|
|
class TargetType(enum.Enum):
|
|
NICKNAME = 1
|
|
MASK = 2
|
|
ACCOUNT = 3
|
|
|
|
KICK_REASON_SETTING = utils.Setting("default-kick-reason",
|
|
"Set the default kick reason", example="have a nice trip")
|
|
|
|
BAN_FORMATTING = "${n} = nick, ${u} = username, ${h} = hostname, ${a} = account"
|
|
@utils.export("channelset", utils.Setting("ban-format",
|
|
"Set ban format (%s)" % BAN_FORMATTING, example="*!${u}@${h}"))
|
|
|
|
@utils.export("channelset", utils.Setting("ban-format-account",
|
|
"Set ban format for users with accounts (%s)" % BAN_FORMATTING,
|
|
example="~a:${a}"))
|
|
|
|
@utils.export("serverset", utils.OptionsSetting(
|
|
list(QUIET_METHODS.keys()), "quiet-method",
|
|
"Set this server's method of muting users"))
|
|
@utils.export("serverset", utils.OptionsSetting(
|
|
list(ABAN_METHODS.keys()), "aban-method",
|
|
"Set this server's method of banning users by account"))
|
|
|
|
@utils.export("botset", KICK_REASON_SETTING)
|
|
@utils.export("serverset", KICK_REASON_SETTING)
|
|
@utils.export("channelset", KICK_REASON_SETTING)
|
|
class Module(ModuleManager.BaseModule):
|
|
_name = "ChanOp"
|
|
|
|
@utils.hook("timer.unmode")
|
|
def unmode(self, timer):
|
|
channel = self.bot.database.channels.by_id(timer.kwargs["channel"])
|
|
|
|
if channel:
|
|
server_id, channel_name = channel
|
|
server = self.bot.get_server_by_id(server_id)
|
|
if server and channel_name in server.channels:
|
|
channel = server.channels.get(channel_name)
|
|
|
|
args = timer.kwargs.get("args", [timer.kwargs.get("arg", None)])
|
|
if args:
|
|
channel.send_modes(args, False)
|
|
else:
|
|
channel.send_mode(timer.kwargs["mode"], False)
|
|
|
|
def _kick_reason(self, server, channel):
|
|
return channel.get_setting("default-kick-reason",
|
|
server.get_setting("default-kick-reason",
|
|
self.bot.get_setting("default-kick-reson", KICK_REASON)))
|
|
|
|
def _kick(self, server, channel, nicknames, reason):
|
|
reason = reason or self._kick_reason(server, channel)
|
|
channel.send_kicks(nicknames, reason)
|
|
|
|
def _format_hostmask(self, user, s):
|
|
vars = {}
|
|
vars["n"] = vars["nickname"] = user.nickname
|
|
vars["u"] = vars["username"] = user.username
|
|
vars["h"] = vars["hostname"] = user.hostname
|
|
vars["a"] = vars["account"] = user.account or ""
|
|
return utils.parse.format_token_replace(s, vars)
|
|
def _get_hostmask(self, channel, user):
|
|
if not user.account == None:
|
|
account_format = channel.get_setting("ban-format-account", None)
|
|
if not account_format == None:
|
|
return self._format_hostmask(user, account_format)
|
|
|
|
format = channel.get_setting("ban-format", "*!${u}@${h}")
|
|
return self._format_hostmask(user, format)
|
|
|
|
|
|
@utils.hook("received.command.topic")
|
|
@utils.kwarg("require_mode", "o")
|
|
@utils.kwarg("require_access", "low,topic")
|
|
@utils.kwarg("remove_empty", False)
|
|
@utils.spec("!<#channel>r~channel !<topic>string")
|
|
def topic(self, event):
|
|
event["spec"][0].send_topic(event["spec"][1])
|
|
|
|
@utils.hook("received.command.tappend")
|
|
@utils.kwarg("require_mode", "o")
|
|
@utils.kwarg("require_access", "low,topic")
|
|
@utils.kwarg("remove_empty", False)
|
|
@utils.spec("!<#channel>r~channel !<topic>string")
|
|
def tappend(self, event):
|
|
event["spec"][0].send_topic(event["spec"][0].topic + event["spec"][1])
|
|
|
|
def _quiet_method(self, server):
|
|
if server.quiet:
|
|
return server.quiet
|
|
|
|
method = server.get_setting("quiet-method", None)
|
|
return QUIET_METHODS.get(method, None)
|
|
|
|
def _aban_method(self, server):
|
|
method = server.get_setting("aban-method", None)
|
|
return ABAN_METHODS.get(method, None)
|
|
|
|
@utils.hook("received.command.invite")
|
|
@utils.kwarg("require_mode", "o")
|
|
@utils.kwarg("require_access", "low,invite")
|
|
@utils.kwarg("help", "Invite a given user")
|
|
@utils.spec("!<#channel>r~channel !<nickname>word")
|
|
def invite(self, event):
|
|
user_nickname = event["args_split"][0]
|
|
|
|
event["target"].send_invite(user_nickname)
|
|
|
|
user = event["server"].get_user(user_nickname, create=False)
|
|
if user:
|
|
user_nickname = user.nickname
|
|
|
|
event["stdout"].write("Invited %s" % user_nickname)
|
|
|
|
def _parse_flags(self, s):
|
|
if s[0] == "+":
|
|
return True, list(s[1:])
|
|
elif s[0] == "-":
|
|
return False, list(s[1:])
|
|
else:
|
|
return None, None
|
|
|
|
@utils.hook("received.command.flags")
|
|
@utils.kwarg("help", "Configure access flags for a given user")
|
|
@utils.kwarg("require_mode", "o")
|
|
@utils.kwarg("require_access", "admin,flags")
|
|
@utils.spec("!<#channel>r~channel !<nickname>ouser ?<flags>word")
|
|
def flags(self, event):
|
|
target = event["spec"][1]
|
|
current_flags = event["spec"][0].get_user_setting(target.get_id(),
|
|
"flags", "")
|
|
|
|
if not event["spec"][2]:
|
|
current_flags_str = ("+%s" % current_flags) if current_flags else ""
|
|
event["stdout"].write("Flags for %s: %s" %
|
|
(target, current_flags_str))
|
|
else:
|
|
is_add, parsed_flags = self._parse_flags(event["spec"][2])
|
|
new_flags = None
|
|
|
|
if is_add == None:
|
|
raise utils.EventError("Invalid flags format")
|
|
elif is_add:
|
|
new_flags = list(set(list(current_flags)+parsed_flags))
|
|
else:
|
|
new_flags = list(set(current_flags)-set(parsed_flags))
|
|
|
|
if new_flags:
|
|
# sort alphanumeric with uppercase after lowercase
|
|
new_flags = sorted(new_flags,
|
|
key=lambda c: ("0" if c.islower() else "1")+c)
|
|
|
|
new_flags_str = "".join(new_flags)
|
|
event["spec"][0].set_user_setting(target.get_id(), "flags",
|
|
new_flags_str)
|
|
|
|
self._check_flags(event["server"], event["spec"][0], target)
|
|
|
|
event["stdout"].write("Set flags for %s to +%s" % (
|
|
target.nickname, new_flags_str))
|
|
else:
|
|
event["spec"][0].del_user_setting(target.get_id(), "flags")
|
|
event["stdout"].write("Cleared flags for %s" % target.nickname)
|
|
|
|
@utils.hook("received.join")
|
|
def on_join(self, event):
|
|
self._check_flags(event["server"], event["channel"], event["user"])
|
|
@utils.hook("received.account.login")
|
|
@utils.hook("internal.identified")
|
|
def on_account(self, event):
|
|
for channel in event["user"].channels:
|
|
self._check_flags(event["server"], channel, event["user"])
|
|
|
|
def _check_flags(self, server, channel, user):
|
|
flags = channel.get_user_setting(user.get_id(), "flags", "")
|
|
|
|
if flags:
|
|
identified = not user._id_override == None
|
|
current_modes = channel.get_user_modes(user)
|
|
|
|
modes = []
|
|
kick_reason = None
|
|
for flag in list(flags):
|
|
if flag == "O" and identified:
|
|
modes.append(("o", user.nickname))
|
|
elif flag == "V" and identified:
|
|
modes.append(("v", user.nickname))
|
|
elif flag == "b":
|
|
modes.append(("b", self._get_hostmask(channel, user)))
|
|
kick_reason = "User is banned from this channel"
|
|
|
|
new_modes = []
|
|
for mode, arg in modes:
|
|
if not mode in current_modes:
|
|
new_modes.append((mode, arg))
|
|
|
|
if new_modes:
|
|
channel.send_modes(new_modes, True)
|
|
if not kick_reason == None:
|
|
channel.send_kick(user.nickname, kick_reason)
|
|
|
|
@utils.hook("received.command.cmute")
|
|
@utils.kwarg("require_mode", "o")
|
|
@utils.kwarg("require_access", "high,cmute")
|
|
@utils.kwarg("help", "Mute the current channel")
|
|
@utils.spec("!<#channel>r~channel ?duration")
|
|
def cmute(self, event):
|
|
event["spec"][0].send_mode("+m")
|
|
|
|
if event["spec"][1]:
|
|
self.timers.add_persistent("unmode", event["spec"][1],
|
|
channel=event["spec"][0].id, mode="m")
|
|
@utils.hook("received.command.cunmute")
|
|
@utils.kwarg("require_mode", "o")
|
|
@utils.kwarg("require_access", "high,cmute")
|
|
@utils.kwarg("help", "Mute the current channel")
|
|
@utils.spec("!<#channel>r~channel")
|
|
def cunmute(self, event):
|
|
event["spec"][0].send_mode("-m")
|
|
|
|
def _filter_mask(self, mask, mode_list):
|
|
parsed_mask = utils.irc.hostmask_parse(mask)
|
|
return list(utils.irc.hostmask_match_many(mode_list, parsed_mask))
|
|
def _filter_prefix(self, prefix, list):
|
|
return [l for l in list if prefix in l]
|
|
|
|
def _type_to_mode(self, server, channel, type):
|
|
if type[0] == "+":
|
|
if type[1:]:
|
|
if type[1] in channel.mode_lists:
|
|
return type[1], None
|
|
else:
|
|
raise utils.EventError("Unknown list mode")
|
|
else:
|
|
raise utils.EventError("Please provide a list mode")
|
|
elif type in ["quiets", "mutes"]:
|
|
quiet_method = self._quiet_method(server)
|
|
if quiet_method:
|
|
return quiet_method[0], quiet_method[1]
|
|
else:
|
|
raise utils.EventError(NO_QUIETS)
|
|
else:
|
|
raise utils.EventError("Unknown type '%s'" % type)
|
|
|
|
def _list_query_event(self, channel, list_mask, list_mode, list_prefix):
|
|
mode_list = list(channel.mode_lists[list_mode])
|
|
if list_prefix:
|
|
mode_list = self._filter_prefix(list_prefix, mode_list)
|
|
if list_mask:
|
|
mode_list = self._filter_mask(list_mask, mode_list)
|
|
|
|
return mode_list
|
|
|
|
@utils.hook("received.command.clear")
|
|
@utils.kwarg("require_mode", "o")
|
|
@utils.kwarg("require_access", "admin,clear")
|
|
@utils.kwarg("help", "Clear a given channel list mode (e.g. +b)")
|
|
@utils.spec("!<#channel>r~channel !<type>word|<mode>word ?<mask>word")
|
|
def clear(self, event):
|
|
mode, prefix = self._type_to_mode(event["server"], event["spec"][0],
|
|
event["spec"][1])
|
|
|
|
mode_list = self._list_query_event(event["spec"][0], event["spec"][2],
|
|
mode, prefix)
|
|
|
|
event["spec"][0].send_modes([(mode, a) for a in mode_list], False)
|
|
|
|
@utils.hook("received.command.lsearch")
|
|
@utils.kwarg("require_mode", "o")
|
|
@utils.kwarg("require_access", "high,lsearch")
|
|
@utils.kwarg("help", "Search a given channel list mode (e.g. +b)")
|
|
@utils.spec("!<#channel>r~channel !<type>word|<mode>word ?<mask>word")
|
|
def lsearch(self, event):
|
|
mode, prefix = self._type_to_mode(event["server"], event["spec"][0],
|
|
event["spec"][1])
|
|
|
|
mode_list = self._list_query_event(event["spec"][0], event["spec"][2],
|
|
mode, prefix)
|
|
|
|
if mode_list:
|
|
event["stdout"].write("%s: %s" %
|
|
(event["user"].nickname, " ".join(mode_list)))
|
|
else:
|
|
event["stderr"].write("%s: no matches" % event["user"].nickname)
|
|
|
|
def _find_mode(self, type, server):
|
|
if type == "ban":
|
|
return TargetType.MASK, "b", ""
|
|
elif type == "aban":
|
|
aban_method = self._aban_method(server)
|
|
if aban_method == None:
|
|
raise utils.EventError(NO_ABANS)
|
|
return TargetType.ACCOUNT, "b", aban_method
|
|
elif type == "invex":
|
|
if not "INVEX" in server.isupport:
|
|
raise utils.EventError(
|
|
"invexes are not supported on this network")
|
|
return TargetType.MASK, server.isupport["INVEX"] or "I", ""
|
|
elif type == "quiet":
|
|
quiet_method = self._quiet_method(server)
|
|
if quiet_method == None:
|
|
raise utils.EventError(NO_QUIETS)
|
|
mode, prefix, _, _ = quiet_method
|
|
return TargetType.MASK, mode, prefix
|
|
elif type == "op":
|
|
return TargetType.NICKNAME, "o", None
|
|
elif type =="voice":
|
|
return TargetType.NICKNAME, "v", None
|
|
|
|
@utils.hook("received.command.ban", require_access="high,ban", type="ban")
|
|
@utils.hook("received.command.aban", require_access="high,ban", type="aban")
|
|
@utils.hook("received.command.quiet", require_access="high,quiet",
|
|
type="quiet")
|
|
@utils.hook("received.command.invex", require_access="high,invex",
|
|
type="invex")
|
|
@utils.kwarg("require_mode", "o")
|
|
@utils.spec("!r~channel ?duration !<mask>cmask|<nickname>user|<mask>word")
|
|
def mask_mode(self, event):
|
|
self._mask_mode(event["server"], event["user"], event["spec"],
|
|
event["hook"].get_kwarg("type"))
|
|
|
|
@utils.hook("received.command.op", require_access="high,op", type="op")
|
|
@utils.hook("received.command.voice", require_access="low,voice",
|
|
type="voice")
|
|
@utils.kwarg("require_mode", "o")
|
|
@utils.spec("!r~channel ?duration ?<mask>cmask|<nickname>cuser")
|
|
def access_mode(self, event):
|
|
self._mask_mode(event["server"], event["user"], event["spec"],
|
|
event["hook"].get_kwarg("type"))
|
|
|
|
def _mask_kick(self, server, channel, target, reason):
|
|
if target[0] == "cmask":
|
|
users = target[1]
|
|
elif target[0] == "cuser":
|
|
users = [target[1]]
|
|
self._kick(server, channel, [u.nickname for u in users], reason)
|
|
|
|
@utils.hook("received.command.kick")
|
|
@utils.kwarg("require_access", "high,kick")
|
|
@utils.kwarg("require_mode", "o")
|
|
@utils.spec(
|
|
"!r~channel !<mask>cmask|<nickname>cuser ?<reason>string")
|
|
def kick(self, event):
|
|
self._mask_kick(event["server"], event["spec"][0], event["spec"][1],
|
|
event["spec"][2])
|
|
|
|
@utils.hook("received.command.kickban", type="ban")
|
|
@utils.hook("received.command.akickban", type="aban")
|
|
@utils.kwarg("require_access", "high,kickban")
|
|
@utils.kwarg("require_mode", "o")
|
|
@utils.spec(
|
|
"!r~channel ?duration !<mask>cmask|<nickname>cuser ?<reason>string")
|
|
def kickban(self, event):
|
|
self._mask_mode(event["server"], event["user"], event["spec"],
|
|
event["hook"].get_kwarg("type"))
|
|
self._mask_kick(event["server"], event["spec"][0], event["spec"][2],
|
|
event["spec"][3])
|
|
|
|
def _mask_mode(self, server, user, spec, type):
|
|
users = args = []
|
|
if spec[2] == None:
|
|
users = [user]
|
|
elif spec[2][0] == "cmask":
|
|
users = spec[2][1]
|
|
elif spec[2][0] in ["user", "cuser"]:
|
|
users = [spec[2][1]]
|
|
elif spec[2][0] == "word":
|
|
masks = [spec[2][1]]
|
|
|
|
target_type, mode, prefix = self._find_mode(type, server)
|
|
if users:
|
|
if target_type == TargetType.MASK:
|
|
args = [self._get_hostmask(spec[0], u) for u in users]
|
|
elif target_type == TargetType.NICKNAME:
|
|
args = [
|
|
u.nickname for u in users if not spec[0].has_mode(u, mode)]
|
|
elif target_type == TargetType.ACCOUNT:
|
|
args = [u.account for u in users if not u.account == None]
|
|
|
|
if args:
|
|
args = [(mode, "%s%s" % (prefix, a)) for a in args]
|
|
spec[0].send_modes(args, True)
|
|
|
|
if not spec[1] == None:
|
|
self.timers.add_persistent("unmode", spec[1],
|
|
channel=spec[0].id, args=args)
|
|
|
|
@utils.hook("received.command.unban", require_access="high,unban",
|
|
type="ban")
|
|
@utils.hook("received.command.unquiet", require_access="high,unquiet",
|
|
type="quiet")
|
|
@utils.hook("received.command.uninvex", require_access="high,uninvex",
|
|
type="invex")
|
|
@utils.kwarg("require_mode", "o")
|
|
@utils.spec("!r~channel !<nickname>user|<mask>word")
|
|
def mask_unmode(self, event):
|
|
is_mask, mode, prefix = self._find_mode(
|
|
event["hook"].get_kwarg("type"), event["server"])
|
|
|
|
users = args = []
|
|
if event["spec"][1][0] == "user":
|
|
masks = [self._get_hostmask(event["spec"][0], event["spec"][1][1])]
|
|
elif event["spec"][1][0] == "word":
|
|
masks = self._list_query_event(event["spec"][0],
|
|
event["spec"][1][1], mode, prefix)
|
|
|
|
if masks:
|
|
event["spec"][0].send_modes([(mode, a) for a in masks], False)
|
|
|
|
@utils.hook("received.command.deop", require_access="high,deop", type="op")
|
|
@utils.hook("received.command.devoice", require_access="low,devoice",
|
|
type="voice")
|
|
@utils.kwarg("require_mode", "o")
|
|
@utils.spec("!r~channel ?<nickname>cuser|<mask>cmask")
|
|
def access_unmode(self, event):
|
|
if event["spec"][1] == None:
|
|
users = [event["user"]]
|
|
elif event["spec"][1][0] == "cuser":
|
|
users = [event["spec"][1][1]]
|
|
elif event["spec"][1][0] == "cmask":
|
|
users = event["spec"][1][1]
|
|
|
|
for i, user in enumerate(users):
|
|
if event["server"].is_own_nickname(user.nickname):
|
|
users.append(users.pop(i))
|
|
break
|
|
|
|
_, mode, _ = self._find_mode(
|
|
event["hook"].get_kwarg("type"), event["server"])
|
|
valid_nicks = [
|
|
u.nickname for u in users if event["spec"][0].has_mode(u, mode)]
|
|
|
|
if valid_nicks:
|
|
event["spec"][0].send_modes([(mode, a) for a in valid_nicks], False)
|
|
|
|
@utils.hook("received.command.down")
|
|
@utils.spec("!r~channel ?<nickname>cuser|<mask>cmask")
|
|
def down(self, event):
|
|
if not event["spec"][1] or event["spec"][1][1] == event["user"]:
|
|
users = [event["user"]]
|
|
else:
|
|
event["check_assert"](
|
|
utils.Check("channel-mode", "o")|
|
|
utils.Check("channel-access", "high,down"))
|
|
|
|
if event["spec"][1][0] == "cuser":
|
|
users = [event["spec"][1][1]]
|
|
elif event["spec"][1][0] == "cmask":
|
|
users = event["spec"][1][1]
|
|
|
|
for i, user in enumerate(users):
|
|
if event["server"].is_own_nickname(user.nickname):
|
|
users.append(users.pop(i))
|
|
break
|
|
|
|
modes = []
|
|
for user in users:
|
|
user_modes = event["spec"][0].get_user_modes(user)
|
|
modes.extend([(m, user.nickname) for m in user_modes])
|
|
|
|
if modes:
|
|
event["spec"][0].send_modes(modes, False)
|