Compare commits

..

3 commits

Author SHA1 Message Date
99e8002266
Change the VERSION to be clear this is a fork 2024-09-16 18:01:15 -05:00
3bf47d9ded
Bitbot Python3.11 Inital Commit 2024-05-20 22:23:16 -05:00
dependabot[bot]
b9ffe7b027
Bump requests from 2.22.0 to 2.31.0 (#365)
Bumps [requests](https://github.com/psf/requests) from 2.22.0 to 2.31.0.
- [Release notes](https://github.com/psf/requests/releases)
- [Changelog](https://github.com/psf/requests/blob/main/HISTORY.md)
- [Commits](https://github.com/psf/requests/compare/v2.22.0...v2.31.0)

---
updated-dependencies:
- dependency-name: requests
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-05-22 17:29:10 -05:00
41 changed files with 790 additions and 539 deletions

View file

@ -1 +1 @@
1.20.0-dev-3.11 fork
1.20.0-3.11 fork

View file

@ -94,6 +94,3 @@ bitly-api-key =
# https://help.github.com/en/articles/creating-a-personal-access-token-for-the-command-line
github-token =
# https://ipinfo.io/account/token
ipinfo-token =

View file

@ -2,7 +2,7 @@
* Move `docs/bot.conf.example` to `~/.bitbot/bot.conf` and fill in the config options you care about. Ones blank or removed will disable relevant functionality.
* Run `./bitbotd -a` to add a server.
* Run `./bitbotd` to start the bot or `./bitbotd -c /path/to/bot.conf` for non-standard config location (outside of `~/.bitbot`).
* Run `./bitbotd` to start the bot.
* Run `./bitbotctl command master-password` to get the master admin password (needed to add regular admin accounts)
* Join `#bitbot` on a server with the bot (or invite it to another channel)
* `/msg <bot> register <password here>` to register your nickname with the bot
@ -14,9 +14,6 @@
Generate a TLS keypair and point `bot.conf`'s `tls-key` to the private key and `tls-certificate` to the public key.
Below is an OpenSSL command example that will create a `bitbot-cert.pem` and `bitbot-key.pem` with `10y` validity (self-signed):
> openssl req -x509 -nodes -sha512 -newkey rsa:4096 -keyout bitbot-key.pem -out bitbot-cert.pem -days 3650 -subj "/CN=YourBotNick"
### Configure SASL
Configure the bot to use SASL to authenticate (usually used for `NickServ` identification)

View file

@ -15,7 +15,7 @@ Either set up a reverse proxy (with persisted Host header) with your favourite H
#### Apache2
* Run `$ a2enmod ssl proxy proxy_http` as root
* Copy example config file from [/docs/rest_api/apache2](/docs/rest_api/apache2) to `/etc/apache2/sites-enabled/`
* Edit `ServerName`, `SSLCertificateFile` and `SSLCertificateKeyFile`
* Edit `ServerName`, `SSLCertificateFile and `SSLCertificateKeyFile`
* `$ service apache2 restart` as root
#### Lighttpd

View file

@ -15,6 +15,6 @@ Listen 5000
ProxyRequests off
ProxyPass / http://[::1]:5001/
ProxyPassReverse / http://[::1]:5001/
ProxyPassReverse / http://[::1]:5001
ProxyPreserveHost on
</VirtualHost>

View file

@ -8,7 +8,7 @@ server {
location / {
proxy_pass http://[::1]:5001;
proxy_set_header Host $host:$server_port;
proxy_set_header Host $host:$port;
proxy_set_header X-Forwarded-For $remote_addr;
}
}

View file

@ -10,8 +10,6 @@
# which can be disabled with: systemctl --user disable systemd-tmpfiles-clean.timer
#
# After placing this script in the correct location, and with bitbot stopped, type:
# systemcl --user daemon-reload
# Afert that start bitbot with:
# systemctl --user enable bitbot_user.service --now
# This will enable the systemd script and launch bitbot

View file

@ -4,8 +4,6 @@ REASON = "User is banned from this channel"
@utils.export("channelset", utils.BoolSetting("ban-enforce",
"Whether or not to parse new bans and kick who they affect"))
@utils.export("channelset", utils.IntSetting("ban-enforce-max",
"Do not enforce ban if the ban effects more than this many users. Default is half of total channel users."))
class Module(ModuleManager.BaseModule):
@utils.hook("received.mode.channel")
def on_mode(self, event):
@ -16,10 +14,6 @@ class Module(ModuleManager.BaseModule):
if mode[0] == "+" and mode[1] == "b":
bans.append(arg)
affected = 0
defaultmax = len(event["channel"].users) // 2
realmax = event["channel"].get_setting("ban-enforce-max", defaultmax)
if bans:
umasks = {u.hostmask(): u for u in event["channel"].users}
for ban in bans:
@ -27,10 +21,7 @@ class Module(ModuleManager.BaseModule):
matches = list(utils.irc.hostmask_match_many(
umasks.keys(), mask))
for match in matches:
affected = affected + 1
kicks.add(umasks[match])
if kicks:
if affected > realmax:
return
nicks = [u.nickname for u in kicks]
event["channel"].send_kicks(sorted(nicks), REASON)

View file

@ -83,7 +83,7 @@ class Module(ModuleManager.BaseModule):
channel = server.channels.get(channel_name)
args = timer.kwargs.get("args", [timer.kwargs.get("arg", None)])
if any(args):
if args:
channel.send_modes(args, False)
else:
channel.send_mode(timer.kwargs["mode"], False)
@ -238,7 +238,7 @@ class Module(ModuleManager.BaseModule):
if event["spec"][1]:
self.timers.add_persistent("unmode", event["spec"][1],
channel=event["spec"][0].id, mode="-m")
channel=event["spec"][0].id, mode="m")
@utils.hook("received.command.cunmute")
@utils.kwarg("require_mode", "o")
@utils.kwarg("require_access", "high,cmute")

View file

@ -43,22 +43,14 @@ class Module(ModuleManager.BaseModule):
failed = []
for list in lists:
record = self._check_list(list.hostname, address)
if record is not None:
a_record, txt_record = record
reason = list.process(a_record, txt_record) or "unknown"
if not record == None:
reason = list.process(record) or "unknown"
failed.append((list.hostname, reason))
return failed
def _check_list(self, list, address):
list_address = "%s.%s" % (address, list)
try:
a_record = dns.resolver.resolve(list_address, "A")[0].to_text()
return dns.resolver.query(list_address, "A")[0].to_text()
except dns.resolver.NXDOMAIN:
return None
try:
txt_record = dns.resolver.resolve(list_address, "TXT")[0].to_text()
except:
txt_record = None
return (a_record, txt_record)

View file

@ -5,62 +5,53 @@ class DNSBL(object):
if not hostname == None:
self.hostname = hostname
def process(self, a_record, txt_record):
out = a_record
if txt_record is not None:
out += f" - {txt_record}"
return out
def process(self, result: str):
return result
class ZenSpamhaus(DNSBL):
hostname = "zen.spamhaus.org"
def process(self, a_record, txt_record):
result = a_record.rsplit(".", 1)[1]
def process(self, result):
result = result.rsplit(".", 1)[1]
if result in ["2", "3", "9"]:
desc = "spam"
return "spam"
elif result in ["4", "5", "6", "7"]:
desc = "exploits"
else:
desc = "unknown"
return f"{result} - {desc}"
return "exploits"
class EFNetRBL(DNSBL):
hostname = "rbl.efnetrbl.org"
def process(self, a_record, txt_record):
result = a_record.rsplit(".", 1)[1]
def process(self, result):
result = result.rsplit(".", 1)[1]
if result == "1":
desc = "proxy"
return "proxy"
elif result in ["2", "3"]:
desc = "spamtap"
return "spamtap"
elif result == "4":
desc = "tor"
return "tor"
elif result == "5":
desc = "flooding"
return f"{result} - {desc}"
return "flooding"
class DroneBL(DNSBL):
hostname = "dnsbl.dronebl.org"
def process(self, result):
result = result.rsplit(".", 1)[1]
if result in ["8", "9", "10", "11", "14"]:
return "proxy"
elif result in ["3", "6", "7"]:
return "flooding"
elif result in ["12", "13", "15", "16"]:
return "exploits"
class AbuseAtCBL(DNSBL):
hostname = "cbl.abuseat.org"
def process(self, a_record, txt_record):
result = a_record.rsplit(".", 1)[1]
def process(self, result):
result = result.rsplit(".", 1)[1]
if result == "2":
desc = "abuse"
else:
desc = "unknown"
return f"{result} - {desc}"
class TorExitDan(DNSBL):
hostname = "torexit.dan.me.uk"
def process(self, a_record, txt_record):
return "tor exit"
return "abuse"
DEFAULT_LISTS = [
ZenSpamhaus(),
EFNetRBL(),
DroneBL(),
AbuseAtCBL(),
TorExitDan()
AbuseAtCBL()
]
def default_lists():

View file

@ -98,10 +98,6 @@ class Module(ModuleManager.BaseModule):
@utils.kwarg("help", "Befriend a duck")
@utils.spec("!-channelonly")
def befriend(self, event):
if not event["target"].get_setting("ducks-enabled", False):
return event["stderr"].write(
"Ducks are not enabled in this channel"
)
if event["target"].duck_active:
action = self._duck_action(event["target"], event["user"],
"befriended", "ducks-befriended")
@ -113,10 +109,6 @@ class Module(ModuleManager.BaseModule):
@utils.kwarg("help", "Trap a duck")
@utils.spec("!-channelonly")
def trap(self, event):
if not event["target"].get_setting("ducks-enabled", False):
return event["stderr"].write(
"Ducks are not enabled in this channel"
)
if event["target"].duck_active:
action = self._duck_action(event["target"], event["user"],
"trapped", "ducks-shot")

View file

@ -31,7 +31,7 @@ class Module(ModuleManager.BaseModule):
_name = "Webhooks"
def on_load(self):
self._github = github.GitHub(self.log, self.exports)
self._github = github.GitHub(self.log)
self._gitea = gitea.Gitea()
self._gitlab = gitlab.GitLab()
@ -135,10 +135,6 @@ class Module(ModuleManager.BaseModule):
for output, url in outputs:
output = "(%s) %s" % (
utils.irc.color(source, colors.COLOR_REPO), output)
if channel.get_setting("git-prevent-highlight", False):
output = self._prevent_highlight(server, channel,
output)
if url:
if channel.get_setting("git-shorten-urls", False):
@ -146,6 +142,10 @@ class Module(ModuleManager.BaseModule):
context=channel) or url
output = "%s - %s" % (output, url)
if channel.get_setting("git-prevent-highlight", False):
output = self._prevent_highlight(server, channel,
output)
hide_prefix = channel.get_setting("git-hide-prefix", False)
self.events.on("send.stdout").call(target=channel,
module_name=webhook_name, server=server, message=output,
@ -228,9 +228,6 @@ class Module(ModuleManager.BaseModule):
if existing_hook:
raise utils.EventError("There's already a hook for %s" %
hook_name)
if hook_name == None:
command = "%s%s" % (event["command_prefix"], event["command"])
raise utils.EventError("Not enough arguments (Usage: %s add <hook>)" % command)
all_hooks[hook_name] = {
"events": DEFAULT_EVENT_CATEGORIES.copy(),

167
modules/git_webhooks/github.py Executable file → Normal file
View file

@ -5,7 +5,6 @@ COMMIT_URL = "https://github.com/%s/commit/%s"
COMMIT_RANGE_URL = "https://github.com/%s/compare/%s...%s"
CREATE_URL = "https://github.com/%s/tree/%s"
PR_URL = "https://github.com/%s/pull/%s"
PR_COMMIT_RANGE_URL = "https://github.com/%s/pull/%s/files/%s..%s"
PR_COMMIT_URL = "https://github.com/%s/pull/%s/commits/%s"
@ -78,19 +77,19 @@ COMMENT_ACTIONS = {
}
COMMENT_MAX = 100
CHECK_SUITE_CONCLUSION = {
"success": ("passed", colors.COLOR_POSITIVE),
"failure": ("failed", colors.COLOR_NEGATIVE),
"neutral": ("finished", colors.COLOR_NEUTRAL),
"cancelled": ("was cancelled", colors.COLOR_NEGATIVE),
"timed_out": ("timed out", colors.COLOR_NEGATIVE),
"action_required": ("requires action", colors.COLOR_NEUTRAL)
CHECK_RUN_CONCLUSION = {
"success": "passed",
"failure": "failed",
"neutral": "finished",
"cancelled": "was cancelled",
"timed_out": "timed out",
"action_required": "requires action"
}
CHECK_RUN_FAILURES = ["failure", "cancelled", "timed_out", "action_required"]
class GitHub(object):
def __init__(self, log, exports):
def __init__(self, log):
self.log = log
self.exports = exports
def is_private(self, data, headers):
if "repository" in data:
@ -126,8 +125,6 @@ class GitHub(object):
category_action = None
if "review" in data and "state" in data["review"]:
category = "%s+%s" % (event, data["review"]["state"])
elif "check_suite" in data and "conclusion" in data["check_suite"]:
category = "%s+%s" % (event, data["check_suite"]["conclusion"])
if action:
if category:
@ -162,8 +159,8 @@ class GitHub(object):
out = self.delete(full_name, data)
elif event == "release":
out = self.release(full_name, data)
elif event == "check_suite":
out = self.check_suite(full_name, data)
elif event == "check_run":
out = self.check_run(data)
elif event == "fork":
out = self.fork(full_name, data)
elif event == "ping":
@ -172,13 +169,24 @@ class GitHub(object):
out = self.membership(organisation, data)
elif event == "watch":
out = self.watch(data)
return out
return list(zip(out, [None]*len(out)))
def _short_url(self, url):
self.log.debug("git.io shortening: %s" % url)
try:
page = utils.http.request("https://git.io", method="POST",
post_data={"url": url})
return page.headers["Location"]
except utils.http.HTTPTimeoutException:
self.log.warn(
"HTTPTimeoutException while waiting for github short URL", [])
return url
def _iso8601(self, s):
return utils.datetime.parse.iso8601(s)
def ping(self, data):
return [("Received new webhook", None)]
return ["Received new webhook"]
def _change_count(self, n, symbol, color):
return utils.irc.color("%s%d" % (symbol, n), color)+utils.irc.bold("")
@ -222,20 +230,21 @@ class GitHub(object):
if len(commits) == 0 and forced:
outputs.append(
"%s %spushed to %s" % (author, forced_str, branch), None)
"%s %spushed to %s" % (author, forced_str, branch))
elif len(commits) <= 3:
for commit in commits:
hash = commit["id"]
hash_colored = utils.irc.color(self._short_hash(hash), colors.COLOR_ID)
message = commit["message"].split("\n")[0].strip()
url = single_url % hash
url = self._short_url(single_url % hash)
outputs.append((
"%s %spushed %s to %s: %s"
% (author, forced_str, hash_colored, branch, message), url))
outputs.append(
"%s %spushed %s to %s: %s - %s"
% (author, forced_str, hash_colored, branch, message, url))
else:
outputs.append(("%s %spushed %d commits to %s"
% (author, forced_str, len(commits), branch), url))
outputs.append("%s %spushed %d commits to %s - %s"
% (author, forced_str, len(commits), branch,
self._short_url(range_url)))
return outputs
@ -253,9 +262,9 @@ class GitHub(object):
action = data["action"]
commit = self._short_hash(data["comment"]["commit_id"])
commenter = utils.irc.bold(data["comment"]["user"]["login"])
url = data["comment"]["html_url"]
return [("[commit/%s] %s %s a comment" % (commit, commenter,
action), url)]
url = self._short_url(data["comment"]["html_url"])
return ["[commit/%s] %s %s a comment - %s" % (commit, commenter,
action, url)]
def pull_request(self, full_name, data):
raw_number = data["pull_request"]["number"]
@ -263,7 +272,7 @@ class GitHub(object):
colored_branch = utils.irc.color(branch, colors.COLOR_BRANCH)
sender = utils.irc.bold(data["sender"]["login"])
author = utils.irc.bold(data["pull_request"]["user"]["login"])
author = utils.irc.bold(data["sender"]["login"])
number = utils.irc.color("#%s" % data["pull_request"]["number"],
colors.COLOR_ID)
identifier = "%s by %s" % (number, author)
@ -319,9 +328,9 @@ class GitHub(object):
action_desc = "renamed %s" % identifier
pr_title = data["pull_request"]["title"]
url = data["pull_request"]["html_url"]
return [("[PR] %s %s: %s" % (
sender, action_desc, pr_title), url)]
url = self._short_url(data["pull_request"]["html_url"])
return ["[PR] %s %s: %s - %s" % (
sender, action_desc, pr_title, url)]
def pull_request_review(self, full_name, data):
if not data["action"] == "submitted":
@ -339,7 +348,7 @@ class GitHub(object):
action = data["action"]
pr_title = data["pull_request"]["title"]
reviewer = utils.irc.bold(data["sender"]["login"])
url = data["review"]["html_url"]
url = self._short_url(data["review"]["html_url"])
state_desc = state
if state == "approved":
@ -349,8 +358,8 @@ class GitHub(object):
elif state == "dismissed":
state_desc = "dismissed a review"
return [("[PR] %s %s on %s: %s" %
(reviewer, state_desc, number, pr_title), url)]
return ["[PR] %s %s on %s: %s - %s" %
(reviewer, state_desc, number, pr_title, url)]
def pull_request_review_comment(self, full_name, data):
number = utils.irc.color("#%s" % data["pull_request"]["number"],
@ -358,9 +367,9 @@ class GitHub(object):
action = data["action"]
pr_title = data["pull_request"]["title"]
sender = utils.irc.bold(data["sender"]["login"])
url = data["comment"]["html_url"]
return [("[PR] %s %s on a review on %s: %s" %
(sender, COMMENT_ACTIONS[action], number, pr_title), url)]
url = self._short_url(data["comment"]["html_url"])
return ["[PR] %s %s on a review on %s: %s - %s" %
(sender, COMMENT_ACTIONS[action], number, pr_title, url)]
def issues(self, full_name, data):
number = utils.irc.color("#%s" % data["issue"]["number"],
@ -374,9 +383,9 @@ class GitHub(object):
issue_title = data["issue"]["title"]
author = utils.irc.bold(data["sender"]["login"])
url = data["issue"]["html_url"]
return [("[issue] %s %s: %s" %
(author, action_str, issue_title), url)]
url = self._short_url(data["issue"]["html_url"])
return ["[issue] %s %s: %s - %s" %
(author, action_str, issue_title, url)]
def issue_comment(self, full_name, data):
if "changes" in data:
# don't show this event when nothing has actually changed
@ -389,29 +398,30 @@ class GitHub(object):
type = "PR" if "pull_request" in data["issue"] else "issue"
title = data["issue"]["title"]
commenter = utils.irc.bold(data["sender"]["login"])
url = data["comment"]["html_url"]
url = self._short_url(data["comment"]["html_url"])
body = ""
if not action == "deleted":
body = ": %s" % self._comment(data["comment"]["body"])
return [("[%s] %s %s on %s (%s)%s" %
(type, commenter, COMMENT_ACTIONS[action], number, title, body), url)]
return ["[%s] %s %s on %s (%s)%s - %s" %
(type, commenter, COMMENT_ACTIONS[action], number, title, body,
url)]
def create(self, full_name, data):
ref = data["ref"]
ref_color = utils.irc.color(ref, colors.COLOR_BRANCH)
type = data["ref_type"]
sender = utils.irc.bold(data["sender"]["login"])
url = CREATE_URL % (full_name, ref)
return [("%s created a %s: %s" % (sender, type, ref_color), url)]
url = self._short_url(CREATE_URL % (full_name, ref))
return ["%s created a %s: %s - %s" % (sender, type, ref_color, url)]
def delete(self, full_name, data):
ref = data["ref"]
ref_color = utils.irc.color(ref, colors.COLOR_BRANCH)
type = data["ref_type"]
sender = utils.irc.bold(data["sender"]["login"])
return [("%s deleted a %s: %s" % (sender, type, ref_color), None)]
return ["%s deleted a %s: %s" % (sender, type, ref_color)]
def release(self, full_name, data):
action = data["action"]
@ -420,47 +430,60 @@ class GitHub(object):
if name:
name = ": %s" % name
author = utils.irc.bold(data["release"]["author"]["login"])
url = data["release"]["html_url"]
return [("%s %s a release%s" % (author, action, name), url)]
url = self._short_url(data["release"]["html_url"])
return ["%s %s a release%s - %s" % (author, action, name, url)]
def check_suite(self, full_name, data):
suite = data["check_suite"]
commit = self._short_hash(suite["head_sha"])
def check_run(self, data):
name = data["check_run"]["name"]
commit = self._short_hash(data["check_run"]["head_sha"])
commit = utils.irc.color(commit, utils.consts.LIGHTBLUE)
pr = ""
url = ""
if suite["pull_requests"]:
pr_num = suite["pull_requests"][0]["number"]
pr = "/PR%s" % utils.irc.color("#%s" % pr_num, colors.COLOR_ID)
url = PR_URL % (full_name, pr_num)
if data["check_run"]["details_url"]:
url = data["check_run"]["details_url"]
url = " - %s" % self.exports.get("shorturl-any")(url)
name = suite["app"]["name"]
conclusion = suite["conclusion"]
conclusion, conclusion_color = CHECK_SUITE_CONCLUSION[conclusion]
conclusion = utils.irc.color(conclusion, conclusion_color)
duration = ""
if data["check_run"]["completed_at"]:
started_at = self._iso8601(data["check_run"]["started_at"])
completed_at = self._iso8601(data["check_run"]["completed_at"])
if completed_at > started_at:
seconds = (completed_at-started_at).total_seconds()
duration = " in %s" % utils.datetime.format.to_pretty_time(
seconds)
created_at = self._iso8601(suite["created_at"])
updated_at = self._iso8601(suite["updated_at"])
seconds = (updated_at-created_at).total_seconds()
duration = utils.datetime.format.to_pretty_time(seconds)
status = data["check_run"]["status"]
status_str = ""
if status == "queued":
status_str = utils.irc.bold("queued")
elif status == "in_progress":
status_str = utils.irc.bold("started")
elif status == "completed":
conclusion = data["check_run"]["conclusion"]
conclusion_color = colors.COLOR_POSITIVE
if conclusion in CHECK_RUN_FAILURES:
conclusion_color = colors.COLOR_NEGATIVE
if conclusion == "neutral":
conclusion_color = colors.COLOR_NEUTRAL
return [("[build @%s%s] %s: %s in %s" % (
commit, pr, name, conclusion, duration), url)]
status_str = utils.irc.color(
CHECK_RUN_CONCLUSION[conclusion], conclusion_color)
return ["[build @%s] %s: %s%s%s" % (
commit, name, status_str, duration, url)]
def fork(self, full_name, data):
forker = utils.irc.bold(data["sender"]["login"])
fork_full_name = utils.irc.color(data["forkee"]["full_name"],
utils.consts.LIGHTBLUE)
url = data["forkee"]["html_url"]
return [("%s forked into %s" %
(forker, fork_full_name), url)]
url = self._short_url(data["forkee"]["html_url"])
return ["%s forked into %s - %s" %
(forker, fork_full_name, url)]
def membership(self, organisation, data):
return [("%s %s %s to team %s" %
return ["%s %s %s to team %s" %
(data["sender"]["login"], data["action"], data["member"]["login"],
data["team"]["name"]), None)]
data["team"]["name"])]
def watch(self, data):
return [("%s starred the repository" % data["sender"]["login"], None)]
return ["%s starred the repository" % data["sender"]["login"]]

View file

@ -47,7 +47,14 @@ class Module(ModuleManager.BaseModule):
return org, repo, number
def _short_url(self, url):
return self.exports.get("shorturl")(self.bot, url) or url
try:
page = utils.http.request("https://git.io", method="POST",
post_data={"url": url})
return page.headers["Location"]
except utils.http.HTTPTimeoutException:
self.log.warn(
"HTTPTimeoutException while waiting for github short URL", [])
return url
def _change_count(self, n, symbol, color):
return utils.irc.color("%s%d" % (symbol, n), color)+utils.irc.bold("")

View file

@ -5,7 +5,6 @@ from src import ModuleManager, utils
import dns.resolver
URL_GEOIP = "http://ip-api.com/json/%s"
URL_IPINFO = "https://ipinfo.io/%s/json"
REGEX_IPv6 = r"(?:(?:[a-f0-9]{1,4}:){2,}|[a-f0-9:]*::)[a-f0-9:]*"
REGEX_IPv4 = r"(?:\d{1,3}\.){3}\d{1,3}"
REGEX_IP = re.compile("%s|%s" % (REGEX_IPv4, REGEX_IPv6), re.I)
@ -22,25 +21,6 @@ def _parse(value):
@utils.export("channelset", utils.FunctionSetting(_parse, "dns-nameserver",
"Set DNS nameserver", example="8.8.8.8"))
class Module(ModuleManager.BaseModule):
def _get_ip(self, event):
ip = event["args_split"][0] if event["args"] else ""
if not ip:
line = event["target"].buffer.find(REGEX_IP)
if line:
ip = line.match
if not ip:
raise utils.EventError("No IP provided")
return ip
def _ipinfo_get(self, url):
access_token = self.bot.config.get("ipinfo-token", None)
headers = {}
if not access_token == None:
headers["Authorization"] = "Bearer %s" % access_token
request = utils.http.Request(url, headers=headers)
return utils.http.request(request)
@utils.hook("received.command.dig", alias_of="dns")
@utils.hook("received.command.dns", min_args=1)
def dns(self, event):
"""
@ -75,7 +55,7 @@ class Module(ModuleManager.BaseModule):
for record_type in record_types:
record_type_strip = record_type.rstrip("?").upper()
try:
query_result = resolver.resolve(hostname, record_type_strip,
query_result = resolver.query(hostname, record_type_strip,
lifetime=4)
query_results = [q.to_text() for q in query_result]
results.append([record_type_strip, query_result.rrset.ttl,
@ -98,79 +78,27 @@ class Module(ModuleManager.BaseModule):
(t, ttl, ", ".join(r)) for t, ttl, r in results]
event["stdout"].write("(%s) %s" % (hostname, " | ".join(results_str)))
@utils.hook("received.command.geoip")
@utils.hook("received.command.geoip", min_args=1)
def geoip(self, event):
"""
:help: Get GeoIP data on a given IPv4/IPv6 address
:help: Get geoip data on a given IPv4/IPv6 address
:usage: <IP>
:prefix: GeoIP
"""
ip = self._get_ip(event)
page = utils.http.request(URL_GEOIP % ip).json()
page = utils.http.request(URL_GEOIP % event["args_split"][0]).json()
if page:
if page["status"] == "success":
hostname = None
try:
hostname, alias, ips = socket.gethostbyaddr(page["query"])
except (socket.herror, socket.gaierror):
pass
data = page["query"]
data += " (%s)" % hostname if hostname else ""
data += " | Organisation: %s" % page["org"]
data += " | City: %s" % page["city"]
data += " | Region: %s (%s)" % (
page["regionName"], page["countryCode"])
data += " | ISP: %s (%s)" % (page["isp"], page["as"])
data += " | ISP: %s" % page["isp"]
data += " | Lon/Lat: %s/%s" % (page["lon"], page["lat"])
data += " | Timezone: %s" % page["timezone"]
event["stdout"].write(data)
else:
event["stderr"].write("No GeoIP data found")
else:
raise utils.EventResultsError()
@utils.hook("received.command.ipinfo")
def ipinfo(self, event):
"""
:help: Get IPinfo.io data on a given IPv4/IPv6 address
:usage: <IP>
:prefix: IPinfo
"""
ip = self._get_ip(event)
page = self._ipinfo_get(URL_IPINFO % ip).json()
if page:
if page.get("error", False):
if isinstance(page["error"], (list, dict)):
event["stderr"].write(page["error"]["message"])
else:
event["stderr"].write(page["error"])
elif page.get("ip", False):
bogon = page.get("bogon", False)
hostname = page.get("hostname", None)
if not hostname and not bogon:
try:
hostname, alias, ips = socket.gethostbyaddr(page["ip"])
except (socket.herror, socket.gaierror):
pass
data = page["ip"]
if bogon:
data += " (Bogon)"
else:
data += " (%s)" % hostname if hostname else ""
data += " (Anycast)" if page.get("anycast", False) == True else ""
if page.get("country", False):
data += " | City: %s" % page["city"]
data += " | Region: %s (%s)" % (page["region"], page["country"])
data += " | ISP: %s" % page.get("org", "Unknown")
data += " | Lon/Lat: %s" % page["loc"]
data += " | Timezone: %s" % page["timezone"]
event["stdout"].write(data)
else:
event["stderr"].write("Unsupported endpoint")
event["stderr"].write("No geoip data found")
else:
raise utils.EventResultsError()
@ -181,7 +109,13 @@ class Module(ModuleManager.BaseModule):
:usage: <IP>
:prefix: rDNS
"""
ip = self._get_ip(event)
ip = event["args_split"][0] if event["args"] else ""
if not ip:
line = event["target"].buffer.find(REGEX_IP)
if line:
ip = line.match
if not ip:
raise utils.EventError("No IP provided")
try:
hostname, alias, ips = socket.gethostbyaddr(ip)

View file

@ -1,22 +1,11 @@
from src import EventManager, ModuleManager, utils
TAGS = {
utils.irc.MessageTag(None, "inspircd.org/bot"),
utils.irc.MessageTag(None, "draft/bot")
}
TAG = utils.irc.MessageTag(None, "inspircd.org/bot")
class Module(ModuleManager.BaseModule):
@utils.hook("received.376")
@utils.hook("received.422")
def botmode(self, event):
if "BOT" in event["server"].isupport:
botmode = event["server"].isupport["BOT"]
event["server"].send_raw("MODE %s +%s" % (event["server"].nickname, botmode))
@utils.hook("received.message.private")
@utils.hook("received.message.channel")
@utils.kwarg("priority", EventManager.PRIORITY_HIGH)
def message(self, event):
for tag in TAGS:
if tag.present(event["tags"]):
event.eat()
if TAG.present(event["tags"]):
event.eat()

View file

@ -14,18 +14,6 @@ REGEX_PARENS = re.compile(r"\(([^)]+)\)(\+\+|--)")
@utils.export("channelset", utils.BoolSetting("karma-pattern",
"Enable/disable parsing ++/-- karma format"))
class Module(ModuleManager.BaseModule):
def listify(self, items):
if type(items) != list:
items = list(items)
listified = ""
if len(items) > 2:
listified = ', '.join(items[:-1]) + ', and ' + items[-1]
elif len(items) > 1:
listified = items[0] + ' and ' + items[1]
elif items:
listified = items[0]
return listified
def _karma_str(self, karma):
karma_str = str(karma)
if karma < 0:
@ -78,8 +66,7 @@ class Module(ModuleManager.BaseModule):
self._set_throttle(sender, positive)
karma_str = self._karma_str(karma)
karma_total = sum(self._get_karma(server, target).values())
karma_total = self._karma_str(karma_total)
karma_total = self._karma_str(self._get_karma(server, target))
return True, "%s now has %s karma (%s from %s)" % (
target, karma_total, karma_str, sender.nickname)
@ -131,35 +118,18 @@ class Module(ModuleManager.BaseModule):
target = event["user"].nickname
target = self._get_target(event["server"], target)
karma = sum(self._get_karma(event["server"], target).values())
karma = self._karma_str(karma)
karma = self._karma_str(self._get_karma(event["server"], target))
event["stdout"].write("%s has %s karma" % (target, karma))
@utils.hook("received.command.karmawho")
@utils.spec("!<target>string")
def karmawho(self, event):
target = event["server"].irc_lower(event["spec"][0])
karma = self._get_karma(event["server"], target, True)
karma = sorted(list(karma.items()),
key=lambda k: abs(k[1]),
reverse=True)
parts = ["%s (%d)" % (n, v) for n, v in karma]
if len(parts) == 0:
event["stdout"].write("%s has no karma." % target)
return
event["stdout"].write("%s has karma from: %s" %
(target, self.listify(parts)))
def _get_karma(self, server, target, own=False):
def _get_karma(self, server, target):
settings = dict(server.get_all_user_settings("karma-%s" % target))
target_lower = server.irc_lower(target)
if target_lower in settings and not own:
if target_lower in settings:
del settings[target_lower]
return settings
return sum(settings.values())
@utils.hook("received.command.resetkarma")
@utils.kwarg("min_args", 2)

View file

@ -82,10 +82,7 @@ class Module(ModuleManager.BaseModule):
tags_str = ""
if "toptags" in track and track["toptags"]["tag"]:
tags_list = track["toptags"]["tag"]
if not type(tags_list) == list:
tags_list = [tags_list]
tags = [t["name"] for t in tags_list]
tags = [t["name"] for t in track["toptags"]["tag"]]
tags_str = " [%s]" % ", ".join(tags)
play_count_str = ""

View file

@ -2,7 +2,6 @@
#--require-config opencagedata-api-key
import typing
import pytz
from src import ModuleManager, utils
URL_OPENCAGE = "https://api.opencagedata.com/geocode/v1/json"
@ -20,11 +19,6 @@ class Module(ModuleManager.BaseModule):
if page and page["results"]:
result = page["results"][0]
timezone = result["annotations"]["timezone"]["name"]
try:
pytz.timezone(timezone)
except pytz.exceptions.UnknownTimeZoneError:
return None
lat = result["geometry"]["lat"]
lon = result["geometry"]["lng"]

View file

@ -15,10 +15,6 @@ class Module(ModuleManager.BaseModule):
@utils.hook("preprocess.send.privmsg")
@utils.hook("preprocess.send.notice")
def channel_message(self, event):
if event["line"].assured():
# don't run filters/replaces against assured lines
return
message = event["line"].args[1]
original_message = message
message_plain = utils.irc.strip_font(message)

518
modules/nr.py Normal file
View file

@ -0,0 +1,518 @@
#--depends-on commands
#--require-config nre-api-key
import collections, re, time
from datetime import datetime, date
from collections import Counter
from src import ModuleManager, utils
from suds.client import Client
from suds import WebFault
# Note that this module requires the open *Staff Version* of the Darwin API
# You can register for an API key here: http://openldbsv.nationalrail.co.uk/
# We use this instead of the 'regular' version because it offers a *lot* more
# information.
URL = 'https://lite.realtime.nationalrail.co.uk/OpenLDBSVWS/wsdl.aspx?ver=2016-02-16'
class Module(ModuleManager.BaseModule):
_name = "NR"
_client = None
PASSENGER_ACTIVITIES = ["U", "P", "R"]
COLOURS = [utils.consts.LIGHTBLUE, utils.consts.GREEN,
utils.consts.RED, utils.consts.CYAN, utils.consts.LIGHTGREY,
utils.consts.ORANGE]
@property
def client(self):
if self._client: return self._client
try:
token = self.bot.config["nre-api-key"]
client = Client(URL)
header_token = client.factory.create('ns2:AccessToken')
header_token.TokenValue = token
client.set_options(soapheaders=header_token)
self._client = client
except Exception as e:
pass
return self._client
def filter(self, args, defaults):
args = re.findall(r"[^\s,]+", args)
params = {}
for arg in args:
if ":" in arg:
params[arg.split(":", 1)[0]] = arg.split(":", 1)[1]
elif "=" in arg:
params[arg.split("=", 1)[0]] = arg.split("=", 1)[1]
else:
params[arg.replace("!", "")] = '!' not in arg
ret = {k: v[0] for k,v in defaults.items()}
ret["default"] = True
ret["errors"] = []
for k,v in params.items():
if not k in defaults.keys():
ret["errors"].append((k, "Invalid parameter"))
continue
if not defaults[k][1](v):
ret["errors"].append((v, 'Invalid value for "%s"' % k))
continue
ret["default"] = False
ret[k] = v if len(defaults[k]) == 2 else defaults[k][2](v)
ret["errors_summary"] = ", ".join(['"%s": %s' % (a[0], a[1]) for a in ret["errors"]])
return ret
def process(self, service):
ut_now = datetime.now().timestamp()
nonetime = {"orig": None, "datetime": None, "ut": 0,
"short": ' ', "prefix": '', "on_time": False,
"estimate": False, "status": 4, "schedule": False}
times = {}
a_types = ["eta", "ata", "sta"]
d_types = ["etd", "atd", "std"]
for a in a_types + d_types:
if a in service and service[a]:
times[a] = {"orig": service[a]}
if len(service[a]) > 5:
times[a]["datetime"] = datetime.strptime(service[a], "%Y-%m-%dT%H:%M:%S")
else:
times[a]["datetime"] = datetime.strptime(
datetime.now().date().isoformat() + "T" + service[a][:4],
"%Y-%m-%dT%H%M"
)
times[a]["ut"] = times[a]["datetime"].timestamp()
else:
times[a] = nonetime
for k, a in times.items():
if not a["orig"]: continue
a["short"] = a["datetime"].strftime("%H%M") if len(a["orig"]) > 5 else a["orig"]
a["shortest"] = "%02d" % a["datetime"].minute if -300 < a["ut"]-ut_now < 1800 else a["short"]
a["prefix"] = k[2] + ("s" if k[0] == "s" else "")
a["estimate"] = k[0] == "e"
a["schedule"] = k[0] == "s"
a["on_time"] = a["ut"] - times["s"+ k[1:]]["ut"] < 300
a["status"] = 1 if a["on_time"] else 2
if "a" + k[1:] in service: a["status"] = {"d": 0, "a": 3}[k[2]]
if k[0] == "s": a["status"] = 4
arr, dep = [times[a] for a in a_types if times[a]["ut"]], [times[a] for a in d_types if times[a]["ut"]]
times["arrival"] = (arr + dep + [nonetime])[0]
times["departure"] = (dep + arr + [nonetime])[0]
times["a"], times["d"] = (arr + [nonetime])[0], (dep + [nonetime])[0]
times["both"] = times["departure"]
times["max_sched"] = {"ut": max(times["sta"]["ut"], times["std"]["ut"])}
return times
def activities(self, string): return [a+b.strip() for a,b in list(zip(*[iter(string)]*2)) if (a+b).strip()]
def reduced_activities(self, string): return [a for a in self.activities(string) if a in self.PASSENGER_ACTIVITIES]
@utils.hook("received.command.nrtrains", min_args=1)
def trains(self, event):
"""
:help: Get train/bus services for a station (Powered by NRE)
:usage: <crs_id>
"""
client = self.client
colours = self.COLOURS
schedule = {}
location_code = event["args_split"][0].upper()
filter = self.filter(' '.join(event["args_split"][1:]) if len(event["args_split"]) > 1 else "", {
"dest": ('', lambda x: x.isalpha() and len(x)==3),
"origin":('', lambda x: x.isalpha() and len(x)==3),
"inter": ('', lambda x: x.isalpha() and len(x)==3, lambda x: x.upper()),
"toc": ('', lambda x: x.isalpha() and len(x) == 2),
"dedup": (False, lambda x: type(x)==type(True)),
"plat": ('', lambda x: len(x) <= 3),
"type": ("departure", lambda x: x in ["departure", "arrival", "both"]),
"terminating": (False, lambda x: type(x)==type(True)),
"period": (120, lambda x: x.isdigit() and 1 <= int(x) <= 480, lambda x: int(x)),
"nonpassenger": (False, lambda x: type(x)==type(True)),
"time": ("", lambda x: len(x)==4 and x.isdigit()),
"date": ("", lambda x: len(x)==10),
"tops": (None, lambda x: len(x)<4 and x.isdigit()),
"power": (None, lambda x: x.upper() in ["EMU", "DMU", "HST", "D", "E", "DEM"], lambda x: x.upper()),
"crs": (False, lambda x: type(x)==type(True)),
"st": (False, lambda x: type(x)==type(True))
})
if filter["errors"]:
raise utils.EventError("Filter: " + filter["errors_summary"])
if filter["inter"] and filter["type"]!="departure":
raise utils.EventError("Filtering by intermediate stations is only "
"supported for departures.")
nr_filterlist = client.factory.create("filterList")
if filter["inter"]: nr_filterlist.crs.append(filter["inter"])
now = datetime.now()
if filter["time"]:
now = now.replace(hour=int(filter["time"][:2]))
now = now.replace(minute=int(filter["time"][2:]))
if filter["date"]:
newdate = datetime.strptime(filter["date"], "%Y-%m-%d").date()
now = now.replace(day=newdate.day, month=newdate.month, year=newdate.year)
method = client.service.GetArrivalDepartureBoardByCRS if len(location_code) == 3 else client.service.GetArrivalDepartureBoardByTIPLOC
try:
query = method(100, location_code, now.isoformat().split(".")[0], filter["period"],
nr_filterlist, "to", '', "PBS", filter["nonpassenger"])
except WebFault as detail:
if str(detail) == "Server raised fault: 'Invalid crs code supplied'":
raise utils.EventError("Invalid CRS code.")
else:
raise utils.EventError("An error occurred.")
nrcc_severe = len([a for a in query["nrccMessages"][0] if a["severity"] == "Major"]) if "nrccMessages" in query else 0
if event.get("external"):
station_summary = "%s (%s) - %s (%s):\n" % (query["locationName"], query["crs"], query["stationManager"],
query["stationManagerCode"])
else:
severe_summary = ""
if nrcc_severe:
severe_summary += ", "
severe_summary += utils.irc.bold(utils.irc.color("%s severe messages" % nrcc_severe, utils.consts.RED))
station_summary = "%s (%s, %s%s)" % (query["locationName"], query["crs"], query["stationManagerCode"], severe_summary)
if not "trainServices" in query and not "busServices" in query and not "ferryServices" in query:
return event["stdout"].write("%s: No services for the next %s minutes" % (
station_summary, filter["period"]))
trains = []
services = []
if "trainServices" in query: services += query["trainServices"][0]
if "busServices" in query: services += query["busServices"][0]
if "ferryServices" in query: services += query["ferryServices"][0]
for t in services:
parsed = {
"rid" : t["rid"],
"uid" : t["uid"],
"head" : t["trainid"],
"platform": '?' if not "platform" in t else t["platform"],
"platform_hidden": "platformIsHidden" in t and t["platformIsHidden"],
"platform_prefix": "",
"toc": t["operatorCode"],
"cancelled" : t["isCancelled"] if "isCancelled" in t else False,
"delayed" : t["departureType"]=="Delayed" if "departureType" in t else None,
"cancel_reason" : t["cancelReason"]["value"] if "cancelReason" in t else "",
"delay_reason" : t["delayReason"]["value"] if "delayReason" in t else "",
"terminating" : not "std" in t and not "etd" in t and not "atd" in t,
"bus" : t["trainid"]=="0B00",
"times" : self.process(t),
"activity" : self.reduced_activities(t["activities"]),
}
parsed["destinations"] = [{"name": a["locationName"], "tiploc": a["tiploc"],
"crs": a["crs"] if "crs" in a else '', "code": a["crs"] if "crs"
in a else a["tiploc"], "via": a["via"] if "via" in a else ''}
for a in t["destination"][0]]
parsed["origins"] = [{"name": a["locationName"], "tiploc": a["tiploc"],
"crs": a["crs"] if "crs" in a else '', "code": a["crs"] if "crs"
in a else a["tiploc"], "via": a["via"] if "via" in a else ''}
for a in t["origin"][0]]
parsed["departure_only"] = location_code in [a["code"] for a in parsed["origins"]]
if parsed["cancelled"] or parsed["delayed"]:
for k, time in parsed["times"].items():
time["short"], time["on_time"], time["status"], time["prefix"] = (
"%s:%s" % ("C" if parsed["cancel_reason"] else "D", parsed["cancel_reason"] or parsed["delay_reason"] or "?"),
False, 2, ""
)
trains.append(parsed)
for t in trains:
t["dest_summary"] = "/".join(["%s%s" %(a["code"]*filter["crs"] or a["name"], " " + a["via"]
if a["via"] else '') for a in t["destinations"]])
t["origin_summary"] = "/".join(["%s%s" %(a["code"]*filter["crs"] or a["name"], " " + a["via"]
if a["via"] else '') for a in t["origins"]])
trains = sorted(trains, key=lambda t: t["times"]["max_sched"]["ut"] if filter["type"]=="both" else t["times"]["st" + filter["type"][0]]["ut"])
trains_filtered = []
train_locs_toc = []
for train in trains:
if not True in [
(train["destinations"], train["toc"]) in train_locs_toc and (filter["dedup"] or filter["default"]),
filter["dest"] and not filter["dest"].upper() in [a["code"] for a in train["destinations"]],
filter["origin"] and not filter["origin"].upper() in [a["code"] for a in train["origins"]],
filter["toc"] and not filter["toc"].upper() == train["toc"],
filter["plat"] and not filter["plat"] == train["platform"],
filter["type"] == "departure" and train["terminating"],
filter["type"] == "arrival" and train["departure_only"],
filter["terminating"] and not train["terminating"],
filter["tops"] and not filter["tops"] in train.get("tops_possible", []),
filter["power"] and not filter["power"]==train.get("power_type", None),
]:
train_locs_toc.append((train["destinations"], train["toc"]))
trains_filtered.append(train)
if event.get("external"):
trains_string = "\n".join(["%-6s %-4s %-2s %-3s %1s%-6s %1s %s" % (
t["uid"], t["head"], t["toc"], "bus" if t["bus"] else t["platform"],
"~" if t["times"]["both"]["estimate"] else '',
t["times"]["both"]["prefix"] + t["times"]["both"]["short"],
"" if t["terminating"] or filter["type"]=="arrival" else "",
t["origin_summary"] if t["terminating"] or filter["type"]=="arrival" else t["dest_summary"]
) for t in trains_filtered])
else:
trains_string = ", ".join(["%s%s (%s, %s%s%s%s, %s%s%s)" % (
"from " if not filter["type"][0] in "ad" and t["terminating"] else '',
t["origin_summary"] if t["terminating"] or filter["type"]=="arrival" else t["dest_summary"],
t["uid"],
t["platform_prefix"],
"bus" if t["bus"] else t["platform"],
"*" if t["platform_hidden"] else '',
"?" if "platformsAreUnreliable" in query and query["platformsAreUnreliable"] else '',
t["times"][filter["type"]]["prefix"].replace(filter["type"][0], '') if not t["cancelled"] else "",
utils.irc.bold(utils.irc.color(t["times"][filter["type"]]["shortest"*filter["st"] or "short"], colours[t["times"][filter["type"]]["status"]])),
bool(t["activity"])*", " + "+".join(t["activity"]),
) for t in trains_filtered])
if event.get("external"):
event["stdout"].write("%s%s\n%s" % (
station_summary, "\n calling at %s" % filter["inter"] if filter["inter"] else '', trains_string))
else:
event["stdout"].write("%s%s: %s" % (station_summary, " departures calling at %s" % filter["inter"] if filter["inter"] else '', trains_string))
@utils.hook("received.command.nrservice", min_args=1)
def service(self, event):
"""
:help: Get train service information for a UID, headcode or RID
(Powered by NRE)
:usage: <service_id>
"""
client = self.client
colours = self.COLOURS
external = event.get("external", False)
SCHEDULE_STATUS = {"B": "perm bus", "F": "freight train", "P": "train",
"S": "ship", "T": "trip", "1": "train", "2": "freight",
"3": "trip", "4": "ship", "5": "bus"}
schedule = {}
sources = []
service_id = event["args_split"][0]
filter = self.filter(' '.join(event["args_split"][1:]) if len(event["args_split"]) > 1 else "", {
"passing": (False, lambda x: type(x)==type(True)),
"associations": (False, lambda x: type(x)==type(True)),
"type": ("arrival", lambda x: x in ["arrival", "departure"])
})
if filter["errors"]:
raise utils.EventError("Filter: " + filter["errors_summary"])
rid = service_id
if len(service_id) <= 8:
query = client.service.QueryServices(service_id, datetime.utcnow().date().isoformat(),
datetime.utcnow().time().strftime("%H:%M:%S+0000"))
if not query and not schedule:
return event["stdout"].write("No service information is available for this identifier.")
if query and len(query["serviceList"][0]) > 1:
return event["stdout"].write("Identifier refers to multiple services: " +
", ".join(["%s (%s->%s)" % (a["uid"], a["originCrs"], a["destinationCrs"]) for a in query["serviceList"][0]]))
if query: rid = query["serviceList"][0][0]["rid"]
if query:
sources.append("LDBSVWS")
query = client.service.GetServiceDetailsByRID(rid)
if schedule:
sources.append("Eagle/SCHEDULE")
if not query: query = {"trainid": schedule["signalling_id"] or "0000", "operator": schedule["operator_name"] or schedule["atoc_code"]}
stype = "%s %s" % (schedule_query.data["tops_inferred"], schedule["power_type"]) if schedule_query.data["tops_inferred"] else schedule["power_type"]
for k,v in {
"operatorCode": schedule["atoc_code"],
"serviceType": stype if stype else SCHEDULE_STATUS[schedule["status"]],
}.items():
query[k] = v
disruptions = []
if "cancelReason" in query:
disruptions.append("Cancelled (%s%s)" % (query["cancelReason"]["value"], " at " + query["cancelReason"]["_tiploc"] if query["cancelReason"]["_tiploc"] else ""))
if "delayReason" in query:
disruptions.append("Delayed (%s%s)" % (query["delayReason"]["value"], " at " + query["delayReason"]["_tiploc"] if query["delayReason"]["_tiploc"] else ""))
if disruptions and not external:
disruptions = utils.irc.color(", ".join(disruptions), utils.consts.RED) + " "
elif disruptions and external:
disruptions = ", ".join(disruptions)
else: disruptions = ""
stations = []
for station in query["locations"][0] if "locations" in query else schedule["locations"]:
if "locations" in query:
parsed = {"name": station["locationName"],
"crs": (station["crs"] if "crs" in station else station["tiploc"]).rstrip(),
"tiploc": station["tiploc"].rstrip(),
"called": "atd" in station,
"passing": station["isPass"] if "isPass" in station else False,
"first": len(stations) == 0,
"last" : False,
"cancelled" : station["isCancelled"] if "isCancelled" in station else False,
"associations": [],
"length": station["length"] if "length" in station else None,
"times": self.process(station),
"platform": station["platform"] if "platform" in station else None,
"activity": self.activities(station["activities"]) if "activities" in station else [],
"activity_p": self.reduced_activities(station["activities"]) if "activities" in station else [],
}
if parsed["cancelled"]:
parsed["times"]["arrival"].update({"short": "Cancelled", "on_time": False, "status": 2})
parsed["times"]["departure"].update({"short": "Cancelled", "on_time": False, "status": 2})
associations = station["associations"][0] if "associations" in station else []
for assoc in associations:
parsed_assoc = {
"uid_assoc": assoc.uid,
"category": {"divide": "VV", "join": "JJ", "next": "NP"}[assoc["category"]],
"from": parsed["first"], "direction": assoc["destTiploc"].rstrip()==parsed["tiploc"],
"origin_name": assoc["origin"], "origin_tiploc": assoc["originTiploc"],
"origin_crs": assoc["originCRS"] if "originCRS" in assoc else None,
"dest_name": assoc["destination"], "dest_tiploc": assoc["destTiploc"],
"dest_crs": assoc["destCRS"] if "destCRS" in assoc else None,
"far_name": assoc["destination"], "far_tiploc": assoc["destTiploc"],
"far_crs": assoc["destCRS"] if "destCRS" in assoc else None,
}
if parsed_assoc["direction"]:
parsed_assoc.update({"far_name": parsed_assoc["origin_name"],
"far_tiploc": parsed_assoc["origin_tiploc"], "far_crs": parsed_assoc["origin_crs"]})
parsed["associations"].append(parsed_assoc)
else:
parsed = {"name": (station["name"] or "none"),
"crs": station["crs"] if station["crs"] else station["tiploc"],
"tiploc": station["tiploc"],
"called": False,
"passing": bool(station.get("pass")),
"first": len(stations) == 0,
"last" : False,
"cancelled" : False,
"length": None,
"times": self.process(station["dolphin_times"]),
"platform": station["platform"],
"associations": station["associations"] or [],
"activity": self.activities(station["activity"]),
"activity_p": self.reduced_activities(station["activity"]),
}
stations.append(parsed)
[a for a in stations if a["called"] or a["first"]][-1]["last"] = True
for station in stations[0:[k for k,v in enumerate(stations) if v["last"]][0]]:
if not station["first"]: station["called"] = True
for station in stations:
for assoc in station["associations"]:
assoc["summary"] = "{arrow} {assoc[category]} {assoc[uid_assoc]} {dir_arrow} {assoc[far_name]} ({code})".format(assoc=assoc, arrow=assoc["from"]*"<-" or "->", dir_arrow=(assoc["direction"])*"<-" or "->", code=assoc["far_crs"] or assoc["far_tiploc"])
if station["passing"]:
station["times"]["arrival"]["status"], station["times"]["departure"]["status"] = 5, 5
elif station["called"]:
station["times"]["arrival"]["status"], station["times"]["departure"]["status"] = 0, 0
station["summary"] = "%s%s (%s%s%s%s%s)%s" % (
"*" * station["passing"],
station["name"],
station["crs"] + ", " if station["name"] != station["crs"] else '',
station["length"] + " car, " if station["length"] and (station["first"] or station["associations"]) else '',
("~" if station["times"][filter["type"]]["estimate"] else '') +
station["times"][filter["type"]]["prefix"].replace(filter["type"][0], ""),
utils.irc.color(station["times"][filter["type"]]["short"], colours[station["times"][filter["type"]]["status"]]),
", "*bool(station["activity_p"]) + "+".join(station["activity_p"]),
", ".join([a["summary"] for a in station["associations"]] if filter["associations"] else ""),
)
station["summary_external"] = "%1s%-5s %1s%-5s %-3s %-3s %-3s %s%s" % (
"~"*station["times"]["a"]["estimate"] + "s"*(station["times"]["a"]["schedule"]),
station["times"]["a"]["short"],
"~"*station["times"]["d"]["estimate"] + "s"*(station["times"]["d"]["schedule"]),
station["times"]["d"]["short"],
station["platform"] or '',
",".join(station["activity"]) or '',
station["crs"] or station["tiploc"],
station["name"],
"\n" + "\n".join([a["summary"] for a in station["associations"]]) if station["associations"] else "",
)
stations_filtered = []
for station in stations:
if station["passing"] and not filter["passing"]: continue
if station["called"] and filter["default"] and not external:
if not station["first"] and not station["last"]:
continue
stations_filtered.append(station)
if station["first"] and not station["last"] and filter["default"] and not external:
stations_filtered.append({"summary": "(...)", "summary_external": "(...)"})
done_count = len([s for s in stations if s["called"]])
total_count = len(stations)
if external:
event["stdout"].write("%s: %s\n%s%s (%s) %s %s\n\n%s" % (
service_id, ", ".join(sources),
disruptions + "\n" if disruptions else '',
query["operator"], query["operatorCode"], query["trainid"], query["serviceType"],
"\n".join([s["summary_external"] for s in stations_filtered])
))
else:
event["stdout"].write("%s%s %s %s (%s/%s): %s" % (disruptions, query["operatorCode"],
query["trainid"], query["serviceType"],
done_count, total_count,
", ".join([s["summary"] for s in stations_filtered])))
@utils.hook("received.command.nrhead", min_args=1)
def head(self, event):
"""
:help: Get information for a given headcode/UID/RID (Powered by NRE)
:usage: <headcode>
"""
client = self.client
service_id = event["args_split"][0]
query = client.service.QueryServices(service_id, datetime.utcnow().date().isoformat(),
datetime.utcnow().time().strftime("%H:%M:%S+0000"))
if not query:
raise utils.EventError("No currently running services match this "
"identifier")
services = query["serviceList"][0]
if event.get("external"):
event["stdout"].write("\n".join(["{a.uid:6} {a.trainid:4} {a.originName} ({a.originCrs}) → {a.destinationName} ({a.destinationCrs})".format(a=a) for a in services]))
else:
event["stdout"].write(", ".join(["h/%s r/%s u/%s rs/%s %s (%s) -> %s (%s)" % (a["trainid"], a["rid"], a["uid"], a["rsid"], a["originName"], a["originCrs"], a["destinationName"], a["destinationCrs"]) for a in services]))
@utils.hook("received.command.nrcode", min_args=1)
def service_code(self, event):
"""
:help: Get the text for a given delay/cancellation code (Powered by NRE)
:usage: <code>
"""
client = self.client
if not event["args"].isnumeric():
raise utils.EventError("The delay/cancellation code must be a "
"number")
reasons = {a["code"]:(a["lateReason"], a["cancReason"]) for a in client.service.GetReasonCodeList()[0]}
if event["args"] in reasons:
event["stdout"].write("%s: %s" % (event["args"], " / ".join(reasons[event["args"]])))
else:
event["stdout"].write("This doesn't seem to be a valid reason code")

View file

@ -4,7 +4,7 @@
from src import ModuleManager, utils
@utils.export("set", utils.Setting("pronouns", "Set your pronouns",
example="they/them"))
example="she/her"))
class Module(ModuleManager.BaseModule):
@utils.hook("received.command.pronouns")
def pronouns(self, event):

View file

@ -5,9 +5,6 @@ from src import ModuleManager, utils
@utils.export("channelset", utils.BoolSetting("channel-quotes",
"Whether or not quotes added from this channel are kept in this channel"))
@utils.export("set", utils.BoolSetting("quotable",
"Whether or not you wish to be quoted"))
class Module(ModuleManager.BaseModule):
def category_and_quote(self, s):
category, sep, quote = s.partition("=")
@ -34,11 +31,6 @@ class Module(ModuleManager.BaseModule):
"channel-quotes", False):
target = event["target"]
if not event["server"].get_user(category).get_setting(
"quotable", True):
event["stderr"].write("%s does not wish to be quoted" % category)
return
quotes = self._get_quotes(target, category)
quotes.append([event["user"].name, int(time.time()), quote])
self._set_quotes(target, category, quotes)
@ -156,10 +148,6 @@ class Module(ModuleManager.BaseModule):
text = " ".join(lines_str)
quote_category = line.sender
if not event["server"].get_user(quote_category).get_setting(
"quotable", True):
event["stderr"].write("%s does not wish to be quoted" % quote_category)
return
if event["server"].has_user(quote_category):
quote_category = event["server"].get_user_nickname(
event["server"].get_user(quote_category).get_id())

View file

@ -1,7 +1,7 @@
#--depends-on config
#--depends-on shorturl
import difflib, hashlib, time, re
import difflib, hashlib, time
from src import ModuleManager, utils
import feedparser
@ -9,66 +9,37 @@ RSS_INTERVAL = 60 # 1 minute
SETTING_BIND = utils.Setting("rss-bindhost",
"Which local address to bind to for RSS requests", example="127.0.0.1")
@utils.export("botset", utils.IntSetting("rss-interval",
"Interval (in seconds) between RSS polls", example="120"))
@utils.export("channelset", utils.BoolSetting("rss-shorten",
"Whether or not to shorten RSS urls"))
@utils.export("channelset", utils.Setting("rss-format", "Format of RSS announcements", example="${longtitle}: ${title} - ${link} [${author}]"))
@utils.export("serverset", SETTING_BIND)
@utils.export("channelset", SETTING_BIND)
class Module(ModuleManager.BaseModule):
_name = "RSS"
def _migrate_formats(self):
count = 0
migration_re = re.compile(r"(?:\$|{)+(?P<variable>[^}:\s]+)(?:})?")
old_formats = self.bot.database.execute_fetchall("""
SELECT channel_id, value FROM channel_settings
WHERE setting = 'rss-format'
""")
for channel_id, format in old_formats:
new_format = migration_re.sub(r"${\1}", format)
self.bot.database.execute("""
UPDATE channel_settings SET value = ?
WHERE setting = 'rss-format'
AND channel_id = ?
""", [new_format, channel_id])
count += 1
self.log.info("Successfully migrated %d rss-format settings" % count)
def on_load(self):
if not self.bot.get_setting("rss-fmt-migration", False):
self.log.info("Attempting to migrate old rss-format settings")
self._migrate_formats()
self.bot.set_setting("rss-fmt-migration", True)
self.timers.add("rss-feeds", self._timer,
self.bot.get_setting("rss-interval", RSS_INTERVAL))
def _format_entry(self, server, channel, feed_title, entry, shorten):
def _format_entry(self, server, feed_title, entry, shorten):
title = utils.parse.line_normalise(utils.http.strip_html(
entry["title"]))
author = entry.get("author", None)
author = " by %s" % author if author else ""
link = entry.get("link", None)
if shorten:
try:
link = self.exports.get("shorturl")(server, link)
except:
pass
link = "%s" % link if link else ""
link = " - %s" % link if link else ""
variables = dict(
longtitle=feed_title or "",
title=utils.parse.line_normalise(utils.http.strip_html(
entry["title"])),
link=link or "",
author=entry.get("author", "unknown author") or "",
)
variables.update(entry)
# just in case the format starts keyerroring and you're not sure why
self.log.trace("RSS Entry: " + str(entry))
template = channel.get_setting("rss-format", "${longtitle}: ${title} by ${author} - ${link}")
_, formatted = utils.parse.format_token_replace(template, variables)
return formatted
feed_title_str = "%s: " % feed_title if feed_title else ""
return "%s%s%s%s" % (feed_title_str, title, author, link)
def _timer(self, timer):
start_time = time.monotonic()
@ -135,7 +106,7 @@ class Module(ModuleManager.BaseModule):
valid += 1
shorten = channel.get_setting("rss-shorten", False)
output = self._format_entry(server, channel, feed_title, entry,
output = self._format_entry(server, feed_title, entry,
shorten)
self.events.on("send.stdout").call(target=channel,
@ -229,10 +200,10 @@ class Module(ModuleManager.BaseModule):
title, entries = self._get_entries(url)
if not entries:
raise utils.EventError("%s has no entries" % url)
raise utils.EventError("Failed to get RSS entries")
shorten = event["target"].get_setting("rss-shorten", False)
out = self._format_entry(event["server"], event["target"], title, entries[0],
out = self._format_entry(event["server"], title, entries[0],
shorten)
event["stdout"].write(out)
else:

View file

@ -4,7 +4,7 @@
import re, traceback
from src import ModuleManager, utils
REGEX_SED = re.compile(r"^(?:(\S+)[:,] )?s([/,`#]).*\2")
REGEX_SED = re.compile("^(?:(\\S+)[:,] )?s/")
@utils.export("channelset",
utils.BoolSetting("sed","Disable/Enable sed in a channel"))
@ -35,7 +35,7 @@ class Module(ModuleManager.BaseModule):
sed.replace = utils.irc.bold(sed.replace)
if self._closest_setting(event, "sed-sender-only", False):
for_user = event["user"].nickname_lower
for_user = event["user"].nickname
match_line = None
match_message = None

View file

@ -4,7 +4,7 @@
import re
from src import ModuleManager, utils
URL_BITLYSHORTEN = "https://api-ssl.bitly.com/v4/shorten"
URL_BITLYSHORTEN = "https://api-ssl.bitly.com/v3/shorten"
class Module(ModuleManager.BaseModule):
def on_load(self):
@ -41,7 +41,7 @@ class Module(ModuleManager.BaseModule):
@utils.export("shorturl-any")
def _shorturl_any(self, url):
return self._call_shortener(None, None, "bitly", url) or url
return self._call_shortener(server, None, "bitly", url) or url
@utils.export("shorturl")
def _shorturl(self, server, url, context=None):
@ -66,16 +66,11 @@ class Module(ModuleManager.BaseModule):
access_token = self.bot.config.get("bitly-api-key", None)
if access_token:
resp = utils.http.request(
URL_BITLYSHORTEN,
method="POST",
post_data={"long_url": url},
json_body=True,
headers={"Authorization": f"Bearer {access_token}"}
)
page = utils.http.request(URL_BITLYSHORTEN, get_params={
"access_token": access_token, "longUrl": url}).json()
if resp.code == 200:
return resp.json()["link"]
if page["data"]:
return page["data"]["url"]
return None
def _find_url(self, target, args):
@ -117,4 +112,4 @@ class Module(ModuleManager.BaseModule):
event["stdout"].write("Unshortened: %s" %
response.headers["location"])
else:
event["stderr"].write("Failed to unshorten URL")
event["stderr"].write("Failed to unshorten URL")

View file

@ -2,7 +2,7 @@ import datetime, html, time
from src import utils
def _timestamp(dt):
seconds_since = time.time()-dt.replace(tzinfo=datetime.timezone.utc).timestamp()
seconds_since = time.time()-dt.timestamp()
timestamp = utils.datetime.format.to_pretty_since(
seconds_since, max_units=2)
return "%s ago" % timestamp

View file

@ -2,7 +2,7 @@
from src import ModuleManager, utils
URL_URBANDICTIONARY = "https://api.urbandictionary.com/v0/define"
URL_URBANDICTIONARY = "http://api.urbandictionary.com/v0/define"
class Module(ModuleManager.BaseModule):
_name = "UrbanDictionary"

View file

@ -69,7 +69,6 @@ class Module(ModuleManager.BaseModule):
celsius = "%dC" % page["main"]["temp"]
fahrenheit = "%dF" % ((page["main"]["temp"]*(9/5))+32)
kelvin = "%dK" % ((page["main"]["temp"])+273.15)
description = page["weather"][0]["description"].title()
humidity = "%s%%" % page["main"]["humidity"]
@ -82,11 +81,10 @@ class Module(ModuleManager.BaseModule):
location_str = "(%s) %s" % (nickname, location_str)
event["stdout"].write(
"%s | %s/%s/%s | %s | Humidity: %s | Wind: %s/%s" % (
location_str, celsius, fahrenheit, kelvin, description,
humidity, wind_speed_k, wind_speed_m))
"%s | %s/%s | %s | Humidity: %s | Wind: %s/%s" % (
location_str, celsius, fahrenheit, description, humidity,
wind_speed_k, wind_speed_m))
else:
event["stderr"].write("No weather information for this location")
else:
raise utils.EventResultsError()

View file

@ -32,7 +32,7 @@ class Module(ModuleManager.BaseModule):
for pod in page["queryresult"]["pods"]:
text = pod["subpods"][0]["plaintext"]
if pod["id"] == "Input" and text:
input = text.replace("\n", " | ")
input = text
elif pod.get("primary", False):
primaries.append(text)

View file

@ -36,49 +36,46 @@ class Module(ModuleManager.BaseModule):
def video_details(self, video_id):
page = self.get_video_page(video_id)
try:
if page["items"]:
item = page["items"][0]
snippet = item["snippet"]
statistics = item["statistics"]
content = item["contentDetails"]
if page["items"]:
item = page["items"][0]
snippet = item["snippet"]
statistics = item["statistics"]
content = item["contentDetails"]
video_uploaded_at = utils.datetime.parse.iso8601(
snippet["publishedAt"])
video_uploaded_at = utils.datetime.format.date_human(
video_uploaded_at)
video_uploaded_at = utils.datetime.parse.iso8601(
snippet["publishedAt"])
video_uploaded_at = utils.datetime.format.date_human(
video_uploaded_at)
video_uploader = snippet["channelTitle"]
video_title = utils.irc.bold(snippet["title"])
video_uploader = snippet["channelTitle"]
video_title = utils.irc.bold(snippet["title"])
video_views = self._number(statistics.get("viewCount"))
video_likes = self._number(statistics.get("likeCount"))
video_dislikes = self._number(statistics.get("dislikeCount"))
video_views = self._number(statistics.get("viewCount"))
video_likes = self._number(statistics.get("likeCount"))
video_dislikes = self._number(statistics.get("dislikeCount"))
video_opinions = ""
if video_likes and video_dislikes:
likes = utils.irc.color("%s%s" % (video_likes, ARROW_UP),
utils.consts.GREEN)
dislikes = utils.irc.color("%s%s" %
(ARROW_DOWN, video_dislikes), utils.consts.RED)
video_opinions = " (%s%s)" % (likes, dislikes)
video_opinions = ""
if video_likes and video_dislikes:
likes = utils.irc.color("%s%s" % (video_likes, ARROW_UP),
utils.consts.GREEN)
dislikes = utils.irc.color("%s%s" %
(ARROW_DOWN, video_dislikes), utils.consts.RED)
video_opinions = " (%s%s)" % (likes, dislikes)
video_views_str = ""
if video_views:
video_views_str = ", %s views" % video_views
video_views_str = ""
if video_views:
video_views_str = ", %s views" % video_views
td = utils.datetime.parse.iso8601_duration(content["duration"])
video_duration = utils.datetime.format.to_pretty_time(
td.total_seconds())
td = utils.datetime.parse.iso8601_duration(content["duration"])
video_duration = utils.datetime.format.to_pretty_time(
td.total_seconds())
url = URL_YOUTUBESHORT % video_id
url = URL_YOUTUBESHORT % video_id
return "%s (%s) uploaded by %s on %s%s%s" % (
video_title, video_duration, video_uploader, video_uploaded_at,
video_views_str, video_opinions), url
return None
except KeyError:
return None
return "%s (%s) uploaded by %s on %s%s%s" % (
video_title, video_duration, video_uploader, video_uploaded_at,
video_views_str, video_opinions), url
return None
def get_playlist_page(self, playlist_id):
self.log.debug("youtube API request: "
@ -89,19 +86,16 @@ class Module(ModuleManager.BaseModule):
"key": self.bot.config["google-api-key"]}).json()
def playlist_details(self, playlist_id):
page = self.get_playlist_page(playlist_id)
try:
if page["items"]:
item = page["items"][0]
snippet = item["snippet"]
content = item["contentDetails"]
if page["items"]:
item = page["items"][0]
snippet = item["snippet"]
content = item["contentDetails"]
count = content["itemCount"]
count = content["itemCount"]
return "%s - %s (%s %s)" % (snippet["channelTitle"],
snippet["title"], count, "video" if count == 1 else "videos"
), URL_PLAYLIST % playlist_id
except KeyError:
return None
return "%s - %s (%s %s)" % (snippet["channelTitle"],
snippet["title"], count, "video" if count == 1 else "videos"
), URL_PLAYLIST % playlist_id
def _from_url(self, url):
parsed = urllib.parse.urlparse(url)
@ -154,27 +148,23 @@ class Module(ModuleManager.BaseModule):
from_url = not url == None
if not url:
safe_setting = event["target"].get_setting("youtube-safesearch", True)
safe = "moderate" if safe_setting else "none"
try:
if not url:
safe_setting = event["target"].get_setting("youtube-safesearch", True)
safe = "moderate" if safe_setting else "none"
self.log.debug("youtube API request: search.list (B) [snippet]")
self.log.debug("youtube API request: search.list (B) [snippet]")
search_page = utils.http.request(URL_YOUTUBESEARCH,
get_params={"q": search, "part": "snippet", "maxResults": "1",
"type": "video", "key": self.bot.config["google-api-key"],
"safeSearch": safe}).json()
if search_page:
if search_page["pageInfo"]["totalResults"] > 0:
url = URL_VIDEO % search_page["items"][0]["id"]["videoId"]
else:
raise utils.EventError("No videos found")
search_page = utils.http.request(URL_YOUTUBESEARCH,
get_params={"q": search, "part": "snippet", "maxResults": "1",
"type": "video", "key": self.bot.config["google-api-key"],
"safeSearch": safe}).json()
if search_page:
if search_page["pageInfo"]["totalResults"] > 0:
url = URL_VIDEO % search_page["items"][0]["id"]["videoId"]
else:
raise utils.EventResultsError()
except KeyError:
raise utils.EventError("API error")
raise utils.EventError("No videos found")
else:
raise utils.EventResultsError()
if url:
out = self._from_url(url)

View file

@ -11,6 +11,8 @@ PySocks ==1.7.1
python-dateutil ==2.8.1
pytz ==2019.2
requests ==2.31.0
tornado ==6.3.2
scrypt ==0.8.13
suds-jurko ==0.6
tornado ==6.0.3
tweepy ==3.8.0
requests-toolbelt ==0.9.1

View file

@ -116,8 +116,7 @@ class Bot(object):
self._trigger_both()
return returned
func_queue: queue.Queue[typing.Tuple[TriggerResult, typing.Any]
] = queue.Queue(1)
func_queue = queue.Queue(1) # type: queue.Queue[str]
def _action():
try:
@ -135,8 +134,7 @@ class Bot(object):
if trigger_threads:
self._trigger_both()
if (type == TriggerResult.Exception and
isinstance(returned, Exception)):
if type == TriggerResult.Exception:
raise returned
elif type == TriggerResult.Return:
return returned

View file

@ -95,12 +95,8 @@ class Server(IRCObject.Object):
self.connection_params.bindhost,
self.connection_params.tls,
tls_verify=self.get_setting("ssl-verify", True),
cert=self.bot.config.get("tls-certificate", '').format(
DATA=self.bot.data_directory
) or None,
key=self.bot.config.get("tls-key", '').format(
DATA=self.bot.data_directory
))
cert=self.bot.config.get("tls-certificate", None),
key=self.bot.config.get("tls-key", None))
self.events.on("preprocess.connect").call(server=self)
self.socket.connect()

View file

@ -151,27 +151,6 @@ class Module(ModuleManager.BaseModule):
return
event["stdout"].write("Added server '%s'" % alias)
@utils.hook("received.command.delserver")
@utils.kwarg("help", "Delete a server")
@utils.kwarg("pemission", "delserver")
@utils.spec("!<alias>word")
def del_server(self, event):
alias = event["spec"][0]
sid = self.bot.database.servers.by_alias(alias)
if sid == None:
event["stderr"].write("Server '%s' does not exist" % alias)
return
if self._server_from_alias(alias):
event["stderr"].write("You must disconnect from %s before deleting it" % alias)
return
try:
self.bot.database.servers.delete(sid)
except Exception as e:
event["stderr"].write("Failed to delete server")
self.log.error("failed to add server \"%s\"", [alias], exc_info=True)
return
event["stderr"].write("Server '%s' has been deleted" % alias)
@utils.hook("received.command.editserver")
@utils.kwarg("help", "Edit server details")
@utils.kwarg("permission", "editserver")

View file

@ -12,23 +12,12 @@ class Module(ModuleManager.BaseModule):
return channel.get_user_setting(user.get_id(), "ignore", False)
def _server_command_ignored(self, server, command):
return server.get_setting("ignore-%s" % command, False)
def _channel_command_ignored(self, channel, command):
return channel.get_setting("ignore-command-%s" % command, False)
def _is_command_ignored(self, event):
if self._user_command_ignored(event["user"], event["command"]):
def _is_command_ignored(self, server, user, command):
if self._user_command_ignored(user, command):
return True
elif self._server_command_ignored(event["server"], event["command"]):
elif self._server_command_ignored(server, command):
return True
elif event["is_channel"] and self._channel_command_ignored(event["target"], event["command"]):
return True
def _is_valid_command(self, command):
hooks = self.events.on("received.command").on(command).get_hooks()
if hooks:
return True
else:
return False
@utils.hook("received.message.private")
@utils.hook("received.message.channel")
@ -49,7 +38,8 @@ class Module(ModuleManager.BaseModule):
elif event["is_channel"] and self._user_channel_ignored(event["target"],
event["user"]):
return utils.consts.PERMISSION_HARD_FAIL, None
elif self._is_command_ignored(event):
elif self._is_command_ignored(event["server"], event["user"],
event["command"]):
return utils.consts.PERMISSION_HARD_FAIL, None
@utils.hook("received.command.ignore", min_args=1)
@ -133,38 +123,6 @@ class Module(ModuleManager.BaseModule):
True)
event["stdout"].write("Ignoring %s" % target_user.nickname)
@utils.hook("received.command.ignorecommand",
help="Ignore a command in this channel")
@utils.hook("received.command.unignorecommand",
help="Unignore a command in this channel")
@utils.kwarg("channel_only", True)
@utils.kwarg("min_args", 1)
@utils.kwarg("usage", "<command>")
@utils.kwarg("permission", "cignore")
@utils.kwarg("require_mode", "o")
@utils.kwarg("require_access", "high,cignore")
def cignore_command(self, event):
remove = event["command"] == "unignorecommand"
command = event["args_split"][0]
if not self._is_valid_command(command):
raise utils.EventError("Unknown command '%s'" % command)
is_ignored = self._channel_command_ignored(event["target"], command)
if remove:
if not is_ignored:
raise utils.EventError("I'm not ignoring '%s' in this channel" %
target_user.nickname)
event["target"].del_setting("ignore-command-%s" % command)
event["stdout"].write("Unignored '%s' command" % command)
else:
if is_ignored:
raise utils.EventError("I'm already ignoring '%s' in this channel"
% command)
event["target"].set_setting("ignore-command-%s" % command, True)
event["stdout"].write("Ignoring '%s' command" % command)
@utils.hook("received.command.serverignore")
@utils.kwarg("help", "Ignore a command on the current server")
@utils.kwarg("permission", "serverignore")

View file

@ -5,15 +5,15 @@ def bool_input(s: str):
return not result or result[0].lower() in ["", "y"]
def add_server():
alias = input("alias (display name): ")
hostname = input("hostname (address of server): ")
alias = input("alias: ")
hostname = input("hostname: ")
port = int(input("port: "))
tls = bool_input("tls?")
password = input("password (optional, leave blank to skip): ")
password = input("password?: ")
nickname = input("nickname: ")
username = input("username (optional): ")
realname = input("realname (optional): ")
bindhost = input("bindhost (optional): ")
username = input("username: ")
realname = input("realname: ")
bindhost = input("bindhost?: ")
return irc.IRCConnectionParameters(-1, alias, hostname, port, password, tls,
bindhost, nickname, username, realname)

View file

@ -7,7 +7,7 @@ from requests_toolbelt.adapters import source
REGEX_URL = re.compile("https?://\S+", re.I)
PAIRED_CHARACTERS = [("<", ">"), ("(", ")")]
PAIRED_CHARACTERS = ["<>", "()"]
# best-effort tidying up of URLs
def url_sanitise(url: str):
@ -316,7 +316,7 @@ class Client(object):
request_many = request_many
def strip_html(s: str) -> str:
return bs4.BeautifulSoup(s, "lxml").get_text()
return bs4.BeautifulSoup(s, "html5lib").get_text()
def resolve_hostname(hostname: str) -> typing.List[str]:
try:

View file

@ -44,7 +44,7 @@ class SedMatch(Sed):
return None
def _sed_split(s: str) -> typing.List[str]:
tokens = _tokens(s, s[1])
tokens = _tokens(s, "/")
if tokens and (not tokens[-1] == (len(s)-1)):
tokens.append(len(s))

View file

@ -25,20 +25,13 @@ def ssl_wrap(sock: socket.socket, cert: str=None, key: str=None,
def constant_time_compare(a: typing.AnyStr, b: typing.AnyStr) -> bool:
return hmac.compare_digest(a, b)
import hashlib
import scrypt
def password(byte_n: int=32) -> str:
return binascii.hexlify(os.urandom(byte_n)).decode("utf8")
def salt(byte_n: int=64) -> str:
return base64.b64encode(os.urandom(byte_n)).decode("utf8")
def hash(given_salt: str, data: str):
hash = hashlib.scrypt(
data.encode("utf8"),
salt=given_salt.encode("utf8"),
n=1<<14,
r=8,
p=1
)
return base64.b64encode(hash).decode("ascii")
return base64.b64encode(scrypt.hash(data, given_salt)).decode("utf8")
def hash_verify(salt: str, data: str, compare: str):
given_hash = hash(salt, data)
return constant_time_compare(given_hash, compare)