#--depends-on config #--depends-on permissions import enum, re, shlex, string, traceback, typing from src import EventManager, IRCLine, ModuleManager, utils from . import outs COMMAND_METHOD = "command-method" COMMAND_METHODS = ["PRIVMSG", "NOTICE"] STR_MORE = " (more...)" STR_MORE_LEN = len(STR_MORE.encode("utf8")) STR_CONTINUED = "(...continued)" WORD_BOUNDARIES = [" "] NON_ALPHANUMERIC = [char for char in string.printable if not char.isalnum()] class OutType(enum.Enum): OUT = 1 ERR = 2 class BadContextException(Exception): def __init__(self, required_context): self.required_context = required_context Exception.__init__(self) class CommandEvent(object): def __init__(self, command, args): self.command = command self.args = args SETTING_COMMANDMETHOD = utils.OptionsSetting(COMMAND_METHODS, COMMAND_METHOD, "Set the method used to respond to commands") @utils.export("channelset", utils.Setting("command-prefix", "Set the command prefix used in this channel", example="!")) @utils.export("serverset", utils.Setting("command-prefix", "Set the command prefix used on this server", example="!")) @utils.export("serverset", SETTING_COMMANDMETHOD) @utils.export("channelset", SETTING_COMMANDMETHOD) @utils.export("botset", SETTING_COMMANDMETHOD) @utils.export("channelset", utils.BoolSetting("hide-prefix", "Disable/enable hiding prefix in command reponses")) @utils.export("channelset", utils.BoolSetting("commands", "Disable/enable responding to commands in-channel")) @utils.export("channelset", utils.BoolSetting("prefixed-commands", "Disable/enable responding to prefixed commands in-channel")) class Module(ModuleManager.BaseModule): @utils.hook("new.user") @utils.hook("new.channel") def new(self, event): if "user" in event: target = event["user"] else: target = event["channel"] def has_command(self, command): return command.lower() in self.events.on("received").on( "command").get_children() def get_hooks(self, command): return self.events.on("received.command").on(command ).get_hooks() def is_highlight(self, server, s): if s and s[-1] in [":", ","]: return server.is_own_nickname(s[:-1]) def _command_method(self, server, target): return target.get_setting(COMMAND_METHOD, server.get_setting(COMMAND_METHOD, self.bot.get_setting(COMMAND_METHOD, "PRIVMSG"))).upper() def _find_command_hook(self, server, target, is_channel, command, args): if not self.has_command(command): command_event = CommandEvent(command, args) self.events.on("get.command").call(command=command_event, server=server, target=target, is_channel=is_channel) command = command_event.command args = command_event.args hook = None args_split = [] channel_skip = False private_skip = False if self.has_command(command): for potential_hook in self.get_hooks(command): alias_of = self._get_alias_of(potential_hook) if alias_of: if self.has_command(alias_of): potential_hook = self.get_hooks(alias_of)[0] else: raise ValueError( "'%s' is an alias of unknown command '%s'" % (command.lower(), alias_of.lower())) if not is_channel and potential_hook.get_kwarg("channel_only", False): channel_skip = True continue if is_channel and potential_hook.get_kwarg("private_only", False): private_skip = True continue hook = potential_hook if args: argparse = hook.get_kwarg("argparse", "plain") if argparse == "shlex": args_split = shlex.split(args) elif argparse == "plain": args_split = args.split(" ") break if not hook and (private_skip or channel_skip): raise BadContextException("channel" if channel_skip else "private") return hook, command, args_split def _check(self, context, kwargs, requests=[]): event_hook = self.events.on(context).on("command") returns = [] if requests: for request, request_args in requests: returns.append(event_hook.on(request).call_for_result_unsafe( **kwargs, request_args=request_args)) else: returns = event_hook.call_unsafe(**kwargs) hard_fail = False force_success = False error = None for returned in returns: if returned: type, message = returned if type == utils.consts.PERMISSION_HARD_FAIL: error = message hard_fail = True break elif type == utils.consts.PERMISSION_FORCE_SUCCESS: force_success = True break elif type == utils.consts.PERMISSION_ERROR: error = message if hard_fail: return False, error elif not force_success and error: return False, error else: return True, None def _check_assert(self, check_kwargs, user, check: typing.Union[utils.Check, utils.MultiCheck]): checks = check.to_multi() # both Check and MultiCheck has this func is_success, message = self._check("check", check_kwargs, checks.requests()) if not is_success: raise utils.EventError("%s: %s" % (user.nickname, message)) def command(self, server, target, target_str, is_channel, user, command, args_split, line, hook, **kwargs): module_name = (self._get_prefix(hook) or self.bot.modules.from_context(hook.context).title) stdout = outs.StdOut(module_name) stderr = outs.StdOut(module_name) ret = False has_out = False if hook.get_kwarg("remove_empty", True): args_split = list(filter(None, args_split)) event_kwargs = {"hook": hook, "user": user, "server": server, "target": target, "target_str": target_str, "is_channel": is_channel, "line": line, "args_split": args_split, "command": command, "args": " ".join(args_split), "stdout": stdout, "stderr": stderr, "tags": {}} event_kwargs.update(kwargs) check_assert = lambda check: self._check_assert(event_kwargs, user, check) event_kwargs["check_assert"] = check_assert eaten = False check_success, check_message = self._check("preprocess", event_kwargs) if check_success: new_event = self.events.on(hook.event_name).make_event(**event_kwargs) self.log.trace("calling command '%s': %s", [command, new_event.kwargs]) try: hook.call(new_event) except utils.EventError as e: stderr.write(str(e)) eaten = new_event.eaten else: if check_message: stderr.write("%s: %s" % (user.nickname, check_message)) self._check("postprocess", event_kwargs) # postprocess - send stdout/stderr and typing tag return eaten @utils.hook("postprocess.command") @utils.kwarg("priority", EventManager.PRIORITY_LOW) def postprocess(self, event): type = None obj = None if event["stdout"].has_text(): type = OutType.OUT obj = event["stdout"] elif event["stderr"].has_text(): type = OutType.ERR obj = event["stderr"] else: return self._out(event["server"], event["target"], event["target_str"], obj, type, event["tags"]) def _out(self, server, target, target_str, obj, type, tags): if type == OutType.OUT: color = utils.consts.GREEN else: color = utils.consts.RED line_str = obj.pop() if obj.prefix: line_str = "[%s] %s" % ( utils.irc.color(obj.prefix, color), line_str) method = self._command_method(server, target) if not method in ["PRIVMSG", "NOTICE"]: raise ValueError("Unknown command-method '%s'" % method) line = IRCLine.ParsedLine(method, [target_str, line_str], tags=tags) valid, trunc = line.truncate(server.hostmask(), margin=STR_MORE_LEN) if trunc: if not trunc[0] in WORD_BOUNDARIES: for boundary in WORD_BOUNDARIES: left, *right = valid.rsplit(boundary, 1) if right: valid = left trunc = right[0]+trunc obj.insert("%s %s" % (STR_CONTINUED, trunc)) valid = valid+STR_MORE line = IRCLine.parse_line(valid) server.send(line) @utils.hook("preprocess.command") def _check_min_args(self, event): min_args = event["hook"].get_kwarg("min_args") if min_args and len(event["args_split"]) < min_args: usage = self._get_usage(event["hook"], event["command"], event["command_prefix"]) error = None if usage: error = "Not enough arguments, usage: %s" % usage else: error = "Not enough arguments (minimum: %d)" % min_args return utils.consts.PERMISSION_HARD_FAIL, error def _command_prefix(self, server, channel): return channel.get_setting("command-prefix", server.get_setting("command-prefix", "!")) @utils.hook("received.message.channel", priority=EventManager.PRIORITY_LOW) def channel_message(self, event): commands_enabled = event["channel"].get_setting("commands", True) if not commands_enabled: return command_prefix = self._command_prefix(event["server"], event["channel"]) command = None args = "" if event["message_split"][0].startswith(command_prefix): if not event["channel"].get_setting("prefixed-commands",True): return command = event["message_split"][0].replace( command_prefix, "", 1).lower() if " " in event["message"]: args = event["message"].split(" ", 1)[1] elif len(event["message_split"]) > 1 and self.is_highlight( event["server"], event["message_split"][0]): command = event["message_split"][1].lower() if event["message"].count(" ") > 1: args = event["message"].split(" ", 2)[2] hook = None args_split = [] if command: try: hook, command, args_split = self._find_command_hook( event["server"], event["channel"], True, command, args) except BadContextException: event["channel"].send_message( "%s: That command is not valid in a channel" % event["user"].nickname) return if hook: if event["action"]: return if hook: self.command(event["server"], event["channel"], event["target_str"], True, event["user"], command, args_split, event["line"], hook, command_prefix=command_prefix) else: self.events.on("unknown.command").call(server=event["server"], target=event["channel"], user=event["user"], command=command, command_prefix=command_prefix, is_channel=True) else: regex_hooks = self.events.on("command.regex").get_hooks() for hook in regex_hooks: if event["action"] and hook.get_kwarg("ignore_action", True): continue pattern = hook.get_kwarg("pattern", None) if pattern: match = re.search(pattern, event["message"]) if match: command = hook.get_kwarg("command", "") res = self.command(event["server"], event["channel"], event["target_str"], True, event["user"], command, "", event["line"], hook, match=match, message=event["message"], command_prefix="", action=event["action"]) if res: break @utils.hook("received.message.private", priority=EventManager.PRIORITY_LOW) def private_message(self, event): if event["message_split"] and not event["action"]: command = event["message_split"][0].lower() # this should help catch commands when people try to do prefixed # commands ('!help' rather than 'help') in PM command = command.lstrip("".join(NON_ALPHANUMERIC)) args = "" if " " in event["message"]: args = event["message"].split(" ", 1)[1] try: hook, command, args_split = self._find_command_hook( event["server"], event["user"], False, command, args) except BadContextException: event["user"].send_message( "That command is not valid in a PM") return if hook: self.command(event["server"], event["user"], event["user"].nickname, False, event["user"], command, args_split, event["line"], hook, command_prefix="") else: self.events.on("unknown.command").call(server=event["server"], target=event["user"], user=event["user"], command=command, command_prefix="", is_channel=False) def _get_usage(self, hook, command, command_prefix=""): command = "%s%s" % (command_prefix, command) usages = hook.get_kwargs("usage") if usages: return " | ".join( "%s %s" % (command, usage) for usage in usages) return None def _get_prefix(self, hook): return hook.get_kwarg("prefix", None) def _get_alias_of(self, hook): return hook.get_kwarg("alias_of", None) @utils.hook("send.stdout") def _stdout(self, event): self._send_out(event, OutType.OUT) @utils.hook("send.stderr") def _stderr(self, event): self._send_out(event, OutType.ERR) def _send_out(self, event, type): target = event["target"] stdout = outs.StdOut(event["module_name"]) stdout.write(event["message"]) if event.get("hide_prefix", False): stdout.prefix = None target_str = event.get("target_str", target.name) self._out(event["server"], target, target_str, stdout, type) @utils.hook("check.command.self") def check_command_self(self, event): if event["server"].irc_lower(event["request_args"][0] ) == event["user"].name: return utils.consts.PERMISSION_FORCE_SUCCESS, None else: return (utils.consts.PERMISSION_ERROR, "You do not have permission to do this") @utils.hook("check.command.is-channel") def check_command_is_channel(self, event): if event["is_channel"]: return utils.consts.PERMISSION_FORCE_SUCCESS, None else: return (utils.consts.PERMISSION_ERROR, "This command can only be used in-channel")