bitbot-3.11-fork/src/EventManager.py

289 lines
11 KiB
Python
Raw Normal View History

import itertools, time, traceback, typing
from src import Logging, utils
2016-03-29 11:56:58 +00:00
PRIORITY_URGENT = 0
PRIORITY_HIGH = 1
PRIORITY_MEDIUM = 2
PRIORITY_LOW = 3
PRIORITY_MONITOR = 4
DEFAULT_PRIORITY = PRIORITY_MEDIUM
DEFAULT_EVENT_DELIMITER = "."
DEFAULT_MULTI_DELIMITER = "|"
CALLBACK_TYPE = typing.Callable[["Event"], typing.Any]
2016-03-29 11:56:58 +00:00
class Event(object):
def __init__(self, name: str, **kwargs):
self.name = name
2016-03-29 11:56:58 +00:00
self.kwargs = kwargs
self.eaten = False
def __getitem__(self, key: str) -> typing.Any:
2016-03-29 11:56:58 +00:00
return self.kwargs[key]
def get(self, key: str, default=None) -> typing.Any:
2016-03-29 11:56:58 +00:00
return self.kwargs.get(key, default)
def __contains__(self, key: str) -> bool:
2016-03-29 11:56:58 +00:00
return key in self.kwargs
def eat(self):
self.eaten = True
class EventCallback(object):
def __init__(self, function: CALLBACK_TYPE, priority: int, kwargs: dict):
2016-03-29 11:56:58 +00:00
self.function = function
2018-07-02 11:23:33 +00:00
self.priority = priority
2016-03-29 11:56:58 +00:00
self.kwargs = kwargs
self.docstring = utils.parse.docstring(function.__doc__)
def call(self, event: Event) -> typing.Any:
2016-03-29 11:56:58 +00:00
return self.function(event)
def get_kwarg(self, name: str, default=None) -> typing.Any:
item = self.kwargs.get(name, default)
return item or self.docstring.items.get(name, default)
2016-03-29 11:56:58 +00:00
class EventHook(object):
def __init__(self, log: Logging.Log, name: str = None,
parent: "EventHook" = None):
self.log = log
self.name = name
self.parent = parent
2016-03-29 11:56:58 +00:00
self._children = {}
self._hooks = []
self._stored_events = []
self._context_hooks = {}
def _make_event(self, kwargs: dict) -> Event:
return Event(self._get_path(), **kwargs)
def _get_path(self) -> str:
path = []
parent = self
while not parent == None and not parent.name == None:
path.append(parent.name)
parent = parent.parent
return DEFAULT_EVENT_DELIMITER.join(path[::-1])
def new_context(self, context: str) -> "EventHookContext":
return EventHookContext(self, context)
def hook(self, function: CALLBACK_TYPE, priority: int = DEFAULT_PRIORITY,
replay: bool = False, **kwargs) -> EventCallback:
return self._hook(function, None, priority, replay, kwargs)
def _context_hook(self, context: str, function: CALLBACK_TYPE,
priority: int, replay: bool, kwargs: dict) -> EventCallback:
return self._hook(function, context, priority, replay, kwargs)
def _hook(self, function: CALLBACK_TYPE, context: str, priority: int,
replay: bool, kwargs: dict) -> EventCallback:
callback = EventCallback(function, priority, kwargs)
if context == None:
self._hooks.append(callback)
else:
if not context in self._context_hooks:
self._context_hooks[context] = []
self._context_hooks[context].append(callback)
if replay and not self._stored_events == None:
for kwargs in self._stored_events:
2018-10-01 15:33:04 +00:00
self._call(kwargs, True, None)
self._stored_events = None
return callback
def unhook(self, callback: "EventHook"):
if callback in self._hooks:
self._hooks.remove(callback)
empty = []
for context, hooks in self._context_hooks.items():
if callback in hooks:
hooks.remove(callback)
if not hooks:
empty.append(context)
for context in empty:
del self._context_hooks[context]
def _make_multiple_hook(self, source: "EventHook", context: str,
events: typing.List[str]) -> "MultipleEventHook":
multiple_event_hook = MultipleEventHook()
for event in events:
event_hook = source.get_child(event)
if not context == None:
event_hook = event_hook.new_context(context)
multiple_event_hook._add(event_hook)
return multiple_event_hook
def on(self, subevent: str, *extra_subevents,
delimiter: int = DEFAULT_EVENT_DELIMITER) -> "EventHook":
return self._on(subevent, extra_subevents, None, delimiter)
def _context_on(self, context: str, subevent: str,
extra_subevents: typing.List[str],
delimiter: str = DEFAULT_EVENT_DELIMITER) -> "EventHook":
return self._on(subevent, extra_subevents, context, delimiter)
def _on(self, subevent: str, extra_subevents: typing.List[str],
context: str, delimiter: str) -> "EventHook":
if delimiter in subevent:
event_chain = subevent.split(delimiter)
event_obj = self
for event_name in event_chain:
if DEFAULT_MULTI_DELIMITER in event_name:
return self._make_multiple_hook(event_obj, context,
event_name.split(DEFAULT_MULTI_DELIMITER))
event_obj = event_obj.get_child(event_name)
if not context == None:
return event_obj.new_context(context)
return event_obj
2016-03-29 11:56:58 +00:00
if extra_subevents:
return self._make_multiple_hook(self, context,
(subevent,)+extra_subevents)
child = self.get_child(subevent)
if not context == None:
child = child.new_context(context)
return child
def call_for_result(self, default=None, **kwargs) -> typing.Any:
return (self.call_limited(1, **kwargs) or [default])[0]
def assure_call(self, **kwargs):
if not self._stored_events == None:
self._stored_events.append(kwargs)
else:
self._call(kwargs, True, None)
def call(self, **kwargs) -> typing.List[typing.Any]:
return self._call(kwargs, True, None)
def call_limited(self, maximum: int, **kwargs) -> typing.List[typing.Any]:
return self._call(kwargs, True, None)
def call_unsafe_for_result(self, default=None, **kwargs) -> typing.Any:
return (self.call_unsafe_limited(1, **kwargs) or [default])[0]
def call_unsafe(self, **kwargs) -> typing.List[typing.Any]:
2018-10-01 15:33:04 +00:00
return self._call(kwargs, False, None)
def call_unsafe_limited(self, maximum: int, **kwargs
) -> typing.List[typing.Any]:
return self._call(kwargs, False, maximum)
def _call(self, kwargs: dict, safe: bool, maximum: int
) -> typing.List[typing.Any]:
event_path = self._get_path()
self.log.trace("calling event: \"%s\" (params: %s)",
[event_path, kwargs])
start = time.monotonic()
event = self._make_event(kwargs)
2016-03-29 11:56:58 +00:00
returns = []
for hook in self.get_hooks()[:maximum]:
if event.eaten:
2016-03-29 11:56:58 +00:00
break
try:
returns.append(hook.call(event))
except Exception as e:
if safe:
self.log.error("failed to call event \"%s\"",
[self._get_path()], exc_info=True)
else:
raise
total_milliseconds = (time.monotonic() - start) * 1000
self.log.trace("event \"%s\" called in %fms", [
event_path, total_milliseconds])
self.check_purge()
2016-03-29 11:56:58 +00:00
return returns
def get_child(self, child_name: str) -> "EventHook":
2016-03-29 11:56:58 +00:00
child_name_lower = child_name.lower()
if not child_name_lower in self._children:
self._children[child_name_lower] = EventHook(self.log,
2018-09-27 11:16:30 +00:00
child_name_lower, self)
2016-03-29 11:56:58 +00:00
return self._children[child_name_lower]
def remove_child(self, child_name: str):
child_name_lower = child_name.lower()
if child_name_lower in self._children:
del self._children[child_name_lower]
def check_purge(self):
if self.is_empty() and not self.parent == None:
self.parent.remove_child(self.name)
self.parent.check_purge()
def remove_context(self, context: str):
del self._context_hooks[context]
def has_context(self, context: str) -> bool:
return context in self._context_hooks
def purge_context(self, context: str):
if self.has_context(context):
self.remove_context(context)
for child_name in self.get_children()[:]:
child = self.get_child(child_name)
child.purge_context(context)
def get_hooks(self) -> typing.List[EventCallback]:
return sorted(self._hooks + sum(self._context_hooks.values(), []),
key=lambda e: e.priority)
def get_children(self) -> typing.List["EventHook"]:
return list(self._children.keys())
def is_empty(self) -> bool:
return len(self.get_hooks() + self.get_children()) == 0
class MultipleEventHook(object):
def __init__(self):
self._event_hooks = set([])
def _add(self, event_hook: EventHook):
self._event_hooks.add(event_hook)
def hook(self, function: CALLBACK_TYPE, **kwargs):
for event_hook in self._event_hooks:
event_hook.hook(function, **kwargs)
def call_limited(self, maximum: int, **kwargs) -> typing.List[typing.Any]:
returns = []
for event_hook in self._event_hooks:
returns.append(event_hook.call_limited(maximum, **kwargs))
return returns
def call(self, **kwargs) -> typing.List[typing.Any]:
returns = []
for event_hook in self._event_hooks:
returns.append(event_hook.call(**kwargs))
return returns
class EventHookContext(object):
def __init__(self, parent, context):
self._parent = parent
self.context = context
def hook(self, function: CALLBACK_TYPE, priority: int = DEFAULT_PRIORITY,
replay: bool = False, **kwargs) -> EventCallback:
return self._parent._context_hook(self.context, function, priority,
replay, kwargs)
def unhook(self, callback: EventCallback):
self._parent.unhook(callback)
def on(self, subevent: str, *extra_subevents,
delimiter: str = DEFAULT_EVENT_DELIMITER) -> EventHook:
return self._parent._context_on(self.context, subevent,
extra_subevents, delimiter)
def call_for_result(self, default=None, **kwargs) -> typing.Any:
return self._parent.call_for_result(default, **kwargs)
def assure_call(self, **kwargs):
self._parent.assure_call(**kwargs)
def call(self, **kwargs) -> typing.List[typing.Any]:
return self._parent.call(**kwargs)
def call_limited(self, maximum: int, **kwargs) -> typing.List[typing.Any]:
return self._parent.call_limited(maximum, **kwargs)
def call_unsafe_for_result(self, default=None, **kwargs) -> typing.Any:
return self._parent.call_unsafe_for_result(default, **kwargs)
def call_unsafe(self, **kwargs) -> typing.List[typing.Any]:
return self._parent.call_unsafe(**kwargs)
def call_unsafe_limited(self, maximum: int, **kwargs
) -> typing.List[typing.Any]:
return self._parent.call_unsafe_limited(maximum, **kwargs)
def get_hooks(self) -> typing.List[EventCallback]:
return self._parent.get_hooks()
def get_children(self) -> typing.List[EventHook]:
return self._parent.get_children()