move all datetime-related code from utils/__init__ to utils.datetime

This commit is contained in:
jesopo 2019-11-15 13:59:09 +00:00
parent 5e8cf06a45
commit 5d01db8514
20 changed files with 176 additions and 167 deletions

View file

@ -5,12 +5,11 @@ from src import ModuleManager, utils
RE_HUMAN_FORMAT = re.compile(r"(\d\d\d\d)-(\d?\d)-(\d?\d)")
HUMAN_FORMAT_HELP = "year-month-day (e.g. 2018-12-29)"
DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
class Module(ModuleManager.BaseModule):
def _parse_date(self, dt: str):
if dt.lower() == "today":
return utils.datetime_utcnow()
return utils.datetime.datetime_utcnow()
else:
match = RE_HUMAN_FORMAT.match(dt)
if not match:
@ -23,7 +22,7 @@ class Module(ModuleManager.BaseModule):
).replace(tzinfo=datetime.timezone.utc)
def _date_str(self, dt: datetime.datetime):
return utils.date_human(dt)
return utils.datetime.date_human(dt)
def _round_up_day(self, dt: datetime.datetime):
return dt.date()+datetime.timedelta(days=1)
@ -53,12 +52,12 @@ class Module(ModuleManager.BaseModule):
badge_lower = badge.lower()
badges = self._get_badges(event["user"])
now = self._round_up_day(utils.datetime_utcnow())
now = self._round_up_day(utils.datetime.datetime_utcnow())
found_badge = self._find_badge(badges, badge)
if found_badge:
dt = utils.iso8601_parse(badges[found_badge])
dt = utils.datetime.iso8601_parse(badges[found_badge])
days_since = self._days_since(now, dt)
event["stdout"].write("(%s) %s on day %s (%s)" % (
event["user"].nickname, found_badge, days_since,
@ -75,10 +74,11 @@ class Module(ModuleManager.BaseModule):
if event["args"]:
user = event["server"].get_user(event["args_split"][0])
now = self._round_up_day(utils.datetime_utcnow())
now = self._round_up_day(utils.datetime.datetime_utcnow())
badges = []
for badge, date in self._get_badges(user).items():
days_since = self._days_since(now, utils.iso8601_parse(date))
days_since = self._days_since(now,
utils.datetime.iso8601_parse(date))
badges.append("%s on day %s" % (
badge, days_since))
@ -99,7 +99,7 @@ class Module(ModuleManager.BaseModule):
if badge_name.lower() == badge_lower:
raise utils.EventError("You already have a '%s' badge" % badge)
badges[badge] = utils.iso8601_format_now()
badges[badge] = utils.datetime.iso8601_format_now()
self._set_badges(event["user"], badges)
event["stdout"].write("Added '%s' badge" % badge)
@ -135,7 +135,7 @@ class Module(ModuleManager.BaseModule):
found_badge = self._find_badge(badges, badge)
if found_badge:
badges[found_badge] = utils.iso8601_format_now()
badges[found_badge] = utils.datetime.iso8601_format_now()
self._set_badges(event["user"], badges)
event["stdout"].write("Reset badge '%s'" % found_badge)
else:
@ -158,7 +158,7 @@ class Module(ModuleManager.BaseModule):
dt = self._parse_date(event["args_split"][-1])
badges[found_badge] = utils.iso8601_format(dt)
badges[found_badge] = utils.datetime.iso8601_format(dt)
self._set_badges(event["user"], badges)
event["stdout"].write("Updated '%s' badge to %s" % (
found_badge, self._date_str(dt)))
@ -176,7 +176,7 @@ class Module(ModuleManager.BaseModule):
found_badge = self._find_badge(badges, badge)
dt = self._parse_date(event["args_split"][-1])
badges[found_badge or badge] = utils.iso8601_format(dt)
badges[found_badge or badge] = utils.datetime.iso8601_format(dt)
self._set_badges(event["user"], badges)
add_or_update = "Added" if not found_badge else "Updated"

View file

@ -31,7 +31,7 @@ class Module(ModuleManager.BaseModule):
if args and args[0][0] == "+":
if len(args[1:]) < min_args:
raise utils.EventError("Not enough arguments")
time = utils.from_pretty_time(args[0][1:])
time = utils.datetime.from_pretty_time(args[0][1:])
if time == None:
raise utils.EventError("Invalid timeframe")
return time, args[1:]

View file

@ -190,7 +190,7 @@ class Module(ModuleManager.BaseModule):
time_left = self.bot.cache.until_expiration(cache)
event["stderr"].write("%s: Please wait %s before redeeming" % (
event["user"].nickname,
utils.to_pretty_time(math.ceil(time_left))))
utils.datetime.to_pretty_time(math.ceil(time_left))))
else:
event["stderr"].write(
"%s: You can only redeem coins when you have none" %
@ -525,7 +525,7 @@ class Module(ModuleManager.BaseModule):
"""
until = self._until_next_6_hour()
event["stdout"].write("Next lottery is in: %s" %
utils.to_pretty_time(until))
utils.datetime.to_pretty_time(until))
@utils.hook("received.command.lotterywinner")
def lottery_winner(self, event):

View file

@ -52,7 +52,7 @@ class Server(object):
activities.append([activity_id, content, timestamp])
return activities
def _make_activity(self, content):
timestamp = utils.iso8601_format_now()
timestamp = utils.datetime.iso8601_format_now()
activity_id = self._random_id()
self.bot.set_setting("ap-activity-%s" % activity_id,
[content, timestamp])

View file

@ -227,7 +227,8 @@ class Module(ModuleManager.BaseModule):
self._on_topic(event, event["setter"].nickname, "set",
event["channel"].topic)
dt = utils.iso8601_format(utils.datetime_timestamp(event["set_at"]))
dt = utils.datetime.iso8601_format(
utils.datetime.datetime_timestamp(event["set_at"]))
minimal = "topic set at %s" % dt
normal = "- %s" % minimal

View file

@ -178,7 +178,7 @@ class GitHub(object):
return url
def _iso8601(self, s):
return datetime.datetime.strptime(s, utils.ISO8601_PARSE)
return utils.datetime.iso8601_parse(s)
def ping(self, data):
return ["Received new webhook"]
@ -415,7 +415,7 @@ class GitHub(object):
completed_at = self._iso8601(data["check_run"]["completed_at"])
if completed_at > started_at:
seconds = (completed_at-started_at).total_seconds()
duration = " in %s" % utils.to_pretty_time(seconds)
duration = " in %s" % utils.datetime.to_pretty_time(seconds)
status = data["check_run"]["status"]
status_str = ""

View file

@ -2,7 +2,7 @@
import time
from src import ModuleManager, utils
SECONDS_MAX = utils.SECONDS_WEEKS*8
SECONDS_MAX = utils.datetime.SECONDS_WEEKS*8
SECONDS_MAX_DESCRIPTION = "8 weeks"
class Module(ModuleManager.BaseModule):

View file

@ -38,7 +38,7 @@ class Module(ModuleManager.BaseModule):
seen_info = " (%s%s)" % (seen_info["action"],
utils.consts.RESET)
since = utils.to_pretty_time(time.time()-seen_seconds,
since = utils.datetime.to_pretty_time(time.time()-seen_seconds,
max_units=2)
event["stdout"].write("%s was last seen %s ago%s" % (
event["args_split"][0], since, seen_info or ""))

View file

@ -30,7 +30,8 @@ class Module(ModuleManager.BaseModule):
def silence(self, event):
duration = SILENCE_TIME
if event["args"] and event["args_split"][0].startswith("+"):
duration = utils.from_pretty_time(event["args_split"][0][1:])
duration = utils.datetime.from_pretty_time(
event["args_split"][0][1:])
if duration == None:
raise utils.EventError("Invalid duration provided")

View file

@ -7,7 +7,8 @@ HIDDEN_MODES = set(["s", "p"])
class Module(ModuleManager.BaseModule):
def _uptime(self):
return utils.to_pretty_time(int(time.time()-self.bot.start_time))
return utils.datetime.to_pretty_time(
int(time.time()-self.bot.start_time))
@utils.hook("received.command.uptime")
def uptime(self, event):

View file

@ -94,14 +94,17 @@ class Module(ModuleManager.BaseModule):
if first_details:
first_nickname, first_timestamp, _ = first_details
timestamp_parsed = utils.iso8601_parse(first_timestamp)
timestamp_human = utils.datetime_human(timestamp_parsed)
timestamp_parsed = utils.datetime.iso8601_parse(
first_timestamp)
timestamp_human = utils.datetime.datetime_human(
timestamp_parsed)
message = "%s (first posted by %s at %s)" % (title,
first_nickname, timestamp_human)
else:
event["target"].set_setting(setting,
[event["user"].nickname, utils.iso8601_format_now(),
url])
[event["user"].nickname,
utils.datetime.iso8601_format_now(), url])
event["stdout"].write(message)
if code == -2:
self.log.debug("Not showing title for %s, too similar", [url])

View file

@ -8,8 +8,8 @@ class Module(ModuleManager.BaseModule):
messages = event["channel"].get_user_setting(event["user"].get_id(),
"to", [])
for nickname, message, timestamp in messages:
timestamp_parsed = utils.iso8601_parse(timestamp)
timestamp_human = utils.datetime_human(timestamp_parsed)
timestamp_parsed = utils.datetime.iso8601_parse(timestamp)
timestamp_human = utils.datetime.datetime_human(timestamp_parsed)
event["channel"].send_message("%s: <%s> %s (at %s UTC)" % (
event["user"].nickname, nickname, message, timestamp_human))
if messages:
@ -35,7 +35,7 @@ class Module(ModuleManager.BaseModule):
messages.append([event["user"].nickname,
" ".join(event["args_split"][1:]),
utils.iso8601_format_now()])
utils.datetime.iso8601_format_now()])
event["target"].set_user_setting(target_user.get_id(),
"to", messages)
event["stdout"].write("Message saved")

View file

@ -3,7 +3,7 @@ from src import utils
def _timestamp(dt):
seconds_since = time.time()-dt.timestamp()
since, unit = utils.time_unit(seconds_since)
since, unit = utils.datetime.time_unit(seconds_since)
return "%s %s ago" % (since, unit)
def _normalise(tweet):

View file

@ -58,7 +58,7 @@ class Module(ModuleManager.BaseModule):
if utc_offset > 0:
tz += "+"
tz += "%g" % utc_offset
human = utils.datetime_human(dt)
human = utils.datetime.datetime_human(dt)
out = None
if type == LocationType.USER:

View file

@ -79,8 +79,8 @@ class Module(ModuleManager.BaseModule):
since = ""
first_words = target.get_setting("first-words", None)
if not first_words == None:
since = " since %s" % utils.date_human(
utils.datetime_timestamp(first_words))
since = " since %s" % utils.datetime.date_human(
utils.datetime.datetime_timestamp(first_words))
event["stdout"].write("%s has used %d words (%d in %s)%s" % (
target.nickname, total, this_channel, event["target"].name, since))

View file

@ -45,9 +45,9 @@ class Module(ModuleManager.BaseModule):
content = self.get_video_page(video_id, "contentDetails").data[
"items"][0]["contentDetails"]
video_uploaded_at = utils.iso8601_parse(snippet["publishedAt"],
microseconds=True)
video_uploaded_at = utils.date_human(video_uploaded_at)
video_uploaded_at = utils.datetime.iso8601_parse(
snippet["publishedAt"], microseconds=True)
video_uploaded_at = utils.datetime.date_human(video_uploaded_at)
video_uploader = snippet["channelTitle"]
video_title = utils.irc.bold(snippet["title"])
video_views = self._number(statistics["viewCount"])

View file

@ -9,12 +9,12 @@ class LockFile(PollHook.PollHook):
self._next_lock = None
def available(self):
now = utils.datetime_utcnow()
now = utils.datetime.datetime_utcnow()
if os.path.exists(self._filename):
with open(self._filename, "r") as lock_file:
timestamp_str = lock_file.read().strip().split(" ", 1)[0]
timestamp = utils.iso8601_parse(timestamp_str)
timestamp = utils.datetime.iso8601_parse(timestamp_str)
if (now-timestamp).total_seconds() < EXPIRATION:
return False
@ -23,13 +23,14 @@ class LockFile(PollHook.PollHook):
def lock(self):
with open(self._filename, "w") as lock_file:
last_lock = utils.datetime_utcnow()
lock_file.write("%s" % utils.iso8601_format(last_lock))
last_lock = utils.datetime.datetime_utcnow()
lock_file.write("%s" % utils.datetime.iso8601_format(last_lock))
self._next_lock = last_lock+datetime.timedelta(
seconds=EXPIRATION/2)
def next(self):
return max(0, (self._next_lock-utils.datetime_utcnow()).total_seconds())
return max(0,
(self._next_lock-utils.datetime.datetime_utcnow()).total_seconds())
def call(self):
self.lock()

View file

@ -13,7 +13,7 @@ LEVELS = {
class BitBotFormatter(logging.Formatter):
def formatTime(self, record, datefmt=None):
datetime_obj = datetime.datetime.fromtimestamp(record.created)
return utils.iso8601_format(datetime_obj, milliseconds=True)
return utils.datetime.iso8601_format(datetime_obj, milliseconds=True)
class HookedHandler(logging.StreamHandler):
def __init__(self, func: typing.Callable[[int, str], None]):

View file

@ -1,6 +1,6 @@
import contextlib, datetime, decimal, enum, io, ipaddress, multiprocessing
import contextlib, decimal, enum, io, ipaddress, multiprocessing
import queue, re, signal, threading, typing
from . import cli, consts, decorators, irc, http, parse, security
from . import cli, consts, datetime, decorators, irc, http, parse, security
from .decorators import export, hook, kwarg
from .settings import (BoolSetting, FunctionSetting, IntRangeSetting,
@ -10,129 +10,6 @@ class Direction(enum.Enum):
Send = 0
Recv = 1
ISO8601_PARSE = "%Y-%m-%dT%H:%M:%S%z"
ISO8601_PARSE_MICROSECONDS = "%Y-%m-%dT%H:%M:%S.%f%z"
ISO8601_FORMAT_DT = "%Y-%m-%dT%H:%M:%S"
ISO8601_FORMAT_TZ = "%z"
DATETIME_HUMAN = "%Y/%m/%d %H:%M:%S"
DATE_HUMAN = "%Y-%m-%d"
def datetime_utcnow() -> datetime.datetime:
return datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc)
def datetime_timestamp(seconds: float) -> datetime.datetime:
return datetime.datetime.fromtimestamp(seconds).replace(
tzinfo=datetime.timezone.utc)
def iso8601_format(dt: datetime.datetime, milliseconds: bool=False) -> str:
dt_format = dt.strftime(ISO8601_FORMAT_DT)
tz_format = dt.strftime(ISO8601_FORMAT_TZ)
ms_format = ""
if milliseconds:
ms_format = ".%s" % str(int(dt.microsecond/1000)).zfill(3)
return "%s%s%s" % (dt_format, ms_format, tz_format)
def iso8601_format_now(milliseconds: bool=False) -> str:
return iso8601_format(datetime_utcnow(), milliseconds=milliseconds)
def iso8601_parse(s: str, microseconds: bool=False) -> datetime.datetime:
fmt = ISO8601_PARSE_MICROSECONDS if microseconds else ISO8601_PARSE
return datetime.datetime.strptime(s, fmt)
def datetime_human(dt: datetime.datetime):
return datetime.datetime.strftime(dt, DATETIME_HUMAN)
def date_human(dt: datetime.datetime):
return datetime.datetime.strftime(dt, DATE_HUMAN)
TIME_SECOND = 1
TIME_MINUTE = TIME_SECOND*60
TIME_HOUR = TIME_MINUTE*60
TIME_DAY = TIME_HOUR*24
TIME_WEEK = TIME_DAY*7
def time_unit(seconds: int) -> typing.Tuple[int, str]:
since = None
unit = None
if seconds >= TIME_WEEK:
since = seconds/TIME_WEEK
unit = "week"
elif seconds >= TIME_DAY:
since = seconds/TIME_DAY
unit = "day"
elif seconds >= TIME_HOUR:
since = seconds/TIME_HOUR
unit = "hour"
elif seconds >= TIME_MINUTE:
since = seconds/TIME_MINUTE
unit = "minute"
else:
since = seconds
unit = "second"
since = int(since)
if since > 1:
unit = "%ss" % unit # pluralise the unit
return (since, unit)
REGEX_PRETTYTIME = re.compile(
r"(?:(\d+)w)?(?:(\d+)d)?(?:(\d+)h)?(?:(\d+)m)?(?:(\d+)s)?", re.I)
SECONDS_MINUTES = 60
SECONDS_HOURS = SECONDS_MINUTES*60
SECONDS_DAYS = SECONDS_HOURS*24
SECONDS_WEEKS = SECONDS_DAYS*7
def from_pretty_time(pretty_time: str) -> typing.Optional[int]:
seconds = 0
match = re.match(REGEX_PRETTYTIME, pretty_time)
if match:
seconds += int(match.group(1) or 0)*SECONDS_WEEKS
seconds += int(match.group(2) or 0)*SECONDS_DAYS
seconds += int(match.group(3) or 0)*SECONDS_HOURS
seconds += int(match.group(4) or 0)*SECONDS_MINUTES
seconds += int(match.group(5) or 0)
if seconds > 0:
return seconds
return None
UNIT_MINIMUM = 6
UNIT_SECOND = 5
UNIT_MINUTE = 4
UNIT_HOUR = 3
UNIT_DAY = 2
UNIT_WEEK = 1
def to_pretty_time(total_seconds: int, minimum_unit: int=UNIT_SECOND,
max_units: int=UNIT_MINIMUM) -> str:
if total_seconds == 0:
return "0s"
minutes, seconds = divmod(total_seconds, 60)
hours, minutes = divmod(minutes, 60)
days, hours = divmod(hours, 24)
weeks, days = divmod(days, 7)
out = []
units = 0
if weeks and minimum_unit >= UNIT_WEEK and units < max_units:
out.append("%dw" % weeks)
units += 1
if days and minimum_unit >= UNIT_DAY and units < max_units:
out.append("%dd" % days)
units += 1
if hours and minimum_unit >= UNIT_HOUR and units < max_units:
out.append("%dh" % hours)
units += 1
if minutes and minimum_unit >= UNIT_MINUTE and units < max_units:
out.append("%dm" % minutes)
units += 1
if seconds and minimum_unit >= UNIT_SECOND and units < max_units:
out.append("%ds" % seconds)
units += 1
return " ".join(out)
def prevent_highlight(nickname: str) -> str:
return nickname[0]+"\u200c"+nickname[1:]

125
src/utils/datetime.py Normal file
View file

@ -0,0 +1,125 @@
import re, typing
import datetime as _datetime
ISO8601_PARSE = "%Y-%m-%dT%H:%M:%S%z"
ISO8601_PARSE_MICROSECONDS = "%Y-%m-%dT%H:%M:%S.%f%z"
ISO8601_FORMAT_DT = "%Y-%m-%dT%H:%M:%S"
ISO8601_FORMAT_TZ = "%z"
DATETIME_HUMAN = "%Y/%m/%d %H:%M:%S"
DATE_HUMAN = "%Y-%m-%d"
def datetime_utcnow() -> _datetime.datetime:
return _datetime.datetime.utcnow().replace(tzinfo=_datetime.timezone.utc)
def datetime_timestamp(seconds: float) -> _datetime.datetime:
return _datetime.datetime.fromtimestamp(seconds).replace(
tzinfo=_datetime.timezone.utc)
def iso8601_format(dt: _datetime.datetime, milliseconds: bool=False) -> str:
dt_format = dt.strftime(ISO8601_FORMAT_DT)
tz_format = dt.strftime(ISO8601_FORMAT_TZ)
ms_format = ""
if milliseconds:
ms_format = ".%s" % str(int(dt.microsecond/1000)).zfill(3)
return "%s%s%s" % (dt_format, ms_format, tz_format)
def iso8601_format_now(milliseconds: bool=False) -> str:
return iso8601_format(datetime_utcnow(), milliseconds=milliseconds)
def iso8601_parse(s: str, microseconds: bool=False) -> _datetime.datetime:
fmt = ISO8601_PARSE_MICROSECONDS if microseconds else ISO8601_PARSE
return _datetime.datetime.strptime(s, fmt)
def datetime_human(dt: _datetime.datetime):
return _datetime.datetime.strftime(dt, DATETIME_HUMAN)
def date_human(dt: _datetime.datetime):
return _datetime.datetime.strftime(dt, DATE_HUMAN)
TIME_SECOND = 1
TIME_MINUTE = TIME_SECOND*60
TIME_HOUR = TIME_MINUTE*60
TIME_DAY = TIME_HOUR*24
TIME_WEEK = TIME_DAY*7
def time_unit(seconds: int) -> typing.Tuple[int, str]:
since = None
unit = None
if seconds >= TIME_WEEK:
since = seconds/TIME_WEEK
unit = "week"
elif seconds >= TIME_DAY:
since = seconds/TIME_DAY
unit = "day"
elif seconds >= TIME_HOUR:
since = seconds/TIME_HOUR
unit = "hour"
elif seconds >= TIME_MINUTE:
since = seconds/TIME_MINUTE
unit = "minute"
else:
since = seconds
unit = "second"
since = int(since)
if since > 1:
unit = "%ss" % unit # pluralise the unit
return (since, unit)
REGEX_PRETTYTIME = re.compile(
r"(?:(\d+)w)?(?:(\d+)d)?(?:(\d+)h)?(?:(\d+)m)?(?:(\d+)s)?", re.I)
SECONDS_MINUTES = 60
SECONDS_HOURS = SECONDS_MINUTES*60
SECONDS_DAYS = SECONDS_HOURS*24
SECONDS_WEEKS = SECONDS_DAYS*7
def from_pretty_time(pretty_time: str) -> typing.Optional[int]:
seconds = 0
match = re.match(REGEX_PRETTYTIME, pretty_time)
if match:
seconds += int(match.group(1) or 0)*SECONDS_WEEKS
seconds += int(match.group(2) or 0)*SECONDS_DAYS
seconds += int(match.group(3) or 0)*SECONDS_HOURS
seconds += int(match.group(4) or 0)*SECONDS_MINUTES
seconds += int(match.group(5) or 0)
if seconds > 0:
return seconds
return None
UNIT_MINIMUM = 6
UNIT_SECOND = 5
UNIT_MINUTE = 4
UNIT_HOUR = 3
UNIT_DAY = 2
UNIT_WEEK = 1
def to_pretty_time(total_seconds: int, minimum_unit: int=UNIT_SECOND,
max_units: int=UNIT_MINIMUM) -> str:
if total_seconds == 0:
return "0s"
minutes, seconds = divmod(total_seconds, 60)
hours, minutes = divmod(minutes, 60)
days, hours = divmod(hours, 24)
weeks, days = divmod(days, 7)
out = []
units = 0
if weeks and minimum_unit >= UNIT_WEEK and units < max_units:
out.append("%dw" % weeks)
units += 1
if days and minimum_unit >= UNIT_DAY and units < max_units:
out.append("%dd" % days)
units += 1
if hours and minimum_unit >= UNIT_HOUR and units < max_units:
out.append("%dh" % hours)
units += 1
if minutes and minimum_unit >= UNIT_MINUTE and units < max_units:
out.append("%dm" % minutes)
units += 1
if seconds and minimum_unit >= UNIT_SECOND and units < max_units:
out.append("%ds" % seconds)
units += 1
return " ".join(out)