implement default spec argument types

This commit is contained in:
jesopo 2020-01-25 22:56:06 +00:00
parent 478223f88c
commit bd33ea5d8a
4 changed files with 103 additions and 68 deletions

View file

@ -49,7 +49,7 @@ class Module(ModuleManager.BaseModule):
@utils.kwarg("require_mode", "o") @utils.kwarg("require_mode", "o")
@utils.kwarg("require_access", "kick") @utils.kwarg("require_access", "kick")
@utils.kwarg("usage", "<nickname> [reason]") @utils.kwarg("usage", "<nickname> [reason]")
@utils.spec("!r~channel !cuser ?...") @utils.spec("!r~channel !cuser ?string")
def kick(self, event): def kick(self, event):
self._kick(event["server"], event["target"], event["spec"][0], self._kick(event["server"], event["target"], event["spec"][0],
event["spec"][1]) event["spec"][1])
@ -99,7 +99,7 @@ class Module(ModuleManager.BaseModule):
@utils.kwarg("require_mode", "o") @utils.kwarg("require_mode", "o")
@utils.kwarg("require_access", "ban") @utils.kwarg("require_access", "ban")
@utils.kwarg("usage", "[+time] <target>") @utils.kwarg("usage", "[+time] <target>")
@utils.spec("!r~channel ?time !user|text") @utils.spec("!r~channel ?time !user|word")
def ban(self, event): def ban(self, event):
self._ban(event["server"], event["spec"][0], event["spec"][2], True, self._ban(event["server"], event["spec"][0], event["spec"][2], True,
event["spec"][1], True) event["spec"][1], True)
@ -118,7 +118,7 @@ class Module(ModuleManager.BaseModule):
@utils.kwarg("require_mode", "o") @utils.kwarg("require_mode", "o")
@utils.kwarg("require_access", "kickban") @utils.kwarg("require_access", "kickban")
@utils.kwarg("usage", "[+time] <nickname> [reason]") @utils.kwarg("usage", "[+time] <nickname> [reason]")
@utils.spec("!r~channel ?time !cuser| ?...") @utils.spec("!r~channel ?time !cuser| ?string")
def kickban(self, event): def kickban(self, event):
self._ban(event["server"], event["spec"][0], event["spec"][2], self._ban(event["server"], event["spec"][0], event["spec"][2],
False, event["spec"][1], True) False, event["spec"][1], True)
@ -161,7 +161,7 @@ class Module(ModuleManager.BaseModule):
@utils.kwarg("require_access", "topic") @utils.kwarg("require_access", "topic")
@utils.kwarg("remove_empty", False) @utils.kwarg("remove_empty", False)
@utils.kwarg("usage", "<topic>") @utils.kwarg("usage", "<topic>")
@utils.spec("!r~channel !...") @utils.spec("!r~channel !string")
def topic(self, event): def topic(self, event):
event["spec"][0].send_topic(event["spec"][1]) event["spec"][0].send_topic(event["spec"][1])
@ -170,7 +170,7 @@ class Module(ModuleManager.BaseModule):
@utils.kwarg("require_access", "topic") @utils.kwarg("require_access", "topic")
@utils.kwarg("remove_empty", False) @utils.kwarg("remove_empty", False)
@utils.kwarg("usage", "<topic>") @utils.kwarg("usage", "<topic>")
@utils.spec("!r~channel !...") @utils.spec("!r~channel !string")
def tappend(self, event): def tappend(self, event):
event["spec"][0].send_topic(event["spec"][0].topic + event["spec"][1]) event["spec"][0].send_topic(event["spec"][0].topic + event["spec"][1])
@ -264,7 +264,7 @@ class Module(ModuleManager.BaseModule):
@utils.kwarg("require_mode", "o") @utils.kwarg("require_mode", "o")
@utils.kwarg("require_access", "flags") @utils.kwarg("require_access", "flags")
@utils.kwarg("usage", "<nickname> [flags]") @utils.kwarg("usage", "<nickname> [flags]")
@utils.spec("!r~channel !ouser ?...") @utils.spec("!r~channel !ouser ?string")
def flags(self, event): def flags(self, event):
target = event["spec"][1] target = event["spec"][1]
current_flags = event["spec"][0].get_user_setting(target.get_id(), current_flags = event["spec"][0].get_user_setting(target.get_id(),

View file

@ -6,14 +6,14 @@ class Module(ModuleManager.BaseModule):
@utils.hook("received.command.echo") @utils.hook("received.command.echo")
@utils.kwarg("remove_empty", False) @utils.kwarg("remove_empty", False)
@utils.kwarg("help", "Echo a string back") @utils.kwarg("help", "Echo a string back")
@utils.spec("!...") @utils.spec("!string")
def echo(self, event): def echo(self, event):
event["stdout"].write(event["spec"][0]) event["stdout"].write(event["spec"][0])
@utils.hook("received.command.action") @utils.hook("received.command.action")
@utils.kwarg("remove_empty", False) @utils.kwarg("remove_empty", False)
@utils.kwarg("help", "Make the bot send a /me") @utils.kwarg("help", "Make the bot send a /me")
@utils.spec("!...") @utils.spec("!string")
def action(self, event): def action(self, event):
event["target"].send_message("\x01ACTION %s\x01" % event["spec"][0]) event["target"].send_message("\x01ACTION %s\x01" % event["spec"][0])
@ -21,6 +21,6 @@ class Module(ModuleManager.BaseModule):
@utils.kwarg("permission", "say") @utils.kwarg("permission", "say")
@utils.kwarg("remove_empty", False) @utils.kwarg("remove_empty", False)
@utils.kwarg("help", "Send a message to a target") @utils.kwarg("help", "Send a message to a target")
@utils.spec("!word !...") @utils.spec("!word !string")
def msg(self, event): def msg(self, event):
event["server"].send_message(event["spec"][0], event["spec"][1]) event["server"].send_message(event["spec"][0], event["spec"][1])

View file

@ -27,79 +27,71 @@ from src import EventManager, ModuleManager, utils
# - "user" - an argument of a user's nickname # - "user" - an argument of a user's nickname
# - "ouser" - an argument of a potentially offline user's nickname # - "ouser" - an argument of a potentially offline user's nickname
# - "word" - one word from arguments # - "word" - one word from arguments
# - "..." - collect all remaining args in to a string # - "string" - collect all remaining args in to a string
class Module(ModuleManager.BaseModule): class Module(ModuleManager.BaseModule):
def _spec_chunk(self, server, channel, user, spec_types, args): def _spec_value(self, server, channel, user, argument_types, args):
options = [] options = []
first_error = None first_error = None
for spec_type in spec_types: for argument_type in argument_types:
chunk = None value = None
n = 0 n = 0
error = None error = None
if spec_type.type == "time" and args: simple_value, simple_count = argument_type.simple(args)
time, _ = utils.parse.timed_args(args) if not simple_count == -1:
chunk = time value = simple_value
n = 1 n = simple_count
error = "Invalid timeframe" error = argument_type.error()
elif spec_type.type == "rchannel": elif argument_type.type == "rchannel":
if channel: if channel:
chunk = channel value = channel
elif args: elif args:
n = 1 n = 1
if args[0] in server.channels: if args[0] in server.channels:
chunk = server.channels.get(args[0]) value = server.channels.get(args[0])
error = "No such channel" error = "No such channel"
else: else:
error = "No channel provided" error = "No channel provided"
elif spec_type.type == "channel" and args: elif argument_type.type == "channel" and args:
if args[0] in server.channels: if args[0] in server.channels:
chunk = server.channels.get(args[0]) value = server.channels.get(args[0])
n = 1 n = 1
error = "No such channel" error = "No such channel"
elif spec_type.type == "cuser" and args: elif argument_type.type == "cuser" and args:
tuser = server.get_user(args[0], create=False) tuser = server.get_user(args[0], create=False)
if tuser and channel.has_user(tuser): if tuser and channel.has_user(tuser):
chunk = tuser value = tuser
n = 1 n = 1
error = "That user is not in this channel" error = "That user is not in this channel"
elif spec_type.type == "ruser": elif argument_type.type == "ruser":
if args: if args:
chunk = server.get_user(args[0], create=False) value = server.get_user(args[0], create=False)
n = 1 n = 1
else: else:
chunk = user value = user
error = "No such user" error = "No such user"
elif spec_type.type == "user": elif argument_type.type == "user":
if args: if args:
chunk = server.get_user(args[0], create=False) value = server.get_user(args[0], create=False)
n = 1 n = 1
error = "No such user" error = "No such user"
else: else:
error = "No user provided" error = "No user provided"
elif spec_type.type == "ouser" and args: elif argument_type.type == "ouser" and args:
if server.has_user_id(args[0]): if server.has_user_id(args[0]):
chunk = server.get_user(args[0]) value = server.get_user(args[0])
n = 1 n = 1
error = "Unknown nickname" error = "Unknown nickname"
elif spec_type.type == "word":
if args:
chunk = args[0]
n = 1
elif spec_type.type == "...":
if args:
chunk = " ".join(args)
n = max(1, len(args))
options.append([spec_type, chunk, n, error]) options.append([argument_type, value, n, error])
return options return options
@utils.hook("preprocess.command") @utils.hook("preprocess.command")
@utils.kwarg("priority", EventManager.PRIORITY_HIGH) @utils.kwarg("priority", EventManager.PRIORITY_HIGH)
def preprocess(self, event): def preprocess(self, event):
spec_types = event["hook"].get_kwarg("spec", None) spec_arguments = event["hook"].get_kwarg("spec", None)
if not spec_types == None: if not spec_arguments == None:
server = event["server"] server = event["server"]
channel = event["target"] if event["is_channel"] else None channel = event["target"] if event["is_channel"] else None
user = event["user"] user = event["user"]
@ -108,22 +100,22 @@ class Module(ModuleManager.BaseModule):
out = [] out = []
kwargs = {"channel": channel} kwargs = {"channel": channel}
for item in spec_types: for spec_argument in spec_arguments:
options = self._spec_chunk(server, kwargs["channel"], user, options = self._spec_value(server, kwargs["channel"], user,
item.types, args) spec_argument.types, args)
found = None found = None
first_error = None first_error = None
for spec_type, chunk, n, error in options: for argument_type, value, n, error in options:
if not chunk == None: if not value == None:
if spec_type.exported: if argument_type.exported:
kwargs[spec_type.exported] = chunk kwargs[argument_type.exported] = value
found = True found = True
args = args[n:] args = args[n:]
if len(item.types) > 1: if len(spec_argument.types) > 1:
chunk = [spec_type, chunk] value = [argument_type.type, value]
found = chunk found = value
break break
elif not error and n > 0: elif not error and n > 0:
error = "Not enough arguments" error = "Not enough arguments"
@ -131,11 +123,11 @@ class Module(ModuleManager.BaseModule):
if error and not first_error: if error and not first_error:
first_error = error first_error = error
out.append(found) if not spec_argument.optional and not found:
if not item.optional and not found:
error = first_error or "Invalid arguments" error = first_error or "Invalid arguments"
return utils.consts.PERMISSION_HARD_FAIL, error return utils.consts.PERMISSION_HARD_FAIL, error
out.append(found)
kwargs["spec"] = out kwargs["spec"] = out
event["kwargs"].update(kwargs) event["kwargs"].update(kwargs)

View file

@ -156,36 +156,79 @@ def format_token_replace(s: str, vars: typing.Dict[str, str],
s = s[:i] + vars[token.replace(sigil, "", 1)] + s[i+len(token):] s = s[:i] + vars[token.replace(sigil, "", 1)] + s[i+len(token):]
return s return s
class ArgumentSpecType(object): class SpecArgumentType(object):
def __init__(self, type_name: str, name: str, exported: str): def __init__(self, type_name: str, name: typing.Optional[str], exported: str):
self.type = type_name self.type = type_name
self.name = name self._name = name
self.exported = exported self.exported = exported
class ArgumentSpec(object): def name(self) -> typing.Optional[str]:
def __init__(self, optional: bool, types: typing.List[ArgumentSpecType]): return self._name
def simple(self, args: typing.List[str]) -> typing.Tuple[typing.Any, int]:
return None, -1
def error(self) -> typing.Optional[str]:
return None
class SpecArgumentTypeWord(SpecArgumentType):
def simple(self, args: typing.List[str]) -> typing.Tuple[typing.Any, int]:
if args:
return args[0], 1
return None, 1
class SpecArgumentTypeWordLower(SpecArgumentTypeWord):
def simple(self, args: typing.List[str]) -> typing.Tuple[typing.Any, int]:
out = SpecArgumentTypeWord.simple(self, args)
if out[0]:
return out[0].lower(), out[1]
return out
class SpecArgumentTypeString(SpecArgumentType):
def name(self):
return "%s ..." % SpecArgumentType.name(self)
def simple(self, args: typing.List[str]) -> typing.Tuple[typing.Any, int]:
return " ".join(args), len(args)
class SpecArgumentTypeTime(SpecArgumentType):
def name(self):
return "+%s" % (SpecArgumentType.name(self) or "time")
def simple(self, args: typing.List[str]) -> typing.Tuple[typing.Any, int]:
time, _ = timed_args(args)
return time, 1
def error(self) -> typing.Optional[str]:
return "Invalid timeframe"
SPEC_ARGUMENT_TYPES = {
"word": SpecArgumentTypeWord,
"wordlower": SpecArgumentTypeWordLower,
"string": SpecArgumentTypeString,
"time": SpecArgumentTypeTime
}
class SpecArgument(object):
def __init__(self, optional: bool, types: typing.List[SpecArgumentType]):
self.optional = optional self.optional = optional
self.types = types self.types = types
def argument_spec(spec: str) -> typing.List[ArgumentSpec]: def argument_spec(spec: str) -> typing.List[SpecArgument]:
out: typing.List[ArgumentSpec] = [] out: typing.List[SpecArgument] = []
for spec_argument in spec.split(" "): for spec_argument in spec.split(" "):
optional = spec_argument[0] == "?" optional = spec_argument[0] == "?"
argument_types: typing.List[ArgumentSpecType] = [] argument_types: typing.List[SpecArgumentType] = []
for argument_type in spec_argument[1:].split("|"): for argument_type in spec_argument[1:].split("|"):
exported = "" exported = ""
if "~" in argument_type: if "~" in argument_type:
exported = argument_type.split("~", 1)[1] exported = argument_type.split("~", 1)[1]
argument_type = argument_type.replace("~", "", 1) argument_type = argument_type.replace("~", "", 1)
argument_type_name = argument_type argument_type_name: typing.Optional[str] = None
name_end = argument_type.find(">") name_end = argument_type.find(">")
if argument_type[0] == "<" and name_end > 0: if argument_type.startswith("<") and name_end > 0:
argument_type_name = argument_type[1:name_end] argument_type_name = argument_type[1:name_end]
argument_type = argument_type[name_end+1:] argument_type = argument_type[name_end+1:]
argument_types.append(ArgumentSpecType(argument_type, argument_type_class = SpecArgumentType
if argument_type in SPEC_ARGUMENT_TYPES:
argument_type_class = SPEC_ARGUMENT_TYPES[argument_type]
argument_types.append(argument_type_class(argument_type,
argument_type_name, exported)) argument_type_name, exported))
out.append(ArgumentSpec(optional, argument_types)) out.append(SpecArgument(optional, argument_types))
return out return out