bitbot-3.11-fork/src/EventManager.py

247 lines
8.7 KiB
Python

import itertools, time, traceback, typing
from src import Logging, utils
PRIORITY_URGENT = 0
PRIORITY_HIGH = 1
PRIORITY_MEDIUM = 2
PRIORITY_LOW = 3
PRIORITY_MONITOR = 4
DEFAULT_PRIORITY = PRIORITY_MEDIUM
DEFAULT_EVENT_DELIMITER = "."
DEFAULT_MULTI_DELIMITER = "|"
class Event(object):
def __init__(self, name: str, kwargs):
self.name = name
self.kwargs = kwargs
self.eaten = False
def __getitem__(self, key: str) -> typing.Any:
return self.kwargs[key]
def get(self, key: str, default=None) -> typing.Any:
return self.kwargs.get(key, default)
def __contains__(self, key: str) -> bool:
return key in self.kwargs
def eat(self):
self.eaten = True
CALLBACK_TYPE = typing.Callable[[Event], typing.Any]
class EventHook(object):
def __init__(self, event_name: str, func: CALLBACK_TYPE,
context: typing.Optional[str], priority: int,
kwargs: typing.List[typing.Tuple[str, typing.Any]]):
self.event_name = event_name
self.function = func
self.context = context
self.priority = priority
self.docstring = utils.parse.docstring(func.__doc__ or "")
self.call_count = 0
self._kwargs: typing.Dict[str, typing.Any] = {}
self._multi_kwargs: typing.Dict[str, typing.List[typing.Any]] = {}
for key, value in kwargs:
if key in self._multi_kwargs:
self._multi_kwargs[key].append(value)
elif key in self._kwargs:
self._multi_kwargs[key] = [self._kwargs.pop(key), value]
else:
self._kwargs[key] = value
def call(self, event: Event) -> typing.Any:
self.call_count += 1
return self.function(event)
def get_kwargs(self, key: str) -> typing.List[typing.Any]:
if key in self._kwargs:
return [self._kwargs[key]]
elif key in self._multi_kwargs:
return self._multi_kwargs[key].copy()
elif key in self.docstring.var_items:
return self.docstring.var_items[key]
elif key in self.docstring.items:
return [self.docstring.items[key]]
return []
def get_kwarg(self, key: str, default: typing.Any=None) -> typing.Any:
return (self.get_kwargs(key) or [default])[0]
class Events(object):
def __init__(self, root: "EventRoot", path: typing.List[str],
context: typing.Optional[str]):
self._root = root
self._path = path
self._context = context
def new_root(self):
return self._root._new_root()
def new_context(self, context: str):
return self._root._new_context(context)
def make_event(self, **kwargs):
return self._root._make_event(self._path, kwargs)
def on(self, subname):
parts = subname.split(DEFAULT_EVENT_DELIMITER)
new_path = self._path + parts
return Events(self._root, new_path, self._context)
def hook(self, func: CALLBACK_TYPE, priority: int = DEFAULT_PRIORITY,
**kwargs):
self._hook(func, priority, list(kwargs.items()))
def _hook(self, func: CALLBACK_TYPE, priority: int = DEFAULT_PRIORITY,
kwargs: typing.List[typing.Tuple[str, typing.Any]] = []):
for key, value in kwargs:
if key == "priority":
priority = value
break
self._root._hook(self._path, func, self._context, priority, kwargs)
def call(self, **kwargs):
return self._root._call(self._path, kwargs, True, self._context, None)
def call_unsafe(self, **kwargs):
return self._root._call(self._path, kwargs, False, self._context, None)
def _call_limited(self, maximum: int, safe: bool, kwargs):
return self._root._call(self._path, kwargs, safe, self._context,
maximum)
def call_limited(self, maximum: int, **kwargs):
return self._call_limited(maximum, True, kwargs)
def call_limited_unsafe(self, maximum: int, **kwargs):
return self._call_limited(maximum, False, kwargs)
def call_for_result(self, default=None, **kwargs):
return (self._call_limited(1, True, kwargs) or [default])[0]
def call_for_result_unsafe(self, default=None, **kwargs):
return (self._call_limited(1, False, kwargs) or [default])[0]
def get_children(self):
return self._root._get_children(self._path)
def get_hooks(self):
return self._root._get_hooks(self._path)
def purge_context(self, context: str):
self._root._purge_context(context)
def all_hooks(self):
return self._root.all_hooks()
class EventRoot(object):
def __init__(self, log: Logging.Log):
self.log = log
self._hooks: typing.Dict[str, typing.List[EventHook]] = {}
def _make_event(self, path: typing.List[str], kwargs: dict):
return Event(self._path_str(path), kwargs)
def _new_context(self, context: str):
return Events(self, [], context)
def _new_root(self):
return EventRoot(self.log).wrap()
def wrap(self):
return Events(self, [], None)
def _path_str(self, path: typing.List[str]):
path_lower = [p.lower() for p in path]
return DEFAULT_EVENT_DELIMITER.join(path_lower)
def _hook(self, path: typing.List[str], func: CALLBACK_TYPE,
context: typing.Optional[str], priority: int,
kwargs: typing.List[typing.Tuple[str, typing.Any]] = []
) -> EventHook:
path_str = self._path_str(path)
new_hook = EventHook(path_str, func, context, priority, kwargs)
if not path_str in self._hooks:
self._hooks[path_str] = []
hook_array = self._hooks[path_str]
hooked = False
for i, other_hook in enumerate(hook_array):
if other_hook.priority >= new_hook.priority:
hooked = True
hook_array.insert(i, new_hook)
break
if not hooked:
hook_array.append(new_hook)
return new_hook
def _call(self, path: typing.List[str], kwargs: dict, safe: bool,
context: typing.Optional[str], maximum: typing.Optional[int]
) -> typing.List[typing.Any]:
if not utils.is_main_thread():
raise RuntimeError("Can't call events outside of main thread")
returns: typing.List[typing.Any] = []
path_str = self._path_str(path)
if not path_str in self._hooks:
self.log.trace("not calling non-hooked event \"%s\" (params: %s)",
[path_str, str(kwargs)])
return returns
self.log.trace("calling event: \"%s\" (params: %s)",
[path_str, str(kwargs)])
start = time.monotonic()
# .copy() hooks so we don't call new hooks in this loop
mutable_hooks = self._hooks[path_str]
hooks = mutable_hooks.copy()
if maximum:
hooks = hooks[:maximum]
event = self._make_event(path, kwargs)
for hook in hooks:
if event.eaten:
break
if not hook in mutable_hooks:
# this hook has been removed while handling this event
continue
try:
returned = hook.call(event)
except Exception as e:
if safe:
self.log.error("failed to call event \"%s\"",
[path_str], exc_info=True)
continue
else:
raise
returns.append(returned)
total_milliseconds = (time.monotonic() - start) * 1000
self.log.trace("event \"%s\" called in %fms",
[path_str, total_milliseconds])
return returns
def _purge_context(self, context: str):
context_hooks: typing.Dict[str, typing.List[EventHook]] = {}
for path in self._hooks.keys():
for hook in self._hooks[path]:
if hook.context == context:
if not path in context_hooks:
context_hooks[path] = []
context_hooks[path].append(hook)
for path in context_hooks:
for hook in context_hooks[path]:
self._hooks[path].remove(hook)
if not self._hooks[path]:
del self._hooks[path]
def _get_children(self, path):
path_prefix = "%s%s" % (self._path_str(path), DEFAULT_EVENT_DELIMITER)
matches = []
for key in self._hooks.keys():
if key.startswith(path_prefix):
matches.append(key.replace(path_prefix, "", 1))
return matches
def _get_hooks(self, path):
path_str = self._path_str(path)
if path_str in self._hooks:
return self._hooks[path_str][:]
return []
def all_hooks(self):
return self._hooks.copy()