Add a way to __or__ utils.Check
s, as a "if one of these passes" thing
This commit is contained in:
parent
d7fa2cfa24
commit
f05fc209b0
4 changed files with 37 additions and 12 deletions
|
@ -34,4 +34,4 @@ class Module(ModuleManager.BaseModule):
|
||||||
|
|
||||||
@utils.hook("check.command.channel-mode")
|
@utils.hook("check.command.channel-mode")
|
||||||
def check_command(self, event):
|
def check_command(self, event):
|
||||||
return self._check_command(event, event["check_args"][0])
|
return self._check_command(event, event["request_args"][0])
|
||||||
|
|
|
@ -130,12 +130,16 @@ class Module(ModuleManager.BaseModule):
|
||||||
|
|
||||||
return hook, args_split
|
return hook, args_split
|
||||||
|
|
||||||
def _check(self, context, kwargs, request=None, **extra_kwargs):
|
def _check(self, context, kwargs, requests=[]):
|
||||||
event_hook = self.events.on(context).on("command")
|
event_hook = self.events.on(context).on("command")
|
||||||
if not request == None:
|
|
||||||
event_hook = event_hook.on(request)
|
|
||||||
|
|
||||||
returns = event_hook.call_unsafe(**kwargs, **extra_kwargs)
|
returns = []
|
||||||
|
if requests:
|
||||||
|
for request, request_args in requests:
|
||||||
|
returns.append(event_hook.on(request).call_unsafe_for_result(
|
||||||
|
**kwargs, request_args=request_args))
|
||||||
|
else:
|
||||||
|
returns = event_hook.call_unsafe(**kwargs)
|
||||||
|
|
||||||
hard_fail = False
|
hard_fail = False
|
||||||
force_success = False
|
force_success = False
|
||||||
|
@ -174,10 +178,16 @@ class Module(ModuleManager.BaseModule):
|
||||||
break
|
break
|
||||||
|
|
||||||
if next_success:
|
if next_success:
|
||||||
|
multi_check = None
|
||||||
if isinstance(next_return, utils.Check):
|
if isinstance(next_return, utils.Check):
|
||||||
|
multi_check = next_return.to_multi()
|
||||||
|
elif isinstance(next_return, utils.MultiCheck):
|
||||||
|
multi_check = next_return
|
||||||
|
|
||||||
|
if multi_check:
|
||||||
check_success, check_message = self._check("check",
|
check_success, check_message = self._check("check",
|
||||||
check_kwargs, next_return.request,
|
check_kwargs, multi_check.requests)
|
||||||
check_args=next_return.args)
|
|
||||||
if not check_success:
|
if not check_success:
|
||||||
return False, check_message
|
return False, check_message
|
||||||
else:
|
else:
|
||||||
|
|
|
@ -191,14 +191,10 @@ class Module(ModuleManager.BaseModule):
|
||||||
event["stdout"].write("Reset password for '%s'" %
|
event["stdout"].write("Reset password for '%s'" %
|
||||||
target.nickname)
|
target.nickname)
|
||||||
|
|
||||||
@utils.hook("preprocess.command")
|
def _check_command(self, event, permission, authenticated):
|
||||||
def preprocess_command(self, event):
|
|
||||||
if event["user"].admin_master:
|
if event["user"].admin_master:
|
||||||
return utils.consts.PERMISSION_FORCE_SUCCESS
|
return utils.consts.PERMISSION_FORCE_SUCCESS
|
||||||
|
|
||||||
permission = event["hook"].get_kwarg("permission", None)
|
|
||||||
authenticated = event["hook"].kwargs.get("authenticated", False)
|
|
||||||
|
|
||||||
identity_mechanism = event["server"].get_setting("identity-mechanism",
|
identity_mechanism = event["server"].get_setting("identity-mechanism",
|
||||||
"internal")
|
"internal")
|
||||||
identified_account = None
|
identified_account = None
|
||||||
|
@ -231,6 +227,16 @@ class Module(ModuleManager.BaseModule):
|
||||||
else:
|
else:
|
||||||
return utils.consts.PERMISSION_FORCE_SUCCESS
|
return utils.consts.PERMISSION_FORCE_SUCCESS
|
||||||
|
|
||||||
|
@utils.hook("preprocess.command")
|
||||||
|
def preprocess_command(self, event):
|
||||||
|
permission = event["hook"].get_kwarg("permission", None)
|
||||||
|
authenticated = event["hook"].kwargs.get("authenticated", False)
|
||||||
|
return self._check_command(event, permission, authenticated)
|
||||||
|
|
||||||
|
@utils.hook("check.command.permission")
|
||||||
|
def check_command(self, event):
|
||||||
|
return self._check_command(event, event["request_args"][0], False)
|
||||||
|
|
||||||
@utils.hook("received.command.mypermissions", authenticated=True)
|
@utils.hook("received.command.mypermissions", authenticated=True)
|
||||||
def my_permissions(self, event):
|
def my_permissions(self, event):
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -185,10 +185,19 @@ def export(setting: str, value: typing.Any):
|
||||||
return module
|
return module
|
||||||
return _export_func
|
return _export_func
|
||||||
|
|
||||||
|
class MultiCheck(object):
|
||||||
|
def __init__(self,
|
||||||
|
requests: typing.List[typing.Tuple[str, typing.List[str]]]):
|
||||||
|
self.requests = requests
|
||||||
class Check(object):
|
class Check(object):
|
||||||
def __init__(self, request: str, *args: typing.List[str]):
|
def __init__(self, request: str, *args: typing.List[str]):
|
||||||
self.request = request
|
self.request = request
|
||||||
self.args = args
|
self.args = args
|
||||||
|
def to_multi(self):
|
||||||
|
return MultiCheck([(self.request, self.args)])
|
||||||
|
def __or__(self, other: "Check"):
|
||||||
|
return MultiCheck([(self.request, self.args),
|
||||||
|
(other.request, other.args)])
|
||||||
|
|
||||||
TOP_10_CALLABLE = typing.Callable[[typing.Any], typing.Any]
|
TOP_10_CALLABLE = typing.Callable[[typing.Any], typing.Any]
|
||||||
def top_10(items: typing.Dict[typing.Any, typing.Any],
|
def top_10(items: typing.Dict[typing.Any, typing.Any],
|
||||||
|
|
Loading…
Reference in a new issue