import contextlib, enum, ipaddress, multiprocessing queue, signal, threading import typing from . import cli, consts, datetime, decorators, irc, http, parse, security from .decorators import export, hook, kwarg from .settings import (BoolSetting, FunctionSetting, IntRangeSetting, IntSetting, OptionsSetting, sensitive_format, SensitiveSetting, Setting) from .errors import (EventError, EventsNotEnoughArgsError, EventResultsError, EventUsageError) class Direction(enum.Enum): Send = 0 Recv = 1 def prevent_highlight(nickname: str) -> str: return nickname[0]+"\u200c"+nickname[1:] class MultiCheck(object): def __init__(self, requests: typing.List[typing.Tuple[str, typing.List[str]]]): self._requests = requests def to_multi(self): return self def requests(self): return self._requests[:] def __or__(self, other: "Check"): return MultiCheck(self._requests+[(other.request, other.args)]) class Check(object): def __init__(self, request: str, *args: str): self.request = request self.args = list(args) def to_multi(self): return MultiCheck([(self.request, self.args)]) def __or__(self, other: "Check"): return MultiCheck([(self.request, self.args), (other.request, other.args)]) TOP_10_CALLABLE = typing.Callable[[typing.Any], typing.Any] def top_10(items: typing.Dict[typing.Any, typing.Any], convert_key: TOP_10_CALLABLE=lambda x: x, value_format: TOP_10_CALLABLE=lambda x: x): top_10 = sorted(items.keys()) top_10 = sorted(top_10, key=items.get, reverse=True)[:10] top_10_items = [] for key in top_10: top_10_items.append("%s (%s)" % (convert_key(key), value_format(items[key]))) return top_10_items class CaseInsensitiveDict(dict): def __init__(self, other: typing.Dict[str, typing.Any]): dict.__init__(self, ((k.lower(), v) for k, v in other.items())) def __getitem__(self, key: str) -> typing.Any: return dict.__getitem__(self, key.lower()) def __setitem__(self, key: str, value: typing.Any) -> typing.Any: return dict.__setitem__(self, key.lower(), value) def __contains__(self, key: typing.Any) -> bool: if isinstance(key, str): return dict.__contains__(self, key.lower()) else: raise TypeError("Expected string, not %r" % key) def get(self, key: str, default: typing.Any=None): return dict.get(self, key.lower(), default) def is_ip(s: str) -> bool: try: ipaddress.ip_address(s) except ValueError: return False return True def is_main_thread() -> bool: return threading.current_thread() is threading.main_thread() class DeadlineExceededException(Exception): pass def _raise_deadline(): raise DeadlineExceededException() @contextlib.contextmanager def deadline(seconds: int=10): old_handler = signal.signal(signal.SIGALRM, lambda _1, _2: _raise_deadline()) old_seconds, _ = signal.setitimer(signal.ITIMER_REAL, seconds, 0) try: if not old_seconds == 0.0 and seconds > old_seconds: raise ValueError( "Deadline timeout larger than parent deadline (%s > %s)" % (seconds, old_seconds)) yield finally: signal.signal(signal.SIGALRM, old_handler) signal.setitimer(signal.ITIMER_REAL, old_seconds, 0) DeadlineProcessReturnType = typing.TypeVar("DeadlineProcessReturnType") def deadline_process(func: typing.Callable[[], DeadlineProcessReturnType], seconds: int=10) -> DeadlineProcessReturnType: q: multiprocessing.Queue[ typing.Tuple[bool, DeadlineProcessReturnType]] = multiprocessing.Queue() def _wrap(func, q): try: q.put([True, func()]) except Exception as e: q.put([False, e]) p = multiprocessing.Process(target=_wrap, args=(func, q)) p.start() deadlined = False try: success, out = q.get(block=True, timeout=seconds) except queue.Empty: p.kill() # type: ignore # to make mypy pass on Python 3.6 deadlined = True finally: q.close() if deadlined: _raise_deadline() if success: return out else: raise out # type: ignore