2019-02-25 10:36:17 +00:00
|
|
|
import datetime, decimal, enum, io, ipaddress, re, threading, typing
|
2019-02-10 12:36:52 +00:00
|
|
|
from src.utils import cli, consts, irc, http, parse, security
|
2018-10-03 12:22:37 +00:00
|
|
|
|
2019-02-22 11:23:36 +00:00
|
|
|
class Direction(enum.Enum):
|
2019-03-10 13:14:15 +00:00
|
|
|
Send = 0
|
|
|
|
Recv = 1
|
2019-02-22 11:23:36 +00:00
|
|
|
|
2019-02-17 14:15:40 +00:00
|
|
|
ISO8601_PARSE = "%Y-%m-%dT%H:%M:%S%z"
|
2019-04-28 11:11:48 +00:00
|
|
|
ISO8601_PARSE_MICROSECONDS = "%Y-%m-%dT%H:%M:%S.%f%z"
|
2019-08-30 12:25:19 +00:00
|
|
|
|
|
|
|
ISO8601_FORMAT_DT = "%Y-%m-%dT%H:%M:%S"
|
|
|
|
ISO8601_FORMAT_TZ = "%z"
|
|
|
|
|
|
|
|
|
2019-04-24 13:32:56 +00:00
|
|
|
DATETIME_HUMAN = "%Y/%m/%d %H:%M:%S"
|
2019-02-17 14:15:40 +00:00
|
|
|
|
2019-04-23 20:27:43 +00:00
|
|
|
def iso8601_format(dt: datetime.datetime, milliseconds: bool=False) -> str:
|
2019-08-30 12:25:19 +00:00
|
|
|
dt_format = dt.strftime(ISO8601_FORMAT_DT)
|
|
|
|
tz_format = dt.strftime(ISO8601_FORMAT_TZ)
|
2019-04-23 20:27:43 +00:00
|
|
|
|
2019-08-30 12:25:19 +00:00
|
|
|
ms_format = ""
|
|
|
|
if milliseconds:
|
|
|
|
ms_format = ".%d" % (dt.microsecond/1000)
|
|
|
|
return "%s%s%s" % (dt_format, ms_format, tz_format)
|
|
|
|
def iso8601_format_now(milliseconds: bool=False) -> str:
|
|
|
|
now = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc)
|
|
|
|
return iso8601_format(now, milliseconds=milliseconds)
|
2019-04-28 11:11:48 +00:00
|
|
|
def iso8601_parse(s: str, microseconds: bool=False) -> datetime.datetime:
|
|
|
|
fmt = ISO8601_PARSE_MICROSECONDS if microseconds else ISO8601_PARSE
|
|
|
|
return datetime.datetime.strptime(s, fmt)
|
2019-02-12 11:08:24 +00:00
|
|
|
|
2019-04-24 13:32:56 +00:00
|
|
|
def datetime_human(dt: datetime.datetime):
|
|
|
|
return datetime.datetime.strftime(dt, DATETIME_HUMAN)
|
|
|
|
|
2018-10-03 12:22:37 +00:00
|
|
|
TIME_SECOND = 1
|
|
|
|
TIME_MINUTE = TIME_SECOND*60
|
|
|
|
TIME_HOUR = TIME_MINUTE*60
|
|
|
|
TIME_DAY = TIME_HOUR*24
|
|
|
|
TIME_WEEK = TIME_DAY*7
|
|
|
|
|
2018-10-30 14:58:48 +00:00
|
|
|
def time_unit(seconds: int) -> typing.Tuple[int, str]:
|
2018-10-03 12:22:37 +00:00
|
|
|
since = None
|
|
|
|
unit = None
|
|
|
|
if seconds >= TIME_WEEK:
|
|
|
|
since = seconds/TIME_WEEK
|
|
|
|
unit = "week"
|
|
|
|
elif seconds >= TIME_DAY:
|
|
|
|
since = seconds/TIME_DAY
|
|
|
|
unit = "day"
|
|
|
|
elif seconds >= TIME_HOUR:
|
|
|
|
since = seconds/TIME_HOUR
|
|
|
|
unit = "hour"
|
|
|
|
elif seconds >= TIME_MINUTE:
|
|
|
|
since = seconds/TIME_MINUTE
|
|
|
|
unit = "minute"
|
|
|
|
else:
|
|
|
|
since = seconds
|
|
|
|
unit = "second"
|
|
|
|
since = int(since)
|
|
|
|
if since > 1:
|
|
|
|
unit = "%ss" % unit # pluralise the unit
|
2018-10-30 14:58:48 +00:00
|
|
|
return (since, unit)
|
2018-10-03 12:22:37 +00:00
|
|
|
|
|
|
|
REGEX_PRETTYTIME = re.compile("\d+[wdhms]", re.I)
|
|
|
|
|
|
|
|
SECONDS_MINUTES = 60
|
|
|
|
SECONDS_HOURS = SECONDS_MINUTES*60
|
|
|
|
SECONDS_DAYS = SECONDS_HOURS*24
|
|
|
|
SECONDS_WEEKS = SECONDS_DAYS*7
|
|
|
|
|
2018-10-30 14:58:48 +00:00
|
|
|
def from_pretty_time(pretty_time: str) -> typing.Optional[int]:
|
2018-10-03 12:22:37 +00:00
|
|
|
seconds = 0
|
|
|
|
for match in re.findall(REGEX_PRETTYTIME, pretty_time):
|
|
|
|
number, unit = int(match[:-1]), match[-1].lower()
|
|
|
|
if unit == "m":
|
|
|
|
number = number*SECONDS_MINUTES
|
|
|
|
elif unit == "h":
|
|
|
|
number = number*SECONDS_HOURS
|
|
|
|
elif unit == "d":
|
|
|
|
number = number*SECONDS_DAYS
|
|
|
|
elif unit == "w":
|
|
|
|
number = number*SECONDS_WEEKS
|
|
|
|
seconds += number
|
|
|
|
if seconds > 0:
|
|
|
|
return seconds
|
2018-10-31 15:12:46 +00:00
|
|
|
return None
|
2018-10-03 12:22:37 +00:00
|
|
|
|
2018-10-30 14:58:48 +00:00
|
|
|
UNIT_MINIMUM = 6
|
2018-10-03 12:22:37 +00:00
|
|
|
UNIT_SECOND = 5
|
|
|
|
UNIT_MINUTE = 4
|
|
|
|
UNIT_HOUR = 3
|
|
|
|
UNIT_DAY = 2
|
|
|
|
UNIT_WEEK = 1
|
2018-10-30 14:58:48 +00:00
|
|
|
def to_pretty_time(total_seconds: int, minimum_unit: int=UNIT_SECOND,
|
|
|
|
max_units: int=UNIT_MINIMUM) -> str:
|
2019-02-07 22:47:03 +00:00
|
|
|
if total_seconds == 0:
|
|
|
|
return "0s"
|
|
|
|
|
2018-10-03 12:22:37 +00:00
|
|
|
minutes, seconds = divmod(total_seconds, 60)
|
|
|
|
hours, minutes = divmod(minutes, 60)
|
|
|
|
days, hours = divmod(hours, 24)
|
|
|
|
weeks, days = divmod(days, 7)
|
2019-03-21 17:59:42 +00:00
|
|
|
out = []
|
2018-10-03 12:22:37 +00:00
|
|
|
|
|
|
|
units = 0
|
|
|
|
if weeks and minimum_unit >= UNIT_WEEK and units < max_units:
|
2019-03-21 17:59:42 +00:00
|
|
|
out.append("%dw" % weeks)
|
2018-10-03 12:22:37 +00:00
|
|
|
units += 1
|
|
|
|
if days and minimum_unit >= UNIT_DAY and units < max_units:
|
2019-03-21 17:59:42 +00:00
|
|
|
out.append("%dd" % days)
|
2018-10-03 12:22:37 +00:00
|
|
|
units += 1
|
|
|
|
if hours and minimum_unit >= UNIT_HOUR and units < max_units:
|
2019-03-21 17:59:42 +00:00
|
|
|
out.append("%dh" % hours)
|
2018-10-03 12:22:37 +00:00
|
|
|
units += 1
|
|
|
|
if minutes and minimum_unit >= UNIT_MINUTE and units < max_units:
|
2019-03-21 17:59:42 +00:00
|
|
|
out.append("%dm" % minutes)
|
2018-10-03 12:22:37 +00:00
|
|
|
units += 1
|
|
|
|
if seconds and minimum_unit >= UNIT_SECOND and units < max_units:
|
2019-03-21 17:59:42 +00:00
|
|
|
out.append("%ds" % seconds)
|
2018-10-03 12:22:37 +00:00
|
|
|
units += 1
|
2019-03-21 17:59:42 +00:00
|
|
|
return " ".join(out)
|
2018-10-03 12:22:37 +00:00
|
|
|
|
2018-10-30 14:58:48 +00:00
|
|
|
def parse_number(s: str) -> str:
|
2018-10-12 10:28:14 +00:00
|
|
|
try:
|
|
|
|
decimal.Decimal(s)
|
2018-10-12 10:16:39 +00:00
|
|
|
return s
|
2018-10-12 10:28:14 +00:00
|
|
|
except:
|
|
|
|
pass
|
2018-10-12 10:16:39 +00:00
|
|
|
|
|
|
|
unit = s[-1].lower()
|
2018-10-30 17:49:35 +00:00
|
|
|
number_str = s[:-1]
|
2018-10-12 10:28:14 +00:00
|
|
|
try:
|
2018-10-30 17:49:35 +00:00
|
|
|
number = decimal.Decimal(number_str)
|
2018-10-12 10:28:14 +00:00
|
|
|
except:
|
2018-11-16 17:45:40 +00:00
|
|
|
raise ValueError("Invalid format '%s' passed to parse_number" %
|
|
|
|
number_str)
|
2018-10-12 10:16:39 +00:00
|
|
|
|
|
|
|
if unit == "k":
|
2018-10-12 10:28:14 +00:00
|
|
|
number *= decimal.Decimal("1_000")
|
2018-10-12 10:16:39 +00:00
|
|
|
elif unit == "m":
|
2018-10-12 10:28:14 +00:00
|
|
|
number *= decimal.Decimal("1_000_000")
|
2018-10-12 10:16:39 +00:00
|
|
|
elif unit == "b":
|
2018-10-12 10:28:14 +00:00
|
|
|
number *= decimal.Decimal("1_000_000_000")
|
2018-10-12 10:16:39 +00:00
|
|
|
else:
|
2018-10-12 10:21:57 +00:00
|
|
|
raise ValueError("Unknown unit '%s' given to parse_number" % unit)
|
2018-10-12 10:16:39 +00:00
|
|
|
return str(number)
|
|
|
|
|
2018-10-30 14:58:48 +00:00
|
|
|
def prevent_highlight(nickname: str) -> str:
|
2018-10-03 12:22:37 +00:00
|
|
|
return nickname[0]+"\u200c"+nickname[1:]
|
|
|
|
|
2018-10-16 13:47:01 +00:00
|
|
|
class EventError(Exception):
|
|
|
|
pass
|
2018-10-20 19:51:29 +00:00
|
|
|
class EventsResultsError(EventError):
|
|
|
|
def __init__(self):
|
|
|
|
EventError.__init__(self, "Failed to load results")
|
2018-10-30 14:58:48 +00:00
|
|
|
class EventsNotEnoughArgsError(EventError):
|
|
|
|
def __init__(self, n):
|
|
|
|
EventError.__init__(self, "Not enough arguments (minimum %d)" % n)
|
|
|
|
class EventsUsageError(EventError):
|
|
|
|
def __init__(self, usage):
|
|
|
|
EventError.__init__(self, "Not enough arguments, usage: %s" % usage)
|
|
|
|
|
2019-07-26 10:58:06 +00:00
|
|
|
class BitBotMagic(object):
|
|
|
|
def __init__(self):
|
|
|
|
self._hooks: typing.List[typing.Tuple[str, dict]] = []
|
|
|
|
self._kwargs: typing.List[typing.Tuple[str, typing.Any]] = []
|
|
|
|
self._exports: typing.List[typing.Tuple[str, typing.Any]] = []
|
|
|
|
def add_hook(self, hook: str, kwargs: dict):
|
|
|
|
self._hooks.append((hook, kwargs))
|
|
|
|
def add_kwarg(self, key: str, value: typing.Any):
|
|
|
|
self._kwargs.append((key, value))
|
|
|
|
|
|
|
|
def get_hooks(self):
|
|
|
|
hooks: typing.List[typing.Tuple[str, typing.List[str, typing.Any]]] = []
|
|
|
|
for hook, kwargs in self._hooks:
|
|
|
|
hooks.append((hook, self._kwargs.copy()+list(kwargs.items())))
|
|
|
|
return hooks
|
|
|
|
|
|
|
|
def add_export(self, key: str, value: typing.Any):
|
|
|
|
self._exports.append((key, value))
|
|
|
|
def get_exports(self):
|
|
|
|
return self._exports.copy()
|
|
|
|
|
|
|
|
def get_magic(obj: typing.Any):
|
|
|
|
if not has_magic(obj):
|
|
|
|
setattr(obj, consts.BITBOT_MAGIC, BitBotMagic())
|
|
|
|
return getattr(obj, consts.BITBOT_MAGIC)
|
|
|
|
def has_magic(obj: typing.Any):
|
|
|
|
return hasattr(obj, consts.BITBOT_MAGIC)
|
|
|
|
|
2018-10-30 14:58:48 +00:00
|
|
|
def hook(event: str, **kwargs):
|
2018-10-03 12:22:37 +00:00
|
|
|
def _hook_func(func):
|
2019-07-26 10:58:06 +00:00
|
|
|
magic = get_magic(func)
|
|
|
|
magic.add_hook(event, kwargs)
|
2018-10-03 12:22:37 +00:00
|
|
|
return func
|
|
|
|
return _hook_func
|
2018-10-30 14:58:48 +00:00
|
|
|
def export(setting: str, value: typing.Any):
|
2018-10-03 12:22:37 +00:00
|
|
|
def _export_func(module):
|
2019-07-26 10:58:06 +00:00
|
|
|
magic = get_magic(module)
|
|
|
|
magic.add_export(setting, value)
|
2018-10-03 12:22:37 +00:00
|
|
|
return module
|
|
|
|
return _export_func
|
2019-06-26 13:37:26 +00:00
|
|
|
def kwarg(key: str, value: typing.Any):
|
|
|
|
def _kwarg_func(func):
|
2019-07-26 10:58:06 +00:00
|
|
|
magic = get_magic(func)
|
|
|
|
magic.add_kwarg(key, value)
|
2019-06-26 13:37:26 +00:00
|
|
|
return func
|
|
|
|
return _kwarg_func
|
2018-10-03 12:22:37 +00:00
|
|
|
|
2019-06-14 11:01:55 +00:00
|
|
|
class MultiCheck(object):
|
|
|
|
def __init__(self,
|
|
|
|
requests: typing.List[typing.Tuple[str, typing.List[str]]]):
|
2019-06-15 17:42:14 +00:00
|
|
|
self._requests = requests
|
|
|
|
def to_multi(self):
|
|
|
|
return self
|
|
|
|
def requests(self):
|
|
|
|
return self._requests[:]
|
2019-06-16 19:48:31 +00:00
|
|
|
def __or__(self, other: "Check"):
|
|
|
|
return MultiCheck(self._requests+[(other.request, other.args)])
|
2019-06-14 10:42:12 +00:00
|
|
|
class Check(object):
|
2019-06-14 11:12:38 +00:00
|
|
|
def __init__(self, request: str, *args: str):
|
2019-06-14 10:42:12 +00:00
|
|
|
self.request = request
|
2019-06-14 11:12:38 +00:00
|
|
|
self.args = list(args)
|
2019-06-14 11:01:55 +00:00
|
|
|
def to_multi(self):
|
|
|
|
return MultiCheck([(self.request, self.args)])
|
|
|
|
def __or__(self, other: "Check"):
|
|
|
|
return MultiCheck([(self.request, self.args),
|
|
|
|
(other.request, other.args)])
|
2019-06-14 10:42:12 +00:00
|
|
|
|
2018-10-30 14:58:48 +00:00
|
|
|
TOP_10_CALLABLE = typing.Callable[[typing.Any], typing.Any]
|
2018-10-30 17:49:35 +00:00
|
|
|
def top_10(items: typing.Dict[typing.Any, typing.Any],
|
2018-10-30 14:58:48 +00:00
|
|
|
convert_key: TOP_10_CALLABLE=lambda x: x,
|
|
|
|
value_format: TOP_10_CALLABLE=lambda x: x):
|
|
|
|
top_10 = sorted(items.keys())
|
|
|
|
top_10 = sorted(top_10, key=items.get, reverse=True)[:10]
|
|
|
|
|
|
|
|
top_10_items = []
|
|
|
|
for key in top_10:
|
|
|
|
top_10_items.append("%s (%s)" % (convert_key(key),
|
|
|
|
value_format(items[key])))
|
|
|
|
|
|
|
|
return top_10_items
|
2018-12-08 09:00:12 +00:00
|
|
|
|
|
|
|
class CaseInsensitiveDict(dict):
|
2018-12-08 09:13:01 +00:00
|
|
|
def __init__(self, other: typing.Dict[str, typing.Any]):
|
|
|
|
dict.__init__(self, ((k.lower(), v) for k, v in other.items()))
|
|
|
|
def __getitem__(self, key: str) -> typing.Any:
|
2018-12-08 09:00:12 +00:00
|
|
|
return dict.__getitem__(self, key.lower())
|
2018-12-08 09:13:01 +00:00
|
|
|
def __setitem__(self, key: str, value: typing.Any) -> typing.Any:
|
|
|
|
return dict.__setitem__(self, key.lower(), value)
|
2019-06-26 16:57:49 +00:00
|
|
|
def __contains__(self, key: str):
|
|
|
|
return dict.__contains__(self, key.lower())
|
2019-02-06 18:11:19 +00:00
|
|
|
|
|
|
|
def is_ip(s: str) -> bool:
|
|
|
|
try:
|
|
|
|
ipaddress.ip_address(s)
|
|
|
|
except ValueError:
|
|
|
|
return False
|
|
|
|
return True
|
2019-02-25 10:36:17 +00:00
|
|
|
|
|
|
|
def is_main_thread() -> bool:
|
|
|
|
return threading.current_thread() is threading.main_thread()
|
2019-06-28 22:16:05 +00:00
|
|
|
|
|
|
|
class Setting(object):
|
2019-06-29 21:26:28 +00:00
|
|
|
example: typing.Optional[str] = None
|
2019-06-28 22:16:05 +00:00
|
|
|
def __init__(self, name: str, help: str=None, example: str=None):
|
|
|
|
self.name = name
|
|
|
|
self.help = help
|
2019-06-29 20:33:26 +00:00
|
|
|
if not example == None:
|
|
|
|
self.example = example
|
2019-06-28 22:16:05 +00:00
|
|
|
def parse(self, value: str) -> typing.Any:
|
|
|
|
return value
|
|
|
|
|
2019-06-29 20:33:26 +00:00
|
|
|
def get_example(self):
|
|
|
|
if not self.example == None:
|
|
|
|
return "Example: %s" % self.example
|
|
|
|
else:
|
|
|
|
return self._format_example()
|
|
|
|
def _format_example(self):
|
|
|
|
return None
|
|
|
|
|
2019-06-28 22:16:05 +00:00
|
|
|
SETTING_TRUE = ["true", "yes", "on", "y"]
|
|
|
|
SETTING_FALSE = ["false", "no", "off", "n"]
|
|
|
|
class BoolSetting(Setting):
|
|
|
|
example = "on"
|
|
|
|
def parse(self, value: str) -> typing.Any:
|
|
|
|
value_lower = value.lower()
|
|
|
|
if value_lower in SETTING_TRUE:
|
|
|
|
return True
|
|
|
|
elif value_lower in SETTING_FALSE:
|
|
|
|
return False
|
|
|
|
return None
|
|
|
|
|
|
|
|
class IntSetting(Setting):
|
|
|
|
example = "10"
|
|
|
|
def parse(self, value: str) -> typing.Any:
|
|
|
|
stripped = value.lstrip("0")
|
|
|
|
if stripped.isdigit():
|
|
|
|
return int(stripped)
|
|
|
|
return None
|
|
|
|
|
2019-06-28 22:25:24 +00:00
|
|
|
class OptionsSetting(Setting):
|
|
|
|
def __init__(self, name: str, options: typing.List[str], help: str=None,
|
2019-06-29 21:26:28 +00:00
|
|
|
example: str=None,
|
|
|
|
options_factory: typing.Callable[[], typing.List[str]]=None):
|
2019-06-28 22:25:24 +00:00
|
|
|
self._options = options
|
2019-06-29 21:26:28 +00:00
|
|
|
self._options_factory = options_factory
|
2019-06-28 22:27:56 +00:00
|
|
|
Setting.__init__(self, name, help, example)
|
2019-06-28 22:25:24 +00:00
|
|
|
|
2019-06-29 21:26:28 +00:00
|
|
|
def _get_options(self):
|
|
|
|
if not self._options_factory == None:
|
|
|
|
return self._options_factory()
|
|
|
|
else:
|
|
|
|
return self._options
|
|
|
|
|
2019-06-28 22:25:24 +00:00
|
|
|
def parse(self, value: str) -> typing.Any:
|
|
|
|
value_lower = value.lower()
|
2019-06-29 21:26:28 +00:00
|
|
|
for option in self._get_options():
|
2019-06-28 22:25:24 +00:00
|
|
|
if option.lower() == value_lower:
|
|
|
|
return option
|
|
|
|
return None
|
2019-06-29 20:33:26 +00:00
|
|
|
|
|
|
|
def _format_example(self):
|
2019-06-29 21:26:28 +00:00
|
|
|
options = self._get_options()
|
|
|
|
options_str = ["'%s'" % option for option in options]
|
|
|
|
return "Options: %s" % ", ".join(options_str)
|