333 lines
13 KiB
Python
333 lines
13 KiB
Python
import itertools, time, traceback, typing
|
|
from src import Logging, utils
|
|
|
|
PRIORITY_URGENT = 0
|
|
PRIORITY_HIGH = 1
|
|
PRIORITY_MEDIUM = 2
|
|
PRIORITY_LOW = 3
|
|
PRIORITY_MONITOR = 4
|
|
|
|
DEFAULT_PRIORITY = PRIORITY_MEDIUM
|
|
DEFAULT_EVENT_DELIMITER = "."
|
|
DEFAULT_MULTI_DELIMITER = "|"
|
|
|
|
CALLBACK_TYPE = typing.Callable[["Event"], typing.Any]
|
|
|
|
class Event(object):
|
|
def __init__(self, name: str, **kwargs):
|
|
self.name = name
|
|
self.kwargs = kwargs
|
|
self.eaten = False
|
|
def __getitem__(self, key: str) -> typing.Any:
|
|
return self.kwargs[key]
|
|
def get(self, key: str, default=None) -> typing.Any:
|
|
return self.kwargs.get(key, default)
|
|
def __contains__(self, key: str) -> bool:
|
|
return key in self.kwargs
|
|
def eat(self):
|
|
self.eaten = True
|
|
|
|
class EventCallback(object):
|
|
def __init__(self, event_name: str, function: CALLBACK_TYPE, priority: int,
|
|
kwargs: dict, context: typing.Optional[str], one_shot: bool=False):
|
|
self.event_name = event_name
|
|
self.function = function
|
|
self.priority = priority
|
|
self.kwargs = kwargs
|
|
self.docstring = utils.parse.docstring(function.__doc__ or "")
|
|
self.context = context
|
|
self._one_shot = one_shot
|
|
|
|
def call(self, event: Event) -> typing.Any:
|
|
return self.function(event)
|
|
|
|
def get_kwarg(self, name: str, default=None) -> typing.Any:
|
|
if name in self.kwargs:
|
|
return self.kwargs[name]
|
|
elif name in self.docstring.items:
|
|
return self.docstring.items[name]
|
|
return default
|
|
|
|
class EventHook(object):
|
|
def __init__(self, log: Logging.Log, name: str = None,
|
|
parent: "EventHook" = None):
|
|
self.log = log
|
|
self.name = name
|
|
self.parent = parent
|
|
self._children = {} # type: typing.Dict[str, EventHook]
|
|
self._hooks = [] # type: typing.List[EventCallback]
|
|
self._replayed = False
|
|
self._stored_events = [] # type: typing.List[typing.Dict]
|
|
self._context_hooks = {} # type: typing.Dict[str, typing.List[EventCallback]]
|
|
|
|
def new_root(self) -> "EventHook":
|
|
return EventHook(self.log)
|
|
|
|
def _make_event(self, kwargs: dict) -> Event:
|
|
return Event(self._get_path(), **kwargs)
|
|
def make_event(self, **kwargs):
|
|
return self._make_event(kwargs)
|
|
|
|
def _get_path(self) -> str:
|
|
path = []
|
|
parent = self # type: typing.Optional[EventHook]
|
|
while not parent == None:
|
|
cast_parent = typing.cast(EventHook, parent)
|
|
if cast_parent.name == None:
|
|
break
|
|
|
|
path.append(typing.cast(str, cast_parent.name))
|
|
parent = cast_parent.parent
|
|
return DEFAULT_EVENT_DELIMITER.join(path[::-1])
|
|
|
|
def new_context(self, context: str) -> "EventHookContext":
|
|
return EventHookContext(self, context)
|
|
|
|
def hook(self, function: CALLBACK_TYPE, priority: int = DEFAULT_PRIORITY,
|
|
replay: bool = False, **kwargs) -> EventCallback:
|
|
return self._hook(function, None, priority, replay, kwargs)
|
|
def _context_hook(self, context: str, function: CALLBACK_TYPE,
|
|
priority: int, replay: bool, kwargs: dict) -> EventCallback:
|
|
return self._hook(function, context, priority, replay, kwargs)
|
|
def _hook(self, function: CALLBACK_TYPE, context: typing.Optional[str],
|
|
priority: int, replay: bool, kwargs: dict) -> EventCallback:
|
|
event_name = typing.cast(str, self.name)
|
|
callback = EventCallback(event_name, function, priority, kwargs,
|
|
context)
|
|
|
|
if context == None:
|
|
self._hooks.append(callback)
|
|
else:
|
|
context_str = typing.cast(str, context)
|
|
if not context in self._context_hooks:
|
|
self._context_hooks[context_str] = []
|
|
self._context_hooks[context_str].append(callback)
|
|
|
|
if replay and not self._replayed:
|
|
for kwargs in self._stored_events:
|
|
self._call(kwargs, True, None)
|
|
self._replayed = True
|
|
return callback
|
|
|
|
def unhook(self, callback: "EventCallback"):
|
|
if callback in self._hooks:
|
|
self._hooks.remove(callback)
|
|
|
|
empty = []
|
|
for context, hooks in self._context_hooks.items():
|
|
if callback in hooks:
|
|
hooks.remove(callback)
|
|
if not hooks:
|
|
empty.append(context)
|
|
for context in empty:
|
|
del self._context_hooks[context]
|
|
|
|
def _make_multiple_hook(self, source: "EventHook",
|
|
context: typing.Optional[str],
|
|
events: typing.Iterable[str]) -> "MultipleEventHook":
|
|
multiple_event_hook = MultipleEventHook()
|
|
for event in events:
|
|
event_hook = source.get_child(event)
|
|
if not context == None:
|
|
context_hook = event_hook.new_context(typing.cast(str, context))
|
|
multiple_event_hook._add(typing.cast(EventHook, context_hook))
|
|
else:
|
|
multiple_event_hook._add(event_hook)
|
|
return multiple_event_hook
|
|
|
|
def on(self, subevent: str, *extra_subevents: str,
|
|
delimiter: str = DEFAULT_EVENT_DELIMITER) -> "EventHook":
|
|
return self._on(subevent, extra_subevents, None, delimiter)
|
|
def _context_on(self, context: str, subevent: str,
|
|
extra_subevents: typing.Tuple[str, ...],
|
|
delimiter: str = DEFAULT_EVENT_DELIMITER) -> "EventHook":
|
|
return self._on(subevent, extra_subevents, context, delimiter)
|
|
def _on(self, subevent: str, extra_subevents: typing.Tuple[str, ...],
|
|
context: typing.Optional[str], delimiter: str) -> "EventHook":
|
|
if delimiter in subevent:
|
|
event_chain = subevent.split(delimiter)
|
|
event_obj = self
|
|
for event_name in event_chain:
|
|
if DEFAULT_MULTI_DELIMITER in event_name:
|
|
multiple_hook = self._make_multiple_hook(event_obj, context,
|
|
event_name.split(DEFAULT_MULTI_DELIMITER))
|
|
return typing.cast(EventHook, multiple_hook)
|
|
|
|
event_obj = event_obj.get_child(event_name)
|
|
if not context == None:
|
|
context_hook = event_obj.new_context(typing.cast(str, context))
|
|
return typing.cast(EventHook, context_hook)
|
|
return event_obj
|
|
|
|
if extra_subevents:
|
|
multiple_hook = self._make_multiple_hook(self, context,
|
|
(subevent,)+extra_subevents)
|
|
return typing.cast(EventHook, multiple_hook)
|
|
|
|
child = self.get_child(subevent)
|
|
if not context == None:
|
|
context_child = child.new_context(typing.cast(str, context))
|
|
child = typing.cast(EventHook, context_child)
|
|
return child
|
|
|
|
def call_for_result(self, default=None, **kwargs) -> typing.Any:
|
|
return (self.call_limited(1, **kwargs) or [default])[0]
|
|
def assure_call(self, **kwargs):
|
|
if not self._replayed:
|
|
self._stored_events.append(kwargs)
|
|
else:
|
|
self._call(kwargs, True, None)
|
|
def call(self, **kwargs) -> typing.List[typing.Any]:
|
|
return self._call(kwargs, True, None)
|
|
def call_limited(self, maximum: int, **kwargs) -> typing.List[typing.Any]:
|
|
return self._call(kwargs, True, maximum)
|
|
|
|
def call_unsafe_for_result(self, default=None, **kwargs) -> typing.Any:
|
|
return (self.call_unsafe_limited(1, **kwargs) or [default])[0]
|
|
def call_unsafe(self, **kwargs) -> typing.List[typing.Any]:
|
|
return self._call(kwargs, False, None)
|
|
def call_unsafe_limited(self, maximum: int, **kwargs
|
|
) -> typing.List[typing.Any]:
|
|
return self._call(kwargs, False, maximum)
|
|
|
|
def _call(self, kwargs: dict, safe: bool, maximum: typing.Optional[int]
|
|
) -> typing.List[typing.Any]:
|
|
if not utils.is_main_thread():
|
|
raise RuntimeError("Can't call events outside of main thread")
|
|
|
|
event_path = self._get_path()
|
|
hooks = self.get_hooks()
|
|
returns = [] # type: typing.List[typing.Any]
|
|
|
|
if not hooks:
|
|
self.log.trace("not calling non-hooked event \"%s\" (params: %s)",
|
|
[event_path, kwargs])
|
|
return returns
|
|
|
|
self.log.trace("calling event: \"%s\" (params: %s)",
|
|
[event_path, kwargs])
|
|
start = time.monotonic()
|
|
|
|
event = self._make_event(kwargs)
|
|
for hook in hooks[:maximum]:
|
|
if event.eaten:
|
|
break
|
|
try:
|
|
returns.append(hook.call(event))
|
|
except Exception as e:
|
|
if safe:
|
|
self.log.error("failed to call event \"%s\"",
|
|
[self._get_path()], exc_info=True)
|
|
else:
|
|
raise
|
|
|
|
if hook._one_shot:
|
|
self.unhook(hook)
|
|
|
|
total_milliseconds = (time.monotonic() - start) * 1000
|
|
self.log.trace("event \"%s\" called in %fms",
|
|
[event_path, total_milliseconds])
|
|
|
|
self.check_purge()
|
|
|
|
return returns
|
|
|
|
def get_child(self, child_name: str) -> "EventHook":
|
|
child_name_lower = child_name.lower()
|
|
if not child_name_lower in self._children:
|
|
self._children[child_name_lower] = EventHook(self.log,
|
|
child_name_lower, self)
|
|
return self._children[child_name_lower]
|
|
def remove_child(self, child_name: str):
|
|
child_name_lower = child_name.lower()
|
|
if child_name_lower in self._children:
|
|
del self._children[child_name_lower]
|
|
|
|
def check_purge(self):
|
|
if self.is_empty() and not self.parent == None:
|
|
self.parent.remove_child(self.name)
|
|
self.parent.check_purge()
|
|
|
|
def remove_context(self, context: str):
|
|
del self._context_hooks[context]
|
|
self.check_purge()
|
|
def has_context(self, context: str) -> bool:
|
|
return context in self._context_hooks
|
|
def purge_context(self, context: str):
|
|
if self.has_context(context):
|
|
self.remove_context(context)
|
|
|
|
for child_name in self.get_children()[:]:
|
|
child = self.get_child(child_name)
|
|
child.purge_context(context)
|
|
|
|
def get_hooks(self) -> typing.List[EventCallback]:
|
|
return sorted(self._hooks + sum(self._context_hooks.values(), []),
|
|
key=lambda e: e.priority)
|
|
def get_children(self) -> typing.List[str]:
|
|
return list(self._children.keys())
|
|
def is_empty(self) -> bool:
|
|
return (len(self.get_hooks())+len(self.get_children())) == 0
|
|
|
|
class MultipleEventHook(object):
|
|
def __init__(self):
|
|
self._event_hooks = set([])
|
|
def _add(self, event_hook: EventHook):
|
|
self._event_hooks.add(event_hook)
|
|
|
|
def hook(self, function: CALLBACK_TYPE, **kwargs):
|
|
for event_hook in self._event_hooks:
|
|
event_hook.hook(function, **kwargs)
|
|
|
|
def call_limited(self, maximum: int, **kwargs) -> typing.List[typing.Any]:
|
|
returns = []
|
|
for event_hook in self._event_hooks:
|
|
returns.append(event_hook.call_limited(maximum, **kwargs))
|
|
return returns
|
|
def call(self, **kwargs) -> typing.List[typing.Any]:
|
|
returns = []
|
|
for event_hook in self._event_hooks:
|
|
returns.append(event_hook.call(**kwargs))
|
|
return returns
|
|
|
|
class EventHookContext(object):
|
|
def __init__(self, parent, context):
|
|
self._parent = parent
|
|
self.context = context
|
|
|
|
def make_event(self, **kwargs):
|
|
return self._parent.make_event(**kwargs)
|
|
|
|
def hook(self, function: CALLBACK_TYPE, priority: int = DEFAULT_PRIORITY,
|
|
replay: bool = False, **kwargs) -> EventCallback:
|
|
return self._parent._context_hook(self.context, function, priority,
|
|
replay, kwargs)
|
|
def unhook(self, callback: EventCallback):
|
|
self._parent.unhook(callback)
|
|
|
|
def on(self, subevent: str, *extra_subevents,
|
|
delimiter: str = DEFAULT_EVENT_DELIMITER) -> EventHook:
|
|
return self._parent._context_on(self.context, subevent,
|
|
extra_subevents, delimiter)
|
|
|
|
def call_for_result(self, default=None, **kwargs) -> typing.Any:
|
|
return self._parent.call_for_result(default, **kwargs)
|
|
def assure_call(self, **kwargs):
|
|
self._parent.assure_call(**kwargs)
|
|
def call(self, **kwargs) -> typing.List[typing.Any]:
|
|
return self._parent.call(**kwargs)
|
|
def call_limited(self, maximum: int, **kwargs) -> typing.List[typing.Any]:
|
|
return self._parent.call_limited(maximum, **kwargs)
|
|
|
|
def call_unsafe_for_result(self, default=None, **kwargs) -> typing.Any:
|
|
return self._parent.call_unsafe_for_result(default, **kwargs)
|
|
def call_unsafe(self, **kwargs) -> typing.List[typing.Any]:
|
|
return self._parent.call_unsafe(**kwargs)
|
|
def call_unsafe_limited(self, maximum: int, **kwargs
|
|
) -> typing.List[typing.Any]:
|
|
return self._parent.call_unsafe_limited(maximum, **kwargs)
|
|
|
|
def get_hooks(self) -> typing.List[EventCallback]:
|
|
return self._parent.get_hooks()
|
|
def get_children(self) -> typing.List[EventHook]:
|
|
return self._parent.get_children()
|