Move src/Utils.py in to src/utils/, splitting functionality out in to modules of

related functionality
This commit is contained in:
jesopo 2018-10-03 13:22:37 +01:00
parent 760c9d8844
commit 69d58eede2
80 changed files with 870 additions and 853 deletions

View file

@ -1,5 +1,5 @@
import random
from src import ModuleManager, Utils
from src import ModuleManager, utils
CHOICES = [
"Definitely",
@ -19,7 +19,8 @@ CHOICES = [
"It is certain",
"Naturally",
"Reply hazy, try again later",
Utils.underline(Utils.color("DO NOT WASTE MY TIME", Utils.COLOR_RED)),
utils.irc.underline(utils.irc.color("DO NOT WASTE MY TIME",
utils.irc.COLOR_RED)),
"Hmm... Could be!",
"I'm leaning towards no",
"Without a doubt",
@ -29,11 +30,11 @@ CHOICES = [
]
class Module(ModuleManager.BaseModule):
@Utils.hook("received.command.8ball", min_args=1)
@utils.hook("received.command.8ball", min_args=1)
def decide(selfs, event):
"""
:help: Ask the mystic 8ball a question!
:usage: <question>
"""
event["stdout"].write("You shake the magic ball... it "
"says " + Utils.bold(random.choice(CHOICES)))
"says " + utils.irc.bold(random.choice(CHOICES)))

View file

@ -1,10 +1,10 @@
from src import ModuleManager, Utils
from src import ModuleManager, utils
@Utils.export("serverset", {"setting": "accept-invites",
@utils.export("serverset", {"setting": "accept-invites",
"help": "Set whether I accept invites on this server",
"validate": Utils.bool_or_none})
"validate": utils.bool_or_none})
class Module(ModuleManager.BaseModule):
@Utils.hook("received.invite")
@utils.hook("received.invite")
def on_invite(self, event):
if event["server"].is_own_nickname(event["target_user"].nickname):
if event["server"].get_setting("accept-invites", True):

View file

@ -1,7 +1,7 @@
from src import ModuleManager, Utils
from src import ModuleManager, utils
class Module(ModuleManager.BaseModule):
@Utils.hook("received.command.changenickname", min_args=1)
@utils.hook("received.command.changenickname", min_args=1)
def change_nickname(self, event):
"""
:help: Change my nickname
@ -11,7 +11,7 @@ class Module(ModuleManager.BaseModule):
nickname = event["args_split"][0]
event["server"].send_nick(nickname)
@Utils.hook("received.command.raw", min_args=1)
@utils.hook("received.command.raw", min_args=1)
def raw(self, event):
"""
:help: Send a line of raw IRC data
@ -20,7 +20,7 @@ class Module(ModuleManager.BaseModule):
"""
event["server"].send(event["args"])
@Utils.hook("received.command.part")
@utils.hook("received.command.part")
def part(self, event):
"""
:help: Part from the current or given channel
@ -35,7 +35,7 @@ class Module(ModuleManager.BaseModule):
event["stderr"].write("No channel provided")
event["server"].send_part(target)
@Utils.hook("received.command.reconnect")
@utils.hook("received.command.reconnect")
def reconnect(self, event):
"""
:help: Reconnect to the current network

View file

@ -1,7 +1,7 @@
from src import ModuleManager, Utils
from src import ModuleManager, utils
@Utils.export("channelset", {"setting": "automode",
"help": "Disable/Enable automode", "validate": Utils.bool_or_none})
@utils.export("channelset", {"setting": "automode",
"help": "Disable/Enable automode", "validate": utils.bool_or_none})
class Module(ModuleManager.BaseModule):
_name = "AutoMode"
@ -13,10 +13,10 @@ class Module(ModuleManager.BaseModule):
channel.send_mode("+%s" % "".join(modes),
" ".join([user.nickname for mode in modes]))
@Utils.hook("received.join")
@utils.hook("received.join")
def on_join(self, event):
self._check_modes(event["channel"], event["user"])
@Utils.hook("received.account")
@utils.hook("received.account")
def on_account(self, event):
for channel in event["user"].channels:
self._check_modes(channel, event["user"])
@ -51,7 +51,7 @@ class Module(ModuleManager.BaseModule):
event["stdout"].write("Removed automode %s from '%s'" % (
mode_name, target_user.nickname))
@Utils.hook("received.command.addop", min_args=1, channel_only=True)
@utils.hook("received.command.addop", min_args=1, channel_only=True)
def add_op(self, event):
"""
:help: Add a user to the auto-mode list as an op
@ -59,7 +59,7 @@ class Module(ModuleManager.BaseModule):
:require_mode: o
"""
self._add_mode(event, "o", "op")
@Utils.hook("received.command.removeop", min_args=1, channel_only=True)
@utils.hook("received.command.removeop", min_args=1, channel_only=True)
def remove_op(self, event):
"""
:help: Remove a user from the auto-mode list as an op
@ -68,7 +68,7 @@ class Module(ModuleManager.BaseModule):
"""
self._remove_mode(event, "o", "op")
@Utils.hook("received.command.addvoice", min_args=1, channel_only=True)
@utils.hook("received.command.addvoice", min_args=1, channel_only=True)
def add_voice(self, event):
"""
:help: Add a user to the auto-mode list as a voice
@ -76,7 +76,7 @@ class Module(ModuleManager.BaseModule):
:require_mode: o
"""
self._add_mode(event, "v", "voice")
@Utils.hook("received.command.removevoice", min_args=1, channel_only=True)
@utils.hook("received.command.removevoice", min_args=1, channel_only=True)
def remove_voice(self, event):
"""
:help: Remove a user from the auto-mode list as a voice

View file

@ -1,16 +1,16 @@
from src import ModuleManager, Utils
from src import ModuleManager, utils
class Module(ModuleManager.BaseModule):
_name = "BTC"
@Utils.hook("received.command.btc")
@utils.hook("received.command.btc")
def btc(self, event):
"""
:help: Get the exchange rate of bitcoins
:usage: [currency]
"""
currency = (event["args"] or "USD").upper()
page = Utils.get_url("https://blockchain.info/ticker",
page = utils.http.get_url("https://blockchain.info/ticker",
json=True)
if page:
if currency in page:

View file

@ -1,5 +1,5 @@
import json, re
from src import ModuleManager, Utils
from src import ModuleManager, utils
URL_GOOGLEBOOKS = "https://www.googleapis.com/books/v1/volumes"
URL_BOOKINFO = "https://books.google.co.uk/books?id=%s"
@ -9,7 +9,7 @@ class Module(ModuleManager.BaseModule):
_name = "ISBN"
def get_book(self, query, event):
page = Utils.get_url(URL_GOOGLEBOOKS, get_params={
page = utils.http.get_url(URL_GOOGLEBOOKS, get_params={
"q": query, "country": "us"}, json=True)
if page:
if page["totalItems"] > 0:
@ -36,7 +36,7 @@ class Module(ModuleManager.BaseModule):
else:
event["stderr"].write("Failed to load results")
@Utils.hook("received.command.isbn", min_args=1)
@utils.hook("received.command.isbn", min_args=1)
def isbn(self, event):
"""
:help: Get book information from a provided ISBN
@ -48,7 +48,7 @@ class Module(ModuleManager.BaseModule):
isbn = isbn.replace("-", "")
self.get_book("isbn:%s" % isbn, event)
@Utils.hook("received.command.book", min_args=1)
@utils.hook("received.command.book", min_args=1)
def book(self, event):
"""
:help: Get book information from a provided title

View file

@ -1,9 +1,9 @@
from src import ModuleManager, Utils
from src import ModuleManager, utils
@Utils.export("serverset", {"setting": "bot-channel",
@utils.export("serverset", {"setting": "bot-channel",
"help": "Set main channel"})
class Module(ModuleManager.BaseModule):
@Utils.hook("received.numeric.001")
@utils.hook("received.numeric.001")
def do_join(self, event):
event["server"].send_join(event["server"].get_setting("bot-channel",
"#bitbot"))

View file

@ -1,25 +1,25 @@
from src import ModuleManager, Utils
from src import ModuleManager, utils
class UserNotFoundException(Exception):
pass
class InvalidTimeoutException(Exception):
pass
@Utils.export("channelset", {"setting": "highlight-spam-threshold",
@utils.export("channelset", {"setting": "highlight-spam-threshold",
"help": "Set the number of nicknames in a message that qualifies as spam",
"validate": Utils.int_or_none})
@Utils.export("channelset", {"setting": "highlight-spam-protection",
"validate": utils.int_or_none})
@utils.export("channelset", {"setting": "highlight-spam-protection",
"help": "Enable/Disable highlight spam protection",
"validate": Utils.bool_or_none})
@Utils.export("channelset", {"setting": "highlight-spam-ban",
"validate": utils.bool_or_none})
@utils.export("channelset", {"setting": "highlight-spam-ban",
"help": "Enable/Disable banning highlight spammers "
"instead of just kicking", "validate": Utils.bool_or_none})
@Utils.export("channelset", {"setting": "ban-format",
"instead of just kicking", "validate": utils.bool_or_none})
@utils.export("channelset", {"setting": "ban-format",
"help": "Set ban format ($n = nick, $u = username, $h = hostname)"})
class Module(ModuleManager.BaseModule):
_name = "Channel Op"
@Utils.hook("timer.unban")
@utils.hook("timer.unban")
def _timer_unban(self, event):
server = self.bot.get_server(event["server_id"])
if server.has_channel(event["channel_name"]):
@ -33,7 +33,7 @@ class Module(ModuleManager.BaseModule):
else:
raise UserNotFoundException("That user is not in this channel")
@Utils.hook("received.command.kick|k", channel_only=True, min_args=1)
@utils.hook("received.command.kick|k", channel_only=True, min_args=1)
def kick(self, event):
"""
:help: Kick a user from the current channel
@ -77,7 +77,7 @@ class Module(ModuleManager.BaseModule):
event["target"].send_unban(target)
return target
@Utils.hook("received.command.ban", channel_only=True, min_args=1)
@utils.hook("received.command.ban", channel_only=True, min_args=1)
def ban(self, event):
"""
:help: Ban a user/hostmask from the current channel
@ -88,7 +88,7 @@ class Module(ModuleManager.BaseModule):
event["args_split"][0])
def _temp_ban(self, event, accept_hostmask):
timeout = Utils.from_pretty_time(event["args_split"][1])
timeout = utils.from_pretty_time(event["args_split"][1])
if not timeout:
raise InvalidTimeoutException(
"Please provided a valid time above 0 seconds")
@ -103,7 +103,7 @@ class Module(ModuleManager.BaseModule):
self.bot.timers.add_persistent("unban", timeout,
server_id=event["server"].id,
channel_name=event["target"].name, hostmask=hostmask)
@Utils.hook("received.command.tempban|tb", channel_only=True, min_args=2)
@utils.hook("received.command.tempban|tb", channel_only=True, min_args=2)
def temp_ban(self, event):
"""
:help: Temporarily ban someone from the current channel
@ -115,7 +115,7 @@ class Module(ModuleManager.BaseModule):
except InvalidTimeoutException as e:
event["stderr"].set_prefix("Tempban")
event["stderr"].write(str(e))
@Utils.hook("received.command.tempkickban|tkb", channel_only=True,
@utils.hook("received.command.tempkickban|tkb", channel_only=True,
min_args=2)
def temp_kick_ban(self, event):
"""
@ -134,7 +134,7 @@ class Module(ModuleManager.BaseModule):
except UserNotFoundException as e:
event["stderr"].write(str(e))
@Utils.hook("received.command.unban", channel_only=True, min_args=1)
@utils.hook("received.command.unban", channel_only=True, min_args=1)
def unban(self, event):
"""
:help: Unban a user/hostmask from the current channel
@ -144,7 +144,7 @@ class Module(ModuleManager.BaseModule):
self._ban(event["server"], event["target"], False,
event["args_split"][0])
@Utils.hook("received.command.kickban|kb", channel_only=True, min_args=1)
@utils.hook("received.command.kickban|kb", channel_only=True, min_args=1)
def kickban(self, event):
"""
:help: Kick and ban a user from the current channel
@ -160,7 +160,7 @@ class Module(ModuleManager.BaseModule):
event["stderr"].set_prefix("Kickban")
event["stderr"].write(str(e))
@Utils.hook("received.command.op", channel_only=True)
@utils.hook("received.command.op", channel_only=True)
def op(self, event):
"""
:help: Op a user in the current channel
@ -170,7 +170,7 @@ class Module(ModuleManager.BaseModule):
target = event["user"].nickname if not event["args_split"] else event[
"args_split"][0]
event["target"].send_mode("+o", target)
@Utils.hook("received.command.deop", channel_only=True)
@utils.hook("received.command.deop", channel_only=True)
def deop(self, event):
"""
:help: Remove op from a user in the current channel
@ -181,7 +181,7 @@ class Module(ModuleManager.BaseModule):
"args_split"][0]
event["target"].send_mode("-o", target)
@Utils.hook("received.command.voice", channel_only=True)
@utils.hook("received.command.voice", channel_only=True)
def voice(self, event):
"""
:help: Voice a user in the current channel
@ -191,7 +191,7 @@ class Module(ModuleManager.BaseModule):
target = event["user"].nickname if not event["args_split"] else event[
"args_split"][0]
event["target"].send_mode("+v", target)
@Utils.hook("received.command.devoice", channel_only=True)
@utils.hook("received.command.devoice", channel_only=True)
def devoice(self, event):
"""
:help: Remove voice from a user in the current channel
@ -202,7 +202,7 @@ class Module(ModuleManager.BaseModule):
"args_split"][0]
event["target"].send_mode("-v", target)
@Utils.hook("received.command.topic", min_args=1, channel_only=True)
@utils.hook("received.command.topic", min_args=1, channel_only=True)
def topic(self, event):
"""
:help: Set the topic in the current channel
@ -210,7 +210,7 @@ class Module(ModuleManager.BaseModule):
:require_mode: o
"""
event["target"].send_topic(event["args"])
@Utils.hook("received.command.tappend", min_args=1, channel_only=True)
@utils.hook("received.command.tappend", min_args=1, channel_only=True)
def tappend(self, event):
"""
:help: Append to the topic in the current channel
@ -219,7 +219,7 @@ class Module(ModuleManager.BaseModule):
"""
event["target"].send_topic(event["target"].topic + event["args"])
@Utils.hook("received.message.channel")
@utils.hook("received.message.channel")
def highlight_spam(self, event):
if event["channel"].get_setting("highlight-spam-protection", False):
nicknames = list(map(lambda user: user.nickname,
@ -238,7 +238,7 @@ class Module(ModuleManager.BaseModule):
event["channel"].send_kick(event["user"].nickname,
"highlight spam detected")
@Utils.hook("received.command.leave", channel_only=True)
@utils.hook("received.command.leave", channel_only=True)
def leave(self, event):
"""
:help: Part me from the current channel

View file

@ -1,7 +1,7 @@
from src import ModuleManager, Utils
from src import ModuleManager, utils
class Module(ModuleManager.BaseModule):
@Utils.hook("received.numeric.001")
@utils.hook("received.numeric.001")
def on_connect(self, event):
channels = event["server"].get_setting("autojoin", [])
chan_keys = event["server"].get_setting("channel_keys", {})
@ -20,7 +20,7 @@ class Module(ModuleManager.BaseModule):
event["server"].send_join(
",".join(channels_sorted), ",".join(keys_sorted))
@Utils.hook("self.join")
@utils.hook("self.join")
def on_join(self, event):
channels = event["server"].get_setting("autojoin", [])
if not event["channel"].name in channels:
@ -33,10 +33,10 @@ class Module(ModuleManager.BaseModule):
channels.remove(channel_name)
server.set_setting("autojoin", channels)
@Utils.hook("self.part")
@utils.hook("self.part")
def on_part(self, event):
self._remove_channel(event["server"], event["channel"].name)
@Utils.hook("self.kick")
@utils.hook("self.kick")
def on_kick(self, event):
self._remove_channel(event["server"], event["channel"].name)

View file

@ -1,8 +1,8 @@
from src import ModuleManager, Utils
from src import ModuleManager, utils
class Module(ModuleManager.BaseModule):
@Utils.hook("preprocess.command")
@utils.hook("preprocess.command")
def preprocess_command(self, event):
require_mode = event["hook"].get_kwarg("require_mode")
if event["is_channel"] and require_mode:

View file

@ -1,28 +1,28 @@
#--require-config virustotal-api-key
import re
from src import ModuleManager, Utils
from src import ModuleManager, utils
URL_VIRUSTOTAL = "https://www.virustotal.com/vtapi/v2/url/report"
RE_URL = re.compile(r"https?://\S+", re.I)
@Utils.export("channelset", {"setting": "check-urls",
@utils.export("channelset", {"setting": "check-urls",
"help": "Enable/Disable automatically checking for malicious URLs",
"validate": Utils.bool_or_none})
@Utils.export("serverset", {"setting": "check-urls",
"validate": utils.bool_or_none})
@utils.export("serverset", {"setting": "check-urls",
"help": "Enable/Disable automatically checking for malicious URLs",
"validate": Utils.bool_or_none})
@Utils.export("channelset", {"setting": "check-urls-kick",
"validate": utils.bool_or_none})
@utils.export("channelset", {"setting": "check-urls-kick",
"help": "Enable/Disable automatically kicking users that "
"send malicious URLs", "validate": Utils.bool_or_none})
"send malicious URLs", "validate": utils.bool_or_none})
class Module(ModuleManager.BaseModule):
@Utils.hook("received.message.channel")
@utils.hook("received.message.channel")
def message(self, event):
match = RE_URL.search(event["message"])
if match and event["channel"].get_setting("check-urls",
event["server"].get_setting("check-urls", False)):
url = match.group(0)
page = Utils.get_url(URL_VIRUSTOTAL, get_params={
page = utils.http.get_url(URL_VIRUSTOTAL, get_params={
"apikey": self.bot.config["virustotal-api-key"],
"resource": url}, json=True)

View file

@ -1,5 +1,5 @@
import datetime, decimal, math, random, re, time
from src import Utils
from src import utils
SIDES = {"heads": 0, "tails": 1}
DEFAULT_REDEEM_DELAY = 600 # 600 seconds, 10 minutes
@ -36,7 +36,7 @@ class Module(object):
bot.timers.add("coin-interest", INTEREST_INTERVAL,
time.time()+until_next_hour)
@Utils.hook("received.command.coins")
@utils.hook("received.command.coins")
def coins(self, event):
"""
:help: Show how many coins you have
@ -49,7 +49,7 @@ class Module(object):
event["stdout"].write("%s has %s coin%s" % (target.nickname,
"{0:.2f}".format(coins), "" if coins == 1 else "s"))
@Utils.hook("received.command.resetcoins", min_args=1)
@utils.hook("received.command.resetcoins", min_args=1)
def reset_coins(self, event):
"""
:help: Reset a user's coins to 0
@ -65,7 +65,7 @@ class Module(object):
target.del_setting("coins")
event["stdout"].write("Reset coins for %s" % target.nickname)
@Utils.hook("received.command.givecoins", min_args=1)
@utils.hook("received.command.givecoins", min_args=1)
def give_coins(self, event):
"""
:help: Give coins to a user
@ -85,7 +85,7 @@ class Module(object):
event["stdout"].write("Gave '%s' %s coins" % (target.nickname,
str(coins)))
@Utils.hook("received.command.richest")
@utils.hook("received.command.richest")
def richest(self, event):
"""
:help: Show the top 10 richest users
@ -98,7 +98,7 @@ class Module(object):
top_10 = sorted(all_coins.keys())
top_10 = sorted(top_10, key=all_coins.get, reverse=True)[:10]
top_10 = ", ".join("%s (%s)" % (Utils.prevent_highlight(
top_10 = ", ".join("%s (%s)" % (utils.prevent_highlight(
event["server"].get_user(nickname).nickname), "{0:.2f}".format(
all_coins[nickname])) for nickname in top_10)
event["stdout"].write("Richest users: %s" % top_10)
@ -106,7 +106,7 @@ class Module(object):
def _redeem_cache(self, server, user):
return "redeem|%s|%s@%s" % (server.id, user.username, user.hostname)
@Utils.hook("received.command.redeemcoins")
@utils.hook("received.command.redeemcoins")
def redeem_coins(self, event):
"""
:help: Redeem your free coins
@ -129,12 +129,12 @@ class Module(object):
else:
time_left = self.bot.cache.until_expiration(cache)
event["stderr"].write("Please wait %s before redeeming" %
Utils.to_pretty_time(math.ceil(time_left)))
utils.to_pretty_time(math.ceil(time_left)))
else:
event["stderr"].write(
"You can only redeem coins when you have none")
@Utils.hook("received.command.flip", min_args=2, authenticated=True)
@utils.hook("received.command.flip", min_args=2, authenticated=True)
def flip(self, event):
"""
:help: Bet on a coin flip
@ -178,7 +178,7 @@ class Module(object):
event["user"].nickname, side_name, coin_bet_str,
"" if coin_bet == 1 else "s"))
@Utils.hook("received.command.sendcoins", min_args=2, authenticated=True)
@utils.hook("received.command.sendcoins", min_args=2, authenticated=True)
def send(self, event):
"""
:help: Send coins to another user
@ -230,7 +230,7 @@ class Module(object):
event["user"].nickname, "{0:.2f}".format(send_amount),
target_user.nickname))
@Utils.hook("timer.coin-interest")
@utils.hook("timer.coin-interest")
def interest(self, event):
for server in self.bot.servers.values():
all_coins = server.get_all_user_settings(
@ -247,7 +247,7 @@ class Module(object):
str(coins))
event["timer"].redo()
@Utils.hook("received.command.roulette", min_args=2, authenticated=True)
@utils.hook("received.command.roulette", min_args=2, authenticated=True)
def roulette(self, event):
"""
:help: Spin a roulette wheel

View file

@ -1,7 +1,7 @@
import re
from src import EventManager, ModuleManager, Utils
from src import EventManager, ModuleManager, utils
STR_MORE = "%s (more...)" % Utils.FONT_RESET
STR_MORE = "%s (more...)" % utils.irc.FONT_RESET
STR_CONTINUED = "(...continued) "
COMMAND_METHOD = "command-method"
@ -41,7 +41,7 @@ class Out(object):
if self._msgid:
tags["+draft/reply"] = self._msgid
prefix = Utils.FONT_RESET + "[%s] " % self.prefix()
prefix = utils.irc.FONT_RESET + "[%s] " % self.prefix()
method = self._get_method()
if method == "PRIVMSG":
self.target.send_message(text, prefix=prefix, tags=tags)
@ -60,29 +60,31 @@ class Out(object):
class StdOut(Out):
def prefix(self):
return Utils.color(Utils.bold(self.module_name), Utils.COLOR_GREEN)
return utils.irc.color(utils.irc.bold(self.module_name),
utils.irc.COLOR_GREEN)
class StdErr(Out):
def prefix(self):
return Utils.color(Utils.bold("!"+self.module_name), Utils.COLOR_RED)
return utils.irc.color(utils.irc.bold("!"+self.module_name),
utils.irc.COLOR_RED)
def _command_method_validate(s):
if s.upper() in COMMAND_METHODS:
return s.upper()
@Utils.export("channelset", {"setting": "command-prefix",
@utils.export("channelset", {"setting": "command-prefix",
"help": "Set the command prefix used in this channel"})
@Utils.export("serverset", {"setting": "command-prefix",
@utils.export("serverset", {"setting": "command-prefix",
"help": "Set the command prefix used on this server"})
@Utils.export("serverset", {"setting": "identity-mechanism",
@utils.export("serverset", {"setting": "identity-mechanism",
"help": "Set the identity mechanism for this server"})
@Utils.export("serverset", {"setting": "command-method",
@utils.export("serverset", {"setting": "command-method",
"help": "Set the method used to respond to commands",
"validate": _command_method_validate})
@Utils.export("channelset", {"setting": "command-method",
@utils.export("channelset", {"setting": "command-method",
"help": "Set the method used to respond to commands",
"validate": _command_method_validate})
class Module(ModuleManager.BaseModule):
@Utils.hook("new.user|channel")
@utils.hook("new.user|channel")
def new(self, event):
if "user" in event:
target = event["user"]
@ -165,7 +167,7 @@ class Module(ModuleManager.BaseModule):
target.buffer.skip_next()
event.eat()
@Utils.hook("received.message.channel", priority=EventManager.PRIORITY_LOW)
@utils.hook("received.message.channel", priority=EventManager.PRIORITY_LOW)
def channel_message(self, event):
command_prefix = event["channel"].get_setting("command-prefix",
event["server"].get_setting("command-prefix", "!"))
@ -178,7 +180,7 @@ class Module(ModuleManager.BaseModule):
command = event["message_split"][1].lower()
self.message(event, command, 2)
@Utils.hook("received.message.private", priority=EventManager.PRIORITY_LOW)
@utils.hook("received.message.private", priority=EventManager.PRIORITY_LOW)
def private_message(self, event):
if event["message_split"]:
command = event["message_split"][0].lower()
@ -189,7 +191,7 @@ class Module(ModuleManager.BaseModule):
def _get_usage(self, hook):
return hook.get_kwarg("usage", None)
@Utils.hook("received.command.help")
@utils.hook("received.command.help")
def help(self, event):
"""
:help: Show help for a given command
@ -218,7 +220,7 @@ class Module(ModuleManager.BaseModule):
help_available = sorted(help_available)
event["stdout"].write("Commands: %s" % ", ".join(help_available))
@Utils.hook("received.command.usage", min_args=1)
@utils.hook("received.command.usage", min_args=1)
def usage(self, event):
"""
:help: Show the usage for a given command
@ -243,7 +245,7 @@ class Module(ModuleManager.BaseModule):
else:
event["stderr"].write("Unknown command '%s'" % command)
@Utils.hook("received.command.more", skip_out=True)
@utils.hook("received.command.more", skip_out=True)
def more(self, event):
"""
:help: Show more output from the last command
@ -251,7 +253,7 @@ class Module(ModuleManager.BaseModule):
if event["target"].last_stdout and event["target"].last_stdout.has_text():
event["target"].last_stdout.send()
@Utils.hook("received.command.ignore", min_args=1)
@utils.hook("received.command.ignore", min_args=1)
def ignore(self, event):
"""
:help: Ignore commands from a given user
@ -266,7 +268,7 @@ class Module(ModuleManager.BaseModule):
user.set_setting("ignore", True)
event["stdout"].write("Now ignoring '%s'" % user.nickname)
@Utils.hook("received.command.unignore", min_args=1)
@utils.hook("received.command.unignore", min_args=1)
def unignore(self, event):
"""
:help: Unignore commands from a given user
@ -280,14 +282,14 @@ class Module(ModuleManager.BaseModule):
user.set_setting("ignore", False)
event["stdout"].write("Removed ignore for '%s'" % user.nickname)
@Utils.hook("send.stdout")
@utils.hook("send.stdout")
def send_stdout(self, event):
stdout = StdOut(event["server"], event["module_name"],
event["target"], event.get("msgid", None))
stdout.write(event["message"]).send()
if stdout.has_text():
event["target"].last_stdout = stdout
@Utils.hook("send.stderr")
@utils.hook("send.stderr")
def send_stderr(self, event):
stderr = StdErr(event["server"], event["module_name"],
event["target"], event.get("msgid", None))

View file

@ -1,11 +1,11 @@
import datetime
from src import ModuleManager, Utils
from src import ModuleManager, utils
@Utils.export("serverset", {"setting": "ctcp-responses",
@utils.export("serverset", {"setting": "ctcp-responses",
"help": "Set whether I respond to CTCPs on this server",
"validate": Utils.bool_or_none})
"validate": utils.bool_or_none})
class Module(ModuleManager.BaseModule):
@Utils.hook("received.message.private")
@utils.hook("received.message.private")
def private_message(self, event):
if event["message"][0] == "\x01" and event["message"][-1] == "\x01":
if event["server"].get_setting("ctcp-responses", True):

View file

@ -1,5 +1,5 @@
import datetime, glob, os, shutil, time
from src import Utils
from src import utils
BACKUP_INTERVAL = 60*60 # 1 hour
BACKUP_COUNT = 5
@ -14,7 +14,7 @@ class Module(object):
bot.timers.add("database-backup", BACKUP_INTERVAL,
time.time()+until_next_hour)
@Utils.hook("timer.database-backup")
@utils.hook("timer.database-backup")
def backup(self, event):
location = self.bot.database.location
files = glob.glob("%s.*" % location)

View file

@ -1,7 +1,7 @@
#--require-config wordnik-api-key
import time
from src import ModuleManager, Utils
from src import ModuleManager, utils
URL_WORDNIK = "https://api.wordnik.com/v4/word.json/%s/definitions"
URL_WORDNIK_RANDOM = "https://api.wordnik.com/v4/words.json/randomWord"
@ -12,14 +12,14 @@ class Module(ModuleManager.BaseModule):
_last_called = 0
def _get_definition(self, word):
page = Utils.get_url(URL_WORDNIK % word, get_params={
page = utils.http.get_url(URL_WORDNIK % word, get_params={
"useCanonical": "true", "limit": 1,
"sourceDictionaries": "wiktionary", "api_key": self.bot.config[
"wordnik-api-key"]}, json=True)
return page
@Utils.hook("received.command.define")
@utils.hook("received.command.define")
def define(self, event):
"""
:help: Define a provided term
@ -40,7 +40,7 @@ class Module(ModuleManager.BaseModule):
else:
event["stderr"].write("Failed to load results")
@Utils.hook("received.command.randomword")
@utils.hook("received.command.randomword")
def random_word(self, event):
"""
:help: Define a random word
@ -49,7 +49,7 @@ class Module(ModuleManager.BaseModule):
RANDOM_DELAY_SECONDS):
self._last_called = time.time()
page = Utils.get_url(URL_WORDNIK_RANDOM, get_params={
page = utils.http.get_url(URL_WORDNIK_RANDOM, get_params={
"api_key":self.bot.config["wordnik-api-key"],
"min_dictionary_count":1},json=True)
if page and len(page):

View file

@ -1,10 +1,10 @@
import random
from src import ModuleManager, Utils
from src import ModuleManager, utils
ERROR_FORMAT = "Incorrect format! Format must be [number]d[number], e.g. 1d20"
class Module(ModuleManager.BaseModule):
@Utils.hook("received.command.roll", min_args=1)
@utils.hook("received.command.roll", min_args=1)
def roll_dice(self, event):
"""
:help: Roll some dice, DND style!
@ -35,6 +35,5 @@ class Module(ModuleManager.BaseModule):
total = sum(results)
results = ', '.join(map(str, results))
event["stdout"].write("Rolled " + Utils.bold(str_roll) + " for a total "
+ "of " + Utils.bold(str(total))
+ ": " + results)
event["stdout"].write("Rolled %s for a total of %d: %s" % (
str_roll, str(total), results))

View file

@ -1,10 +1,10 @@
import socket
from src import ModuleManager, Utils
from src import ModuleManager, utils
class Module(ModuleManager.BaseModule):
_name = "DNS"
@Utils.hook("received.command.dns", min_args=1)
@utils.hook("received.command.dns", min_args=1)
def dns(self, event):
"""
:help: Get all addresses for a given hostname (IPv4/IPv6)

View file

@ -1,29 +1,29 @@
import random
from operator import itemgetter
from time import time
from src import EventManager, Utils
from src import EventManager, utils
DUCK_TAIL = "・゜゜・。。・゜゜"
DUCK_HEAD = ["\_o< ", "\_O< ", "\_0< ", "\_\u00f6< ", "\_\u00f8< ",
"\_\u00f3< "]
DUCK_MESSAGE = ["QUACK!", "FLAP FLAP!", "quack!", "squawk!"]
DUCK_MESSAGE_RARE = ["beep boop!", "QUACK QUACK QUACK QUACK QUACK!!", "HONK!",
Utils.underline("I AM THE METAL DUCK")]
utils.irc.underline("I AM THE METAL DUCK")]
DUCK_MINIMUM_MESSAGES = 10
DUCK_MINIMUM_UNIQUE = 3
@Utils.export("channelset", {"setting": "ducks-enabled",
"help": "Toggle ducks!", "validate": Utils.bool_or_none})
@Utils.export("channelset", {"setting": "ducks-kick",
@utils.export("channelset", {"setting": "ducks-enabled",
"help": "Toggle ducks!", "validate": utils.bool_or_none})
@utils.export("channelset", {"setting": "ducks-kick",
"help": "Should the bot kick if there's no duck?",
"validate": Utils.bool_or_none})
@Utils.export("channelset", {"setting": "ducks-min-unique",
"validate": utils.bool_or_none})
@utils.export("channelset", {"setting": "ducks-min-unique",
"help": "Minimum unique users required to talk before a duck spawns.",
"validate": Utils.int_or_none})
@Utils.export("channelset", {"setting": "ducks-min-messages",
"validate": utils.int_or_none})
@utils.export("channelset", {"setting": "ducks-min-messages",
"help": "Minimum messages between ducks spawning.",
"validate": Utils.int_or_none})
"validate": utils.int_or_none})
class Module(object):
def __init__(self, bot, events, exports):
self.bot = bot
@ -32,7 +32,7 @@ class Module(object):
for channel in server.channels.values():
self.bootstrap(channel)
@Utils.hook("new.channel")
@utils.hook("new.channel")
def new_channel(self, event):
self.bootstrap(event["channel"])
@ -116,7 +116,7 @@ class Module(object):
channel.send_kick(target,
"You tried shooting a non-existent duck. Creepy!")
@Utils.hook("received.command.decoy")
@utils.hook("received.command.decoy")
def duck_decoy(self, event):
"""
:help: Prepare a decoy duck
@ -175,7 +175,8 @@ class Module(object):
if random.randint(1, 20) == 1:
# rare!
message = random.choice(DUCK_MESSAGE_RARE)
duck = Utils.color(Utils.bold(duck + message), Utils.COLOR_RED)
duck = utils.irc.color(utils.irc.bold(duck + message),
utils.irc.COLOR_RED)
else:
# not rare!
duck += random.choice(DUCK_MESSAGE)
@ -191,7 +192,7 @@ class Module(object):
else:
game["duck_spawned"] = 1
@Utils.hook("received.message.channel",
@utils.hook("received.message.channel",
priority=EventManager.PRIORITY_MONITOR)
def channel_message(self, event):
if not event["channel"].get_setting("ducks-enabled", False):
@ -222,7 +223,7 @@ class Module(object):
if self.should_generate_duck(event) == True:
self.show_duck(event)
@Utils.hook("received.command.bef")
@utils.hook("received.command.bef")
def befriend(self, event):
"""
:help: Befriend a duck
@ -252,15 +253,15 @@ class Module(object):
channel.set_user_setting(uid, "ducks-befriended", total_befriended)
msg = "Aww! %s befriended a duck! You've befriended %s ducks in %s!" \
% (Utils.bold(nick), Utils.bold(total_befriended),
Utils.bold(channel.name))
% (utils.irc.bold(nick), utils.irc.bold(total_befriended),
utils.irc.bold(channel.name))
event["stdout"].write(msg)
self.clear_ducks(channel)
event.eat()
@Utils.hook("received.command.bang")
@utils.hook("received.command.bang")
def shoot(self, event):
"""
:help: Shoot a duck
@ -291,15 +292,15 @@ class Module(object):
channel.set_user_setting(uid, "ducks-shot", total_shot)
msg = "Pow! %s shot a duck! You've shot %s ducks in %s!" \
% (Utils.bold(nick), Utils.bold(total_shot),
Utils.bold(channel.name))
% (utils.irc.bold(nick), utils.irc.bold(total_shot),
utils.irc.bold(channel.name))
event["stdout"].write(msg)
self.clear_ducks(channel)
event.eat()
@Utils.hook("received.command.duckstats")
@utils.hook("received.command.duckstats")
def duck_stats(self, event):
"""
:help: Show your duck stats
@ -338,13 +339,13 @@ class Module(object):
cf = channel_friends
msg = "%s ducks killed (%s in %s), and %s ducks befriended (%s in %s)" \
% (Utils.bold(tp), Utils.bold(cp), Utils.bold(channel),
Utils.bold(tf), Utils.bold(cf), Utils.bold(channel))
% (utils.irc.bold(tp), utils.irc.bold(cp), utils.irc.bold(channel),
utils.irc.bold(tf), utils.irc.bold(cf), utils.irc.bold(channel))
event["stdout"].write(Utils.bold(nick) + ": " + msg)
event["stdout"].write(utils.irc.bold(nick) + ": " + msg)
event.eat()
@Utils.hook("received.command.killers")
@utils.hook("received.command.killers")
def duck_enemies(self, event):
"""
:help: Show the top duck shooters
@ -366,15 +367,15 @@ class Module(object):
enemy_nicks.append(user)
enemy_ducks.append(enemies)
sentence = Utils.bold("Duck Wranglers: ")
sentence = utils.irc.bold("Duck Wranglers: ")
build = []
length = len(enemy_nicks) if len(enemy_nicks) < 8 else 8
for i in range(0, length):
nick = Utils.prevent_highlight(enemy_nicks[i])
nick = utils.prevent_highlight(enemy_nicks[i])
build.append("%s (%s)" \
% (Utils.bold(nick),
% (utils.irc.bold(nick),
enemy_ducks[i]))
sentence += ", ".join(build)
@ -382,7 +383,7 @@ class Module(object):
event["stdout"].write(sentence)
event.eat()
@Utils.hook("received.command.friends")
@utils.hook("received.command.friends")
def duck_friends(self, event):
"""
:help: Show the top duck friends
@ -405,15 +406,15 @@ class Module(object):
friend_nicks.append(user)
friend_ducks.append(friends)
sentence = Utils.bold("Duck Friends: ")
sentence = utils.irc.bold("Duck Friends: ")
length = len(friend_nicks) if len(friend_nicks) < 8 else 8
build = []
for i in range(0, length):
nick = Utils.prevent_highlight(friend_nicks[i])
nick = utils.prevent_highlight(friend_nicks[i])
build.append("%s (%s)" \
% (Utils.bold(nick),
% (utils.irc.bold(nick),
friend_ducks[i])
)

View file

@ -1,17 +1,17 @@
import socket
from src import ModuleManager, Utils
from src import ModuleManager, utils
EVAL_URL = "https://eval.appspot.com/eval"
class Module(ModuleManager.BaseModule):
@Utils.hook("received.command.eval", min_args=1)
@utils.hook("received.command.eval", min_args=1)
def eval(self, event):
"""
:help: Evaluate a python statement
:usage: <statement>
"""
try:
code, page = Utils.get_url(EVAL_URL, get_params={
code, page = utils.http.get_url(EVAL_URL, get_params={
"statement": event["args"]}, code=True)
except socket.timeout:
event["stderr"].write("%s: eval timed out" %

View file

@ -1,17 +1,17 @@
from src import ModuleManager, Utils
from src import ModuleManager, utils
URL_GEOIP = "http://ip-api.com/json/%s"
class Module(ModuleManager.BaseModule):
_name = "GeoIP"
@Utils.hook("received.command.geoip", min_args=1)
@utils.hook("received.command.geoip", min_args=1)
def geoip(self, event):
"""
:help: Get geoip data on a given IPv4/IPv6 address
:usage: <IP>
"""
page = Utils.get_url(URL_GEOIP % event["args_split"][0],
page = utils.http.get_url(URL_GEOIP % event["args_split"][0],
json=True)
if page:
if page["status"] == "success":

View file

@ -2,13 +2,13 @@
#--require-config google-search-id
import json
from src import ModuleManager, Utils
from src import ModuleManager, utils
URL_GOOGLESEARCH = "https://www.googleapis.com/customsearch/v1"
URL_GOOGLESUGGEST = "http://google.com/complete/search"
class Module(ModuleManager.BaseModule):
@Utils.hook("received.command.google|g")
@utils.hook("received.command.google|g")
def google(self, event):
"""
:help: Get first Google result for a given search term
@ -16,15 +16,15 @@ class Module(ModuleManager.BaseModule):
"""
phrase = event["args"] or event["target"].buffer.get()
if phrase:
page = Utils.get_url(URL_GOOGLESEARCH, get_params={
page = utils.http.get_url(URL_GOOGLESEARCH, get_params={
"q": phrase, "key": self.bot.config[
"google-api-key"], "cx": self.bot.config[
"google-search-id"], "prettyPrint": "true",
"num": 1, "gl": "gb"}, json=True)
if page:
if "items" in page and len(page["items"]):
event["stdout"].write("(" + Utils.bold(phrase) + ") " \
+ page["items"][0]["link"])
event["stdout"].write(
"(%s) %s" % (phrase, page["items"][0]["link"]))
else:
event["stderr"].write("No results found")
else:
@ -32,21 +32,21 @@ class Module(ModuleManager.BaseModule):
else:
event["stderr"].write("No phrase provided")
@Utils.hook("received.command.suggest", usage="[phrase]")
@utils.hook("received.command.suggest", usage="[phrase]")
def suggest(self, event):
"""
Get suggested phrases from Google
"""
phrase = event["args"] or event["target"].buffer.get()
if phrase:
page = Utils.get_url(URL_GOOGLESUGGEST, get_params={
page = utils.html.get_url(URL_GOOGLESUGGEST, get_params={
"output": "json", "client": "hp", "q": phrase})
if page:
# google gives us jsonp, so we need to unwrap it.
page = page.split("(", 1)[1][:-1]
page = json.loads(page)
suggestions = page[1]
suggestions = [Utils.strip_html(s[0]) for s in suggestions]
suggestions = [utils.html.strip_html(s[0]) for s in suggestions]
if suggestions:
event["stdout"].write("%s: %s" % (phrase,

View file

@ -1,9 +1,9 @@
from src import ModuleManager, Utils
from src import ModuleManager, utils
@Utils.export("channelset", {"setting": "greeting",
@utils.export("channelset", {"setting": "greeting",
"help": "Set a greeting to send to users when they join"})
class Module(ModuleManager.BaseModule):
@Utils.hook("received.join")
@utils.hook("received.join")
def join(self, event):
greeting = event["channel"].get_setting("greeting", None)
if greeting:

View file

@ -1,8 +1,8 @@
import hashlib
from src import ModuleManager, Utils
from src import ModuleManager, utils
class Module(ModuleManager.BaseModule):
@Utils.hook("received.command.hash", min_args=2)
@utils.hook("received.command.hash", min_args=2)
def hash(self, event):
"""
:help: Hash a given string with a given algorithm

View file

@ -1,18 +1,18 @@
from src import ModuleManager, Utils
from src import ModuleManager, utils
URL_HAVEIBEENPWNEDAPI = "https://haveibeenpwned.com/api/v2/breachedaccount/%s"
URL_HAVEIBEENPWNED = "https://haveibeenpwned.com/"
class Module(ModuleManager.BaseModule):
@Utils.hook("received.command.beenpwned", min_args=1)
@utils.hook("received.command.beenpwned", min_args=1)
def beenpwned(self, event):
"""
:help: Find out if a username, email or similar has appeared in any
hacked databases
:usage: <username/email>
"""
page = Utils.get_url(URL_HAVEIBEENPWNEDAPI % event["args"], json=True,
code=True)
page = utils.http.get_url(URL_HAVEIBEENPWNEDAPI % event["args"],
json=True, code=True)
if page:
code, page = page
if code == 200:

View file

@ -1,9 +1,9 @@
from src import ModuleManager, Utils
from src import ModuleManager, utils
class Module(ModuleManager.BaseModule):
_name = "IDs"
@Utils.hook("received.command.myid")
@utils.hook("received.command.myid")
def my_id(self, event):
"""
:help: Show your user ID
@ -11,7 +11,7 @@ class Module(ModuleManager.BaseModule):
event["stdout"].write("%s: %d" % (event["user"].nickname,
event["user"].get_id()))
@Utils.hook("received.command.channelid", channel_only=True)
@utils.hook("received.command.channelid", channel_only=True)
def channel_id(self, event):
"""
:help: Show the current channel's ID

View file

@ -1,7 +1,7 @@
#--require-config omdbapi-api-key
import json
from src import ModuleManager, Utils
from src import ModuleManager, utils
URL_OMDB = "http://www.omdbapi.com/"
URL_IMDBTITLE = "http://imdb.com/title/%s"
@ -9,13 +9,13 @@ URL_IMDBTITLE = "http://imdb.com/title/%s"
class Module(ModuleManager.BaseModule):
_name = "IMDb"
@Utils.hook("received.command.imdb", min_args=1)
@utils.hook("received.command.imdb", min_args=1)
def imdb(self, event):
"""
:help: Search for a given title on IMDb
:usage: <movie/tv title>
"""
page = Utils.get_url(URL_OMDB, get_params={
page = utils.http.get_url(URL_OMDB, get_params={
"t": event["args"],
"apikey": self.bot.config["omdbapi-api-key"]},
json=True)

View file

@ -1,17 +1,17 @@
import time
from src import ModuleManager, Utils
from src import ModuleManager, utils
SECONDS_MAX = Utils.SECONDS_WEEKS*8
SECONDS_MAX = utils.SECONDS_WEEKS*8
SECONDS_MAX_DESCRIPTION = "8 weeks"
class Module(ModuleManager.BaseModule):
@Utils.hook("received.command.in", min_args=2)
@utils.hook("received.command.in", min_args=2)
def in_command(self, event):
"""
:help: Set a reminder
:usage: <time> <message>
"""
seconds = Utils.from_pretty_time(event["args_split"][0])
seconds = utils.from_pretty_time(event["args_split"][0])
message = " ".join(event["args_split"][1:])
if seconds:
if seconds <= SECONDS_MAX:
@ -29,7 +29,7 @@ class Module(ModuleManager.BaseModule):
event["stderr"].write(
"Please provided a valid time above 0 seconds")
@Utils.hook("timer.in")
@utils.hook("timer.in")
def timer_due(self, event):
for server in self.bot.servers.values():
if event["server_id"] == server.id:

View file

@ -1,24 +1,23 @@
import re
from src import ModuleManager, Utils
from src import ModuleManager, utils
ISGD_API_URL = "https://is.gd/create.php"
REGEX_URL = re.compile("https?://", re.I)
class Module(ModuleManager.BaseModule):
@Utils.hook("get.shortlink")
@utils.hook("get.shortlink")
def shortlink(self, event):
url = event["url"]
if not re.match(REGEX_URL, url):
url = "http://%s" % url
data = Utils.get_url(ISGD_API_URL, get_params={
"format": "json",
"url": url
}, json=True)
data = utils.http.get_url(ISGD_API_URL, get_params=
{"format": "json", "url": url}, json=True)
if data and data["shorturl"]:
return data["shorturl"]
@Utils.hook("received.command.shorten", min_args=1)
@utils.hook("received.command.shorten", min_args=1)
def shorten(self, event):
"""
:help: Shorten a given URL using the is.gd service

View file

@ -1,21 +1,21 @@
import re, time
from src import EventManager, ModuleManager, Utils
from src import EventManager, ModuleManager, utils
REGEX_KARMA = re.compile("^(.*[^-+])[-+]*(\+{2,}|\-{2,})$")
KARMA_DELAY_SECONDS = 3
@Utils.export("channelset", {"setting": "karma-verbose",
@utils.export("channelset", {"setting": "karma-verbose",
"help": "Enable/disable automatically responding to karma changes",
"validate": Utils.bool_or_none})
@Utils.export("serverset", {"setting": "karma-nickname-only",
"validate": utils.bool_or_none})
@utils.export("serverset", {"setting": "karma-nickname-only",
"help": "Enable/disable karma being for nicknames only",
"validate": Utils.bool_or_none})
"validate": utils.bool_or_none})
class Module(ModuleManager.BaseModule):
@Utils.hook("new.user")
@utils.hook("new.user")
def new_user(self, event):
event["user"].last_karma = None
@Utils.hook("received.message.channel",
@utils.hook("received.message.channel",
priority=EventManager.PRIORITY_MONITOR)
def channel_message(self, event):
match = re.match(REGEX_KARMA, event["message"].strip())
@ -27,7 +27,7 @@ class Module(ModuleManager.BaseModule):
if not event["user"].last_karma or (time.time()-event["user"
].last_karma) >= KARMA_DELAY_SECONDS:
target = match.group(1).strip()
if Utils.irc_lower(event["server"], target
if utils.irc.lower(event["server"], target
) == event["user"].name:
if verbose:
self.events.on("send.stderr").call(
@ -63,7 +63,7 @@ class Module(ModuleManager.BaseModule):
target=event["channel"],
message="Try again in a couple of seconds")
@Utils.hook("received.command.karma")
@utils.hook("received.command.karma")
def karma(self, event):
"""
:help: Get your or someone else's karma
@ -81,7 +81,7 @@ class Module(ModuleManager.BaseModule):
karma = event["server"].get_setting("karma-%s" % target, 0)
event["stdout"].write("%s has %s karma" % (target, karma))
@Utils.hook("received.command.resetkarma", min_args=1)
@utils.hook("received.command.resetkarma", min_args=1)
def reset_karma(self, event):
"""
:help: Reset a specified karma to 0

View file

@ -1,15 +1,15 @@
#--require-config lastfm-api-key
from datetime import datetime, timezone
from src import ModuleManager, Utils
from src import ModuleManager, utils
URL_SCROBBLER = "http://ws.audioscrobbler.com/2.0/"
@Utils.export("set", {"setting": "lastfm", "help": "Set last.fm username"})
@utils.export("set", {"setting": "lastfm", "help": "Set last.fm username"})
class Module(ModuleManager.BaseModule):
_name = "last.fm"
@Utils.hook("received.command.np|listening|nowplaying")
@utils.hook("received.command.np|listening|nowplaying")
def np(self, event):
"""
:help: Get the last listened to track from a user
@ -22,7 +22,7 @@ class Module(ModuleManager.BaseModule):
lastfm_username = event["user"].get_setting("lastfm",
event["user"].nickname)
shown_username = event["user"].nickname
page = Utils.get_url(URL_SCROBBLER, get_params={
page = utils.http.get_url(URL_SCROBBLER, get_params={
"method": "user.getrecenttracks", "user": lastfm_username,
"api_key": self.bot.config["lastfm-api-key"],
"format": "json", "limit": "1"}, json=True)
@ -51,7 +51,7 @@ class Module(ModuleManager.BaseModule):
short_url = " -- " + short_url if short_url else ""
info_page = Utils.get_url(URL_SCROBBLER, get_params={
info_page = utils.http.get_url(URL_SCROBBLER, get_params={
"method": "track.getInfo", "artist": artist,
"track": track_name, "autocorrect": "1",
"api_key": self.bot.config["lastfm-api-key"],

View file

@ -1,5 +1,5 @@
import re, threading
from src import ModuleManager, Utils
from src import ModuleManager, utils
RE_PREFIXES = re.compile(r"\bPREFIX=\((\w+)\)(\W+)(?:\b|$)")
RE_CHANMODES = re.compile(
@ -27,41 +27,41 @@ class Module(ModuleManager.BaseModule):
self.events.on("raw").on(line.command).call(**kwargs)
if default_event or not hooks:
if command.isdigit():
if line.command.isdigit():
self.events.on("received.numeric").on(line.command).call(
**kwargs)
else:
self.events.on("received").on(line.command).call(**kwargs)
@Utils.hook("raw")
@utils.hook("raw")
def handle_raw(self, event):
line = Utils.parse_line(event["server"], event["line"])
line = utils.irc.parse_line(event["server"], event["line"])
if "batch" in line.tags and line.tags["batch"] in event[
"server"].batches:
server.batches[tag["batch"]].append(line)
else:
self._handle(line)
@Utils.hook("preprocess.send")
@utils.hook("preprocess.send")
def handle_send(self, event):
line = Utils.parse_line(event["server"], event["line"])
line = utils.irc.parse_line(event["server"], event["line"])
self.events.on("send").on(line.command).call(
args=line.args, arbitrary=line.arbitrary, tags=line.tags,
last=line.last, server=line.server)
# ping from the server
@Utils.hook("raw.ping")
@utils.hook("raw.ping")
def ping(self, event):
event["server"].send_pong(event["last"])
# first numeric line the server sends
@Utils.hook("raw.001", default_event=True)
@utils.hook("raw.001", default_event=True)
def handle_001(self, event):
event["server"].name = event["prefix"].nickname
event["server"].set_own_nickname(event["args"][0])
event["server"].send_whois(event["server"].nickname)
# server telling us what it supports
@Utils.hook("raw.005")
@utils.hook("raw.005")
def handle_005(self, event):
isupport_line = " ".join(event["args"][1:])
@ -91,7 +91,7 @@ class Module(ModuleManager.BaseModule):
isupport=isupport_line, server=event["server"])
# whois respose (nickname, username, realname, hostname)
@Utils.hook("raw.311", default_event=True)
@utils.hook("raw.311", default_event=True)
def handle_311(self, event):
nickname = event["args"][1]
if event["server"].is_own_nickname(nickname):
@ -103,7 +103,7 @@ class Module(ModuleManager.BaseModule):
target.realname = event["arbitrary"]
# on-join channel topic line
@Utils.hook("raw.332")
@utils.hook("raw.332")
def handle_332(self, event):
channel = event["server"].get_channel(event["args"][1])
@ -112,7 +112,7 @@ class Module(ModuleManager.BaseModule):
server=event["server"], topic=event["arbitrary"])
# channel topic changed
@Utils.hook("raw.topic")
@utils.hook("raw.topic")
def topic(self, event):
user = event["server"].get_user(event["prefix"].nickname)
channel = event["server"].get_channel(event["args"][0])
@ -121,12 +121,12 @@ class Module(ModuleManager.BaseModule):
server=event["server"], topic=event["arbitrary"], user=user)
# on-join channel topic set by/at
@Utils.hook("raw.333")
@utils.hook("raw.333")
def handle_333(self, event):
channel = event["server"].get_channel(event["args"][1])
topic_setter_hostmask = event["args"][2]
topic_setter = Utils.seperate_hostmask(topic_setter_hostmask)
topic_setter = utils.irc.seperate_hostmask(topic_setter_hostmask)
topic_time = int(event["args"][3]) if event["args"][3].isdigit(
) else None
@ -138,7 +138,7 @@ class Module(ModuleManager.BaseModule):
server=event["server"])
# /names response, also on-join user list
@Utils.hook("raw.353", default_event=True)
@utils.hook("raw.353", default_event=True)
def handle_353(self, event):
channel = event["server"].get_channel(event["args"][2])
nicknames = event["arbitrary"].split()
@ -150,7 +150,7 @@ class Module(ModuleManager.BaseModule):
nickname = nickname[1:]
if "userhost-in-names" in event["server"].capabilities:
hostmask = Utils.seperate_hostmask(nickname)
hostmask = utils.irc.seperate_hostmask(nickname)
nickname = hostmask.nickname
user = event["server"].get_user(hostmask.nickname)
user.username = hostmask.username
@ -164,12 +164,12 @@ class Module(ModuleManager.BaseModule):
channel.add_mode(mode, nickname)
# on-join user list has finished
@Utils.hook("raw.366", default_event=True)
@utils.hook("raw.366", default_event=True)
def handle_366(self, event):
event["server"].send_whox(event["args"][1], "n", "ahnrtu", "111")
# on user joining channel
@Utils.hook("raw.join")
@utils.hook("raw.join")
def join(self, event):
account = None
realname = None
@ -207,7 +207,7 @@ class Module(ModuleManager.BaseModule):
channel.send_mode()
# on user parting channel
@Utils.hook("raw.part")
@utils.hook("raw.part")
def part(self, event):
channel = event["server"].get_channel(event["args"][0])
reason = event["arbitrary"] or ""
@ -226,12 +226,12 @@ class Module(ModuleManager.BaseModule):
event["server"].remove_channel(channel)
# unknown command sent by us, oops!
@Utils.hook("raw.421", default_event=True)
@utils.hook("raw.421", default_event=True)
def handle_421(self, event):
print("warning: unknown command '%s'." % event["args"][1])
# a user has disconnected!
@Utils.hook("raw.quit")
@utils.hook("raw.quit")
def quit(self, event):
reason = event["arbitrary"] or ""
@ -244,7 +244,7 @@ class Module(ModuleManager.BaseModule):
event["server"].disconnect()
# the server is telling us about its capabilities!
@Utils.hook("raw.cap")
@utils.hook("raw.cap")
def cap(self, event):
capabilities_list = (event["arbitrary"] or "").split(" ")
capabilities = {}
@ -295,13 +295,13 @@ class Module(ModuleManager.BaseModule):
event["server"].send_capability_end()
# the server is asking for authentication
@Utils.hook("raw.authenticate")
@utils.hook("raw.authenticate")
def authenticate(self, event):
self.events.on("received.authenticate").call(
message=event["args"][0], server=event["server"])
# someone has changed their nickname
@Utils.hook("raw.nick")
@utils.hook("raw.nick")
def nick(self, event):
new_nickname = event["last"]
if not event["server"].is_own_nickname(event["prefix"].nickname):
@ -320,7 +320,7 @@ class Module(ModuleManager.BaseModule):
new_nickname=new_nickname, old_nickname=old_nickname)
# something's mode has changed
@Utils.hook("raw.mode")
@utils.hook("raw.mode")
def mode(self, event):
user = event["server"].get_user(event["prefix"].nickname)
target = event["args"][0]
@ -354,7 +354,7 @@ class Module(ModuleManager.BaseModule):
server=event["server"])
# someone (maybe me!) has been invited somewhere
@Utils.hook("raw.invite")
@utils.hook("raw.invite")
def invite(self, event):
target_channel = event["last"]
user = event["server"].get_user(event["prefix"].nickname)
@ -364,7 +364,7 @@ class Module(ModuleManager.BaseModule):
target_user=target_user)
# we've received a message
@Utils.hook("raw.privmsg")
@utils.hook("raw.privmsg")
def privmsg(self, event):
user = event["server"].get_user(event["prefix"].nickname)
message = event["arbitrary"] or ""
@ -398,7 +398,7 @@ class Module(ModuleManager.BaseModule):
event["tags"])
# we've received a notice
@Utils.hook("raw.notice")
@utils.hook("raw.notice")
def notice(self, event):
message = event["arbitrary"] or ""
message_split = message.split(" ")
@ -427,7 +427,7 @@ class Module(ModuleManager.BaseModule):
server=event["server"], tags=event["tags"])
# IRCv3 TAGMSG, used to send tags without any other information
@Utils.hook("raw.tagmsg")
@utils.hook("raw.tagmsg")
def tagmsg(self, event):
user = event["server"].get_user(event["prefix"].nickname)
target = event["args"][0]
@ -441,7 +441,7 @@ class Module(ModuleManager.BaseModule):
user=user, tags=event["tags"], server=event["server"])
# IRCv3 AWAY, used to notify us that a client we can see has changed /away
@Utils.hook("raw.away")
@utils.hook("raw.away")
def away(self, event):
user = event["server"].get_user(event["prefix"].nickname)
message = event["arbitrary"]
@ -454,7 +454,7 @@ class Module(ModuleManager.BaseModule):
self.events.on("received.away.off").call(user=user,
server=event["server"])
@Utils.hook("raw.batch")
@utils.hook("raw.batch")
def batch(self, event):
identifier = event["args"][0]
modifier, identifier = identifier[0], identifier[1:]
@ -467,7 +467,7 @@ class Module(ModuleManager.BaseModule):
self._handle(line)
# IRCv3 CHGHOST, a user's username and/or hostname has changed
@Utils.hook("raw.chghost")
@utils.hook("raw.chghost")
def chghost(self, event):
username = event["args"][0]
hostname = event["args"][1]
@ -479,7 +479,7 @@ class Module(ModuleManager.BaseModule):
target.username = username
target.hostname = hostname
@Utils.hook("raw.account")
@utils.hook("raw.account")
def account(self, event):
user = event["server"].get_user(event["prefix"].nickname)
@ -496,13 +496,13 @@ class Module(ModuleManager.BaseModule):
server=event["server"])
# response to a WHO command for user information
@Utils.hook("raw.352", default_event=True)
@utils.hook("raw.352", default_event=True)
def handle_352(self, event):
user = event["server"].get_user(event["args"][5])
user.username = event["args"][2]
user.hostname = event["args"][3]
# response to a WHOX command for user information, including account name
@Utils.hook("raw.354", default_event=True)
@utils.hook("raw.354", default_event=True)
def handle_354(self, event):
if event["args"][1] == "111":
username = event["args"][2]
@ -519,7 +519,7 @@ class Module(ModuleManager.BaseModule):
user.identified_account = account
# response to an empty mode command
@Utils.hook("raw.324", default_event=True)
@utils.hook("raw.324", default_event=True)
def handle_324(self, event):
channel = event["server"].get_channel(event["args"][1])
modes = event["args"][2]
@ -529,27 +529,27 @@ class Module(ModuleManager.BaseModule):
channel.add_mode(mode)
# channel creation unix timestamp
@Utils.hook("raw.329", default_event=True)
@utils.hook("raw.329", default_event=True)
def handle_329(self, event):
channel = event["server"].get_channel(event["args"][1])
channel.creation_timestamp = int(event["args"][2])
# nickname already in use
@Utils.hook("raw.433", default_event=True)
@utils.hook("raw.433", default_event=True)
def handle_433(self, event):
pass
# we need a registered nickname for this channel
@Utils.hook("raw.477", default_event=True)
@utils.hook("raw.477", default_event=True)
def handle_477(self, event):
channel_name = Utils.irc_lower(event["server"], event["args"][1])
channel_name = utils.irc.lower(event["server"], event["args"][1])
if channel_name in event["server"]:
key = event["server"].attempted_join[channel_name]
self.timers.add("rejoin", 5, channel_name=channe_name, key=key,
server_id=event["server"].id)
# someone's been kicked from a channel
@Utils.hook("raw.kick")
@utils.hook("raw.kick")
def kick(self, event):
user = event["server"].get_user(event["prefix"].nickname)
target = event["args"][1]

View file

@ -1,7 +1,7 @@
from src import ModuleManager, Utils
from src import ModuleManager, utils
class Module(ModuleManager.BaseModule):
@Utils.hook("received.command.loadmodule", min_args=1)
@utils.hook("received.command.loadmodule", min_args=1)
def load(self, event):
"""
:help: Load a module
@ -15,7 +15,7 @@ class Module(ModuleManager.BaseModule):
self.bot.modules.load_module(self.bot, name)
event["stdout"].write("Loaded '%s'" % name)
@Utils.hook("received.command.unloadmodule", min_args=1)
@utils.hook("received.command.unloadmodule", min_args=1)
def unload(self, event):
"""
:help: Unload a module
@ -33,7 +33,7 @@ class Module(ModuleManager.BaseModule):
self.bot.modules.unload_module(name)
self.bot.modules.load_module(self.bot, name)
@Utils.hook("received.command.reloadmodule", min_args=1)
@utils.hook("received.command.reloadmodule", min_args=1)
def reload(self, event):
"""
:help: Reload a module
@ -56,7 +56,7 @@ class Module(ModuleManager.BaseModule):
return
event["stdout"].write("Reloaded '%s'" % name)
@Utils.hook("received.command.reloadallmodules")
@utils.hook("received.command.reloadallmodules")
def reload_all(self, event):
"""
:help: Reload all modules
@ -82,7 +82,7 @@ class Module(ModuleManager.BaseModule):
else:
event["stdout"].write("Reloaded %d modules" % len(reloaded))
@Utils.hook("received.command.enablemodule", min_args=1)
@utils.hook("received.command.enablemodule", min_args=1)
def enable(self, event):
"""
:help: Remove a module from the module blacklist
@ -99,7 +99,7 @@ class Module(ModuleManager.BaseModule):
event["stdout"].write("Module '%s' has been enabled and can now "
"be loaded" % name)
@Utils.hook("received.command.disablemodule", min_args=1)
@utils.hook("received.command.disablemodule", min_args=1)
def disable(self, event):
"""
:help: Add a module to the module blacklist

View file

@ -1,6 +1,6 @@
#--ignore
import types, json
from src import ModuleManager, Utils
from src import ModuleManager, utils
def get_target(user):
return user.alias or user.nickname
@ -24,7 +24,7 @@ def del_setting(user, setting):
class Module(ModuleManager.BaseModule):
_name = "Aliases"
@Utils.hook("new.user")
@utils.hook("new.user")
def new_user(self, event):
method_type = types.MethodType
user = event["user"]
@ -36,7 +36,7 @@ class Module(ModuleManager.BaseModule):
event["user"].find_settings = method_type(find_settings, user)
event["user"].del_setting = method_type(del_setting, user)
@Utils.hook("received.nick")
@utils.hook("received.nick")
def nickname_change(self, event):
old_nickname = event["old_nickname"]
new_nickname = event["new_nickname"]
@ -60,7 +60,7 @@ class Module(ModuleManager.BaseModule):
SET nickname=? WHERE nickname=?""", [new_nickname.lower(),
old_nickname.lower()])
@Utils.hook("received.command.alias")
@utils.hook("received.command.alias")
def alias(self, event):
if event["args"]:
target = event["args_split"][0]

View file

@ -1,10 +1,10 @@
import base64
from src import EventManager, ModuleManager, Utils
from src import EventManager, ModuleManager, utils
@Utils.export("serverset", {"setting": "nickserv-password",
@utils.export("serverset", {"setting": "nickserv-password",
"help": "Set the nickserv password for this server"})
class Module(ModuleManager.BaseModule):
@Utils.hook("received.numeric.001", priority=EventManager.PRIORITY_URGENT)
@utils.hook("received.numeric.001", priority=EventManager.PRIORITY_URGENT)
def on_connect(self, event):
nickserv_password = event["server"].get_setting(
"nickserv-password")

View file

@ -2,7 +2,7 @@ import collections, re, time
from datetime import datetime, date
from collections import Counter
from src import ModuleManager, Utils
from src import ModuleManager, utils
from suds.client import Client
from suds import WebFault
@ -19,7 +19,9 @@ class Module(ModuleManager.BaseModule):
_client = None
PASSENGER_ACTIVITIES = ["U", "P", "R"]
COLOURS = [Utils.COLOR_LIGHTBLUE, Utils.COLOR_GREEN, Utils.COLOR_RED, Utils.COLOR_CYAN, Utils.COLOR_LIGHTGREY, Utils.COLOR_ORANGE]
COLOURS = [utils.irc.COLOR_LIGHTBLUE, utils.irc.COLOR_GREEN,
utils.irc.COLOR_RED, utils.irc.COLOR_CYAN, utils.irc.COLOR_LIGHTGREY,
utils.irc.COLOR_ORANGE]
@property
def client(self):
@ -111,8 +113,8 @@ class Module(ModuleManager.BaseModule):
def reduced_activities(self, string): return [a for a in self.activities(string) if a in self.PASSENGER_ACTIVITIES]
@Utils.hook("telegram.command.nrtrains")
@Utils.hook("received.command.nrtrains", min_args=1)
@utils.hook("telegram.command.nrtrains")
@utils.hook("received.command.nrtrains", min_args=1)
def trains(self, event):
"""
:help: Get train/bus services for a station (Powered by NRE)
@ -181,7 +183,7 @@ class Module(ModuleManager.BaseModule):
severe_summary = ""
if nrcc_severe:
severe_summary += ", "
severe_summary += Utils.bold(Utils.color("%s severe messages" % nrcc_severe, Utils.COLOR_RED))
severe_summary += utils.irc.bold(utils.irc.color("%s severe messages" % nrcc_severe, utils.irc.COLOR_RED))
station_summary = "%s (%s, %s%s)" % (query["locationName"], query["crs"], query["stationManagerCode"], severe_summary)
if not "trainServices" in query and not "busServices" in query and not "ferryServices" in query:
@ -233,7 +235,7 @@ class Module(ModuleManager.BaseModule):
trains.append(parsed)
if eagle_url:
summary_query = Utils.get_url("%s/json/summaries/%s?uids=%s" % (eagle_url, now.date().isoformat(), "%20".join([a["uid"] for a in trains])), json=True, headers={"x-eagle-key": self.bot.config["eagle-api-key"]})
summary_query = utils.http.get_url("%s/json/summaries/%s?uids=%s" % (eagle_url, now.date().isoformat(), "%20".join([a["uid"] for a in trains])), json=True, headers={"x-eagle-key": self.bot.config["eagle-api-key"]})
if summary_query:
for t in trains:
summary = summary_query[t["uid"]]
@ -286,7 +288,7 @@ class Module(ModuleManager.BaseModule):
"*" if t["platform_hidden"] else '',
"?" if "platformsAreUnreliable" in query and query["platformsAreUnreliable"] else '',
t["times"][filter["type"]]["prefix"].replace(filter["type"][0], '') if not t["cancelled"] else "",
Utils.bold(Utils.color(t["times"][filter["type"]]["shortest"*filter["st"] or "short"], colours[t["times"][filter["type"]]["status"]])),
utils.irc.bold(utils.irc.color(t["times"][filter["type"]]["shortest"*filter["st"] or "short"], colours[t["times"][filter["type"]]["status"]])),
bool(t["activity"])*", " + "+".join(t["activity"]),
) for t in trains_filtered])
if event.get("external"):
@ -295,8 +297,8 @@ class Module(ModuleManager.BaseModule):
else:
event["stdout"].write("%s%s: %s" % (station_summary, " departures calling at %s" % filter["inter"] if filter["inter"] else '', trains_string))
@Utils.hook("telegram.command.nrservice")
@Utils.hook("received.command.nrservice", min_args=1)
@utils.hook("telegram.command.nrservice")
@utils.hook("received.command.nrservice", min_args=1)
def service(self, event):
"""
:help: Get train service information for a UID, headcode or RID
@ -332,7 +334,7 @@ class Module(ModuleManager.BaseModule):
query = client.service.QueryServices(service_id, datetime.utcnow().date().isoformat(),
datetime.utcnow().time().strftime("%H:%M:%S+0000"))
if eagle_url:
schedule_query = Utils.get_url("%s/json/schedule/%s/%s" % (eagle_url, service_id, datetime.now().date().isoformat()), json=True, headers={"x-eagle-key": eagle_key})
schedule_query = utils.http.get_url("%s/json/schedule/%s/%s" % (eagle_url, service_id, datetime.now().date().isoformat()), json=True, headers={"x-eagle-key": eagle_key})
if schedule_query:
schedule = schedule_query["current"]
if not query and not schedule:
@ -362,7 +364,7 @@ class Module(ModuleManager.BaseModule):
if "delayReason" in query:
disruptions.append("Delayed (%s%s)" % (query["delayReason"]["value"], " at " + query["delayReason"]["_tiploc"] if query["delayReason"]["_tiploc"] else ""))
if disruptions and not external:
disruptions = Utils.color(", ".join(disruptions), Utils.COLOR_RED) + " "
disruptions = utils.irc.color(", ".join(disruptions), utils.irc.COLOR_RED) + " "
elif disruptions and external:
disruptions = ", ".join(disruptions)
else: disruptions = ""
@ -448,7 +450,7 @@ class Module(ModuleManager.BaseModule):
station["length"] + " cars, " if station["length"] and (station["first"] or (station["last"]) or station["associations"]) else '',
("~" if station["times"][filter["type"]]["estimate"] else '') +
station["times"][filter["type"]]["prefix"].replace(filter["type"][0], ""),
Utils.color(station["times"][filter["type"]]["short"], colours[station["times"][filter["type"]]["status"]]),
utils.irc.color(station["times"][filter["type"]]["short"], colours[station["times"][filter["type"]]["status"]]),
", "*bool(station["activity_p"]) + "+".join(station["activity_p"]),
", ".join([a["summary"] for a in station["associations"]]),
)
@ -487,12 +489,12 @@ class Module(ModuleManager.BaseModule):
else:
event["stdout"].write("%s%s %s %s (%s%s%s/%s/%s): %s" % (disruptions, query["operatorCode"],
query["trainid"], query["serviceType"],
Utils.color(done_count, Utils.COLOR_LIGHTBLUE),
utils.irc.color(done_count, utils.irc.COLOR_LIGHTBLUE),
len(stations_filtered), total_count,
", ".join([s["summary"] for s in stations_filtered])))
@Utils.hook("telegram.command.nrhead")
@Utils.hook("received.command.nrhead", min_args=1)
@utils.hook("telegram.command.nrhead")
@utils.hook("received.command.nrhead", min_args=1)
def head(self, event):
"""
:help: Get information for a given headcode/UID/RID (Powered by NRE)
@ -513,8 +515,8 @@ class Module(ModuleManager.BaseModule):
else:
event["stdout"].write(", ".join(["h/%s r/%s u/%s rs/%s %s (%s) -> %s (%s)" % (a["trainid"], a["rid"], a["uid"], a["rsid"], a["originName"], a["originCrs"], a["destinationName"], a["destinationCrs"]) for a in services]))
@Utils.hook("telegram.command.nrcode")
@Utils.hook("received.command.nrcode", min_args=1)
@utils.hook("telegram.command.nrcode")
@utils.hook("received.command.nrcode", min_args=1)
def service_code(self, event):
"""
:help: Get the text for a given delay/cancellation code (Powered by NRE)

View file

@ -1,7 +1,7 @@
from src import EventManager, ModuleManager, Utils
from src import EventManager, ModuleManager, utils
class Module(ModuleManager.BaseModule):
@Utils.hook("received.numeric.001", priority=EventManager.PRIORITY_URGENT)
@utils.hook("received.numeric.001", priority=EventManager.PRIORITY_URGENT)
def on_connect(self, event):
commands = event["server"].get_setting("perform", [])
for i, command in enumerate(commands):

View file

@ -1,16 +1,16 @@
import base64, os
import scrypt
from src import ModuleManager, Utils
from src import ModuleManager, utils
REQUIRES_IDENTIFY = ("You need to be identified to use that command "
"(/msg %s register | /msg %s identify)")
class Module(ModuleManager.BaseModule):
@Utils.hook("new.user")
@utils.hook("new.user")
def new_user(self, event):
self._logout(event["user"])
@Utils.hook("received.part")
@utils.hook("received.part")
def on_part(self, event):
if len(event["user"].channels) == 1 and event["user"
].identified_account_override:
@ -38,7 +38,7 @@ class Module(ModuleManager.BaseModule):
user.identified_account_override = None
user.identified_account_id_override = None
@Utils.hook("received.command.identify", private_only=True, min_args=1)
@utils.hook("received.command.identify", private_only=True, min_args=1)
def identify(self, event):
"""
:help: Identify yourself
@ -80,7 +80,7 @@ class Module(ModuleManager.BaseModule):
else:
event["stderr"].write("You are already identified")
@Utils.hook("received.command.register", private_only=True, min_args=1)
@utils.hook("received.command.register", private_only=True, min_args=1)
def register(self, event):
"""
:help: Register yourself
@ -104,7 +104,7 @@ class Module(ModuleManager.BaseModule):
else:
event["stderr"].write("This nickname is already registered")
@Utils.hook("received.command.setpassword", authenticated=True, min_args=1)
@utils.hook("received.command.setpassword", authenticated=True, min_args=1)
def set_password(self, event):
"""
:help: Change your password
@ -114,7 +114,7 @@ class Module(ModuleManager.BaseModule):
event["user"].set_setting("authentication", [hash, salt])
event["stdout"].write("Set your password")
@Utils.hook("received.command.logout", private_only=True)
@utils.hook("received.command.logout", private_only=True)
def logout(self, event):
"""
:help: Logout from your identified account
@ -125,7 +125,7 @@ class Module(ModuleManager.BaseModule):
else:
event["stderr"].write("You are not logged in")
@Utils.hook("received.command.resetpassword", private_only=True,
@utils.hook("received.command.resetpassword", private_only=True,
min_args=2)
def reset_password(self, event):
"""
@ -145,7 +145,7 @@ class Module(ModuleManager.BaseModule):
event["stdout"].write("Reset password for '%s'" %
target.nickname)
@Utils.hook("preprocess.command")
@utils.hook("preprocess.command")
def preprocess_command(self, event):
permission = event["hook"].get_kwarg("permission", None)
authenticated = event["hook"].kwargs.get("authenticated", False)
@ -175,7 +175,7 @@ class Module(ModuleManager.BaseModule):
return REQUIRES_IDENTIFY % (event["server"].nickname,
event["server"].nickname)
@Utils.hook("received.command.mypermissions", authenticated=True)
@utils.hook("received.command.mypermissions", authenticated=True)
def my_permissions(self, event):
"""
:help: Show your permissions
@ -189,7 +189,7 @@ class Module(ModuleManager.BaseModule):
permissions = target.get_setting("permissions", [])
return [target, registered, permissions]
@Utils.hook("received.command.givepermission", min_args=2)
@utils.hook("received.command.givepermission", min_args=2)
def give_permission(self, event):
"""
:help: Give a given permission to a given user
@ -212,7 +212,7 @@ class Module(ModuleManager.BaseModule):
target.set_setting("permissions", permissions)
event["stdout"].write("Gave permission '%s' to %s" % (
permission, target.nickname))
@Utils.hook("received.command.removepermission", min_args=2)
@utils.hook("received.command.removepermission", min_args=2)
def remove_permission(self, event):
"""
:help: Remove a given permission from a given user

View file

@ -1,7 +1,7 @@
from src import ModuleManager, Utils
from src import ModuleManager, utils
class Module(ModuleManager.BaseModule):
@Utils.hook("received.command.ping")
@utils.hook("received.command.ping")
def pong(self, event):
"""
:help: Ping pong!

View file

@ -1,5 +1,5 @@
import datetime
from src import EventManager, ModuleManager, Utils
from src import EventManager, ModuleManager, utils
class Module(ModuleManager.BaseModule):
def print_line(self, event, line, channel=None):
@ -16,35 +16,35 @@ class Module(ModuleManager.BaseModule):
else:
self.print_line(event, "<%s> %s" % (nickname, event["message"]),
channel=event["channel"].name)
@Utils.hook("received.message.channel",
@utils.hook("received.message.channel",
priority=EventManager.PRIORITY_HIGH)
def channel_message(self, event):
self._on_message(event, event["user"].nickname)
@Utils.hook("self.message.channel")
@utils.hook("self.message.channel")
def self_channel_message(self, event):
self._on_message(event, event["server"].nickname)
def _on_notice(self, event, target):
self.print_line(event, "(notice->%s) <%s> %s" % (
target, event["user"].nickname, event["message"]))
@Utils.hook("received.notice.channel",
@utils.hook("received.notice.channel",
priority=EventManager.PRIORITY_HIGH)
def channel_notice(self, event):
self._on_notice(event, event["channel"].name)
@Utils.hook("received.notice.private", priority=EventManager.PRIORITY_HIGH)
@utils.hook("received.notice.private", priority=EventManager.PRIORITY_HIGH)
def private_notice(self, event):
self._on_notice(event, event["server"].nickname)
@Utils.hook("received.server-notice", priority=EventManager.PRIORITY_HIGH)
@utils.hook("received.server-notice", priority=EventManager.PRIORITY_HIGH)
def server_notice(self, event):
self.print_line(event, "(server notice) %s" % event["message"])
def _on_join(self, event, nickname):
self.print_line(event, "%s joined %s" % (nickname,
event["channel"].name))
@Utils.hook("received.join")
@utils.hook("received.join")
def join(self, event):
self._on_join(event, event["user"].nickname)
@Utils.hook("self.join")
@utils.hook("self.join")
def self_join(self, event):
self._on_join(event, event["server"].nickname)
@ -53,20 +53,20 @@ class Module(ModuleManager.BaseModule):
nickname,
event["channel"].name,
"" if not event["reason"] else " (%s)" % event["reason"]))
@Utils.hook("received.part")
@utils.hook("received.part")
def part(self, event):
self._on_part(event, event["user"].nickname)
@Utils.hook("self.part")
@utils.hook("self.part")
def self_part(self, event):
self._on_part(event, event["server"].nickname)
@Utils.hook("received.nick")
@Utils.hook("self.nick")
@utils.hook("received.nick")
@utils.hook("self.nick")
def on_nick(self, event):
self.print_line(event, "%s changed nickname to %s" % (
event["old_nickname"], event["new_nickname"]))
@Utils.hook("received.quit")
@utils.hook("received.quit")
def on_quit(self, event):
self.print_line(event, "%s quit%s" % (event["user"].nickname,
"" if not event["reason"] else " (%s)" % event["reason"]))
@ -75,26 +75,26 @@ class Module(ModuleManager.BaseModule):
self.print_line(event, "%s kicked %s from %s%s" % (
event["user"].nickname, nickname, event["channel"].name,
"" if not event["reason"] else " (%s)" % event["reason"]))
@Utils.hook("received.kick")
@utils.hook("received.kick")
def kick(self, event):
self._on_kick(event, event["target_user"].nickname)
@Utils.hook("self.kick")
@utils.hook("self.kick")
def self_kick(self, event):
self._on_kick(event, event["server"].nickname)
def _on_topic(self, event, setter, action, topic, channel):
self.print_line(event, "topic %s by %s: %s" % (action, setter,
topic), channel=channel.name)
@Utils.hook("received.topic")
@utils.hook("received.topic")
def on_topic(self, event):
self._on_topic(event, event["user"].nickname, "changed",
event["topic"], event["channel"])
@Utils.hook("received.numeric.333")
@utils.hook("received.numeric.333")
def on_333(self, event):
self._on_topic(event, event["setter"], "set",
event["channel"].topic, event["channel"])
@Utils.hook("received.mode.channel")
@utils.hook("received.mode.channel")
def mode(self, event):
args = " ".join(event["mode_args"])
if args:

View file

@ -1,5 +1,5 @@
import random
from src import ModuleManager, Utils
from src import ModuleManager, utils
QUOTES = {
"You can build a throne with bayonets, but it's difficult to sit on it." : "Boris Yeltsin",
@ -56,7 +56,7 @@ QUOTES = {
}
class Module(ModuleManager.BaseModule):
@Utils.hook("get.quit-quote")
@utils.hook("get.quit-quote")
def quote(self, event):
quote = random.choice(list(QUOTES.items()))
return (" - " if quote[1] else "").join(quote)

View file

@ -1,5 +1,5 @@
import random, time
from src import ModuleManager, Utils
from src import ModuleManager, utils
class Module(ModuleManager.BaseModule):
def category_and_quote(self, s):
@ -7,7 +7,7 @@ class Module(ModuleManager.BaseModule):
return [part.strip() for part in s.split("=", 1)]
return None, None
@Utils.hook("received.command.quoteadd|qadd", min_args=1)
@utils.hook("received.command.quoteadd|qadd", min_args=1)
def quote_add(self, event):
"""
:help: Add a quote to a category
@ -23,7 +23,7 @@ class Module(ModuleManager.BaseModule):
else:
event["stderr"].write("Please provide a category AND quote")
@Utils.hook("received.command.quoteget|qget", min_args=1)
@utils.hook("received.command.quoteget|qget", min_args=1)
def quote_get(self, event):
"""
:help: Get a quote from a ccategory
@ -46,7 +46,7 @@ class Module(ModuleManager.BaseModule):
event["stderr"].write("Please provide a category and a "
"part of a quote to find")
@Utils.hook("received.command.quotedel|qdel", min_args=1)
@utils.hook("received.command.quotedel|qdel", min_args=1)
def quote_del(self, event):
"""
:help: Delete a quote from a category
@ -71,7 +71,7 @@ class Module(ModuleManager.BaseModule):
event["stderr"].write("Please provide a category and a quote "
"to remove")
@Utils.hook("received.command.quote|q", min_args=1)
@utils.hook("received.command.quote|q", min_args=1)
def quote(self, event):
"""
:help: Get a random quote from a category

View file

@ -1,10 +1,10 @@
import random, uuid
from src import ModuleManager, Utils
from src import ModuleManager, utils
class Module(ModuleManager.BaseModule):
_name = "Random"
@Utils.hook("received.command.random|rand")
@utils.hook("received.command.random|rand")
def random(self, event):
"""
:help: Get a random number
@ -28,7 +28,7 @@ class Module(ModuleManager.BaseModule):
event["stderr"].write(
"Both start and end must be valid integers")
@Utils.hook("received.command.guid")
@utils.hook("received.command.guid")
def guid(self, event):
"""
:help: Get a random guid

View file

@ -1,5 +1,5 @@
import base64
from src import ModuleManager, Utils
from src import ModuleManager, utils
def _validate(self, s):
mechanism = s
@ -7,11 +7,11 @@ def _validate(self, s):
mechanism, arguments = s.split(" ", 1)
return {"mechanism": mechanism, "args": arguments}
@Utils.export("serverset", {"setting": "sasl",
@utils.export("serverset", {"setting": "sasl",
"help": "Set the sasl username/password for this server",
"validate": _validate})
class Module(ModuleManager.BaseModule):
@Utils.hook("received.cap.ls")
@utils.hook("received.cap.ls")
def on_cap(self, event):
has_sasl = "sasl" in event["capabilities"]
our_sasl = event["server"].get_setting("sasl", None)
@ -28,14 +28,14 @@ class Module(ModuleManager.BaseModule):
if do_sasl:
event["server"].queue_capability("sasl")
@Utils.hook("received.cap.ack")
@utils.hook("received.cap.ack")
def on_cap_ack(self, event):
if "sasl" in event["capabilities"]:
sasl = event["server"].get_setting("sasl")
event["server"].send_authenticate(sasl["mechanism"].upper())
event["server"].wait_for_capability("sasl")
@Utils.hook("received.authenticate")
@utils.hook("received.authenticate")
def on_authenticate(self, event):
if event["message"] != "+":
event["server"].send_authenticate("*")
@ -58,9 +58,9 @@ class Module(ModuleManager.BaseModule):
def _end_sasl(self, server):
server.capability_done("sasl")
@Utils.hook("received.numeric.903")
@utils.hook("received.numeric.903")
def sasl_success(self, event):
self._end_sasl(event["server"])
@Utils.hook("received.numeric.904")
@utils.hook("received.numeric.904")
def sasl_failure(self, event):
self._end_sasl(event["server"])

View file

@ -1,6 +1,6 @@
import glob, json, os, subprocess
from src import IRCObject, Utils
from src import IRCObject, utils
class Module(object):
def __init__(self, bot, events, exports):
@ -12,7 +12,7 @@ class Module(object):
def _load_scripts(self):
for filename in glob.glob(os.path.join(self._directory, "*")):
name = os.path.basename(filename)
for hashflag, value in Utils.get_hashflags(filename):
for hashflag, value in utils.get_hashflags(filename):
if hashflag == "name" and value:
name = value
elif hashflag == "hook" and value:
@ -20,7 +20,7 @@ class Module(object):
lambda x: self.call(x, filename, name))
self._hooks.append([value, hook])
@Utils.hook("received.command.reloadscripts", permission="reloadscripts")
@utils.hook("received.command.reloadscripts", permission="reloadscripts")
def reload(self, event):
for event_name, hook in self._hooks:
self.events.on(event_name).unhook(hook)

View file

@ -1,21 +1,21 @@
import re, traceback
from src import ModuleManager, Utils
from src import ModuleManager, utils
REGEX_SPLIT = re.compile("(?<!\\\\)/")
REGEX_SED = re.compile("^s/")
@Utils.export("channelset", {"setting": "sed",
@utils.export("channelset", {"setting": "sed",
"help": "Disable/Enable sed in a channel",
"validate": Utils.bool_or_none})
@Utils.export("channelset", {"setting": "sed-sender-only",
"validate": utils.bool_or_none})
@utils.export("channelset", {"setting": "sed-sender-only",
"help": "Disable/Enable sed only looking at the messages sent by the user",
"validate": Utils.bool_or_none})
"validate": utils.bool_or_none})
class Module(ModuleManager.BaseModule):
@Utils.hook("received.message.channel")
@utils.hook("received.message.channel")
def channel_message(self, event):
sed_split = re.split(REGEX_SPLIT, event["message"], 3)
if event["message"].startswith("s/") and len(sed_split) > 2:
if event["action"] or not Utils.get_closest_setting(
if event["action"] or not utils.get_closest_setting(
event, "sed", False):
return
@ -48,7 +48,7 @@ class Module(ModuleManager.BaseModule):
return
replace = sed_split[2].replace("\\/", "/")
for_user = event["user"].nickname if Utils.get_closest_setting(
for_user = event["user"].nickname if utils.get_closest_setting(
event, "sed-sender-only", False
) else None
line = event["channel"].buffer.find(pattern, from_self=False,

View file

@ -1,13 +1,13 @@
import time
from src import ModuleManager, Utils
from src import ModuleManager, utils
class Module(ModuleManager.BaseModule):
@Utils.hook("received.message.channel")
@utils.hook("received.message.channel")
def channel_message(self, event):
seen_seconds = time.time()
event["user"].set_setting("seen", seen_seconds)
@Utils.hook("received.command.seen", min_args=1)
@utils.hook("received.command.seen", min_args=1)
def seen(self, event):
"""
:help: Find out when a user was last seen
@ -16,7 +16,7 @@ class Module(ModuleManager.BaseModule):
seen_seconds = event["server"].get_user(event["args_split"][0]
).get_setting("seen")
if seen_seconds:
since = Utils.to_pretty_time(time.time()-seen_seconds,
since = utils.to_pretty_time(time.time()-seen_seconds,
max_units=2)
event["stdout"].write("%s was last seen %s ago" % (
event["args_split"][0], since))

View file

@ -1,4 +1,4 @@
from src import ModuleManager, Utils
from src import ModuleManager, utils
class Module(ModuleManager.BaseModule):
def _set(self, settings, event, target):
@ -22,7 +22,7 @@ class Module(ModuleManager.BaseModule):
else:
event["stdout"].write("Available settings: %s" % (
", ".join(settings_dict.keys())))
@Utils.hook("received.command.set")
@utils.hook("received.command.set")
def set(self, event):
"""
:help: Set a specified user setting
@ -30,9 +30,9 @@ class Module(ModuleManager.BaseModule):
"""
self._set(self.exports.get_all("set"), event, event["user"])
@Utils.hook("received.command.channelset", channel_only=True,
@utils.hook("received.command.channelset", channel_only=True,
require_mode="o")
@Utils.hook("received.command.channelsetoverride", channel_only=True,
@utils.hook("received.command.channelsetoverride", channel_only=True,
permission="channelsetoverride")
def channel_set(self, event):
"""
@ -41,7 +41,7 @@ class Module(ModuleManager.BaseModule):
"""
self._set(self.exports.get_all("channelset"), event, event["target"])
@Utils.hook("received.command.serverset")
@utils.hook("received.command.serverset")
def server_set(self, event):
"""
:help: Set a specified server setting for the current server
@ -57,7 +57,7 @@ class Module(ModuleManager.BaseModule):
else:
event["stdout"].write("'%s' has no value set" % setting)
@Utils.hook("received.command.get", min_args=1)
@utils.hook("received.command.get", min_args=1)
def get(self, event):
"""
:help: Get a specified user setting
@ -67,7 +67,7 @@ class Module(ModuleManager.BaseModule):
self._get(event, setting, "", event["user"].get_setting(
setting, None))
@Utils.hook("received.command.channelget", channel_only=True, min_args=1)
@utils.hook("received.command.channelget", channel_only=True, min_args=1)
def channel_get(self, event):
"""
:help: Get a specified channel setting for the current channel
@ -78,7 +78,7 @@ class Module(ModuleManager.BaseModule):
self._get(event, setting, " for %s" % event["target"].name,
event["target"].get_setting(setting, None))
@Utils.hook("received.command.serverget", min_args=1)
@utils.hook("received.command.serverget", min_args=1)
def server_get(self, event):
"""
:help: Get a specified server setting for the current server

View file

@ -1,5 +1,5 @@
import random
from src import ModuleManager, Utils
from src import ModuleManager, utils
INSULT_INTRO = ["Thou art a", "Ye", "Thou", "Thy", "Thee"]
@ -52,7 +52,7 @@ INSULT_PART_3 = ["apple-john", "baggage", "barnacle", "bladder", "boar-pig",
class Module(ModuleManager.BaseModule):
@Utils.hook("received.command.insult")
@utils.hook("received.command.insult")
def dispense_insult(self, event):
insult = [random.choice(INSULT_INTRO), random.choice(INSULT_PART_1),
random.choice(INSULT_PART_2), random.choice(INSULT_PART_3)]
@ -61,7 +61,9 @@ class Module(ModuleManager.BaseModule):
target = ""
if event["args_split"]:
target = Utils.bold(event["server"].get_user(
event["args_split"][0]).nickname) + ", "
target = event["args_split"][0]
if event["server"].has_user(target):
target= event["server"].get_user(target).nickname
target = "%s, " % target
event["stdout"].write(target + insult + "!")

View file

@ -1,7 +1,7 @@
#--require-config soundcloud-api-key
import json, re, time
from src import ModuleManager, Utils
from src import ModuleManager, utils
URL_SOUNDCLOUD_TRACK = "http://api.soundcloud.com/tracks"
URL_SOUNDCLOUD_RESOLVE = "http://api.soundcloud.com/resolve"
@ -10,7 +10,7 @@ REGEX_SOUNDCLOUD = "https?://soundcloud.com/([^/]+)/([^/]+)"
class Module(ModuleManager.BaseModule):
_name = "SoundCloud"
@Utils.hook("received.command.soundcloud|sc")
@utils.hook("received.command.soundcloud|sc")
def soundcloud(self, event):
"""
:help: Search SoundCloud
@ -43,7 +43,7 @@ class Module(ModuleManager.BaseModule):
else:
get_params["url"] = url
page = Utils.get_url(
page = utils.http.get_url(
URL_SOUNDCLOUD_TRACK if has_query else URL_SOUNDCLOUD_RESOLVE,
get_params=get_params, json=True)

View file

@ -1,17 +1,18 @@
import json
from src import ModuleManager, Utils
from src import ModuleManager, utils
URL_SPOTIFY = "https://api.spotify.com/v1/search"
class Module(ModuleManager.BaseModule):
@Utils.hook("received.command.spotify", min_args=1)
@utils.hook("received.command.spotify", min_args=1)
def spotify(self, event):
"""
:help: Search for a track on spotify
:usage: <term>
"""
page = Utils.get_url(URL_SPOTIFY, get_params={"type": "track",
"limit": 1, "q": event["args"]}, json=True)
page = utils.http.get_url(URL_SPOTIFY, get_params=
{"type": "track", "limit": 1, "q": event["args"]},
json=True)
if page:
if len(page["tracks"]["items"]):
item = page["tracks"]["items"][0]

View file

@ -1,17 +1,17 @@
import time
from src import ModuleManager, Utils
from src import ModuleManager, utils
class Module(ModuleManager.BaseModule):
@Utils.hook("received.command.uptime")
@utils.hook("received.command.uptime")
def uptime(self, event):
"""
:help: Show my uptime
"""
seconds = int(time.time()-self.bot.start_time)
event["stdout"].write("Uptime: %s" % Utils.to_pretty_time(
event["stdout"].write("Uptime: %s" % utils.to_pretty_time(
seconds))
@Utils.hook("received.command.stats")
@utils.hook("received.command.stats")
def stats(self, event):
"""
:help: Show my network/channel/user stats

View file

@ -1,8 +1,8 @@
import random
from src import ModuleManager, Utils
from src import ModuleManager, utils
class Module(ModuleManager.BaseModule):
@Utils.hook("received.command.strax")
@utils.hook("received.command.strax")
def strax(self, event):
"""
:help: Suggests a glorious method of battle for the glory of the

View file

@ -1,10 +1,10 @@
from src import ModuleManager, Utils
from src import ModuleManager, utils
@Utils.export("serverset", {"setting": "strip-color",
@utils.export("serverset", {"setting": "strip-color",
"help": "Set whether I strip colors from my messages on this server",
"validate": Utils.bool_or_none})
"validate": utils.bool_or_none})
class Module(ModuleManager.BaseModule):
@Utils.hook("preprocess.send")
@utils.hook("preprocess.send")
def preprocess(self, event):
if event["server"].get_setting("strip-color", False):
return Utils.strip_font(event["line"])
return utils.irc.strip_font(event["line"])

View file

@ -1,5 +1,6 @@
#--ignore
import telegram, telegram.ext
from src import utils
import json
from datetime import datetime
@ -48,7 +49,7 @@ class Module(Thread):
}
self.events.on("telegram.command").on(command).call(**data)
@Utils.hook("signal.interrupt")
@utils.hook("signal.interrupt")
def sigint(self, event):
self.updater.stop()

View file

@ -1,5 +1,5 @@
import collections, datetime, re
from src import ModuleManager, Utils
from src import ModuleManager, utils
URL_BUS = "https://api.tfl.gov.uk/StopPoint/%s/Arrivals"
URL_BUS_SEARCH = "https://api.tfl.gov.uk/StopPoint/Search/%s"
@ -46,7 +46,7 @@ class Module(ModuleManager.BaseModule):
platform = m.group(2)
return platform
@Utils.hook("received.command.tflbus", min_args=1)
@utils.hook("received.command.tflbus", min_args=1)
def bus(self, event):
"""
:help: Get bus due times for a TfL bus stop
@ -63,21 +63,24 @@ class Module(ModuleManager.BaseModule):
real_stop_id = ""
stop_name = ""
if stop_id.isdigit():
bus_search = Utils.get_url(URL_BUS_SEARCH % stop_id, get_params={
"app_id": app_id, "app_key": app_key}, json=True)
bus_search = utils.http.get_url(URL_BUS_SEARCH % stop_id,
get_params={"app_id": app_id, "app_key": app_key},
json=True)
bus_stop = bus_search["matches"][0]
real_stop_id = bus_stop["id"]
stop_name = bus_stop["name"]
else:
bus_stop = Utils.get_url(URL_STOP % stop_id, get_params={
"app_id": app_id, "app_key": app_key}, json=True)
bus_stop = utils.http.get_url(URL_STOP % stop_id,
get_params={"app_id": app_id, "app_key": app_key},
json=True)
if bus_stop:
real_stop_id = stop_id
stop_name = bus_stop["commonName"]
if real_stop_id:
bus_stop = Utils.get_url(URL_BUS % real_stop_id, get_params={
"app_id": app_id, "app_key": app_key}, json=True)
bus_stop = utils.http.get_url(URL_BUS % real_stop_id,
get_params={"app_id": app_id, "app_key": app_key},
json=True)
busses = []
for bus in bus_stop:
bus_number = bus["lineName"]
@ -123,7 +126,7 @@ class Module(ModuleManager.BaseModule):
else:
event["stderr"].write("Bus ID '%s' unknown" % stop_id)
@Utils.hook("received.command.tflline")
@utils.hook("received.command.tflline")
def line(self, event):
"""
:help: Get line status for TfL underground lines
@ -132,7 +135,7 @@ class Module(ModuleManager.BaseModule):
app_id = self.bot.config["tfl-api-id"]
app_key = self.bot.config["tfl-api-key"]
lines = Utils.get_url(URL_LINE, get_params={
lines = utils.http.get_url(URL_LINE, get_params={
"app_id": app_id, "app_key": app_key}, json=True)
statuses = []
for line in lines:
@ -167,7 +170,7 @@ class Module(ModuleManager.BaseModule):
else:
event["stderr"].write("No results")
@Utils.hook("received.command.tflsearch", min_args=1)
@utils.hook("received.command.tflsearch", min_args=1)
def search(self, event):
"""
:help: Get a list of TfL stop IDs for a given name
@ -179,7 +182,7 @@ class Module(ModuleManager.BaseModule):
#As awful as this is, it also makes it ~work~.
stop_name = event["args"].replace(" ", "%20")
stop_search = Utils.get_url(URL_STOP_SEARCH % stop_name, get_params={
stop_search = utils.http.get_url(URL_STOP_SEARCH % stop_name, get_params={
"app_id": app_id, "app_key": app_key, "maxResults": "6", "faresOnly": "False"}, json=True)
if stop_search:
for stop in stop_search["matches"]:
@ -189,7 +192,7 @@ class Module(ModuleManager.BaseModule):
else:
event["stderr"].write("No results")
@Utils.hook("received.command.tflvehicle", min_args=1)
@utils.hook("received.command.tflvehicle", min_args=1)
def vehicle(self, event):
"""
:help: Get information for a given vehicle
@ -200,7 +203,7 @@ class Module(ModuleManager.BaseModule):
vehicle_id = event["args_split"][0]
vehicle = Utils.get_url(URL_VEHICLE % vehicle_id, get_params={
vehicle = utils.http.get_url(URL_VEHICLE % vehicle_id, get_params={
"app_id": app_id, "app_key": app_key}, json=True)[0]
arrival_time = self.vehicle_span(vehicle["expectedArrival"], human=False)
@ -210,7 +213,7 @@ class Module(ModuleManager.BaseModule):
vehicle["vehicleId"], vehicle["lineName"], vehicle["destinationName"], vehicle["currentLocation"],
vehicle["stationName"], vehicle["naptanId"], arrival_time, platform))
@Utils.hook("received.command.tflservice", min_args=1)
@utils.hook("received.command.tflservice", min_args=1)
def service(self, event):
"""
:help: Get service information and arrival estimates
@ -230,8 +233,8 @@ class Module(ModuleManager.BaseModule):
event["stdout"].write("%s is too high. Remember that the first arrival is 0" % service_id)
return
service = results[int(service_id)]
arrivals = Utils.get_url(URL_LINE_ARRIVALS % service["route"], get_params={
"app_id": app_id, "app_key": app_key}, json=True)
arrivals = utils.http.get_url(URL_LINE_ARRIVALS % service["route"],
get_params={"app_id": app_id, "app_key": app_key}, json=True)
arrivals = [a for a in arrivals if a["vehicleId"] == service["id"]]
arrivals = sorted(arrivals, key=lambda b: b["timeToStation"])
@ -243,7 +246,7 @@ class Module(ModuleManager.BaseModule):
a["expectedArrival"][11:16]
) for a in arrivals]))
@Utils.hook("received.command.tflstop", min_args=1)
@utils.hook("received.command.tflstop", min_args=1)
def stop(self, event):
"""
:help: Get information for a given stop
@ -254,7 +257,7 @@ class Module(ModuleManager.BaseModule):
stop_id = event["args_split"][0]
stop = Utils.get_url(URL_STOP % stop_id, get_params={
stop = utils.http.get_url(URL_STOP % stop_id, get_params={
"app_id": app_id, "app_key": app_key}, json=True)
def route(self, event):
@ -263,7 +266,7 @@ class Module(ModuleManager.BaseModule):
route_id = event["args_split"][0]
route = Utils.get_url(URL_ROUTE % route_id, get_params={
route = utils.http.get_url(URL_ROUTE % route_id, get_params={
"app_id": app_id, "app_key": app_key}, json=True)
event["stdout"].write("")

View file

@ -1,18 +1,18 @@
#--require-config bighugethesaurus-api-key
from src import ModuleManager, Utils
from src import ModuleManager, utils
URL_THESAURUS = "http://words.bighugelabs.com/api/2/%s/%s/json"
class Module(ModuleManager.BaseModule):
@Utils.hook("received.command.synonym|antonym", min_args=1)
@utils.hook("received.command.synonym|antonym", min_args=1)
def thesaurus(self, event):
"""
:help: Get synonyms/antonyms for a provided phrase
:usage: <word> [type]
"""
phrase = event["args_split"][0]
page = Utils.get_url(URL_THESAURUS % (self.bot.config[
page = utils.http.get_url(URL_THESAURUS % (self.bot.config[
"bighugethesaurus-api-key"], phrase), json=True)
syn_ant = event["command"][:3]
if page:

View file

@ -1,10 +1,10 @@
import re
from src import ModuleManager, Utils
from src import ModuleManager, utils
REGEX_URL = re.compile("https?://\S+", re.I)
class Module(ModuleManager.BaseModule):
@Utils.hook("received.command.title|t", usage="[URL]")
@utils.hook("received.command.title|t", usage="[URL]")
def title(self, event):
"""
:help: Get the title of a URL
@ -20,7 +20,7 @@ class Module(ModuleManager.BaseModule):
if not url:
event["stderr"].write("No URL provided/found.")
return
soup = Utils.get_url(url, soup=True)
soup = utils.http.get_url(url, soup=True)
if not soup:
event["stderr"].write("Failed to get URL.")
return

View file

@ -1,7 +1,7 @@
from src import EventManager, ModuleManager, Utils
from src import EventManager, ModuleManager, utils
class Module(ModuleManager.BaseModule):
@Utils.hook("received.message.channel", priority=EventManager.PRIORITY_HIGH)
@utils.hook("received.message.channel", priority=EventManager.PRIORITY_HIGH)
def channel_message(self, event):
messages = event["channel"].get_user_setting(event["user"].get_id(),
"to", [])
@ -11,7 +11,7 @@ class Module(ModuleManager.BaseModule):
if messages:
event["channel"].del_user_setting(event["user"].get_id(), "to")
@Utils.hook("received.command.to", min_args=2, channel_only=True)
@utils.hook("received.command.to", min_args=2, channel_only=True)
def to(self, event):
"""
:help: Relay a message to a user the next time they talk in this

View file

@ -1,7 +1,7 @@
from src import ModuleManager, Utils
from src import ModuleManager, utils
class Module(ModuleManager.BaseModule):
@Utils.hook("received.command.todo")
@utils.hook("received.command.todo")
def todo(self, event):
"""
:help: Find out what's in your todo list
@ -21,7 +21,7 @@ class Module(ModuleManager.BaseModule):
todo_count = len(todo)
event["stdout"].write("There are %d items in your todo" % todo_count)
@Utils.hook("received.command.todoadd", min_args=1)
@utils.hook("received.command.todoadd", min_args=1)
def todo_add(self, event):
"""
:help: Add something to your todo list
@ -38,7 +38,7 @@ class Module(ModuleManager.BaseModule):
event["user"].set_setting("todo", todo)
event["stdout"].write("Saved")
@Utils.hook("received.command.tododel", min_args=1)
@utils.hook("received.command.tododel", min_args=1)
def todo_del(self, event):
"""
:help: Remove something from your todo list

View file

@ -1,13 +1,13 @@
#--require-config trakt-api-key
from src import ModuleManager, Utils
from src import ModuleManager, utils
URL_TRAKT = "https://api-v2launch.trakt.tv/users/%s/watching"
URL_TRAKTSLUG = "https://trakt.tv/%s/%s"
@Utils.export("set", {"setting": "trakt", "help": "Set username on trakt.tv"})
@utils.export("set", {"setting": "trakt", "help": "Set username on trakt.tv"})
class Module(ModuleManager.BaseModule):
@Utils.hook("received.command.nowwatching|nw")
@utils.hook("received.command.nowwatching|nw")
def now_watching(self, event):
"""
:help: Get what you or another user is now watching on trakt.tv
@ -18,7 +18,7 @@ class Module(ModuleManager.BaseModule):
else:
username = event["user"].get_setting("trakt",
event["user"].nickname)
page = Utils.get_url(URL_TRAKT % username, headers={
page = utils.http.get_url(URL_TRAKT % username, headers={
"Content-Type": "application/json",
"trakt-api-version": "2", "trakt-api-key":
self.bot.config["trakt-api-key"]}, json=True,

View file

@ -1,12 +1,12 @@
import json, re
from src import ModuleManager, Utils
from src import ModuleManager, utils
URL_TRANSLATE = "http://translate.googleapis.com/translate_a/single"
URL_LANGUAGES = "https://cloud.google.com/translate/docs/languages"
REGEX_LANGUAGES = re.compile("(\w+)?:(\w+)? ")
class Module(ModuleManager.BaseModule):
@Utils.hook("received.command.translate|tr")
@utils.hook("received.command.translate|tr")
def translate(self, event):
"""
:help: Translate the provided phrase or the last line in thie current
@ -32,7 +32,7 @@ class Module(ModuleManager.BaseModule):
target_language = language_match.group(2)
phrase = phrase.split(" ", 1)[1]
data = Utils.get_url(URL_TRANSLATE, get_params={
data = utils.http.get_url(URL_TRANSLATE, get_params={
"client": "gtx", "sl": source_language,
"tl": target_language, "dt": "t", "q": phrase})

View file

@ -5,7 +5,7 @@
import datetime, re, time, traceback
import twitter
from src import ModuleManager, Utils
from src import ModuleManager, utils
REGEX_TWITTERURL = re.compile(
"https?://(?:www\.)?twitter.com/[^/]+/status/(\d+)", re.I)
@ -16,10 +16,10 @@ class Module(ModuleManager.BaseModule):
def make_timestamp(self, s):
seconds_since = time.time() - datetime.datetime.strptime(s,
"%a %b %d %H:%M:%S %z %Y").timestamp()
since, unit = Utils.time_unit(seconds_since)
since, unit = utils.time_unit(seconds_since)
return "%s %s ago" % (since, unit)
@Utils.hook("received.command.tweet|tw")
@utils.hook("received.command.tweet|tw")
def tweet(self, event):
"""
:help: Get/find a tweet

View file

@ -1,11 +1,11 @@
from src import ModuleManager, Utils
from src import ModuleManager, utils
UPCITEMDB_URL = "https://api.upcitemdb.com/prod/trial/lookup"
class Module(ModuleManager.BaseModule):
_name = "UPC"
@Utils.hook("received.command.upc|ean|gtin", min_args=1)
@utils.hook("received.command.upc|ean|gtin", min_args=1)
def upc(self, event):
"""
:help: Look up a product by UPC, EAN or GTIN
@ -16,7 +16,7 @@ class Module(ModuleManager.BaseModule):
event["stderr"].write("Invalid UPC/EAN provided")
return
page = Utils.get_url(UPCITEMDB_URL,
page = utils.http.get_url(UPCITEMDB_URL,
get_params={"upc": event["args_split"][0]},
json=True)
if page:

View file

@ -1,11 +1,11 @@
import json, re
from src import ModuleManager, Utils
from src import ModuleManager, utils
URL_URBANDICTIONARY = "http://api.urbandictionary.com/v0/define"
REGEX_DEFNUMBER = re.compile("-n(\d+) \S+")
class Module(ModuleManager.BaseModule):
@Utils.hook("received.command.urbandictionary|ud", min_args=1)
@utils.hook("received.command.urbandictionary|ud", min_args=1)
def ud(self, event):
"""
:help: Get the definition of a provided term from Urban Dictionary
@ -17,8 +17,8 @@ class Module(ModuleManager.BaseModule):
if match:
number = int(match.group(1))
term = term.split(" ", 1)[1]
page = Utils.get_url(URL_URBANDICTIONARY, get_params={"term": term},
json=True)
page = utils.http.get_url(URL_URBANDICTIONARY,
get_params={"term": term}, json=True)
if page:
if len(page["list"]):
if number > 0 and len(page["list"]) > number-1:

View file

@ -1,18 +1,18 @@
#--require-config openweathermap-api-key
from src import ModuleManager, Utils
from src import ModuleManager, utils
URL_WEATHER = "http://api.openweathermap.org/data/2.5/weather"
class Module(ModuleManager.BaseModule):
@Utils.hook("received.command.weather", min_args=1, usage="<location>")
@utils.hook("received.command.weather", min_args=1, usage="<location>")
def weather(self, event):
"""
:help: Get current weather data for a provided location
:usage: <location>
"""
api_key = self.bot.config["openweathermap-api-key"]
page = Utils.get_url(URL_WEATHER, get_params={
page = utils.http.get_url(URL_WEATHER, get_params={
"q": event["args"], "units": "metric",
"APPID": api_key},
json=True)

View file

@ -1,15 +1,15 @@
from src import ModuleManager, Utils
from src import ModuleManager, utils
URL_WIKIPEDIA = "https://en.wikipedia.org/w/api.php"
class Module(ModuleManager.BaseModule):
@Utils.hook("received.command.wiki|wi", min_args=1)
@utils.hook("received.command.wiki|wi", min_args=1)
def wikipedia(self, event):
"""
:help: Get information from wikipedia
:usage: <term>
"""
page = Utils.get_url(URL_WIKIPEDIA, get_params={
page = utils.http.get_url(URL_WIKIPEDIA, get_params={
"action": "query", "prop": "extracts",
"titles": event["args"], "exintro": "",
"explaintext": "", "exchars": "500",

View file

@ -1,19 +1,20 @@
#--require-config wolframalpha-api-key
import json
from src import ModuleManager, Utils
from src import ModuleManager, utils
URL_WA = "https://api.wolframalpha.com/v1/result"
class Module(ModuleManager.BaseModule):
_name = "Wolfram|Alpha"
@Utils.hook("received.command.wolframalpha|wa", min_args=1)
@utils.hook("received.command.wolframalpha|wa", min_args=1)
def wa(self, event):
"""
:help: Evauate a given string on Wolfram|Alpha
:usage: <query>
"""
code, result = Utils.get_url(URL_WA, get_params={"i": event["args"],
code, result = utils.http.get_url(URL_WA,
get_params={"i": event["args"],
"appid": self.bot.config["wolframalpha-api-key"],
"reinterpret": "true", "units": "metric"}, code=True)

View file

@ -1,5 +1,5 @@
import time
from src import EventManager, ModuleManager, Utils
from src import EventManager, ModuleManager, utils
class Module(ModuleManager.BaseModule):
def _channel_message(self, user, event):
@ -23,17 +23,17 @@ class Module(ModuleManager.BaseModule):
word_count = user.get_setting(setting, 0)
word_count += 1
user.set_setting(setting, word_count)
@Utils.hook("received.message.channel",
@utils.hook("received.message.channel",
priority=EventManager.PRIORITY_MONITOR)
def channel_message(self, event):
self._channel_message(event["user"], event)
@Utils.hook("self.message.channel",
@utils.hook("self.message.channel",
priority=EventManager.PRIORITY_MONITOR)
def self_channel_message(self, event):
self._channel_message(event["server"].get_user(
event["server"].nickname), event)
@Utils.hook("received.command.words", channel_only=True)
@utils.hook("received.command.words", channel_only=True)
def words(self, event):
"""
:help: See how many words you or the given nickname have used
@ -54,7 +54,7 @@ class Module(ModuleManager.BaseModule):
event["stdout"].write("%s has used %d words (%d in %s)" % (
target.nickname, total, this_channel, event["target"].name))
@Utils.hook("received.command.trackword", min_args=1)
@utils.hook("received.command.trackword", min_args=1)
def track_word(self, event):
"""
:help: Start tracking a word
@ -70,7 +70,7 @@ class Module(ModuleManager.BaseModule):
else:
event["stderr"].write("Already tracking '%s'" % word)
@Utils.hook("received.command.wordusers", min_args=1)
@utils.hook("received.command.wordusers", min_args=1)
def word_users(self, event):
"""
:help: Show who has used a tracked word the most
@ -85,7 +85,7 @@ class Module(ModuleManager.BaseModule):
top_10 = sorted(word_users.keys())
top_10 = sorted(top_10, key=word_users.get, reverse=True)[:10]
top_10 = ", ".join("%s (%d)" % (Utils.prevent_highlight(event[
top_10 = ", ".join("%s (%d)" % (utils.prevent_highlight(event[
"server"].get_user(nickname).nickname), word_users[nickname]
) for nickname in top_10)
event["stdout"].write("Top '%s' users: %s" % (word, top_10))

View file

@ -1,7 +1,7 @@
#--require-config google-api-key
import re
from src import ModuleManager, Utils
from src import ModuleManager, utils
REGEX_YOUTUBE = re.compile(
"https?://(?:www.)?(?:youtu.be/|youtube.com/watch\?[\S]*v=)([\w\-]{11})",
@ -16,12 +16,12 @@ URL_YOUTUBESHORT = "https://youtu.be/%s"
ARROW_UP = ""
ARROW_DOWN = ""
@Utils.export("channelset", {"setting": "auto-youtube",
@utils.export("channelset", {"setting": "auto-youtube",
"help": "Disable/Enable automatically getting info from youtube URLs",
"validate": Utils.bool_or_none})
"validate": utils.bool_or_none})
class Module(ModuleManager.BaseModule):
def get_video_page(self, video_id, part):
return Utils.get_url(URL_YOUTUBEVIDEO, get_params={"part": part,
return utils.http.get_url(URL_YOUTUBEVIDEO, get_params={"part": part,
"id": video_id, "key": self.bot.config["google-api-key"]},
json=True)
def video_details(self, video_id):
@ -55,12 +55,12 @@ class Module(ModuleManager.BaseModule):
video_title, video_duration, video_uploader, "{:,}".format(
int(video_views)), video_opinions, URL_YOUTUBESHORT % video_id)
@Utils.hook("get.searchyoutube")
@utils.hook("get.searchyoutube")
def search_video(self, event):
search = event["query"]
video_id = ""
search_page = Utils.get_url(URL_YOUTUBESEARCH,
search_page = utils.http.get_url(URL_YOUTUBESEARCH,
get_params={"q": search, "part": "snippet",
"maxResults": "1", "type": "video",
"key": self.bot.config["google-api-key"]},
@ -71,7 +71,7 @@ class Module(ModuleManager.BaseModule):
video_id = search_page["items"][0]["id"]["videoId"]
return "https://youtu.be/%s" % video_id
@Utils.hook("received.command.yt|youtube")
@utils.hook("received.command.yt|youtube")
def yt(self, event):
"""
:help: Find a video on youtube
@ -87,7 +87,7 @@ class Module(ModuleManager.BaseModule):
video_id = re.search(REGEX_YOUTUBE, last_youtube.message).group(1)
if search or video_id:
if not video_id:
search_page = Utils.get_url(URL_YOUTUBESEARCH,
search_page = utils.http.get_url(URL_YOUTUBESEARCH,
get_params={"q": search, "part": "snippet",
"maxResults": "1", "type": "video",
"key": self.bot.config["google-api-key"]},
@ -106,7 +106,7 @@ class Module(ModuleManager.BaseModule):
else:
event["stderr"].write("No search phrase provided")
@Utils.hook("received.message.channel")
@utils.hook("received.message.channel")
def channel_message(self, event):
match = re.search(REGEX_YOUTUBE, event["message"])
if match and event["channel"].get_setting("auto-youtube", False):

View file

@ -1,5 +1,5 @@
import itertools, time, traceback
from src import Utils
from src import utils
PRIORITY_URGENT = 0
PRIORITY_HIGH = 1
@ -30,7 +30,7 @@ class EventCallback(object):
self.function = function
self.priority = priority
self.kwargs = kwargs
self.docstring = Utils.parse_docstring(function.__doc__)
self.docstring = utils.parse_docstring(function.__doc__)
def call(self, event):
return self.function(event)

View file

@ -1,5 +1,5 @@
import re
from . import Utils
from src import utils
class BufferLine(object):
def __init__(self, sender, message, action, tags, from_self, method):
@ -39,7 +39,7 @@ class Buffer(object):
def find(self, pattern, **kwargs):
from_self = kwargs.get("from_self", True)
for_user = kwargs.get("for_user", "")
for_user = Utils.irc_lower(self.server, for_user
for_user = utils.irc.lower(self.server, for_user
) if for_user else None
not_pattern = kwargs.get("not_pattern", None)
for line in self.lines:
@ -48,7 +48,7 @@ class Buffer(object):
elif re.search(pattern, line.message):
if not_pattern and re.search(not_pattern, line.message):
continue
if for_user and not Utils.irc_lower(self.server, line.sender
if for_user and not utils.irc.lower(self.server, line.sender
) == for_user:
continue
return line

View file

@ -1,9 +1,9 @@
import uuid
from . import IRCBuffer, IRCObject, Utils
from src import IRCBuffer, IRCObject, utils
class Channel(IRCObject.Object):
def __init__(self, name, id, server, bot):
self.name = Utils.irc_lower(server, name)
self.name = utils.irc.lower(server, name)
self.id = id
self.server = server
self.bot = bot

View file

@ -1,5 +1,5 @@
import collections, socket, ssl, sys, time
from . import IRCChannel, IRCObject, IRCUser, Utils
from src import IRCChannel, IRCObject, IRCUser, utils
THROTTLE_LINES = 4
THROTTLE_SECONDS = 1
@ -142,9 +142,9 @@ class Server(IRCObject.Object):
def set_own_nickname(self, nickname):
self.nickname = nickname
self.nickname_lower = Utils.irc_lower(self, nickname)
self.nickname_lower = utils.irc.lower(self, nickname)
def is_own_nickname(self, nickname):
return Utils.irc_equals(self, nickname, self.nickname)
return utils.irc.equals(self, nickname, self.nickname)
def add_own_mode(self, mode, arg=None):
self.own_modes[mode] = arg
@ -157,7 +157,7 @@ class Server(IRCObject.Object):
self.add_own_mode(mode, arg)
def has_user(self, nickname):
return Utils.irc_lower(self, nickname) in self.users
return utils.irc.lower(self, nickname) in self.users
def get_user(self, nickname, create=True):
if not self.has_user(nickname) and create:
user_id = self.get_user_id(nickname)
@ -165,7 +165,7 @@ class Server(IRCObject.Object):
self.events.on("new.user").call(user=new_user, server=self)
self.users[new_user.nickname_lower] = new_user
self.new_users.add(new_user)
return self.users.get(Utils.irc_lower(self, nickname), None)
return self.users.get(utils.irc.lower(self, nickname), None)
def get_user_id(self, nickname):
self.bot.database.users.add(self.id, nickname)
return self.bot.database.users.get_id(self.id, nickname)
@ -175,11 +175,11 @@ class Server(IRCObject.Object):
channel.remove_user(user)
def change_user_nickname(self, old_nickname, new_nickname):
user = self.users.pop(Utils.irc_lower(self, old_nickname))
user = self.users.pop(utils.irc.lower(self, old_nickname))
user._id = self.get_user_id(new_nickname)
self.users[Utils.irc_lower(self, new_nickname)] = user
self.users[utils.irc.lower(self, new_nickname)] = user
def has_channel(self, channel_name):
return channel_name[0] in self.channel_types and Utils.irc_lower(
return channel_name[0] in self.channel_types and utils.irc.lower(
self, channel_name) in self.channels
def get_channel(self, channel_name):
if not self.has_channel(channel_name):
@ -189,7 +189,7 @@ class Server(IRCObject.Object):
self.events.on("new.channel").call(channel=new_channel,
server=self)
self.channels[new_channel.name] = new_channel
return self.channels[Utils.irc_lower(self, channel_name)]
return self.channels[utils.irc.lower(self, channel_name)]
def get_channel_id(self, channel_name):
self.bot.database.channels.add(self.id, channel_name)
return self.bot.database.channels.get_id(self.id, channel_name)

View file

@ -1,5 +1,5 @@
import uuid
from . import IRCBuffer, IRCObject, Utils
from src import IRCBuffer, IRCObject, utils
class User(IRCObject.Object):
def __init__(self, nickname, id, server, bot):
@ -33,7 +33,7 @@ class User(IRCObject.Object):
def set_nickname(self, nickname):
self.nickname = nickname
self.nickname_lower = Utils.irc_lower(self.server, nickname)
self.nickname_lower = utils.irc.lower(self.server, nickname)
self.name = self.nickname_lower
def join_channel(self, channel):
self.channels.add(channel)

View file

@ -1,5 +1,5 @@
import gc, glob, imp, io, inspect, os, sys, uuid
from src import Utils
from . import utils
BITBOT_HOOKS_MAGIC = "__bitbot_hooks"
BITBOT_EXPORTS_MAGIC = "__bitbot_exports"
@ -54,7 +54,7 @@ class ModuleManager(object):
def _load_module(self, bot, name):
path = self._module_path(name)
for hashflag, value in Utils.get_hashflags(path):
for hashflag, value in utils.get_hashflags(path):
if hashflag == "ignore":
# nope, ignore this module.
raise ModuleNotLoadedWarning("module ignored")
@ -92,7 +92,7 @@ class ModuleManager(object):
attribute = getattr(module_object, attribute_name)
for hook in self._get_magic(attribute, BITBOT_HOOKS_MAGIC, []):
context_events.on(hook["event"]).hook(attribute,
docstring=attribute.__doc__, **hook["kwargs"])
**hook["kwargs"])
for export in self._get_magic(module_object, BITBOT_EXPORTS_MAGIC, []):
context_exports.add(export["setting"], export["value"])

View file

@ -1,378 +0,0 @@
import io, json, re, traceback, urllib.request, urllib.parse, urllib.error
import ssl, string
import bs4
from . import ModuleManager
USER_AGENT = ("Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/49.0.2623.87 Safari/537.36")
REGEX_HTTP = re.compile("https?://", re.I)
ASCII_UPPER = string.ascii_uppercase
ASCII_LOWER = string.ascii_lowercase
STRICT_RFC1459_UPPER = ASCII_UPPER+r'\[]'
STRICT_RFC1459_LOWER = ASCII_LOWER+r'|{}'
RFC1459_UPPER = STRICT_RFC1459_UPPER+"^"
RFC1459_LOWER = STRICT_RFC1459_LOWER+"~"
def remove_colon(s):
if s.startswith(":"):
s = s[1:]
return s
def arbitrary(s, n):
return remove_colon(" ".join(s[n:]))
# case mapping lowercase/uppcase logic
def _multi_replace(s, chars1, chars2):
for char1, char2 in zip(chars1, chars2):
s = s.replace(char1, char2)
return s
def irc_lower(server, s):
if server.case_mapping == "ascii":
return _multi_replace(s, ASCII_UPPER, ASCII_LOWER)
elif server.case_mapping == "rfc1459":
return _multi_replace(s, RFC1459_UPPER, RFC1459_LOWER)
elif server.case_mapping == "strict-rfc1459":
return _multi_replace(s, STRICT_RFC1459_UPPER, STRICT_RFC1459_LOWER)
else:
raise ValueError("unknown casemapping '%s'" % server.case_mapping)
# compare a string while respecting case mapping
def irc_equals(server, s1, s2):
return irc_lower(server, s1) == irc_lower(server, s2)
class IRCHostmask(object):
def __init__(self, nickname, username, hostname, hostmask):
self.nickname = nickname
self.username = username
self.hostname = hostname
self.hostmask = hostmask
def __repr__(self):
return "Utils.IRCHostmask(%s)" % self.__str__()
def __str__(self):
return self.hostmask
def seperate_hostmask(hostmask):
hostmask = remove_colon(hostmask)
nickname, _, username = hostmask.partition("!")
username, _, hostname = username.partition("@")
return IRCHostmask(nickname, username, hostname, hostmask)
def get_url(url, **kwargs):
if not urllib.parse.urlparse(url).scheme:
url = "http://%s" % url
url_parsed = urllib.parse.urlparse(url)
method = kwargs.get("method", "GET")
get_params = kwargs.get("get_params", "")
post_params = kwargs.get("post_params", None)
headers = kwargs.get("headers", {})
if get_params:
get_params = "?%s" % urllib.parse.urlencode(get_params)
if post_params:
post_params = urllib.parse.urlencode(post_params).encode("utf8")
url = "%s%s" % (url, get_params)
try:
url.encode("latin-1")
except UnicodeEncodeError:
if kwargs.get("code"):
return 0, False
return False
request = urllib.request.Request(url, post_params)
request.add_header("Accept-Language", "en-US")
request.add_header("User-Agent", USER_AGENT)
for header, value in headers.items():
request.add_header(header, value)
request.method = method
try:
response = urllib.request.urlopen(request, timeout=5)
except urllib.error.HTTPError as e:
traceback.print_exc()
if kwargs.get("code"):
return e.code, False
return False
except urllib.error.URLError as e:
traceback.print_exc()
if kwargs.get("code"):
return -1, False
return False
except ssl.CertificateError as e:
traceback.print_exc()
if kwargs.get("code"):
return -1, False,
return False
response_content = response.read()
encoding = response.info().get_content_charset()
if kwargs.get("soup"):
return bs4.BeautifulSoup(response_content, kwargs.get("parser", "lxml"))
if not encoding:
soup = bs4.BeautifulSoup(response_content, kwargs.get("parser", "lxml"))
metas = soup.find_all("meta")
for meta in metas:
if "charset=" in meta.get("content", ""):
encoding = meta.get("content").split("charset=", 1)[1
].split(";", 1)[0]
elif meta.get("charset", ""):
encoding = meta.get("charset")
else:
continue
break
if not encoding:
for item in soup.contents:
if isinstance(item, bs4.Doctype):
if item == "html":
encoding = "utf8"
else:
encoding = "latin-1"
break
response_content = response_content.decode(encoding or "utf8")
data = response_content
if kwargs.get("json") and data:
try:
data = json.loads(response_content)
except json.decoder.JSONDecodeError:
traceback.print_exc()
return False
if kwargs.get("code"):
return response.code, data
else:
return data
COLOR_WHITE, COLOR_BLACK, COLOR_BLUE, COLOR_GREEN = 0, 1, 2, 3
COLOR_RED, COLOR_BROWN, COLOR_PURPLE, COLOR_ORANGE = 4, 5, 6, 7
COLOR_YELLOW, COLOR_LIGHTGREEN, COLOR_CYAN, COLOR_LIGHTCYAN = (8, 9,
10, 11)
COLOR_LIGHTBLUE, COLOR_PINK, COLOR_GREY, COLOR_LIGHTGREY = (12, 13,
14, 15)
FONT_BOLD, FONT_ITALIC, FONT_UNDERLINE, FONT_INVERT = ("\x02", "\x1D",
"\x1F", "\x16")
FONT_COLOR, FONT_RESET = "\x03", "\x0F"
REGEX_COLOR = re.compile("%s\d\d(?:,\d\d)?" % FONT_COLOR)
def color(s, foreground, background=None):
foreground = str(foreground).zfill(2)
if background:
background = str(background).zfill(2)
return "%s%s%s%s%s" % (FONT_COLOR, foreground,
"" if not background else ",%s" % background, s, FONT_COLOR)
def bold(s):
return "%s%s%s" % (FONT_BOLD, s, FONT_BOLD)
def underline(s):
return "%s%s%s" % (FONT_UNDERLINE, s, FONT_UNDERLINE)
def strip_font(s):
s = s.replace(FONT_BOLD, "")
s = s.replace(FONT_ITALIC, "")
s = REGEX_COLOR.sub("", s)
s = s.replace(FONT_COLOR, "")
return s
TIME_SECOND = 1
TIME_MINUTE = TIME_SECOND*60
TIME_HOUR = TIME_MINUTE*60
TIME_DAY = TIME_HOUR*24
TIME_WEEK = TIME_DAY*7
def time_unit(seconds):
since = None
unit = None
if seconds >= TIME_WEEK:
since = seconds/TIME_WEEK
unit = "week"
elif seconds >= TIME_DAY:
since = seconds/TIME_DAY
unit = "day"
elif seconds >= TIME_HOUR:
since = seconds/TIME_HOUR
unit = "hour"
elif seconds >= TIME_MINUTE:
since = seconds/TIME_MINUTE
unit = "minute"
else:
since = seconds
unit = "second"
since = int(since)
if since > 1:
unit = "%ss" % unit # pluralise the unit
return [since, unit]
REGEX_PRETTYTIME = re.compile("\d+[wdhms]", re.I)
SECONDS_MINUTES = 60
SECONDS_HOURS = SECONDS_MINUTES*60
SECONDS_DAYS = SECONDS_HOURS*24
SECONDS_WEEKS = SECONDS_DAYS*7
def from_pretty_time(pretty_time):
seconds = 0
for match in re.findall(REGEX_PRETTYTIME, pretty_time):
number, unit = int(match[:-1]), match[-1].lower()
if unit == "m":
number = number*SECONDS_MINUTES
elif unit == "h":
number = number*SECONDS_HOURS
elif unit == "d":
number = number*SECONDS_DAYS
elif unit == "w":
number = number*SECONDS_WEEKS
seconds += number
if seconds > 0:
return seconds
UNIT_SECOND = 5
UNIT_MINUTE = 4
UNIT_HOUR = 3
UNIT_DAY = 2
UNIT_WEEK = 1
def to_pretty_time(total_seconds, minimum_unit=UNIT_SECOND, max_units=6):
minutes, seconds = divmod(total_seconds, 60)
hours, minutes = divmod(minutes, 60)
days, hours = divmod(hours, 24)
weeks, days = divmod(days, 7)
out = ""
units = 0
if weeks and minimum_unit >= UNIT_WEEK and units < max_units:
out += "%dw" % weeks
units += 1
if days and minimum_unit >= UNIT_DAY and units < max_units:
out += "%dd" % days
units += 1
if hours and minimum_unit >= UNIT_HOUR and units < max_units:
out += "%dh" % hours
units += 1
if minutes and minimum_unit >= UNIT_MINUTE and units < max_units:
out += "%dm" % minutes
units += 1
if seconds and minimum_unit >= UNIT_SECOND and units < max_units:
out += "%ds" % seconds
units += 1
return out
IS_TRUE = ["true", "yes", "on", "y"]
IS_FALSE = ["false", "no", "off", "n"]
def bool_or_none(s):
s = s.lower()
if s in IS_TRUE:
return True
elif s in IS_FALSE:
return False
def int_or_none(s):
stripped_s = s.lstrip("0")
if stripped_s.isdigit():
return int(stripped_s)
def get_closest_setting(event, setting, default=None):
server = event["server"]
if "channel" in event:
closest = event["channel"]
elif "target" in event and "is_channel" in event and event["is_channel"]:
closest = event["target"]
else:
closest = event["user"]
return closest.get_setting(setting, server.get_setting(setting, default))
def prevent_highlight(nickname):
return nickname[0]+"\u200c"+nickname[1:]
def _set_get_append(obj, setting, item):
if not hasattr(obj, setting):
setattr(obj, setting, [])
getattr(obj, setting).append(item)
def hook(event, **kwargs):
def _hook_func(func):
_set_get_append(func, ModuleManager.BITBOT_HOOKS_MAGIC,
{"event": event, "kwargs": kwargs})
return func
return _hook_func
def export(setting, value):
def _export_func(module):
_set_get_append(module, ModuleManager.BITBOT_EXPORTS_MAGIC,
{"setting": setting, "value": value})
return module
return _export_func
def strip_html(s):
return bs4.BeautifulSoup(s, "lxml").get_text()
def get_hashflags(filename):
hashflags = {}
with io.open(filename, mode="r", encoding="utf8") as f:
for line in f:
line = line.strip("\n")
if not line.startswith("#"):
break
elif line.startswith("#--"):
line_split = line.split(" ", 1)
hashflag = line_split[0][3:]
value = None
if len(line_split) > 1:
value = line_split[1]
hashflags[hashflag] = value
return hashflags.items()
class Docstring(object):
def __init__(self, description, items):
self.description = description
self.items = items
def parse_docstring(s):
description = ""
last_item = None
items = {}
if s:
for line in s.split("\n"):
line = line.strip()
if line:
if line[0] == ":":
key, _, value = line.partition(": ")
last_item = value
items[value] = value
else:
if last_item:
items[last_item] += " %s" % line
else:
if description:
description += " "
description += line
return Docstring(description, items)
class IRCLine(object):
def __init__(self, tags, prefix, command, args, arbitrary, last, server):
self.tags = tags
self.prefix = prefix
self.command = command
self.args = args
self.arbitrary = arbitrary
self.last = last
self.server = server
def parse_line(server, line):
tags = {}
prefix = None
command = None
if line[0] == "@":
tags_prefix, line = line[1:].split(" ", 1)
for tag in filter(None, tags_prefix.split(";")):
tag, _, value = tag.partition("=")
tags[tag] = value
line, _, arbitrary = line.partition(" :")
arbitrary = arbitrary or None
if line[0] == ":":
prefix, line = line[1:].split(" ", 1)
prefix = seperate_hostmask(prefix)
command, _, line = line.partition(" ")
args = line.split(" ")
last = arbitrary or args[-1]
return IRCLine(tags, prefix, command, args, arbitrary, last, server)

173
src/utils/__init__.py Normal file
View file

@ -0,0 +1,173 @@
from . import irc, http
import io, re
from src import ModuleManager
TIME_SECOND = 1
TIME_MINUTE = TIME_SECOND*60
TIME_HOUR = TIME_MINUTE*60
TIME_DAY = TIME_HOUR*24
TIME_WEEK = TIME_DAY*7
def time_unit(seconds):
since = None
unit = None
if seconds >= TIME_WEEK:
since = seconds/TIME_WEEK
unit = "week"
elif seconds >= TIME_DAY:
since = seconds/TIME_DAY
unit = "day"
elif seconds >= TIME_HOUR:
since = seconds/TIME_HOUR
unit = "hour"
elif seconds >= TIME_MINUTE:
since = seconds/TIME_MINUTE
unit = "minute"
else:
since = seconds
unit = "second"
since = int(since)
if since > 1:
unit = "%ss" % unit # pluralise the unit
return [since, unit]
REGEX_PRETTYTIME = re.compile("\d+[wdhms]", re.I)
SECONDS_MINUTES = 60
SECONDS_HOURS = SECONDS_MINUTES*60
SECONDS_DAYS = SECONDS_HOURS*24
SECONDS_WEEKS = SECONDS_DAYS*7
def from_pretty_time(pretty_time):
seconds = 0
for match in re.findall(REGEX_PRETTYTIME, pretty_time):
number, unit = int(match[:-1]), match[-1].lower()
if unit == "m":
number = number*SECONDS_MINUTES
elif unit == "h":
number = number*SECONDS_HOURS
elif unit == "d":
number = number*SECONDS_DAYS
elif unit == "w":
number = number*SECONDS_WEEKS
seconds += number
if seconds > 0:
return seconds
UNIT_SECOND = 5
UNIT_MINUTE = 4
UNIT_HOUR = 3
UNIT_DAY = 2
UNIT_WEEK = 1
def to_pretty_time(total_seconds, minimum_unit=UNIT_SECOND, max_units=6):
minutes, seconds = divmod(total_seconds, 60)
hours, minutes = divmod(minutes, 60)
days, hours = divmod(hours, 24)
weeks, days = divmod(days, 7)
out = ""
units = 0
if weeks and minimum_unit >= UNIT_WEEK and units < max_units:
out += "%dw" % weeks
units += 1
if days and minimum_unit >= UNIT_DAY and units < max_units:
out += "%dd" % days
units += 1
if hours and minimum_unit >= UNIT_HOUR and units < max_units:
out += "%dh" % hours
units += 1
if minutes and minimum_unit >= UNIT_MINUTE and units < max_units:
out += "%dm" % minutes
units += 1
if seconds and minimum_unit >= UNIT_SECOND and units < max_units:
out += "%ds" % seconds
units += 1
return out
IS_TRUE = ["true", "yes", "on", "y"]
IS_FALSE = ["false", "no", "off", "n"]
def bool_or_none(s):
s = s.lower()
if s in IS_TRUE:
return True
elif s in IS_FALSE:
return False
def int_or_none(s):
stripped_s = s.lstrip("0")
if stripped_s.isdigit():
return int(stripped_s)
def get_closest_setting(event, setting, default=None):
server = event["server"]
if "channel" in event:
closest = event["channel"]
elif "target" in event and "is_channel" in event and event["is_channel"]:
closest = event["target"]
else:
closest = event["user"]
return closest.get_setting(setting, server.get_setting(setting, default))
def prevent_highlight(nickname):
return nickname[0]+"\u200c"+nickname[1:]
def _set_get_append(obj, setting, item):
if not hasattr(obj, setting):
setattr(obj, setting, [])
getattr(obj, setting).append(item)
def hook(event, **kwargs):
def _hook_func(func):
_set_get_append(func, ModuleManager.BITBOT_HOOKS_MAGIC,
{"event": event, "kwargs": kwargs})
return func
return _hook_func
def export(setting, value):
def _export_func(module):
_set_get_append(module, ModuleManager.BITBOT_EXPORTS_MAGIC,
{"setting": setting, "value": value})
return module
return _export_func
def get_hashflags(filename):
hashflags = {}
with io.open(filename, mode="r", encoding="utf8") as f:
for line in f:
line = line.strip("\n")
if not line.startswith("#"):
break
elif line.startswith("#--"):
line_split = line.split(" ", 1)
hashflag = line_split[0][3:]
value = None
if len(line_split) > 1:
value = line_split[1]
hashflags[hashflag] = value
return hashflags.items()
class Docstring(object):
def __init__(self, description, items):
self.description = description
self.items = items
def parse_docstring(s):
description = ""
last_item = None
items = {}
if s:
for line in s.split("\n"):
line = line.strip()
if line:
if line[0] == ":":
key, _, value = line.partition(": ")
last_item = value
items[value] = value
else:
if last_item:
items[last_item] += " %s" % line
else:
if description:
description += " "
description += line
return Docstring(description, items)

94
src/utils/http.py Normal file
View file

@ -0,0 +1,94 @@
import re, traceback, urllib.error, urllib.parse, urllib.request
import json, ssl
import bs4
USER_AGENT = ("Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/49.0.2623.87 Safari/537.36")
REGEX_HTTP = re.compile("https?://", re.I)
def get_url(url, **kwargs):
if not urllib.parse.urlparse(url).scheme:
url = "http://%s" % url
url_parsed = urllib.parse.urlparse(url)
method = kwargs.get("method", "GET")
get_params = kwargs.get("get_params", "")
post_params = kwargs.get("post_params", None)
headers = kwargs.get("headers", {})
if get_params:
get_params = "?%s" % urllib.parse.urlencode(get_params)
if post_params:
post_params = urllib.parse.urlencode(post_params).encode("utf8")
url = "%s%s" % (url, get_params)
try:
url.encode("latin-1")
except UnicodeEncodeError:
if kwargs.get("code"):
return 0, False
return False
request = urllib.request.Request(url, post_params)
request.add_header("Accept-Language", "en-US")
request.add_header("User-Agent", USER_AGENT)
for header, value in headers.items():
request.add_header(header, value)
request.method = method
try:
response = urllib.request.urlopen(request, timeout=5)
except urllib.error.HTTPError as e:
traceback.print_exc()
if kwargs.get("code"):
return e.code, False
return False
except urllib.error.URLError as e:
traceback.print_exc()
if kwargs.get("code"):
return -1, False
return False
except ssl.CertificateError as e:
traceback.print_exc()
if kwargs.get("code"):
return -1, False,
return False
response_content = response.read()
encoding = response.info().get_content_charset()
if kwargs.get("soup"):
return bs4.BeautifulSoup(response_content, kwargs.get("parser", "lxml"))
if not encoding:
soup = bs4.BeautifulSoup(response_content, kwargs.get("parser", "lxml"))
metas = soup.find_all("meta")
for meta in metas:
if "charset=" in meta.get("content", ""):
encoding = meta.get("content").split("charset=", 1)[1
].split(";", 1)[0]
elif meta.get("charset", ""):
encoding = meta.get("charset")
else:
continue
break
if not encoding:
for item in soup.contents:
if isinstance(item, bs4.Doctype):
if item == "html":
encoding = "utf8"
else:
encoding = "latin-1"
break
response_content = response_content.decode(encoding or "utf8")
data = response_content
if kwargs.get("json") and data:
try:
data = json.loads(response_content)
except json.decoder.JSONDecodeError:
traceback.print_exc()
return False
if kwargs.get("code"):
return response.code, data
else:
return data
def strip_html(s):
return bs4.BeautifulSoup(s, "lxml").get_text()

116
src/utils/irc.py Normal file
View file

@ -0,0 +1,116 @@
import string, re
ASCII_UPPER = string.ascii_uppercase
ASCII_LOWER = string.ascii_lowercase
STRICT_RFC1459_UPPER = ASCII_UPPER+r'\[]'
STRICT_RFC1459_LOWER = ASCII_LOWER+r'|{}'
RFC1459_UPPER = STRICT_RFC1459_UPPER+"^"
RFC1459_LOWER = STRICT_RFC1459_LOWER+"~"
def remove_colon(s):
if s.startswith(":"):
s = s[1:]
return s
# case mapping lowercase/uppcase logic
def _multi_replace(s, chars1, chars2):
for char1, char2 in zip(chars1, chars2):
s = s.replace(char1, char2)
return s
def lower(server, s):
if server.case_mapping == "ascii":
return _multi_replace(s, ASCII_UPPER, ASCII_LOWER)
elif server.case_mapping == "rfc1459":
return _multi_replace(s, RFC1459_UPPER, RFC1459_LOWER)
elif server.case_mapping == "strict-rfc1459":
return _multi_replace(s, STRICT_RFC1459_UPPER, STRICT_RFC1459_LOWER)
else:
raise ValueError("unknown casemapping '%s'" % server.case_mapping)
# compare a string while respecting case mapping
def equals(server, s1, s2):
return lower(server, s1) == lower(server, s2)
class IRCHostmask(object):
def __init__(self, nickname, username, hostname, hostmask):
self.nickname = nickname
self.username = username
self.hostname = hostname
self.hostmask = hostmask
def __repr__(self):
return "IRCHostmask(%s)" % self.__str__()
def __str__(self):
return self.hostmask
def seperate_hostmask(hostmask):
hostmask = remove_colon(hostmask)
nickname, _, username = hostmask.partition("!")
username, _, hostname = username.partition("@")
return IRCHostmask(nickname, username, hostname, hostmask)
class IRCLine(object):
def __init__(self, tags, prefix, command, args, arbitrary, last, server):
self.tags = tags
self.prefix = prefix
self.command = command
self.args = args
self.arbitrary = arbitrary
self.last = last
self.server = server
def parse_line(server, line):
tags = {}
prefix = None
command = None
if line[0] == "@":
tags_prefix, line = line[1:].split(" ", 1)
for tag in filter(None, tags_prefix.split(";")):
tag, _, value = tag.partition("=")
tags[tag] = value
line, _, arbitrary = line.partition(" :")
arbitrary = arbitrary or None
if line[0] == ":":
prefix, line = line[1:].split(" ", 1)
prefix = seperate_hostmask(prefix)
command, _, line = line.partition(" ")
args = line.split(" ")
last = arbitrary or args[-1]
return IRCLine(tags, prefix, command, args, arbitrary, last, server)
COLOR_WHITE, COLOR_BLACK, COLOR_BLUE, COLOR_GREEN = 0, 1, 2, 3
COLOR_RED, COLOR_BROWN, COLOR_PURPLE, COLOR_ORANGE = 4, 5, 6, 7
COLOR_YELLOW, COLOR_LIGHTGREEN, COLOR_CYAN, COLOR_LIGHTCYAN = (8, 9,
10, 11)
COLOR_LIGHTBLUE, COLOR_PINK, COLOR_GREY, COLOR_LIGHTGREY = (12, 13,
14, 15)
FONT_BOLD, FONT_ITALIC, FONT_UNDERLINE, FONT_INVERT = ("\x02", "\x1D",
"\x1F", "\x16")
FONT_COLOR, FONT_RESET = "\x03", "\x0F"
REGEX_COLOR = re.compile("%s\d\d(?:,\d\d)?" % FONT_COLOR)
def color(s, foreground, background=None):
foreground = str(foreground).zfill(2)
if background:
background = str(background).zfill(2)
return "%s%s%s%s%s" % (FONT_COLOR, foreground,
"" if not background else ",%s" % background, s, FONT_COLOR)
def bold(s):
return "%s%s%s" % (FONT_BOLD, s, FONT_BOLD)
def underline(s):
return "%s%s%s" % (FONT_UNDERLINE, s, FONT_UNDERLINE)
def strip_font(s):
s = s.replace(FONT_BOLD, "")
s = s.replace(FONT_ITALIC, "")
s = REGEX_COLOR.sub("", s)
s = s.replace(FONT_COLOR, "")
return s