Merge branch 'develop' into patch-4

This commit is contained in:
David Schultz 2021-06-24 20:10:16 -05:00 committed by GitHub
commit e63729fc86
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
24 changed files with 216 additions and 119 deletions

View file

@ -16,7 +16,7 @@ If you wish to create backups of your BitBot instance (which you should, [borgba
I run BitBot as-a-service on most popular networks (willing to add more networks!) and offer github/gitea/gitlab webhook to IRC notifications for free to FOSS projects. Contact me for more information!
## Contact/Support
Come say hi at [#bitbot on freenode](https://webchat.freenode.net/?channels=#bitbot)
Come say hi at `#bitbot` on irc.libera.chat
## License
This project is licensed under GNU General Public License v2.0 - see [LICENSE](LICENSE) for details.

View file

@ -4,6 +4,8 @@ REASON = "User is banned from this channel"
@utils.export("channelset", utils.BoolSetting("ban-enforce",
"Whether or not to parse new bans and kick who they affect"))
@utils.export("channelset", utils.IntSetting("ban-enforce-max",
"Do not enforce ban if the ban effects more than this many users. Default is half of total channel users."))
class Module(ModuleManager.BaseModule):
@utils.hook("received.mode.channel")
def on_mode(self, event):
@ -14,6 +16,10 @@ class Module(ModuleManager.BaseModule):
if mode[0] == "+" and mode[1] == "b":
bans.append(arg)
affected = 0
defaultmax = len(event["channel"].users) // 2
realmax = event["channel"].get_setting("ban-enforce-max", defaultmax)
if bans:
umasks = {u.hostmask(): u for u in event["channel"].users}
for ban in bans:
@ -21,7 +27,10 @@ class Module(ModuleManager.BaseModule):
matches = list(utils.irc.hostmask_match_many(
umasks.keys(), mask))
for match in matches:
affected = affected + 1
kicks.add(umasks[match])
if kicks:
if affected > realmax:
return
nicks = [u.nickname for u in kicks]
event["channel"].send_kicks(sorted(nicks), REASON)

View file

@ -83,7 +83,7 @@ class Module(ModuleManager.BaseModule):
channel = server.channels.get(channel_name)
args = timer.kwargs.get("args", [timer.kwargs.get("arg", None)])
if args:
if any(args):
channel.send_modes(args, False)
else:
channel.send_mode(timer.kwargs["mode"], False)
@ -238,7 +238,7 @@ class Module(ModuleManager.BaseModule):
if event["spec"][1]:
self.timers.add_persistent("unmode", event["spec"][1],
channel=event["spec"][0].id, mode="m")
channel=event["spec"][0].id, mode="-m")
@utils.hook("received.command.cunmute")
@utils.kwarg("require_mode", "o")
@utils.kwarg("require_access", "high,cmute")

View file

@ -43,14 +43,22 @@ class Module(ModuleManager.BaseModule):
failed = []
for list in lists:
record = self._check_list(list.hostname, address)
if not record == None:
reason = list.process(record) or "unknown"
if record is not None:
a_record, txt_record = record
reason = list.process(a_record, txt_record) or "unknown"
failed.append((list.hostname, reason))
return failed
def _check_list(self, list, address):
list_address = "%s.%s" % (address, list)
try:
return dns.resolver.query(list_address, "A")[0].to_text()
a_record = dns.resolver.resolve(list_address, "A")[0].to_text()
except dns.resolver.NXDOMAIN:
return None
try:
txt_record = dns.resolver.resolve(list_address, "TXT")[0].to_text()
except:
txt_record = None
return (a_record, txt_record)

View file

@ -5,53 +5,62 @@ class DNSBL(object):
if not hostname == None:
self.hostname = hostname
def process(self, result: str):
return result
def process(self, a_record, txt_record):
out = a_record
if txt_record is not None:
out += f" - {txt_record}"
return out
class ZenSpamhaus(DNSBL):
hostname = "zen.spamhaus.org"
def process(self, result):
result = result.rsplit(".", 1)[1]
def process(self, a_record, txt_record):
result = a_record.rsplit(".", 1)[1]
if result in ["2", "3", "9"]:
return "spam"
desc = "spam"
elif result in ["4", "5", "6", "7"]:
return "exploits"
desc = "exploits"
else:
desc = "unknown"
return f"{result} - {desc}"
class EFNetRBL(DNSBL):
hostname = "rbl.efnetrbl.org"
def process(self, result):
result = result.rsplit(".", 1)[1]
def process(self, a_record, txt_record):
result = a_record.rsplit(".", 1)[1]
if result == "1":
return "proxy"
desc = "proxy"
elif result in ["2", "3"]:
return "spamtap"
desc = "spamtap"
elif result == "4":
return "tor"
desc = "tor"
elif result == "5":
return "flooding"
desc = "flooding"
return f"{result} - {desc}"
class DroneBL(DNSBL):
hostname = "dnsbl.dronebl.org"
def process(self, result):
result = result.rsplit(".", 1)[1]
if result in ["8", "9", "10", "11", "14"]:
return "proxy"
elif result in ["3", "6", "7"]:
return "flooding"
elif result in ["12", "13", "15", "16"]:
return "exploits"
class AbuseAtCBL(DNSBL):
hostname = "cbl.abuseat.org"
def process(self, result):
result = result.rsplit(".", 1)[1]
def process(self, a_record, txt_record):
result = a_record.rsplit(".", 1)[1]
if result == "2":
return "abuse"
desc = "abuse"
else:
desc = "unknown"
return f"{result} - {desc}"
class TorExitDan(DNSBL):
hostname = "torexit.dan.me.uk"
def process(self, a_record, txt_record):
return "tor exit"
DEFAULT_LISTS = [
ZenSpamhaus(),
EFNetRBL(),
DroneBL(),
AbuseAtCBL()
AbuseAtCBL(),
TorExitDan()
]
def default_lists():

View file

@ -31,7 +31,7 @@ class Module(ModuleManager.BaseModule):
_name = "Webhooks"
def on_load(self):
self._github = github.GitHub(self.log)
self._github = github.GitHub(self.log, self.exports)
self._gitea = gitea.Gitea()
self._gitlab = gitlab.GitLab()
@ -135,6 +135,10 @@ class Module(ModuleManager.BaseModule):
for output, url in outputs:
output = "(%s) %s" % (
utils.irc.color(source, colors.COLOR_REPO), output)
if channel.get_setting("git-prevent-highlight", False):
output = self._prevent_highlight(server, channel,
output)
if url:
if channel.get_setting("git-shorten-urls", False):
@ -142,10 +146,6 @@ class Module(ModuleManager.BaseModule):
context=channel) or url
output = "%s - %s" % (output, url)
if channel.get_setting("git-prevent-highlight", False):
output = self._prevent_highlight(server, channel,
output)
hide_prefix = channel.get_setting("git-hide-prefix", False)
self.events.on("send.stdout").call(target=channel,
module_name=webhook_name, server=server, message=output,
@ -228,6 +228,9 @@ class Module(ModuleManager.BaseModule):
if existing_hook:
raise utils.EventError("There's already a hook for %s" %
hook_name)
if hook_name == None:
command = "%s%s" % (event["command_prefix"], event["command"])
raise utils.EventError("Not enough arguments (Usage: %s add <hook>)" % command)
all_hooks[hook_name] = {
"events": DEFAULT_EVENT_CATEGORIES.copy(),

View file

@ -5,6 +5,7 @@ COMMIT_URL = "https://github.com/%s/commit/%s"
COMMIT_RANGE_URL = "https://github.com/%s/compare/%s...%s"
CREATE_URL = "https://github.com/%s/tree/%s"
PR_URL = "https://github.com/%s/pull/%s"
PR_COMMIT_RANGE_URL = "https://github.com/%s/pull/%s/files/%s..%s"
PR_COMMIT_URL = "https://github.com/%s/pull/%s/commits/%s"
@ -77,19 +78,19 @@ COMMENT_ACTIONS = {
}
COMMENT_MAX = 100
CHECK_RUN_CONCLUSION = {
"success": "passed",
"failure": "failed",
"neutral": "finished",
"cancelled": "was cancelled",
"timed_out": "timed out",
"action_required": "requires action"
CHECK_SUITE_CONCLUSION = {
"success": ("passed", colors.COLOR_POSITIVE),
"failure": ("failed", colors.COLOR_NEGATIVE),
"neutral": ("finished", colors.COLOR_NEUTRAL),
"cancelled": ("was cancelled", colors.COLOR_NEGATIVE),
"timed_out": ("timed out", colors.COLOR_NEGATIVE),
"action_required": ("requires action", colors.COLOR_NEUTRAL)
}
CHECK_RUN_FAILURES = ["failure", "cancelled", "timed_out", "action_required"]
class GitHub(object):
def __init__(self, log):
def __init__(self, log, exports):
self.log = log
self.exports = exports
def is_private(self, data, headers):
if "repository" in data:
@ -125,6 +126,8 @@ class GitHub(object):
category_action = None
if "review" in data and "state" in data["review"]:
category = "%s+%s" % (event, data["review"]["state"])
elif "check_suite" in data and "conclusion" in data["check_suite"]:
category = "%s+%s" % (event, data["check_suite"]["conclusion"])
if action:
if category:
@ -159,8 +162,8 @@ class GitHub(object):
out = self.delete(full_name, data)
elif event == "release":
out = self.release(full_name, data)
elif event == "check_run":
out = self.check_run(data)
elif event == "check_suite":
out = self.check_suite(full_name, data)
elif event == "fork":
out = self.fork(full_name, data)
elif event == "ping":
@ -272,7 +275,7 @@ class GitHub(object):
colored_branch = utils.irc.color(branch, colors.COLOR_BRANCH)
sender = utils.irc.bold(data["sender"]["login"])
author = utils.irc.bold(data["sender"]["login"])
author = utils.irc.bold(data["pull_request"]["user"]["login"])
number = utils.irc.color("#%s" % data["pull_request"]["number"],
colors.COLOR_ID)
identifier = "%s by %s" % (number, author)
@ -433,44 +436,32 @@ class GitHub(object):
url = self._short_url(data["release"]["html_url"])
return ["%s %s a release%s - %s" % (author, action, name, url)]
def check_run(self, data):
name = data["check_run"]["name"]
commit = self._short_hash(data["check_run"]["head_sha"])
def check_suite(self, full_name, data):
suite = data["check_suite"]
commit = self._short_hash(suite["head_sha"])
commit = utils.irc.color(commit, utils.consts.LIGHTBLUE)
pr = ""
url = ""
if data["check_run"]["details_url"]:
url = data["check_run"]["details_url"]
url = " - %s" % self.exports.get("shorturl-any")(url)
if suite["pull_requests"]:
pr_num = suite["pull_requests"][0]["number"]
pr = "/PR%s" % utils.irc.color("#%s" % pr_num, colors.COLOR_ID)
url = self._short_url(PR_URL % (full_name, pr_num))
url = " - %s" % url
duration = ""
if data["check_run"]["completed_at"]:
started_at = self._iso8601(data["check_run"]["started_at"])
completed_at = self._iso8601(data["check_run"]["completed_at"])
if completed_at > started_at:
seconds = (completed_at-started_at).total_seconds()
duration = " in %s" % utils.datetime.format.to_pretty_time(
seconds)
name = suite["app"]["name"]
conclusion = suite["conclusion"]
conclusion, conclusion_color = CHECK_SUITE_CONCLUSION[conclusion]
conclusion = utils.irc.color(conclusion, conclusion_color)
status = data["check_run"]["status"]
status_str = ""
if status == "queued":
status_str = utils.irc.bold("queued")
elif status == "in_progress":
status_str = utils.irc.bold("started")
elif status == "completed":
conclusion = data["check_run"]["conclusion"]
conclusion_color = colors.COLOR_POSITIVE
if conclusion in CHECK_RUN_FAILURES:
conclusion_color = colors.COLOR_NEGATIVE
if conclusion == "neutral":
conclusion_color = colors.COLOR_NEUTRAL
created_at = self._iso8601(suite["created_at"])
updated_at = self._iso8601(suite["updated_at"])
seconds = (updated_at-created_at).total_seconds()
duration = utils.datetime.format.to_pretty_time(seconds)
status_str = utils.irc.color(
CHECK_RUN_CONCLUSION[conclusion], conclusion_color)
return ["[build @%s] %s: %s%s%s" % (
commit, name, status_str, duration, url)]
return ["[build @%s%s] %s: %s in %s%s" % (
commit, pr, name, conclusion, duration, url)]
def fork(self, full_name, data):
forker = utils.irc.bold(data["sender"]["login"])

View file

@ -21,6 +21,7 @@ def _parse(value):
@utils.export("channelset", utils.FunctionSetting(_parse, "dns-nameserver",
"Set DNS nameserver", example="8.8.8.8"))
class Module(ModuleManager.BaseModule):
@utils.hook("received.command.dig", alias_of="dns")
@utils.hook("received.command.dns", min_args=1)
def dns(self, event):
"""
@ -55,7 +56,7 @@ class Module(ModuleManager.BaseModule):
for record_type in record_types:
record_type_strip = record_type.rstrip("?").upper()
try:
query_result = resolver.query(hostname, record_type_strip,
query_result = resolver.resolve(hostname, record_type_strip,
lifetime=4)
query_results = [q.to_text() for q in query_result]
results.append([record_type_strip, query_result.rrset.ttl,

View file

@ -14,6 +14,18 @@ REGEX_PARENS = re.compile(r"\(([^)]+)\)(\+\+|--)")
@utils.export("channelset", utils.BoolSetting("karma-pattern",
"Enable/disable parsing ++/-- karma format"))
class Module(ModuleManager.BaseModule):
def listify(self, items):
if type(items) != list:
items = list(items)
listified = ""
if len(items) > 2:
listified = ', '.join(items[:-1]) + ', and ' + items[-1]
elif len(items) > 1:
listified = items[0] + ' and ' + items[1]
elif items:
listified = items[0]
return listified
def _karma_str(self, karma):
karma_str = str(karma)
if karma < 0:
@ -66,7 +78,8 @@ class Module(ModuleManager.BaseModule):
self._set_throttle(sender, positive)
karma_str = self._karma_str(karma)
karma_total = self._karma_str(self._get_karma(server, target))
karma_total = sum(self._get_karma(server, target).values())
karma_total = self._karma_str(karma_total)
return True, "%s now has %s karma (%s from %s)" % (
target, karma_total, karma_str, sender.nickname)
@ -118,18 +131,35 @@ class Module(ModuleManager.BaseModule):
target = event["user"].nickname
target = self._get_target(event["server"], target)
karma = self._karma_str(self._get_karma(event["server"], target))
karma = sum(self._get_karma(event["server"], target).values())
karma = self._karma_str(karma)
event["stdout"].write("%s has %s karma" % (target, karma))
def _get_karma(self, server, target):
@utils.hook("received.command.karmawho")
@utils.spec("!<target>string")
def karmawho(self, event):
target = event["server"].irc_lower(event["spec"][0])
karma = self._get_karma(event["server"], target, True)
karma = sorted(list(karma.items()),
key=lambda k: abs(k[1]),
reverse=True)
parts = ["%s (%d)" % (n, v) for n, v in karma]
if len(parts) == 0:
event["stdout"].write("%s has no karma." % target)
return
event["stdout"].write("%s has karma from: %s" %
(target, self.listify(parts)))
def _get_karma(self, server, target, own=False):
settings = dict(server.get_all_user_settings("karma-%s" % target))
target_lower = server.irc_lower(target)
if target_lower in settings:
if target_lower in settings and not own:
del settings[target_lower]
return sum(settings.values())
return settings
@utils.hook("received.command.resetkarma")
@utils.kwarg("min_args", 2)

View file

@ -2,6 +2,7 @@
#--require-config opencagedata-api-key
import typing
import pytz
from src import ModuleManager, utils
URL_OPENCAGE = "https://api.opencagedata.com/geocode/v1/json"
@ -19,6 +20,11 @@ class Module(ModuleManager.BaseModule):
if page and page["results"]:
result = page["results"][0]
timezone = result["annotations"]["timezone"]["name"]
try:
pytz.timezone(timezone)
except pytz.exceptions.UnknownTimeZoneError:
return None
lat = result["geometry"]["lat"]
lon = result["geometry"]["lng"]

View file

@ -4,7 +4,7 @@
from src import ModuleManager, utils
@utils.export("set", utils.Setting("pronouns", "Set your pronouns",
example="she/her"))
example="they/them"))
class Module(ModuleManager.BaseModule):
@utils.hook("received.command.pronouns")
def pronouns(self, event):

View file

@ -9,11 +9,11 @@ RSS_INTERVAL = 60 # 1 minute
SETTING_BIND = utils.Setting("rss-bindhost",
"Which local address to bind to for RSS requests", example="127.0.0.1")
@utils.export("botset", utils.IntSetting("rss-interval",
"Interval (in seconds) between RSS polls", example="120"))
@utils.export("channelset", utils.BoolSetting("rss-shorten",
"Whether or not to shorten RSS urls"))
@utils.export("channelset", utils.Setting("rss-format", "Format of RSS announcements", example="$longtitle: $title - $link [$author]"))
@utils.export("serverset", SETTING_BIND)
@utils.export("channelset", SETTING_BIND)
class Module(ModuleManager.BaseModule):
@ -22,12 +22,12 @@ class Module(ModuleManager.BaseModule):
self.timers.add("rss-feeds", self._timer,
self.bot.get_setting("rss-interval", RSS_INTERVAL))
def _format_entry(self, server, feed_title, entry, shorten):
def _format_entry(self, server, channel, feed_title, entry, shorten):
title = utils.parse.line_normalise(utils.http.strip_html(
entry["title"]))
author = entry.get("author", None)
author = " by %s" % author if author else ""
author = entry.get("author", "unknown author")
author = "%s" % author if author else ""
link = entry.get("link", None)
if shorten:
@ -35,11 +35,18 @@ class Module(ModuleManager.BaseModule):
link = self.exports.get("shorturl")(server, link)
except:
pass
link = " - %s" % link if link else ""
link = "%s" % link if link else ""
feed_title_str = "%s: " % feed_title if feed_title else ""
feed_title_str = "%s" % feed_title if feed_title else ""
# just in case the format starts keyerroring and you're not sure why
self.log.trace("RSS Entry: " + str(entry))
try:
format = channel.get_setting("rss-format", "$longtitle: $title by $author - $link").replace("$longtitle", feed_title_str).replace("$title", title).replace("$link", link).replace("$author", author).format(**entry)
except KeyError:
self.log.warn(f"Failed to format RSS entry for {channel}. Falling back to default format.")
format = f"{feed_title_str}: {title} by {author} - {link}"
return "%s%s%s%s" % (feed_title_str, title, author, link)
return format
def _timer(self, timer):
start_time = time.monotonic()
@ -106,7 +113,7 @@ class Module(ModuleManager.BaseModule):
valid += 1
shorten = channel.get_setting("rss-shorten", False)
output = self._format_entry(server, feed_title, entry,
output = self._format_entry(server, channel, feed_title, entry,
shorten)
self.events.on("send.stdout").call(target=channel,
@ -200,10 +207,10 @@ class Module(ModuleManager.BaseModule):
title, entries = self._get_entries(url)
if not entries:
raise utils.EventError("Failed to get RSS entries")
raise utils.EventError("%s has no entries" % url)
shorten = event["target"].get_setting("rss-shorten", False)
out = self._format_entry(event["server"], title, entries[0],
out = self._format_entry(event["server"], event["target"], title, entries[0],
shorten)
event["stdout"].write(out)
else:

View file

@ -4,7 +4,7 @@
import re, traceback
from src import ModuleManager, utils
REGEX_SED = re.compile("^(?:(\\S+)[:,] )?s/")
REGEX_SED = re.compile(r"^(?:(\S+)[:,] )?s([/,`#]).*\2")
@utils.export("channelset",
utils.BoolSetting("sed","Disable/Enable sed in a channel"))
@ -35,7 +35,7 @@ class Module(ModuleManager.BaseModule):
sed.replace = utils.irc.bold(sed.replace)
if self._closest_setting(event, "sed-sender-only", False):
for_user = event["user"].nickname
for_user = event["user"].nickname_lower
match_line = None
match_message = None

View file

@ -41,7 +41,7 @@ class Module(ModuleManager.BaseModule):
@utils.export("shorturl-any")
def _shorturl_any(self, url):
return self._call_shortener(server, None, "bitly", url) or url
return self._call_shortener(None, None, "bitly", url) or url
@utils.export("shorturl")
def _shorturl(self, server, url, context=None):

View file

@ -2,7 +2,7 @@ import datetime, html, time
from src import utils
def _timestamp(dt):
seconds_since = time.time()-dt.timestamp()
seconds_since = time.time()-dt.replace(tzinfo=datetime.timezone.utc).timestamp()
timestamp = utils.datetime.format.to_pretty_since(
seconds_since, max_units=2)
return "%s ago" % timestamp

View file

@ -81,7 +81,7 @@ class Module(ModuleManager.BaseModule):
location_str = "(%s) %s" % (nickname, location_str)
event["stdout"].write(
"%s | %s/%s | %s | Humidity: %s | Wind: %s/%s" % (
"%s | %s/%s | %s | Humidity: %s | Wind: %s (%s)" % (
location_str, celsius, fahrenheit, description, humidity,
wind_speed_k, wind_speed_m))
else:

View file

@ -1,17 +1,16 @@
beautifulsoup4 ==4.8.0
cryptography >=3.3.2
dataclasses ==0.6;python_version<'3.7'
dnspython ==1.16.0
feedparser ==5.2.1
dnspython ==2.0.0
feedparser ==6.0.2
html5lib ==1.0.1
isodate ==0.6.0
lxml ==4.6.2
lxml ==4.6.3
netifaces ==0.10.9
PySocks ==1.7.1
python-dateutil ==2.8.1
pytz ==2019.2
requests ==2.22.0
scrypt ==0.8.13
suds-jurko ==0.6
tornado ==6.0.3
tweepy ==3.8.0

View file

@ -116,7 +116,8 @@ class Bot(object):
self._trigger_both()
return returned
func_queue = queue.Queue(1) # type: queue.Queue[str]
func_queue: queue.Queue[typing.Tuple[TriggerResult, typing.Any]
] = queue.Queue(1)
def _action():
try:
@ -134,7 +135,8 @@ class Bot(object):
if trigger_threads:
self._trigger_both()
if type == TriggerResult.Exception:
if (type == TriggerResult.Exception and
isinstance(returned, Exception)):
raise returned
elif type == TriggerResult.Return:
return returned

View file

@ -95,8 +95,12 @@ class Server(IRCObject.Object):
self.connection_params.bindhost,
self.connection_params.tls,
tls_verify=self.get_setting("ssl-verify", True),
cert=self.bot.config.get("tls-certificate", None),
key=self.bot.config.get("tls-key", None))
cert=self.bot.config.get("tls-certificate", '').format(
DATA=self.bot.data_directory
) or None,
key=self.bot.config.get("tls-key", '').format(
DATA=self.bot.data_directory
))
self.events.on("preprocess.connect").call(server=self)
self.socket.connect()

View file

@ -151,6 +151,27 @@ class Module(ModuleManager.BaseModule):
return
event["stdout"].write("Added server '%s'" % alias)
@utils.hook("received.command.delserver")
@utils.kwarg("help", "Delete a server")
@utils.kwarg("pemission", "delserver")
@utils.spec("!<alias>word")
def del_server(self, event):
alias = event["spec"][0]
sid = self.bot.database.servers.by_alias(alias)
if sid == None:
event["stderr"].write("Server '%s' does not exist" % alias)
return
if self._server_from_alias(alias):
event["stderr"].write("You must disconnect from %s before deleting it" % alias)
return
try:
self.bot.database.servers.delete(sid)
except Exception as e:
event["stderr"].write("Failed to delete server")
self.log.error("failed to add server \"%s\"", [alias], exc_info=True)
return
event["stderr"].write("Server '%s' has been deleted" % alias)
@utils.hook("received.command.editserver")
@utils.kwarg("help", "Edit server details")
@utils.kwarg("permission", "editserver")

View file

@ -5,15 +5,15 @@ def bool_input(s: str):
return not result or result[0].lower() in ["", "y"]
def add_server():
alias = input("alias: ")
hostname = input("hostname: ")
alias = input("alias (display name): ")
hostname = input("hostname (address of server): ")
port = int(input("port: "))
tls = bool_input("tls?")
password = input("password?: ")
password = input("password (optional, leave blank to skip): ")
nickname = input("nickname: ")
username = input("username: ")
realname = input("realname: ")
bindhost = input("bindhost?: ")
username = input("username (optional): ")
realname = input("realname (optional): ")
bindhost = input("bindhost (optional): ")
return irc.IRCConnectionParameters(-1, alias, hostname, port, password, tls,
bindhost, nickname, username, realname)

View file

@ -7,7 +7,7 @@ from requests_toolbelt.adapters import source
REGEX_URL = re.compile("https?://\S+", re.I)
PAIRED_CHARACTERS = ["<>", "()"]
PAIRED_CHARACTERS = [("<", ">"), ("(", ")")]
# best-effort tidying up of URLs
def url_sanitise(url: str):

View file

@ -44,7 +44,7 @@ class SedMatch(Sed):
return None
def _sed_split(s: str) -> typing.List[str]:
tokens = _tokens(s, "/")
tokens = _tokens(s, s[1])
if tokens and (not tokens[-1] == (len(s)-1)):
tokens.append(len(s))

View file

@ -25,13 +25,20 @@ def ssl_wrap(sock: socket.socket, cert: str=None, key: str=None,
def constant_time_compare(a: typing.AnyStr, b: typing.AnyStr) -> bool:
return hmac.compare_digest(a, b)
import scrypt
import hashlib
def password(byte_n: int=32) -> str:
return binascii.hexlify(os.urandom(byte_n)).decode("utf8")
def salt(byte_n: int=64) -> str:
return base64.b64encode(os.urandom(byte_n)).decode("utf8")
def hash(given_salt: str, data: str):
return base64.b64encode(scrypt.hash(data, given_salt)).decode("utf8")
hash = hashlib.scrypt(
data.encode("utf8"),
salt=given_salt.encode("utf8"),
n=1<<14,
r=8,
p=1
)
return base64.b64encode(hash).decode("ascii")
def hash_verify(salt: str, data: str, compare: str):
given_hash = hash(salt, data)
return constant_time_compare(given_hash, compare)