change command specs to be compiled at runtime by a decorator

This commit is contained in:
jesopo 2020-01-25 13:58:13 +00:00
parent 85c13cbbd7
commit 341b314104
6 changed files with 79 additions and 53 deletions

View file

@ -49,7 +49,7 @@ class Module(ModuleManager.BaseModule):
@utils.kwarg("require_mode", "o") @utils.kwarg("require_mode", "o")
@utils.kwarg("require_access", "kick") @utils.kwarg("require_access", "kick")
@utils.kwarg("usage", "<nickname> [reason]") @utils.kwarg("usage", "<nickname> [reason]")
@utils.kwarg("spec", "!r~channel !cuser ?...") @utils.spec("!r~channel !cuser ?...")
def kick(self, event): def kick(self, event):
self._kick(event["server"], event["target"], event["spec"][0], self._kick(event["server"], event["target"], event["spec"][0],
event["spec"][1]) event["spec"][1])
@ -99,7 +99,7 @@ class Module(ModuleManager.BaseModule):
@utils.kwarg("require_mode", "o") @utils.kwarg("require_mode", "o")
@utils.kwarg("require_access", "ban") @utils.kwarg("require_access", "ban")
@utils.kwarg("usage", "[+time] <target>") @utils.kwarg("usage", "[+time] <target>")
@utils.kwarg("spec", "!r~channel ?time !user|text") @utils.spec("!r~channel ?time !user|text")
def ban(self, event): def ban(self, event):
self._ban(event["server"], event["spec"][0], event["spec"][2], True, self._ban(event["server"], event["spec"][0], event["spec"][2], True,
event["spec"][1], True) event["spec"][1], True)
@ -108,7 +108,7 @@ class Module(ModuleManager.BaseModule):
@utils.kwarg("require_mode", "o") @utils.kwarg("require_mode", "o")
@utils.kwarg("require_access", "ban") @utils.kwarg("require_access", "ban")
@utils.kwarg("usage", "<target>") @utils.kwarg("usage", "<target>")
@utils.kwarg("spec", "!r~channel !user|word") @utils.spec("!r~channel !user|word")
def unban(self, event): def unban(self, event):
self._ban(event["server"], event["spec"][0], event["spec"][1], self._ban(event["server"], event["spec"][0], event["spec"][1],
True, None, False) True, None, False)
@ -118,7 +118,7 @@ class Module(ModuleManager.BaseModule):
@utils.kwarg("require_mode", "o") @utils.kwarg("require_mode", "o")
@utils.kwarg("require_access", "kickban") @utils.kwarg("require_access", "kickban")
@utils.kwarg("usage", "[+time] <nickname> [reason]") @utils.kwarg("usage", "[+time] <nickname> [reason]")
@utils.kwarg("spec", "!r~channel ?time !cuser| ?...") @utils.spec("!r~channel ?time !cuser| ?...")
def kickban(self, event): def kickban(self, event):
self._ban(event["server"], event["spec"][0], event["spec"][2], self._ban(event["server"], event["spec"][0], event["spec"][2],
False, event["spec"][1], True) False, event["spec"][1], True)
@ -130,7 +130,7 @@ class Module(ModuleManager.BaseModule):
@utils.kwarg("require_mode", "o") @utils.kwarg("require_mode", "o")
@utils.kwarg("require_access", "op") @utils.kwarg("require_access", "op")
@utils.kwarg("usage", "[nickname]") @utils.kwarg("usage", "[nickname]")
@utils.kwarg("spec", "!r~channel !ruser") @utils.spec("!r~channel !ruser")
def op(self, event): def op(self, event):
self._op(True, event["spec"]) self._op(True, event["spec"])
@ -139,7 +139,7 @@ class Module(ModuleManager.BaseModule):
@utils.kwarg("require_mode", "o") @utils.kwarg("require_mode", "o")
@utils.kwarg("require_access", "op") @utils.kwarg("require_access", "op")
@utils.kwarg("usage", "[nickname]") @utils.kwarg("usage", "[nickname]")
@utils.kwarg("spec", "!r~channel !ruser") @utils.spec("!r~channel !ruser")
def deop(self, event): def deop(self, event):
self._op(False, event["spec"]) self._op(False, event["spec"])
@ -151,7 +151,7 @@ class Module(ModuleManager.BaseModule):
@utils.kwarg("require_mode", "o") @utils.kwarg("require_mode", "o")
@utils.kwarg("require_access", "voice") @utils.kwarg("require_access", "voice")
@utils.kwarg("usage", "[nickname]") @utils.kwarg("usage", "[nickname]")
@utils.kwarg("spec", "!r~channel !ruser") @utils.spec("!r~channel !ruser")
def voice(self, event): def voice(self, event):
add = event["command"] == "voice" add = event["command"] == "voice"
event["spec"][0].send_mode("+v" if add else "-v", [event["spec"][1]]) event["spec"][0].send_mode("+v" if add else "-v", [event["spec"][1]])
@ -161,7 +161,7 @@ class Module(ModuleManager.BaseModule):
@utils.kwarg("require_access", "topic") @utils.kwarg("require_access", "topic")
@utils.kwarg("remove_empty", False) @utils.kwarg("remove_empty", False)
@utils.kwarg("usage", "<topic>") @utils.kwarg("usage", "<topic>")
@utils.kwarg("spec", "!r~channel !...") @utils.spec("!r~channel !...")
def topic(self, event): def topic(self, event):
event["spec"][0].send_topic(event["spec"][1]) event["spec"][0].send_topic(event["spec"][1])
@ -170,7 +170,7 @@ class Module(ModuleManager.BaseModule):
@utils.kwarg("require_access", "topic") @utils.kwarg("require_access", "topic")
@utils.kwarg("remove_empty", False) @utils.kwarg("remove_empty", False)
@utils.kwarg("usage", "<topic>") @utils.kwarg("usage", "<topic>")
@utils.kwarg("spec", "!r~channel !...") @utils.spec("!r~channel !...")
def tappend(self, event): def tappend(self, event):
event["spec"][0].send_topic(event["spec"][0].topic + event["spec"][1]) event["spec"][0].send_topic(event["spec"][0].topic + event["spec"][1])
@ -193,7 +193,7 @@ class Module(ModuleManager.BaseModule):
@utils.kwarg("require_access", "quiet") @utils.kwarg("require_access", "quiet")
@utils.kwarg("help", "Quiet a given user") @utils.kwarg("help", "Quiet a given user")
@utils.kwarg("usage", "[+time] <nickname>") @utils.kwarg("usage", "[+time] <nickname>")
@utils.kwarg("spec", "!r~channel ?time !user|word") @utils.spec("!r~channel ?time !user|word")
def quiet(self, event): def quiet(self, event):
self._quiet(event["server"], True, event["spec"]) self._quiet(event["server"], True, event["spec"])
@ -203,7 +203,7 @@ class Module(ModuleManager.BaseModule):
@utils.kwarg("require_access", "unquiet") @utils.kwarg("require_access", "unquiet")
@utils.kwarg("help", "Unquiet a given user") @utils.kwarg("help", "Unquiet a given user")
@utils.kwarg("usage", "<nickname>") @utils.kwarg("usage", "<nickname>")
@utils.kwarg("spec", "!r~channel !user|word") @utils.spec("!r~channel !user|word")
def unquiet(self, event): def unquiet(self, event):
self._quiet(event["server"], False, event["spec"]) self._quiet(event["server"], False, event["spec"])
@ -239,7 +239,7 @@ class Module(ModuleManager.BaseModule):
@utils.kwarg("require_access", "invite") @utils.kwarg("require_access", "invite")
@utils.kwarg("help", "Invite a given user") @utils.kwarg("help", "Invite a given user")
@utils.kwarg("usage", "<nickname>") @utils.kwarg("usage", "<nickname>")
@utils.kwarg("spec", "!r~channel !word") @utils.spec("!r~channel !word")
def invite(self, event): def invite(self, event):
user_nickname = event["args_split"][0] user_nickname = event["args_split"][0]
@ -264,7 +264,7 @@ class Module(ModuleManager.BaseModule):
@utils.kwarg("require_mode", "o") @utils.kwarg("require_mode", "o")
@utils.kwarg("require_access", "flags") @utils.kwarg("require_access", "flags")
@utils.kwarg("usage", "<nickname> [flags]") @utils.kwarg("usage", "<nickname> [flags]")
@utils.kwarg("spec", "!r~channel !ouser ?...") @utils.spec("!r~channel !ouser ?...")
def flags(self, event): def flags(self, event):
target = event["spec"][1] target = event["spec"][1]
current_flags = event["spec"][0].get_user_setting(target.get_id(), current_flags = event["spec"][0].get_user_setting(target.get_id(),
@ -355,7 +355,7 @@ class Module(ModuleManager.BaseModule):
@utils.kwarg("require_access", "cmute") @utils.kwarg("require_access", "cmute")
@utils.kwarg("help", "Mute the current channel") @utils.kwarg("help", "Mute the current channel")
@utils.kwarg("usage", "[+time]") @utils.kwarg("usage", "[+time]")
@utils.kwarg("spec", "!r~channel ?time") @utils.spec("!r~channel ?time")
def cmute(self, event): def cmute(self, event):
event["spec"][0].send_mode("+m") event["spec"][0].send_mode("+m")
@ -373,7 +373,7 @@ class Module(ModuleManager.BaseModule):
@utils.kwarg("require_mode", "o") @utils.kwarg("require_mode", "o")
@utils.kwarg("require_access", "cmute") @utils.kwarg("require_access", "cmute")
@utils.kwarg("help", "Mute the current channel") @utils.kwarg("help", "Mute the current channel")
@utils.kwarg("spec", "!r~channel") @utils.spec("!r~channel")
def cunmute(self, event): def cunmute(self, event):
self._cunmute(event["spec"][0]) self._cunmute(event["spec"][0])
@ -421,7 +421,7 @@ class Module(ModuleManager.BaseModule):
@utils.kwarg("help", "Clear a given channel list mode (e.g. +b)") @utils.kwarg("help", "Clear a given channel list mode (e.g. +b)")
@utils.kwarg("usage", "<type> [mask]") @utils.kwarg("usage", "<type> [mask]")
@utils.kwarg("usage", "+<mode> [mask]") @utils.kwarg("usage", "+<mode> [mask]")
@utils.kwarg("spec", "!r~channel !word ?word") @utils.spec("!r~channel !word ?word")
def clear(self, event): def clear(self, event):
mode, mode_list = self._list_query_event( mode, mode_list = self._list_query_event(
event["server"], event["spec"][0], event["spec"][1], event["server"], event["spec"][0], event["spec"][1],
@ -437,7 +437,7 @@ class Module(ModuleManager.BaseModule):
@utils.kwarg("help", "Search a given channel list mode (e.g. +b)") @utils.kwarg("help", "Search a given channel list mode (e.g. +b)")
@utils.kwarg("usage", "<type> [mask]") @utils.kwarg("usage", "<type> [mask]")
@utils.kwarg("usage", "+<mode> [mask]") @utils.kwarg("usage", "+<mode> [mask]")
@utils.kwarg("spec", "!r~channel !word ?word") @utils.spec("!r~channel !word ?word")
def lsearch(self, event): def lsearch(self, event):
mode, mode_list = self._list_query_event( mode, mode_list = self._list_query_event(
event["server"], event["spec"][0], event["spec"][1], event["server"], event["spec"][0], event["spec"][1],

View file

@ -6,14 +6,14 @@ class Module(ModuleManager.BaseModule):
@utils.hook("received.command.echo") @utils.hook("received.command.echo")
@utils.kwarg("remove_empty", False) @utils.kwarg("remove_empty", False)
@utils.kwarg("help", "Echo a string back") @utils.kwarg("help", "Echo a string back")
@utils.kwarg("spec", "!...") @utils.spec("!...")
def echo(self, event): def echo(self, event):
event["stdout"].write(event["spec"][0]) event["stdout"].write(event["spec"][0])
@utils.hook("received.command.action") @utils.hook("received.command.action")
@utils.kwarg("remove_empty", False) @utils.kwarg("remove_empty", False)
@utils.kwarg("help", "Make the bot send a /me") @utils.kwarg("help", "Make the bot send a /me")
@utils.kwarg("spec", "!...") @utils.spec("!...")
def action(self, event): def action(self, event):
event["target"].send_message("\x01ACTION %s\x01" % event["spec"][0]) event["target"].send_message("\x01ACTION %s\x01" % event["spec"][0])
@ -21,6 +21,6 @@ class Module(ModuleManager.BaseModule):
@utils.kwarg("permission", "say") @utils.kwarg("permission", "say")
@utils.kwarg("remove_empty", False) @utils.kwarg("remove_empty", False)
@utils.kwarg("help", "Send a message to a target") @utils.kwarg("help", "Send a message to a target")
@utils.kwarg("spec", "!word !...") @utils.spec("!word !...")
def msg(self, event): def msg(self, event):
event["server"].send_message(event["spec"][0], event["spec"][1]) event["server"].send_message(event["spec"][0], event["spec"][1])

View file

@ -38,12 +38,12 @@ class Module(ModuleManager.BaseModule):
n = 0 n = 0
error = None error = None
if spec_type == "time" and args: if spec_type.name == "time" and args:
time, _ = utils.parse.timed_args(args) time, _ = utils.parse.timed_args(args)
chunk = time chunk = time
n = 1 n = 1
error = "Invalid timeframe" error = "Invalid timeframe"
elif spec_type == "rchannel": elif spec_type.name == "rchannel":
if channel: if channel:
chunk = channel chunk = channel
elif args: elif args:
@ -53,53 +53,53 @@ class Module(ModuleManager.BaseModule):
error = "No such channel" error = "No such channel"
else: else:
error = "No channel provided" error = "No channel provided"
elif spec_type == "channel" and args: elif spec_type.name == "channel" and args:
if args[0] in server.channels: if args[0] in server.channels:
chunk = server.channels.get(args[0]) chunk = server.channels.get(args[0])
n = 1 n = 1
error = "No such channel" error = "No such channel"
elif spec_type == "cuser" and args: elif spec_type.name == "cuser" and args:
tuser = server.get_user(args[0], create=False) tuser = server.get_user(args[0], create=False)
if tuser and channel.has_user(tuser): if tuser and channel.has_user(tuser):
chunk = tuser chunk = tuser
n = 1 n = 1
error = "That user is not in this channel" error = "That user is not in this channel"
elif spec_type == "ruser": elif spec_type.name == "ruser":
if args: if args:
chunk = server.get_user(args[0], create=False) chunk = server.get_user(args[0], create=False)
n = 1 n = 1
else: else:
chunk = user chunk = user
error = "No such user" error = "No such user"
elif spec_type == "user": elif spec_type.name == "user":
if args: if args:
chunk = server.get_user(args[0], create=False) chunk = server.get_user(args[0], create=False)
n = 1 n = 1
error = "No such user" error = "No such user"
else: else:
error = "No user provided" error = "No user provided"
elif spec_type == "ouser" and args: elif spec_type.name == "ouser" and args:
if server.has_user_id(args[0]): if server.has_user_id(args[0]):
chunk = server.get_user(args[0]) chunk = server.get_user(args[0])
n = 1 n = 1
error = "Unknown nickname" error = "Unknown nickname"
elif spec_type == "word": elif spec_type.name == "word":
if args: if args:
chunk = args[0] chunk = args[0]
n = 1 n = 1
elif spec_type == "...": elif spec_type.name == "...":
if args: if args:
chunk = " ".join(args) chunk = " ".join(args)
n = max(1, len(args)) n = max(1, len(args))
options.append([chunk, n, error]) options.append([spec_type, chunk, n, error])
return options return options
@utils.hook("preprocess.command") @utils.hook("preprocess.command")
@utils.kwarg("priority", EventManager.PRIORITY_HIGH) @utils.kwarg("priority", EventManager.PRIORITY_HIGH)
def preprocess(self, event): def preprocess(self, event):
spec = event["hook"].get_kwarg("spec", None) spec_types = event["hook"].get_kwarg("spec", None)
if not spec == None: if not spec_types == None:
server = event["server"] server = event["server"]
channel = event["target"] if event["is_channel"] else None channel = event["target"] if event["is_channel"] else None
user = event["user"] user = event["user"]
@ -108,30 +108,20 @@ class Module(ModuleManager.BaseModule):
out = [] out = []
kwargs = {"channel": channel} kwargs = {"channel": channel}
for word in spec.split(): for item in spec_types:
optional = word[0] == "?"
word = word[1:]
raw_spec_types = word.split("|")
spec_types = [t.replace("~", "", 1) for t in raw_spec_types]
options = self._spec_chunk(server, kwargs["channel"], user, options = self._spec_chunk(server, kwargs["channel"], user,
spec_types, args) item.types, args)
found = None found = None
first_error = None first_error = None
for i, (chunk, n, error) in enumerate(options): for spec_type, chunk, n, error in options:
spec_type = spec_types[i]
raw_spec_type = raw_spec_types[i]
if not chunk == None: if not chunk == None:
if "~" in raw_spec_type: if spec_type.exported:
kwargs[raw_spec_type.split("~", 1)[1]] = chunk kwargs[spec_type.exported] = chunk
found = True found = True
args = args[n:] args = args[n:]
if len(spec_types) > 1: if len(item.types) > 1:
chunk = [spec_type, chunk] chunk = [spec_type, chunk]
found = chunk found = chunk
break break
@ -143,7 +133,7 @@ class Module(ModuleManager.BaseModule):
out.append(found) out.append(found)
if not optional and not found: if not item.optional and not found:
error = first_error or "Invalid arguments" error = first_error or "Invalid arguments"
return utils.consts.PERMISSION_HARD_FAIL, error return utils.consts.PERMISSION_HARD_FAIL, error

View file

@ -2,7 +2,7 @@ import contextlib, enum, ipaddress, multiprocessing, queue, signal, threading
import typing import typing
from . import cli, consts, datetime, decorators, irc, http, parse, security from . import cli, consts, datetime, decorators, irc, http, parse, security
from .decorators import export, hook, kwarg from .decorators import export, hook, kwarg, spec
from .settings import (BoolSetting, FunctionSetting, IntRangeSetting, from .settings import (BoolSetting, FunctionSetting, IntRangeSetting,
IntSetting, OptionsSetting, sensitive_format, SensitiveSetting, Setting) IntSetting, OptionsSetting, sensitive_format, SensitiveSetting, Setting)
from .errors import (EventError, EventNotEnoughArgsError, EventResultsError, from .errors import (EventError, EventNotEnoughArgsError, EventResultsError,

View file

@ -1,4 +1,5 @@
import typing import typing
from .parse import argument_spec
BITBOT_MAGIC = "__bitbot" BITBOT_MAGIC = "__bitbot"
@ -42,10 +43,18 @@ def export(setting: str, value: typing.Any):
magic.add_export(setting, value) magic.add_export(setting, value)
return module return module
return _export_func return _export_func
def kwarg(key: str, value: typing.Any):
def _kwarg_func(func): def _kwarg(key: str, value: typing.Any, func: typing.Any):
magic = get_magic(func) magic = get_magic(func)
magic.add_kwarg(key, value) magic.add_kwarg(key, value)
return func return func
def kwarg(key: str, value: typing.Any):
def _kwarg_func(func):
return _kwarg(key, value, func)
return _kwarg_func return _kwarg_func
def spec(spec: str):
def _spec_func(func):
return _kwarg("spec", argument_spec(spec), func)
return _spec_func

View file

@ -155,3 +155,30 @@ def format_token_replace(s: str, vars: typing.Dict[str, str],
for i, token in tokens: for i, token in tokens:
s = s[:i] + vars[token.replace(sigil, "", 1)] + s[i+len(token):] s = s[:i] + vars[token.replace(sigil, "", 1)] + s[i+len(token):]
return s return s
class ArgumentSpecType(object):
def __init__(self, name: str, exported: str):
self.name = name
self.exported = exported
class ArgumentSpec(object):
def __init__(self, optional: bool, types: typing.List[ArgumentSpecType]):
self.optional = optional
self.types = types
def argument_spec(spec: str) -> typing.List[ArgumentSpec]:
out: typing.List[ArgumentSpec] = []
for type_names_str in spec.split(" "):
optional = type_names_str[0] == "?"
type_names_str = type_names_str[1:]
spec_types: typing.List[ArgumentSpecType] = []
for type_name in type_names_str.split("|"):
exported_name = ""
if "~" in type_name:
exported_name = type_name
type_name = type_name.replace("~", "", 1)
spec_types.append(ArgumentSpecType(type_name, exported_name))
out.append(ArgumentSpec(optional, spec_types))
return out