first commit.

This commit is contained in:
jesopo 2016-03-29 12:56:58 +01:00
parent f59005171b
commit f943d63098
No known key found for this signature in database
GPG key ID: 0BBDEB2AEFCFFCB3
47 changed files with 2555 additions and 0 deletions

5
.gitignore vendored
View file

@ -60,3 +60,8 @@ target/
#Ipython Notebook
.ipynb_checkpoints
# custom
*.db
*.json
modules/nongit-*.py

15
Config.py Normal file
View file

@ -0,0 +1,15 @@
import json, os
class Config(object):
def __init__(self, bot, location="bot.json"):
self.bot = bot
self.location = location
self.full_location = os.path.join(bot.bot_directory,
self.location)
self.bot.config = {}
self.load_config()
def load_config(self):
if os.path.isfile(self.full_location):
with open(self.full_location) as config_file:
self.bot.config = json.loads(config_file.read())

152
Database.py Normal file
View file

@ -0,0 +1,152 @@
import json, os, sqlite3, threading
class Database(object):
def __init__(self, bot, location="bot.db"):
self.location = location
self.full_location = os.path.join(bot.bot_directory,
self.location)
self.database = sqlite3.connect(self.full_location,
check_same_thread=False, isolation_level=None)
self.database.execute("PRAGMA foreign_keys")
self.cursors = {}
self.make_servers_table()
self.make_server_settings_table()
self.make_channel_settings_table()
self.make_user_settings_table()
def cursor(self):
id = threading.current_thread().ident
if not id in self.cursors:
self.cursors[id] = self.database.cursor()
return self.cursors[id]
def make_servers_table(self):
try:
self.cursor().execute("""CREATE TABLE servers
(server_id INTEGER PRIMARY KEY,
hostname TEXT, port INTEGER, password TEXT,
ipv4 BOOLEAN, tls BOOLEAN, nickname TEXT,
username TEXT, realname TEXT)""")
except sqlite3.Error as e:
pass
def make_server_settings_table(self):
try:
self.cursor().execute("""CREATE TABLE server_settings
(server_id INTEGER, setting TEXT, value TEXT,
FOREIGN KEY(server_id) REFERENCES
servers(server_id) ON DELETE CASCADE,
PRIMARY KEY (server_id, setting))""")
except sqlite3.Error as e:
pass
def make_channel_settings_table(self):
try:
self.cursor().execute("""CREATE TABLE channel_settings
(server_id INTEGER, channel TEXT, setting TEXT,
value TEXT, FOREIGN KEY (server_id) REFERENCES
servers(server_id) ON DELETE CASCADE,
PRIMARY KEY (server_id, channel, setting))""")
except sqlite3.Error as e:
pass
def make_user_settings_table(self):
try:
self.cursor().execute("""CREATE TABLE user_settings
(server_id INTEGER, nickname TEXT, setting TEXT,
value TEXT, FOREIGN KEY (server_id) REFERENCES
servers(server_id) ON DELETE CASCADE,
PRIMARY KEY (server_id, nickname, setting))""")
except sqlite3.Error as e:
pass
def set_server_setting(self, server_id, setting, value):
self.cursor().execute("""INSERT OR REPLACE INTO server_settings
VALUES (?, ?, ?)""", [server_id, setting.lower(),
json.dumps(value)])
def get_server_setting(self, server_id, setting, default=None):
self.cursor().execute("""SELECT value FROM server_settings
WHERE server_id=? AND setting=?""", [server_id,
setting.lower()])
value = self.cursor().fetchone()
if value:
return json.loads(value[0])
return default
def find_server_settings(self, server_id, pattern, default=[]):
self.cursor().execute("""SELECT setting, value FROM server_settings
WHERE server_id=? AND setting LIKE ?""", [server_id,
pattern.lower()])
values = self.cursor().fetchall()
if values:
for i, value in enumerate(values):
values[i] = value[0], json.loads(value[1])
return values
return default
def del_server_setting(self, server_id, setting):
self.cursor().execute("""DELETE FROM server_settings WHERE
server_id=? AND setting=?""", [server_id, setting.lower()])
def set_channel_setting(self, server_id, channel, setting, value):
self.cursor().execute("""INSERT OR REPLACE INTO channel_settings
VALUES (?, ?, ?, ?)""", [server_id, channel.lower(),
setting.lower(), json.dumps(value)])
def get_channel_setting(self, server_id, channel, setting, default=None):
self.cursor().execute("""SELECT value FROM channel_settings
WHERE server_id=? AND channel=? AND setting=?""",
[server_id, channel.lower(), setting.lower()])
value = self.cursor().fetchone()
if value:
return json.loads(value[0])
return default
def find_channel_settings(self, server_id, channel, pattern, default=[]):
self.cursor().execute("""SELECT setting, value FROM channel_settings
WHERE server_id=? AND channel=? setting LIKE '?'""", [server_id,
channel.lower(), pattern.lower()])
values = self.cursor().fetchall()
if values:
for i, value in enumerate(values):
values[i] = json.loads(value)
return values
return default
def del_channel_setting(self, server_id, channel, setting):
self.cursor().execute("""DELETE FROM channel_settings WHERE
server_id=? AND channel=? AND setting=?""", [server_id,
channel.lower(), setting.lower()])
def set_user_setting(self, server_id, nickname, setting, value):
self.cursor().execute("""INSERT OR REPLACE INTO user_settings
VALUES (?, ?, ?, ?)""", [server_id, nickname.lower(),
setting.lower(), json.dumps(value)])
def get_user_setting(self, server_id, nickname, setting, default=None):
self.cursor().execute("""SELECT value FROM user_settings
WHERE server_id=? AND nickname=? and setting=?""",
[server_id, nickname.lower(), setting.lower()])
value = self.cursor().fetchone()
if value:
return json.loads(value[0])
return default
def find_user_settings(self, server_id, nickname, pattern, default=[]):
self.cursor().execute("""SELECT setting, value FROM user_settings
WHERE server_id=? AND nickname=? setting LIKE '?'""", [server_id,
nickname.lower(), pattern.lower()])
values = self.cursor().fetchall()
if values:
for i, value in enumerate(values):
values[i] = json.loads(value)
return values
return default
def del_user_setting(self, server_id, nickname, setting):
self.cursor().execute("""DELETE FROM user_settings WHERE
server_id=? AND nickname=? AND setting=?""", [server_id,
nickname.lower(), setting.lower()])
def add_server(self, hostname, port, password, ipv4, tls, nickname,
username=None, realname=None):
username = username or nickname
realname = realname or nickname
self.cursor().execute("""INSERT INTO servers (hostname, port,
password, ipv4, tls, nickname, username, realname) VALUES (
?, ?, ?, ?, ?, ?, ?, ?)""", [hostname, port, password, ipv4,
tls, nickname, username, realname])
def get_servers(self):
self.cursor().execute("""SELECT server_id, hostname, port, password,
ipv4, tls, nickname, username, realname FROM servers""")
return self.cursor().fetchall()

87
EventManager.py Normal file
View file

@ -0,0 +1,87 @@
class Event(object):
def __init__(self, bot, **kwargs):
self.bot = bot
self.kwargs = kwargs
self.eaten = False
def __getitem__(self, key):
return self.kwargs[key]
def get(self, key, default=None):
return self.kwargs.get(key, default)
def __contains__(self, key):
return key in self.kwargs
def eat(self):
self.eaten = True
class EventCallback(object):
def __init__(self, function, bot, **kwargs):
self.function = function
self.bot = bot
self.kwargs = kwargs
def call(self, event):
return self.function(event)
class MultipleEventHook(object):
def __init__(self):
self._event_hooks = set([])
def _add(self, event_hook):
self._event_hooks.add(event_hook)
def hook(self, function, **kwargs):
for event_hook in self._event_hooks:
event_hook.hook(function, **kwargs)
def call(self, max=None, **kwargs):
for event_hook in self._event_hooks:
event_hook.call(max, **kwargs)
class EventHook(object):
def __init__(self, bot):
self.bot = bot
self._children = {}
self._hooks = []
self._hook_notify = None
self._child_notify = None
self._call_notify = None
def hook(self, function, **kwargs):
callback = EventCallback(function, self.bot, **kwargs)
if self._hook_notify:
self._hook_notify(self, callback)
self._hooks.append(callback)
def on(self, subevent, *extra_subevents):
if extra_subevents:
multiple_event_hook = MultipleEventHook()
for extra_subevent in (subevent,)+extra_subevents:
multiple_event_hook._add(self.get_child(extra_subevent))
return multiple_event_hook
return self.get_child(subevent)
def call(self, max=None, **kwargs):
event = Event(self.bot, **kwargs)
if self._call_notify:
self._call_notify(self, event)
called = 0
returns = []
for hook in self._hooks:
if max and called == max:
break
if event.eaten:
break
returns.append(hook.call(event))
called += 1
return returns
def get_child(self, child_name):
child_name_lower = child_name.lower()
if not child_name_lower in self._children:
self._children[child_name_lower] = EventHook(self.bot)
if self._child_notify:
self._child_notify(self, self._children[
child_name_lower])
return self._children[child_name_lower]
def get_hooks(self):
return self._hooks
def get_children(self):
return self._children.keys()
def set_hook_notify(self, handler):
self._hook_notify = handler
def set_child_notify(self, handler):
self._child_notify = handler
def set_call_notify(self, handler):
self._call_notify = handler

101
IRCBot.py Normal file
View file

@ -0,0 +1,101 @@
import os, select, sys, threading, time, traceback
import EventManager, IRCServer, ModuleManager, Timer
class Bot(object):
def __init__(self):
self.lock = threading.Lock()
self.database = None
self.config = None
self.bot_directory = os.path.dirname(os.path.realpath(__file__))
self.servers = {}
self.running = True
self.poll = select.epoll()
self.modules = ModuleManager.ModuleManager(self)
self.events = EventManager.EventHook(self)
self.timers = []
self.last_ping = None
def add_server(self, id, hostname, port, password, ipv4, tls,
nickname, username, realname, connect=False):
new_server = IRCServer.Server(id, hostname, port, password,
ipv4, tls, nickname, username, realname, self)
self.servers[new_server.fileno()] = new_server
if connect:
self.connect(new_server)
def connect(self, server):
try:
server.connect()
except:
sys.stderr.write("Failed to connect to %s\n" % str(server))
traceback.print_exc()
return False
return True
def connect_all(self):
for server in self.servers.values():
if self.connect(server):
self.poll.register(server.fileno(), select.EPOLLOUT)
def add_timer(self, function, delay, *args, **kwargs):
timer = Timer.Timer(function, delay, *args, **kwargs)
timer.set_started_time()
self.timers.append(timer)
def next_timer(self):
next = None
for timer in self.timers:
time_left = timer.time_left()
if not next or time_left < next:
next = time_left
return next or 30
def call_timers(self):
for timer in self.timers[:]:
if timer.due():
timer.call()
if timer.done():
self.timers.remove(timer)
def register_read(self, server):
self.poll.modify(server.fileno(), select.EPOLLIN)
def register_write(self, server):
self.poll.modify(server.fileno(), select.EPOLLOUT)
def register_both(self, server):
self.poll.modify(server.fileno(),
select.EPOLLIN|select.EPOLLOUT)
def since_last_read(self, server):
return time.time()-server.last_read
def run(self):
while self.running:
self.lock.acquire()
events = self.poll.poll(self.next_timer())
self.call_timers()
for fd, event in events:
if fd in self.servers:
server = self.servers[fd]
if event & select.EPOLLIN:
lines = server.read()
for line in lines:
print(line)
server.parse_line(line)
elif event & select.EPOLLOUT:
server._send()
self.register_read(server)
elif event & select.EPULLHUP:
print("hangup")
if not self.last_ping or time.time()-self.last_ping >= 60:
for server in self.servers.values():
server.send_ping()
self.last_ping = time.time()
for server in list(self.servers.values()):
if server.last_read and self.since_last_read(server
) > 160:
print("pingout from %s" % str(server))
server.disconnect()
if not server.connected:
self.poll.unregister(server.fileno())
del self.servers[server.fileno()]
# add reconnect here
print("disconnected from %s" % str(server))
elif server.waiting_send():
self.register_both(server)
self.lock.release()

64
IRCChannel.py Normal file
View file

@ -0,0 +1,64 @@
import IRCLog
class Channel(object):
def __init__(self, name, server, bot):
self.name = name.lower()
self.server = server
self.bot = bot
self.topic = ""
self.topic_setter_nickname = None
self.topic_setter_username = None
self.topic_setter_hostname = None
self.topic_time = 0
self.users = set([])
self.modes = {}
self.created_timestamp = None
self.log = IRCLog.Log(bot)
def set_topic(self, topic):
self.topic = topic
def set_topic_setter(self, nickname, username=None, hostname=None):
self.topic_setter_nickname = nickname
self.topic_setter_username = username
self.topic_setter_hostname = hostname
def set_topic_time(self, unix_timestamp):
self.topic_time = unix_timestamp
def add_user(self, user):
self.users.add(user)
def remove_user(self, user):
self.users.remove(user)
def add_mode(self, mode, args=None):
if not mode in self.modes:
self.modes[mode] = set([])
if args:
self.modes[mode].add(args.lower())
def remove_mode(self, mode, args=None):
if not args:
del self.modes[mode]
else:
self.modes[mode].remove(args.lower())
if not len(self.modes[mode]):
del self.modes[mode]
def set_setting(self, setting, value):
self.bot.database.set_channel_setting(self.server.id,
self.name, setting, value)
def get_setting(self, setting, default=None):
return self.bot.database.get_channel_setting(
self.server.id, self.name, setting, default)
def find_settings(self, pattern, default=[]):
return self.bot.database.find_channel_setting(
self.server.id, self.name, pattern, default)
def del_setting(self, setting):
self.bot.database.del_channel_setting(self.server.id,
self.name, setting)
def send_message(self, text):
self.server.send_message(self.name, text)
def send_mode(self, mode=None, target=None):
self.server.send_mode(self.name, mode, target)
def mode_or_above(self, nickname, mode):
mode_orders = list(self.server.mode_prefixes.values())
mode_index = mode_orders.index(mode)
for mode in mode_orders[:mode_index+1]:
if nickname.lower() in self.modes.get(mode, []):
return True
return False

270
IRCLineHandler.py Normal file
View file

@ -0,0 +1,270 @@
import re
import Utils
RE_PREFIXES = re.compile(r"\bPREFIX=\((\w+)\)(\W+)(?:\b|$)")
RE_CHANMODES = re.compile(
r"\bCHANMODES=(\w*),(\w*),(\w*),(\w*)(?:\b|$)")
RE_CHANTYPES = re.compile(r"\bCHANTYPES=(\W+)(?:\b|$)")
handlers = {}
descriptions = {}
current_description = None
def handler(f=None, description=None):
global current_description
if description:
current_description = description
return handler
name = f.__name__.split("handle_")[1].upper()
handlers[name] = f
if current_description:
descriptions[name] = current_description
current_description = None
def handle(line, line_split, bot, server):
handler_function = None
if len(line_split) > 1:
if line_split[0][0] == ":":
if line_split[1] in handlers:
handler_function = handlers[line_split[1]]
elif line_split[1].isdigit():
bot.events.on("received").on("numeric").on(
line_split[1]).call(line=line,
line_split=line_split, server=server,
number=line_split[1])
elif line_split[0] in handlers:
handler_function = handlers[line_split[0]]
if handler_function:
handler_function(line, line_split, bot, server)
@handler(description="reply to a ping")
def handle_PING(line, line_split, bot, server):
nonce = Utils.remove_colon(line_split[1])
server.send_pong(Utils.remove_colon(line_split[1]))
bot.events.on("received").on("ping").call(line=line,
line_split=line_split, server=server, nonce=nonce)
@handler(description="the first line sent to a registered client")
def handle_001(line, line_split, bot, server):
server.set_own_nickname(line_split[2])
server.send_whois(server.nickname)
bot.events.on("received").on("numeric").on("001").call(
line=line, line_split=line_split, server=server)
@handler(description="the extra supported things line")
def handle_005(line, line_split, bot, server):
isupport_line = Utils.arbitrary(line_split, 3)
if "NAMESX" in line:
server.send("PROTOCTL NAMESX")
match = re.search(RE_PREFIXES, isupport_line)
if match:
modes = match.group(1)
prefixes = match.group(2)
for i, prefix in enumerate(prefixes):
if i < len(modes):
server.mode_prefixes[prefix] = modes[i]
match = re.search(RE_CHANMODES, isupport_line)
if match:
server.channel_modes = list(match.group(4))
match = re.search(RE_CHANTYPES, isupport_line)
if match:
server.channel_types = list(match.group(1))
bot.events.on("received").on("numeric").on("005").call(
line=line, line_split=line_split, server=server,
isupport=isupport_line)
@handler(description="whois respose (nickname, username, realname, hostname)")
def handle_311(line, line_split, bot, server):
nickname = line_split[2]
if server.is_own_nickname(nickname):
target = server
else:
target = server.get_user(nickname)
target.username = line_split[4]
target.realname = Utils.arbitrary(line_split, 7)
target.hostname = line_split[5]
@handler(description="on-join channel topic line")
def handle_332(line, line_split, bot, server):
channel = server.get_channel(line_split[3])
topic = Utils.arbitrary(line_split, 4)
channel.set_topic(topic)
@handler(description="on-join channel topic set by/at")
def handle_333(line, line_split, bot, server):
channel = server.get_channel(line_split[3])
topic_setter_hostmask = line_split[4]
nickname, username, hostname = Utils.seperate_hostmask(
topic_setter_hostmask)
topic_time = int(line_split[5]) if line_split[5].isdigit(
) else None
channel.set_topic_setter(nickname, username, hostname)
channel.set_topic_time(topic_time)
@handler(description="on-join user list with status symbols")
def handle_353(line, line_split, bot, server):
channel = server.get_channel(line_split[4])
nicknames = line_split[5:]
nicknames[0] = Utils.remove_colon(nicknames[0])
for nickname in nicknames:
if nickname.strip():
modes = set([])
while nickname[0] in server.mode_prefixes:
modes.add(server.mode_prefixes[nickname[0]])
nickname = nickname[1:]
user = server.get_user(nickname)
user.join_channel(channel)
channel.add_user(user)
for mode in modes:
channel.add_mode(mode, nickname)
@handler(description="on user joining channel")
def handle_JOIN(line, line_split, bot, server):
nickname, username, realname = Utils.seperate_hostmask(line_split[0])
channel = server.get_channel(Utils.remove_colon(line_split[2]))
if not server.is_own_nickname(nickname):
user = server.get_user(nickname)
channel.add_user(user)
user.join_channel(channel)
bot.events.on("received").on("join").call(line=line,
line_split=line_split, server=server, channel=channel,
user=user)
else:
bot.events.on("self").on("join").call(line=line,
line_split=line_split, server=server, channel=channel)
server.send_who(channel.name)
channel.send_mode()
@handler(description="on user parting channel")
def handle_PART(line, line_split, bot, server):
nickname, username, hostname = Utils.seperate_hostmask(line_split[0])
channel = server.get_channel(line_split[2])
reason = Utils.arbitrary(line_split, 3)
if not server.is_own_nickname(nickname):
user = server.get_user(nickname)
bot.events.on("received").on("part").call(line=line,
line_split=line_split, server=server, channel=channel,
reason=reason, user=user)
channel.remove_user(user)
if not len(user.channels):
server.remove_user(user)
else:
server.remove_channel(channel)
bot.events.on("self").on("part").call(line=line,
line_split=line_split, server=server, channel=channel,
reason=reason)
@handler(description="unknown command sent by us, oops!")
def handle_421(line, line_split, bot, server):
print("warning: unknown command '%s'." % line_split[3])
@handler(description="a user has disconnected!")
def handle_QUIT(line, line_split, bot, server):
nickname, username, hostname = Utils.seperate_hostmask(line_split[0])
reason = Utils.arbitrary(line_split, 2)
if not server.is_own_nickname(nickname):
user = server.get_user(nickname)
server.remove_user(user)
bot.events.on("received").on("quit").call(line=line,
line_split=line_split, server=server, reason=reason,
user=user)
else:
server.disconnect()
@handler(description="someone has changed their nickname")
def handle_NICK(line, line_split, bot, server):
nickname, username, hostname = Utils.seperate_hostmask(line_split[0])
new_nickname = Utils.remove_colon(line_split[2])
if not server.is_own_nickname(nickname):
user = server.get_user(nickname)
old_nickname = user.nickname
user.set_nickname(new_nickname)
server.change_user_nickname(old_nickname, new_nickname)
bot.events.on("received").on("nick").call(line=line,
line_split=line_split, server=server,
new_nickname=new_nickname, old_nickname=old_nickname,
user=user)
else:
old_nickname = server.nickname
server.set_own_nickname(new_nickname)
bot.events.on("self").on("nick").call(line=line,
line_split=line_split, server=server,
new_nickname=new_nickname, old_nickname=old_nickname)
@handler(description="something's mode has changed")
def handle_MODE(line, line_split, bot, server):
nickname, username, hostname = Utils.seperate_hostmask(line_split[0])
target = line_split[2]
is_channel = target[0] in server.channel_types
if is_channel:
channel = server.get_channel(target)
remove = False
args = line_split[4:]
for i, char in enumerate(line_split[3]):
if char == "+":
remove = False
elif char == "-":
remove = True
else:
if char in server.channel_modes:
if remove:
channel.remove_mode(char)
else:
channel.add_mode(char)
elif char in server.mode_prefixes.values():
nickname = args.pop(0)
if remove:
channel.remove_mode(char, nickname)
else:
channel.add_mode(char, nickname)
else:
args.pop(0)
elif server.is_own_nickname(target):
modes = Utils.remove_colon(line_split[3])
remove = False
for i, char in enumerate(modes):
if char == "+":
remove = False
elif char == "-":
remove = True
else:
if remove:
server.remove_own_mode(char)
else:
server.add_own_mode(char)
@handler(description="I've been invited somewhere")
def handle_INVITE(line, line_split, bot, server):
nickname, username, hostname = Utils.seperate_hostmask(line_split[0])
target_channel = Utils.remove_colon(line_split[3])
user = server.get_user(nickname)
bot.events.on("received").on("invite").call(
line=line, line_split=line_split, server=server,
user=user, target_channel=target_channel)
@handler(description="we've received a message")
def handle_PRIVMSG(line, line_split, bot, server):
nickname, username, hostname = Utils.seperate_hostmask(line_split[0])
user = server.get_user(nickname)
message = Utils.arbitrary(line_split, 3)
message_split = message.split(" ")
target = line_split[2]
action = message.startswith("\01ACTION ") and message.endswith("\01")
if action:
message = message.replace("\01ACTION ", "", 1)[:-1]
if target[0] in server.channel_types:
channel = server.get_channel(line_split[2])
bot.events.on("received").on("message").on("channel").call(
line=line, line_split=line_split, server=server,
user=user, message=message, message_split=message_split,
channel=channel, action=action)
channel.log.add_line(user.nickname, message, action)
elif server.is_own_nickname(target):
bot.events.on("received").on("message").on("private").call(
line=line, line_split=line_split, server=server,
user=user, message=message, message_split=message_split,
action=action)
@handler(description="response to a WHO command for user information")
def handle_352(line, line_split, bot, server):
user = server.get_user(line_split[7])
user.username = line_split[4]
user.realname = Utils.arbitrary(line_split, 10)
user.hostname = line_split[5]
@handler(description="response to an empty mode command")
def handle_324(line, line_split, bot, server):
channel = server.get_channel(line_split[3])
modes = line_split[4]
if modes[0] == "+" and modes[1:]:
for mode in modes[1:]:
if mode in server.channel_modes:
channel.add_mode(mode)
@handler(description="channel creation unix timestamp")
def handle_329(line, line_split, bot, server):
channel = server.get_channel(line_split[3])
channel.creation_timestamp = int(line_split[4])
@handler(description="nickname already in use")
def handle_433(line, line_split, bot, server):
pass

39
IRCLog.py Normal file
View file

@ -0,0 +1,39 @@
import re
class Line(object):
def __init__(self, sender, message, action, from_self):
self.sender = sender
self.message = message
self.action = action
self.from_self = from_self
class Log(object):
def __init__(self, bot):
self.lines = []
self.max_lines = 64
self._skip_next = False
def add_line(self, sender, message, action, from_self=False):
if not self._skip_next:
line = Line(sender, message, action, from_self)
self.lines.insert(0, line)
if len(self.lines) > self.max_lines:
self.lines.pop()
self._skip_next = False
def get(self, index=0, **kwargs):
from_self = kwargs.get("from_self", True)
for line in self.lines:
if line.from_self and not from_self:
continue
return line
def find(self, pattern, **kwargs):
from_self = kwargs.get("from_self", True)
not_pattern = kwargs.get("not_pattern", None)
for line in self.lines:
if line.from_self and not from_self:
continue
elif re.search(pattern, line.message):
if not_pattern and re.search(not_pattern, line.message):
continue
return line
def skip_next(self):
self._skip_next = True

216
IRCServer.py Normal file
View file

@ -0,0 +1,216 @@
import collections, socket, ssl, sys, time
import IRCChannel, IRCLineHandler, IRCUser
class Server(object):
def __init__(self, id, hostname, port, password, ipv4, tls,
nickname, username, realname, bot):
self.connected = False
self.bot = bot
self.id = id
self.target_hostname = hostname
self.port = port
self.tls = tls
self.password = password
self.ipv4 = ipv4
self.nickname = nickname
self.username = username or nickname
self.realname = realname or nickname
self.write_buffer = b""
self.read_buffer = b""
self.users = {}
self.new_users = set([])
self.nickname_ids = {}
self.channels = {}
self.own_modes = set([])
self.mode_prefixes = collections.OrderedDict()
self.channel_modes = []
self.channel_types = []
self.last_read = None
if ipv4:
self.socket = socket.socket(socket.AF_INET,
socket.SOCK_STREAM)
else:
self.socket = socket.socket(socket.AF_INET6,
socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
if self.tls:
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
self.socket = context.wrap_socket(self.socket)
self.cached_fileno = self.socket.fileno()
def __repr__(self):
return "%s:%s%s" % (self.target_hostname, "+" if self.tls else "",
self.port)
def __str__(self):
return repr(self)
def fileno(self):
fileno = self.socket.fileno()
return self.cached_fileno if fileno == -1 else fileno
def connect(self):
self.socket.connect((self.target_hostname, self.port))
if self.password:
self.send_pass(self.password)
self.send_user(self.username, self.realname)
self.send_nick(self.nickname)
self.connected = True
def disconnect(self):
self.connected = False
try:
self.socket.shutdown(socket.SHUT_RDWR)
except:
pass
try:
self.socket.close()
except:
pass
def set_setting(self, setting, value):
self.bot.database.set_server_setting(self.id, setting,
value)
def get_setting(self, setting, default=None):
return self.bot.database.get_server_setting(self.id,
setting, default)
def find_settings(self, pattern, default=[]):
return self.bot.database.find_server_settings(self.id,
pattern, default)
def del_setting(self, setting):
self.bot.database.del_server_setting(self.id, setting)
def set_own_nickname(self, nickname):
self.nickname = nickname
self.nickname_lower = nickname.lower()
def is_own_nickname(self, nickname):
return nickname.lower() == self.nickname_lower
def add_own_mode(self, mode):
self.own_modes.add(mode)
def remove_own_mode(self, mode):
self.own_modes.remove(mode)
def has_user(self, nickname):
return nickname.lower() in self.nickname_ids
def get_user(self, nickname):
if not self.has_user(nickname):
new_user = IRCUser.User(nickname, self, self.bot)
self.bot.events.on("new").on("user").call(
user=new_user, server=self)
self.users[new_user.id] = new_user
self.nickname_ids[nickname.lower()] = new_user.id
self.new_users.add(new_user)
return self.users[self.nickname_ids[nickname.lower()]]
def remove_user(self, user):
print("removing %s" % user.nickname)
del self.users[user.id]
del self.nickname_ids[user.nickname_lower]
for channel in user.channels:
channel.remove_user(user)
def change_user_nickname(self, old_nickname, new_nickname):
self.nickname_ids[new_nickname.lower()] = self.nickname_ids.pop(old_nickname.lower())
def has_channel(self, channel_name):
return channel_name[0] in self.channel_types and channel_name.lower(
) in self.channels
def get_channel(self, channel_name):
if not self.has_channel(channel_name):
new_channel = IRCChannel.Channel(channel_name, self,
self.bot)
self.bot.events.on("new").on("channel").call(
channel=new_channel, server=self)
self.channels[new_channel.name] = new_channel
return self.channels[channel_name.lower()]
def remove_channel(self, channel):
for users in channel.users:
user.part_channel(channel)
del self.channels[channel.name]
def parse_line(self, line):
if line:
line_split = line.split(" ")
IRCLineHandler.handle(line, line_split, self.bot, self)
self.check_users()
def check_users(self):
for user in self.new_users:
if not len(user.channels):
self.remove_user(user)
self.new_users.clear()
def read(self):
encoding = self.bot.database.get_server_setting(self.id,
"encoding", "utf8")
fallback_encoding = self.bot.database.get_server_setting(
self.id, "fallback-encoding", "latin-1")
data = self.read_buffer + self.socket.recv(4096)
self.read_buffer = b""
data_lines = [line.strip(b"\r") for line in data.split(b"\n")]
if data_lines[-1]:
self.read_buffer = data_lines[-1]
data_lines.pop(-1)
decoded_lines = []
for line in data_lines:
try:
line = line.decode(encoding)
except:
try:
line = line.decode(fallback_encoding)
except:
continue
decoded_lines.append(line)
if not decoded_lines:
self.disconnect()
self.last_read = time.time()
return decoded_lines
def send(self, data):
encoded = data.encode("utf8")
if len(encoded) > 450:
encoded = encoded[:450]
self.write_buffer += b"%s\r\n" % encoded
print(encoded.decode("utf8"))
def _send(self):
self.write_buffer = self.write_buffer[self.socket.send(
self.write_buffer):]
def waiting_send(self):
return bool(len(self.write_buffer))
def send_user(self, username, realname):
self.send("USER %s - - :%s" % (username, realname))
def send_nick(self, nickname):
self.send("NICK %s" % nickname)
def send_pass(self, password):
self.send("PASS %s" % password)
def send_ping(self, nonce="hello"):
self.send("PING :%s" % nonce)
def send_pong(self, nonce="hello"):
self.send("PONG :%s" % nonce)
def send_join(self, channel_name, key=None):
self.send("JOIN %s%s" % (channel_name,
"" if key == None else " %s" % key))
def send_part(self, channel_name, reason=None):
self.send("PART %s%s" % (channel_name,
"" if key == None else " %s" % reason))
def send_quit(self, reason="Leaving"):
self.send("QUIT :%s" % reason)
def send_message(self, target, message):
self.send("PRIVMSG %s :%s" % (target, message))
if self.has_channel(target):
action = message.startswith("\01ACTION ") and message.endswith(
"\01")
if action:
message = message.split("\01ACTION ", 1)[1][:-1]
self.get_channel(target).log.add_line(None, message, action, True)
def send_notice(self, target, message):
self.send("NOTICE %s :%s" % (target, message))
def send_mode(self, target, mode=None, args=None):
self.send("MODE %s%s%s" % (target, "" if mode == None else " %s" % mode,
"" if args == None else " %s" % args))
def send_topic(self, channel_name, topic):
self.send("TOPIC %s :%s" % (channel_name, topic))
def send_kick(self, channel_name, target, reason=None):
self.send("KICK %s %s%s" % (channel_name, target,
"" if reason == None else " :%s" % reason))
def send_names(self, channel_name):
self.send("NAMES %s" % channel_name)
def send_list(self, search_for=None):
self.send(
"LIST%s" % "" if search_for == None else " %s" % search_for)
def send_invite(self, target, channel_name):
self.send("INVITE %s %s" % (target, channel_name))
def send_whois(self, target):
self.send("WHOIS %s" % target)
def send_whowas(self, target, amount=None, server=None):
self.send("WHOWAS %s%s%s" % (target,
"" if amount == None else " %s" % amount,
"" if server == None else " :%s" % server))
def send_who(self, filter=None):
self.send("WHO%s" % ("" if filter == None else " %s" % filter))

41
IRCUser.py Normal file
View file

@ -0,0 +1,41 @@
import uuid
class User(object):
def __init__(self, nickname, server, bot):
self.set_nickname(nickname)
self.username = None
self.realname = None
self.hostname = None
self.server = server
self.bot = bot
self.channels = set([])
self.id = None
while self.id == None or self.id in server.users:
self.id = uuid.uuid1().hex
def set_nickname(self, nickname):
self.nickname = nickname
self.nickname_lower = nickname.lower()
self.name = self.nickname_lower
def join_channel(self, channel):
self.channels.add(channel)
def part_channel(self, channel):
self.channels.remove(channel)
def set_setting(self, setting, value):
self.bot.database.set_user_setting(self.server.id, self.nickname,
setting, value)
def get_setting(self, setting, default=None):
return self.bot.database.get_user_setting(self.server.id,
self.nickname, setting, default)
def find_settings(self, pattern, default=[]):
return self.bot.database.find_user_settings(self.server.id,
self.nickname, pattern, default)
def del_setting(self, setting):
self.bot.database.del_user_setting(self.server.id, self.nickname,
setting)
def send_message(self, message):
self.server.send_message(self.nickname, message)
def send_notice(self, message):
self.server.send_notice(self.nickname, message)
def send_ctcp_response(self, command, args):
self.send_notice("\x01%s %s\x01" % (command, args))

37
ModuleManager.py Normal file
View file

@ -0,0 +1,37 @@
import glob, imp, inspect, os
class ModuleManager(object):
def __init__(self, bot, directory="modules"):
self.bot = bot
self.directory = directory
self.modules = {}
def list_modules(self):
return sorted(glob.glob(os.path.join(self.directory, "*.py")))
def load_module(self, filename):
name = os.path.basename(filename).rsplit(".py", 1)[0]
with open(filename) as module_file:
while True:
line = module_file.readline().strip()
if line and line.startswith("#--"):
# this is a hashflag
if line == "#--ignore":
# nope, ignore this module.
return None
else:
break
module = imp.load_source("bitbot_%s" % name, filename)
assert hasattr(module, "Module"
), "module '%s' doesn't have a Module class."
assert inspect.isclass(module.Module
), "module '%s' has a Module attribute but it is not a class."
module_object = module.Module(self.bot)
if not hasattr(module_object, "_name"):
module_object._name = name.title()
return module_object
def load_modules(self):
for filename in self.list_modules():
module = self.load_module(filename)
if module:
assert not module._name in self.modules, (
"module name '%s' attempted to be used twice.")
self.modules[module._name] = module

17
README.md Normal file
View file

@ -0,0 +1,17 @@
# BitBot
Python3 event-driven modular IRC bot!
## Dependencies
* [BeautifulSoup4](https://pypi.python.org/pypi/beautifulsoup4/4.3.2)
* [twitter](https://pypi.python.org/pypi/twitter)
## Configurating
To get BitBot off the ground, there's some API-keys and the like in bot.json.example. move it to bot.json, fill in the API keys you want (and remove the modules that rely on those configs.)
## Running
Just run `./start.py`
On first boot, it'll ask for a first server to connect to then exit. do `./start.py` again and it'll connect to that server and join #bitbot.
## Data storage
The main data storage for Bitbot is done in his sqlite3 database, bot.db.

29
Timer.py Normal file
View file

@ -0,0 +1,29 @@
import time
class Timer(object):
def __init__(self, function, delay, *args, **kwargs):
self.function = function
self.delay = delay
self.kwargs = kwargs
self.args = args
self._done = False
def set_started_time(self):
self.started_time = time.time()
def due(self):
return self.time_left() <= 0
def time_left(self):
return (self.started_time+self.delay)-time.time()
def call(self):
self._done = True
self.function(self, *self.args, **self.kwargs)
def redo(self):
self.done = False
self.set_started_time()
def done(self):
return self._done

158
Utils.py Normal file
View file

@ -0,0 +1,158 @@
import json, re, traceback, urllib.request, urllib.parse
import bs4
USER_AGENT = ("Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/49.0.2623.87 Safari/537.36")
REGEX_HTTP = re.compile("https?://", re.I)
def remove_colon(s):
if s.startswith(":"):
s = s[1:]
return s
def arbitrary(s, n):
return remove_colon(" ".join(s[n:]))
def seperate_hostmask(hostmask):
hostmask = remove_colon(hostmask)
first_delim = hostmask.find("!")
second_delim = hostmask.find("@")
nickname = username = hostname = hostmask
if first_delim > -1 and second_delim > first_delim:
nickname, username = hostmask.split("!", 1)
username, hostname = hostmask.split("@", 1)
return nickname, username, hostname
def get_url(url, **kwargs):
scheme = urllib.parse.urlparse(url).scheme
if not scheme:
url = "http://%s" % url
method = kwargs.get("method", "GET")
get_params = kwargs.get("get_params", "")
post_params = kwargs.get("post_params", None)
headers = kwargs.get("headers", {})
if get_params:
get_params = "?%s" % urllib.parse.urlencode(get_params)
if post_params:
post_params = urllib.parse.urlencode(post_params).encode("utf8")
url = "%s%s" % (url, get_params)
request = urllib.request.Request(url, post_params)
request.add_header("Accept-Language", "en-US")
request.add_header("User-Agent", USER_AGENT)
for header, value in headers.items():
request.add_header(header, value)
request.method = method
try:
response = urllib.request.urlopen(request)
except:
traceback.print_exc()
return False
response_content = response.read()
encoding = response.info().get_content_charset()
if kwargs.get("soup"):
return bs4.BeautifulSoup(response_content, "lxml")
if not encoding:
soup = bs4.BeautifulSoup(response_content, "lxml")
metas = soup.find_all("meta")
for meta in metas:
if "charset=" in meta.get("content", ""):
encoding = meta.get("content").split("charset=", 1)[1
].split(";", 1)[0]
elif meta.get("charset", ""):
encoding = meta.get("charset")
else:
continue
break
if not encoding:
for item in soup.contents:
if isinstance(item, bs4.Doctype):
if item == "html":
encoding = "utf8"
else:
encoding = "latin-1"
break
response_content = response_content.decode(encoding or "utf8")
data = response_content
if kwargs.get("json"):
try:
data = json.loads(response_content)
except json.decoder.JSONDecodeError:
traceback.print_exc()
return False
if kwargs.get("code"):
return response.code, data
else:
return data
COLOR_WHITE, COLOR_BLACK, COLOR_BLUE, COLOR_GREEN = 0, 1, 2, 3
COLOR_RED, COLOR_BROWN, COLOR_PURPLE, COLOR_ORANGE = 4, 5, 6, 7
COLOR_YELLOW, COLOR_LIGHTGREEN, COLOR_CYAN, COLOR_LIGHTCYAN = (8, 9,
10, 11)
COLOR_LIGHTBLUE, COLOR_PINK, COLOR_GREY, COLOR_LIGHTGREY = (12, 13,
14, 15)
FONT_BOLD, FONT_ITALIC, FONT_UNDERLINE, FONT_INVERT = ("\x02", "\x1D",
"\x1F", "\x16")
FONT_COLOR, FONT_RESET = "\x03", "\x0F"
def color(foreground, background=None):
foreground = str(foreground).zfill(2)
if background:
background = str(backbround).zfill(2)
return "%s%s%s" % (FONT_COLOR, foreground,
"" if not background else ",%s" % background)
TIME_SECOND = 1
TIME_MINUTE = TIME_SECOND*60
TIME_HOUR = TIME_MINUTE*60
TIME_DAY = TIME_HOUR*24
TIME_WEEK = TIME_DAY*7
def time_unit(seconds):
since = None
unit = None
if seconds >= TIME_WEEK:
since = seconds/TIME_WEEK
unit = "week"
elif seconds >= TIME_DAY:
since = seconds/TIME_DAY
unit = "day"
elif seconds >= TIME_HOUR:
since = seconds/TIME_HOUR
unit = "hour"
elif seconds >= TIME_MINUTE:
since = seconds/TIME_MINUTE
unit = "minute"
else:
since = seconds
unit = "second"
since = int(since)
if since > 1:
unit = "%ss" % unit # pluralise the unit
return [since, unit]
REGEX_PRETTYTIME = re.compile("\d+[wdhms]", re.I)
SECONDS_MINUTES = 60
SECONDS_HOURS = SECONDS_MINUTES*60
SECONDS_DAYS = SECONDS_HOURS*24
SECONDS_WEEKS = SECONDS_DAYS*7
def from_pretty_time(pretty_time):
seconds = 0
for match in re.findall(REGEX_PRETTYTIME, pretty_time):
number, unit = int(match[:-1]), match[-1].lower()
if unit == "m":
number = number*SECONDS_MINUTES
elif unit == "h":
number = number*SECONDS_HOURS
elif unit == "d":
number = number*SECONDS_DAYS
elif unit == "w":
number = number*SECONDS_WEEKS
seconds += number
if seconds > 0:
return seconds

14
bot.json.example Normal file
View file

@ -0,0 +1,14 @@
{
"openweathermap-api-key" : "",
"wolframalpha-api-key" : "",
"google-api-key" : "",
"google-search-id" : "",
"bighugethesaurus-api-key" : "",
"wordnik-api-key" : "",
"lastfm-api-key" : "",
"twitter-api-key" : "",
"twitter-api-secret" : "",
"twitter-access-token" : "",
"twitter-access-secret" : "",
"trakt-api-key" : ""
}

9
modules/accept_invite.py Normal file
View file

@ -0,0 +1,9 @@
class Module(object):
def __init__(self, bot):
bot.events.on("received").on("invite").hook(
self.on_invite)
def on_invite(self, event):
event["server"].send_join(event["target_channel"])

18
modules/channel_save.py Normal file
View file

@ -0,0 +1,18 @@
class Module(object):
def __init__(self, bot):
bot.events.on("self").on("join").hook(self.on_join)
bot.events.on("received").on("numeric").on("366").hook(
self.on_connect)
def on_join(self, event):
channels = set(event["server"].get_setting("autojoin", []))
channels.add(event["channel"].name)
event["server"].set_setting("autojoin", list(channels))
def on_connect(self, event):
if event["line_split"][3].lower() == "#bitbot":
channels = event["server"].get_setting("autojoin", [])
for channel in channels:
event["server"].send_join(channel)

142
modules/commands.py Normal file
View file

@ -0,0 +1,142 @@
STR_MORE = " (more...)"
STR_CONTINUED = "(...continued) "
OUT_CUTOFF = 400
class ChannelOut(object):
def __init__(self, module_name, channel):
self.module_name = module_name
self.channel = channel
self._text = ""
self.written = False
def write(self, text):
self._text += text
self.written = True
return self
def send(self):
if self.has_text():
text = "[%s] %s" % (self.prefix(), self._text)
text_encoded = text.encode("utf8")
if len(text_encoded) > OUT_CUTOFF:
text = "%s%s" % (text_encoded[:OUT_CUTOFF].decode("utf8"
).rstrip(), STR_MORE)
self._text = "%s%s" % (STR_CONTINUED, text_encoded[OUT_CUTOFF:
].decode("utf8").lstrip())
self.channel.send_message(text)
def set_prefix(self, prefix):
self.module_name = prefix
def has_text(self):
return bool(self._text)
class ChannelStdOut(ChannelOut):
def prefix(self):
return self.module_name
class ChannelStdErr(ChannelOut):
def prefix(self):
return "!%s" % self.module_name
class Module(object):
def __init__(self, bot):
self.bot = bot
bot.events.on("received").on("message").on("channel").hook(
self.channel_message)
bot.events.on("received").on("message").on("private").hook(
self.private_message)
bot.events.on("received").on("command").on("help").hook(self.help)
bot.events.on("new").on("user", "channel").hook(self.new)
bot.events.on("received").on("command").on("more").hook(self.more)
bot.events.on("send").on("stdout").hook(self.send_stdout)
bot.events.on("send").on("stderr").hook(self.send_stderr)
def new(self, event):
if "user" in event:
target = event["user"]
else:
target = event["channel"]
target.last_stdout = None
target.last_stderr = None
def has_command(self, command):
return command.lower() in self.bot.events.on("received").on(
"command").get_children()
def get_hook(self, command):
return self.bot.events.on("received").on("command").on(command
).get_hooks()[0]
def channel_message(self, event):
command_prefix = event["channel"].get_setting("command_prefix",
event["server"].get_setting("command_prefix", "!"))
if event["message_split"][0].startswith(command_prefix):
command = event["message_split"][0].replace(
command_prefix, "", 1).lower()
if self.has_command(command):
hook = self.get_hook(command)
module_name = hook.function.__self__._name
stdout = ChannelStdOut(module_name, event["channel"])
stderr = ChannelStdErr(module_name, event["channel"])
returns = self.bot.events.on("preprocess").on("command"
).call(hook=hook, user=event["user"], server=event[
"server"])
for returned in returns:
if returned:
event["stderr"].write(returned).send()
return
min_args = hook.kwargs.get("min_args")
args_split = event["message_split"][1:]
if min_args and len(args_split) < min_args:
ChannelStdErr("Error", event["channel"]
).write("Not enough arguments ("
"minimum: %d)" % min_args).send()
else:
args = " ".join(args_split)
self.bot.events.on("received").on("command").on(
command).call(1, user=event["user"],
channel=event["channel"], args=args,
args_split=args_split, server=event["server"
], stdout=stdout, stderr=stderr, command=command,
log=event["channel"].log, target=event["channel"])
stdout.send()
stderr.send()
if stdout.has_text():
event["channel"].last_stdout = stdout
if stderr.has_text():
event["channel"].last_stderr = stderr
event["channel"].log.skip_next()
def private_message(self, event):
pass
def help(self, event):
if event["args"]:
command = event["args_split"][0].lower()
if command in self.bot.events.on("received").on(
"command").get_children():
hooks = self.bot.events.on("received").on("command").on(command).get_hooks()
if hooks and "help" in hooks[0].kwargs:
event["stdout"].write("%s: %s" % (command, hooks[0].kwargs["help"]))
else:
help_available = []
for child in self.bot.events.on("received").on("command").get_children():
hooks = self.bot.events.on("received").on("command").on(child).get_hooks()
if hooks and "help" in hooks[0].kwargs:
help_available.append(child)
help_available = sorted(help_available)
event["stdout"].write("Commands: %s" % ", ".join(help_available))
def more(self, event):
if event["target"].last_stdout:
event["target"].last_stdout.send()
def send_stdout(self, event):
if event["target"].name[0] in event["server"].channel_types:
stdout = ChannelStdOut(event["module_name"], event["target"])
stdout.write(event["message"]).send()
if stdout.has_text():
event["target"].last_stdout = stdout
def send_stderr(self, event):
if event["target"].name[0] in event["server"].channel_types:
stderr = ChannelStdErr(event["module_name"], event["target"])
stderr.write(event["message"]).send()
if stderr.has_text():
event["target"].last_stderr = stderr

26
modules/ctcp.py Normal file
View file

@ -0,0 +1,26 @@
class Module(object):
def __init__(self, bot):
bot.events.on("received").on("message").on("private").hook(
self.private_message)
self.bot = bot
def private_message(self, event):
if event["message"][0] == "\x01" and event["message"][-1] == "\x01":
ctcp_command = event["message_split"][0][1:].upper()
if ctcp_command.endswith("\x01"):
ctcp_command = ctcp_command[:-1]
ctcp_args = " ".join(event["message_split"][1:])[:-1]
ctcp_args_split = ctcp_args.split(" ")
ctcp_response = None
if ctcp_command == "VERSION":
ctcp_response = self.bot.config.get("ctcp-version",
"BitBot")
elif ctcp_command == "PING":
ctcp_response = " ".join(ctcp_args_split)
if ctcp_response:
event["user"].send_ctcp_response(ctcp_command,
ctcp_response)

27
modules/define.py Normal file
View file

@ -0,0 +1,27 @@
import Utils
URL_WORDNIK = "http://api.wordnik.com:80/v4/word.json/%s/definitions"
class Module(object):
def __init__(self, bot):
self.bot = bot
bot.events.on("received").on("command").on("define").hook(
self.define, help="Define a provided term")
def define(self, event):
if event["args"]:
word = event["args"]
else:
word = event["log"].get(from_self=False)
page = Utils.get_url(URL_WORDNIK % event["args"], get_params={
"useCanonical": "true", "limit": 1,
"sourceDictionaries": "wiktionary", "api_key": self.bot.config[
"wordnik-api-key"]}, json=True)
if page:
if len(page):
event["stdout"].write("%s: %s" % (page[0]["word"],
page[0]["text"]))
else:
event["stderr"].write("No definitions found")
else:
event["stderr"].write("Failed to load results")

21
modules/dns.py Normal file
View file

@ -0,0 +1,21 @@
import socket
class Module(object):
_name = "DNS"
def __init__(self, bot):
bot.events.on("received").on("command").on("dns").hook(
self.dns, min_args=1,
help="Get all addresses for a given hostname (IPv4/IPv6)")
def dns(self, event):
hostname = event["args_split"][0]
try:
address_info = socket.getaddrinfo(hostname, 1, 0,
socket.SOCK_DGRAM)
except socket.gaierror:
event["stderr"].write("Failed to find hostname")
return
ips = []
for _, _, _, _, address in address_info:
ips.append(address[0])
event["stdout"].write("%s: %s" % (hostname, ", ".join(ips)))

30
modules/geoip.py Normal file
View file

@ -0,0 +1,30 @@
import Utils
URL_GEOIP = "http://ip-api.com/json/%s"
class Module(object):
_name = "GeoIP"
def __init__(self, bot):
bot.events.on("received").on("command").on("geoip").hook(
self.geoip, min_args=1,
help="Get geoip data on a given IPv4/IPv6 address")
def geoip(self, event):
page = Utils.get_url(URL_GEOIP % event["args_split"][0],
json=True)
if page:
if page["status"] == "success":
data = page["query"]
data += " | Organisation: %s" % page["org"]
data += " | City: %s" % page["city"]
data += " | Region: %s (%s)" % (page["regionName"],
page["countryCode"])
data += " | ISP: %s" % page["isp"]
data += " | Lon/Lat: %s/%s" % (page["lon"],
page["lat"])
data += " | Timezone: %s" % page["timezone"]
event["stdout"].write(data)
else:
event["stderr"].write("No geoip data found")
else:
event["stderr"].write("Failed to load results")

28
modules/google.py Normal file
View file

@ -0,0 +1,28 @@
import Utils
URL_GOOGLESEARCH = "https://www.googleapis.com/customsearch/v1"
class Module(object):
def __init__(self, bot):
self.bot = bot
bot.events.on("received").on("command").on("google",
"g").hook(self.google, help="Google feeling lucky")
def google(self, event):
phrase = event["args"] or event["log"].get()
if phrase:
page = Utils.get_url(URL_GOOGLESEARCH, get_params={
"q": phrase, "key": self.bot.config[
"google-api-key"], "cx": self.bot.config[
"google-search-id"], "prettyPrint": "true",
"num": 1, "gl": "us"}, json=True)
if page:
if "items" in page and len(page["items"]):
event["stdout"].write(page["items"][0][
"link"])
else:
event["stderr"].write("No results found")
else:
event["stderr"].write("Failed to load results")
else:
event["stderr"].write("No phrase provided")

26
modules/imdb.py Normal file
View file

@ -0,0 +1,26 @@
import json
import Utils
URL_OMDB = "http://www.omdbapi.com/"
URL_IMDBTITLE = "http://imdb.com/title/%s"
class Module(object):
_name = "IMDb"
def __init__(self, bot):
bot.events.on("received").on("command").on("imdb").hook(
self.imdb, min_args=1,
help="Search for a given title on IMDb")
def imdb(self, event):
page = Utils.get_url(URL_OMDB, get_params={"t": event["args"]},
json=True)
if page:
if "Title" in page:
event["stdout"].write("%s, %s (%s) %s (%s/10.0) %s" % (
page["Title"], page["Year"], page["Runtime"],
page["Plot"], page["imdbRating"],
URL_IMDBTITLE % page["imdbID"]))
else:
event["stderr"].write("Title not found")
else:
event["stderr"].write("Failed to load results")

79
modules/in.py Normal file
View file

@ -0,0 +1,79 @@
import time
import Utils
SECONDS_MAX = Utils.SECONDS_WEEKS*8
SECONDS_MAX_DESCRIPTION = "8 weeks"
class Module(object):
def __init__(self, bot):
self.bot = bot
bot.events.on("received").on("command").on("in").hook(
self.in_command, min_args=2)
bot.events.on("received").on("numeric").on("001").hook(
self.on_connect)
def on_connect(self, event):
self.load_reminders(event["server"])
def remove_timer(self, target, due_time, server_id, nickname, message):
setting = "in-%s" % nickname
reminders = self.bot.database.get_server_setting(server_id, setting, [])
try:
reminders.remove([target, due_time, server_id, nickname, message])
except:
print("failed to remove a reminder. huh.")
if reminders:
self.bot.database.set_server_setting(server_id,
setting, reminders)
else:
self.bot.database.del_server_setting(server_id,
setting)
def load_reminders(self, server):
reminders = server.find_settings("in-%")
for user_reminders in reminders:
for target, due_time, server_id, nickname, message in user_reminders[1]:
time_left = due_time-time.time()
if time_left > 0:
self.bot.add_timer(self.timer_due, time_left, target=target,
due_time=due_time, server_id=server_id, nickname=nickname,
message=message)
else:
self.remove_timer(target, due_time, server_id, nickname,
message)
def in_command(self, event):
seconds = Utils.from_pretty_time(event["args_split"][0])
message = " ".join(event["args_split"][1:])
if seconds:
if seconds <= SECONDS_MAX:
due_time = int(time.time())+seconds
setting = "in-%s" % event["user"].nickname
reminders = event["server"].get_setting(setting, [])
reminders.append([event["target"].name, due_time,
event["server"].id, event["user"].nickname, message])
event["server"].set_setting(setting, reminders)
self.bot.add_timer(self.timer_due, seconds,
target=event["target"].name, due_time=due_time,
server_id=event["server"].id, nickname=event["user"].nickname,
message=message)
else:
event["stderr"].write(
"The given time is above the max (%s)" % (
SECONDS_MAX_DESCRIPTION))
else:
event["stderr"].write(
"Please provided a valid time above 0 seconds")
def timer_due(self, timer, **kwargs):
for server in self.bot.servers.values():
if kwargs["server_id"] == server.id:
server.send_message(kwargs["target"],
"%s, this is your reminder: %s" % (
kwargs["nickname"], kwargs["message"]))
break
setting = "in-%s" % kwargs["nickname"]
self.remove_timer(kwargs["target"], kwargs["due_time"],
kwargs["server_id"], kwargs["nickname"], kwargs["message"])

35
modules/isbn.py Normal file
View file

@ -0,0 +1,35 @@
import json
import Utils
URL_GOOGLEBOOKS = "https://www.googleapis.com/books/v1/volumes"
class Module(object):
_name = "ISBN"
def __init__(self, bot):
self.bot = bot
bot.events.on("received").on("command").on("isbn").hook(
self.isbn, help="Get book information from a provided ISBN",
min_args=1)
def isbn(self, event):
isbn = event["args_split"][0]
if len(isbn) == 10:
isbn = "978%s" % isbn
isbn = isbn.replace("-", "")
page = Utils.get_url(URL_GOOGLEBOOKS, get_params={
"q": "isbn:%s" % isbn, "country": "us"}, json=True)
if page:
if page["totalItems"] > 0:
book = page["items"][0]["volumeInfo"]
title = book["title"]
sub_title = book["subtitle"]
authors = ", ".join(book["authors"])
date = book["publishedDate"]
rating = book["averageRating"]
#language = book["language"]
event["stdout"].write("%s - %s (%s), %s (%s/5.0)" % (
title, authors, date, sub_title, rating))
else:
event["stderr"].write("Unable to find book")
else:
event["stderr"].write("Failed to load results")

8
modules/join.py Normal file
View file

@ -0,0 +1,8 @@
class Module(object):
def __init__(self, bot):
bot.events.on("received").on("numeric").on("001").hook(self.do_join)
def do_join(self, event):
event["server"].send_join("#bitbot")

41
modules/karma.py Normal file
View file

@ -0,0 +1,41 @@
import re, time
REGEX_KARMA = re.compile("(.*)(\+{2,}|\-{2,})")
KARMA_DELAY_SECONDS = 3
class Module(object):
def __init__(self, bot):
bot.events.on("new").on("user").hook(self.new_user)
bot.events.on("received").on("message").on("channel").hook(
self.channel_message)
bot.events.on("received").on("command").on("karma").hook(
self.karma)
def new_user(self, event):
event["user"].last_karma = None
def channel_message(self, event):
match = re.match(REGEX_KARMA, event["message"])
if match:
if not event["user"].last_karma or (time.time()-event["user"
].last_karma) >= KARMA_DELAY_SECONDS:
positive = match.group(2)[0] == "+"
setting = "karma-%s" % match.group(1).strip()
karma = event["server"].get_setting(setting, 0)
if positive:
karma += 1
else:
karma -= 1
if karma:
event["server"].set_setting(setting, karma)
else:
event["server"].del_setting(setting)
event["user"].last_karma = time.time()
def karma(self, event):
if event["args"]:
target = event["args"]
else:
target = event["user"].nickname
karma = event["server"].get_setting("karma-%s" % target, 0)
event["stdout"].write("%s has %s karma" % (target, karma))

56
modules/lastfm.py Normal file
View file

@ -0,0 +1,56 @@
import Utils
URL_SCROBBLER = "http://ws.audioscrobbler.com/2.0/"
class Module(object):
def __init__(self, bot):
self.bot = bot
bot.events.on("boot").on("done").hook(self.boot_done)
bot.events.on("received").on("command").on("np",
"listening", "nowplaying").hook(self.np,
help="Get the last listen to track from a user")
def boot_done(self, event):
self.bot.events.on("postboot").on("configure").on(
"set").call(setting="lastfm",
help="Set username on last.fm")
def np(self, event):
if event["args_split"]:
username = event["args_split"][0]
else:
username = event["user"].get_setting("lastfm",
event["user"].nickname)
page = Utils.get_url(URL_SCROBBLER, get_params={
"method": "user.getrecenttracks", "user": username,
"api_key": self.bot.config["lastfm-api-key"],
"format": "json", "limit": "1"}, json=True)
if page:
if "recenttracks" in page and len(page["recenttracks"
]["track"]):
now_playing = page["recenttracks"]["track"][0]
track_name = now_playing["name"]
artist = now_playing["artist"]["#text"]
tags_page = Utils.get_url(URL_SCROBBLER, get_params={
"method": "track.getTags", "artist": artist,
"track": track_name, "autocorrect": "1",
"api_key": self.bot.config["lastfm-api-key"],
"user": username, "format": "json"}, json=True)
tags = []
if tags_page.get("tags", {}).get("tag"):
for tag in tags_page["tags"]["tag"]:
tags.append(tag["name"])
if tags:
tags = " (%s)" % ", ".join(tags)
else:
tags = ""
event["stdout"].write("%s is now playing: %s - %s%s" % (
username, artist, track_name, tags))
else:
event["stderr"].write(
"The user '%s' has never scrobbled before" % (
username))
else:
event["stderr"].write("Failed to load results")

13
modules/nickserv.py Normal file
View file

@ -0,0 +1,13 @@
class Module(object):
def __init__(self, bot):
bot.events.on("received").on("numeric").on("001"
).hook(self.on_connect)
def on_connect(self, event):
nickserv_password = event["server"].get_setting(
"nickserv_password")
if nickserv_password:
event["server"].send_message("nickserv",
"identify %s" % nickserv_password)

16
modules/perform.py Normal file
View file

@ -0,0 +1,16 @@
class Module(object):
def __init__(self, bot):
bot.events.on("received").on("numeric").on("001").hook(
self.on_connect)
def on_connect(self, event):
commands = event["server"].get_setting("perform", [])
for i, command in enumerate(commands):
command = command.split("%%")
for j, part in enumerate(command[:]):
command[j] = part.replace("%nick%", event["server"
].nickname)
command = "%".join(command)
event["server"].send(command)

9
modules/pong.py Normal file
View file

@ -0,0 +1,9 @@
class Module(object):
def __init__(self, bot):
bot.events.on("received").on("command").on("ping"
).hook(self.pong, help="Ping pong!")
def pong(self, event):
event["stdout"].write("Pong!")

64
modules/sed.py Normal file
View file

@ -0,0 +1,64 @@
import re, traceback
REGEX_SPLIT = re.compile("(?<!\\\\)/")
REGEX_SED = re.compile("^s/")
class Module(object):
def __init__(self, bot):
self.bot = bot
bot.events.on("boot").on("done").hook(self.boot_done)
bot.events.on("received").on("message").on("channel").hook(
self.channel_message)
def validate_setchannel(self, s):
return s.lower() == "true"
def boot_done(self, event):
self.bot.events.on("postboot").on("configure").on(
"channelset").call(setting="sed",
help="Disable/Enable sed in a channel",
validate=self.validate_setchannel)
def channel_message(self, event):
if event["action"] or not event["channel"].get_setting("sed", True):
return
sed_split = re.split(REGEX_SPLIT, event["message"], 3)
if event["message"].startswith("s/") and len(sed_split) > 2:
regex_flags = 0
flags = (sed_split[3:] or [""])[0]
count = None
last_flag = ""
for flag in flags:
if flag.isdigit():
if last_flag.isdigit():
count = int(str(count) + flag)
elif not count:
count = int(flag)
elif flag == "i":
regex_flags |= re.I
elif flag == "g":
count = 0
last_flag = flag
if count == None:
count = 1
try:
pattern = re.compile(sed_split[1], regex_flags)
except:
traceback.print_exc()
self.bot.events.on("send").on("stderr").call(target=event[
"channel"], module_name="Sed", server=event["server"],
message="Invalid regex in pattern")
return
replace = sed_split[2].replace("\\/", "/")
line = event["channel"].log.find(pattern, from_self=False, not_pattern=REGEX_SED)
if line:
new_message = re.sub(pattern, replace, line.message, count)
if line.action:
prefix = "* %s" % line.sender
else:
prefix = "<%s>" % line.sender
self.bot.events.on("send").on("stdout").call(target=event[
"channel"], module_name="Sed", server=event["server"],
message="%s %s" % (prefix, new_message))

25
modules/seen.py Normal file
View file

@ -0,0 +1,25 @@
import time
import Utils
class Module(object):
def __init__(self, bot):
bot.events.on("received").on("message").on("channel"
).hook(self.channel_message)
bot.events.on("received").on("command").on("seen").hook(
self.seen, min_args=1,
help="Find out when a user was last seen")
def channel_message(self, event):
seen_seconds = time.time()
event["user"].set_setting("seen", seen_seconds)
def seen(self, event):
seen_seconds = event["server"].get_user(event["args_split"][0]
).get_setting("seen")
if seen_seconds:
since, unit = Utils.time_unit(time.time()-seen_seconds)
event["stdout"].write("%s was last seen %s %s ago" % (
event["args_split"][0], since, unit))
else:
event["stderr"].write("I have never seen %s before." % (
event["args_split"][0]))

54
modules/set.py Normal file
View file

@ -0,0 +1,54 @@
class Module(object):
def __init__(self, bot):
self.bot = bot
self.settings = {}
self.channel_settings = {}
bot.events.on("postboot").on("configure").on("set").hook(
self.postboot_set)
bot.events.on("postboot").on("configure").on("channelset"
).hook(self.postboot_channelset)
bot.events.on("received").on("command").on("set").hook(
self.set, help="Set a specified user setting")
bot.events.on("received").on("command").on("channelset"
).hook(self.channel_set, channel_only=True)
def _postboot_set(self, settings, event):
settings[event["setting"]] = {}
settings[event["setting"]]["validate"] = event.get(
"validate", lambda s: s)
settings[event["setting"]]["help"] = event.get("help",
"")
def postboot_set(self, event):
self._postboot_set(self.settings, event)
def postboot_channelset(self, event):
self._postboot_set(self.channel_settings, event)
def _set(self, settings, target, event):
if len(event["args_split"]) > 1:
setting = event["args_split"][0].lower()
if setting in settings:
value = " ".join(event["args_split"][1:])
value = settings[setting]["validate"](value)
if not value == None:
target.set_setting(setting, value)
event["stdout"].write("Saved setting")
else:
event["stderr"].write("Invalid value")
else:
event["stderr"].write("Unknown setting")
elif len(event["args_split"]) == 1:
event["stderr"].write("Please provide a value")
else:
event["stdout"].write("Available settings: %s" % (
", ".join(settings.keys())))
def set(self, event):
self._set(self.settings, event["user"], event)
def channel_set(self, event):
if event["channel"].mode_or_above(event["user"].nickname,
"o"):
self._set(self.channel_settings, event["channel"], event)
else:
event["stderr"].write("You do not have the modes required")

48
modules/signals.py Normal file
View file

@ -0,0 +1,48 @@
import signal
from random import randint
QUOTES = {
"You can build a throne with bayonets, but it's difficult to sit on it." : "Boris Yeltsin",
"We don't appreciate what we have until it's gone. Freedom is like that. It's like air. When you have it, you don't notice it." : "Boris Yeltsin",
"Storm clouds of terror and dictatorship are gathering over the whole country... They must not be allowed to bring eternal night." : "Boris Yeltsin",
"They accused us of suppressing freedom of expression. This was a lie and we could not let them publish it." : "Nelba Blandon, as director of censorship, Nicaragua",
"Death solves all problems - no man, no problem." : "Joseph Stalin",
"Make the lie big, make it simple, keep saying it, and eventually they will believe it." : "Adolf Hitler",
"If we don't end war, war will end us." : "H.G. Wells",
"All that is necessary for evil to succeed is for good men to do nothing." : "Edmund Burke",
"Live well. It is the greatest revenge." : "The Talmud",
"To improve is to change, so to be perfect is to have changed often." : "Winston Churchill",
"I believe it is peace for our time." : "Neville Chamberlain",
"Orbiting Earth in the spaceship, I saw how beautiful our planet is. People, let us preserve and increase this beauty, not destroy it!" : "Yuri Gagarin",
"Science is not everything, but science is very beautiful." : "Robert Oppenheimer",
"We choose to go to the Moon! We choose to go to the Moon in this decade and do the other things, not because they are easy, but because they are hard." : "",
"You teach a child to read, and he or her will be able to pass a literacy test." : "George W Bush",
"I know the human being and fish can coexist peacefully." : "George W Bush",
"They misunderestimated me." : "George W Bush",
"I'm an Internet expert too." : "Kim Jong Il",
"So long, and thanks for all the fish" : "",
"It is a lie that I made the people starve." : "Nicolae Ceaușescu",
"As far as I know - effective immediately, without delay." : "Günter Schabowski"
}
class Module(object):
def __init__(self, bot):
self.bot = bot
signal.signal(signal.SIGINT, self.SIGINT)
signal.signal(signal.SIGUSR1, self.SIGUSR1)
def SIGINT(self, signum, frame):
print()
for server in self.bot.servers.values():
server.send_quit(self.random_quote())
self.bot.register_write(server)
self.bot.running = False
def SIGUSR1(self, signum, frame):
print("Reloading config file")
self.bot.config_object.load_config()
def random_quote(self):
quote = list(QUOTES)[randint(0, len(QUOTES)-1)]
return (" - " if QUOTES[quote] else "").join([quote,
QUOTES[quote]])

43
modules/thesaurus.py Normal file
View file

@ -0,0 +1,43 @@
import Utils
URL_THESAURUS = "http://words.bighugelabs.com/api/2/%s/%s/json"
class Module(object):
def __init__(self, bot):
self.bot = bot
bot.events.on("received").on("command").on("synonym",
"antonym").hook(self.thesaurus, min_args=1,
help="Get synonyms/antonyms for a provided phrase")
def thesaurus(self, event):
phrase = event["args_split"][0]
page = Utils.get_url(URL_THESAURUS % (self.bot.config[
"bighugethesaurus-api-key"], phrase), json=True)
syn_ant = event["command"][:3]
if page:
if not len(event["args_split"]) > 1:
word_types = []
for word_type in page.keys():
if syn_ant in page[word_type]:
word_types.append(word_type)
if word_types:
word_types = sorted(word_types)
event["stdout"].write(
"Available categories for %s: %s" % (
phrase, ", ".join(word_types)))
else:
event["stderr"].write("No categories available")
else:
category = event["args_split"][1].lower()
if category in page:
if syn_ant in page[category]:
event["stdout"].write("%ss for %s: %s" % (
event["command"].title(), phrase, ", ".join(
page[category][syn_ant])))
else:
event["stderr"].write("No %ss for %s" % (
event["command"], phrase))
else:
event["stderr"].write("Category not found")
else:
event["stderr"].write("Failed to load results")

33
modules/title.py Normal file
View file

@ -0,0 +1,33 @@
import re
import Utils
REGEX_URL = re.compile("https?://\S+", re.I)
class Module(object):
def __init__(self, bot):
bot.events.on("received").on("command").on("title", "t").hook(
self.title, help="Get the title of the provided or most "
"recent URL.")
def title(self, event):
url = None
if len(event["args"]) > 0:
url = event["args_split"][0]
else:
url = event["channel"].log.find(REGEX_URL)
if url:
url = re.search(REGEX_URL, url.message).group(0)
if not url:
event["stderr"].write("No URL provided/found.")
return
soup = Utils.get_url(url, soup=True)
if not soup:
event["stderr"].write("Failed to get URL.")
return
title = soup.title
if title:
title = title.text.replace("\n", " ").replace("\r", ""
).replace(" ", " ").strip()
event["stdout"].write(title)
else:
event["stderr"].write("No title found.")

26
modules/to.py Normal file
View file

@ -0,0 +1,26 @@
class Module(object):
def __init__(self, bot):
bot.events.on("received").on("message").on("channel"
).hook(self.channel_message)
bot.events.on("received").on("command").on("to").hook(
self.to, min_args=2, help=("Relay a message to a "
"user the next time they talk in a channel"),
channel_only=True)
def channel_message(self, event):
setting = "to-%s" % event["user"].nickname
messages = event["channel"].get_setting(setting, [])
for nickname, message in messages:
event["channel"].send_message("%s: <%s> %s" % (
event["user"].nickname, nickname, message))
event["channel"].del_setting(setting)
def to(self, event):
setting = "to-%s" % event["args_split"][0]
messages = event["channel"].get_setting(setting, [])
messages.append([event["user"].nickname,
" ".join(event["args_split"][1:])])
event["channel"].set_setting(setting, messages)
event["stdout"].write("Message saved")

58
modules/trakt.py Normal file
View file

@ -0,0 +1,58 @@
import Utils
URL_TRAKT = "https://api-v2launch.trakt.tv/users/%s/watching"
URL_TRAKTSLUG = "https://trakt.tv/%s/%s"
class Module(object):
def __init__(self, bot):
self.bot = bot
bot.events.on("boot").on("done").hook(self.boot_done)
bot.events.on("received").on("command").on("nowwatching",
"nw").hook(self.now_watching)
def boot_done(self, event):
self.bot.events.on("postboot").on("configure").on("set"
).call(setting="trakt", help="Set username on trakt.tv")
def now_watching(self, event):
if event["args"]:
username = event["args_split"][0]
else:
username = event["user"].get_setting("trakt",
event["user"].nickname)
page = Utils.get_url(URL_TRAKT % username, headers={
"Content-Type": "application/json",
"trakt-api-version": "2", "trakt-api-key":
self.bot.config["trakt-api-key"]}, json=True,
code=True)
if page:
code, page = page
if code == 204:
event["stderr"].write(
"%s is not watching anything" % username)
else:
type = page["type"]
if type == "movie":
title = page["movie"]["title"]
year = page["movie"]["year"]
slug = page["movie"]["ids"]["slug"]
event["stdout"].write(
"%s is now watching %s (%s) %s" % (
username, title, year,
URL_TRAKTSLUG % ("movie", slug)))
elif type == "episode":
season = page["episode"]["season"]
episode_number = page["episode"]["number"]
episode_title = page["episode"]["title"]
show_title = page["show"]["title"]
show_year = page["show"]["year"]
slug = page["show"]["ids"]["slug"]
event["stdout"].write(
"%s is now watching %s s%se%s - %s %s" % (
username, show_title, str(season).zfill(2),
str(episode_number).zfill(2), episode_title,
URL_TRAKTSLUG % ("shows", slug)))
else:
print("ack! unknown trakt media type!")
else:
event["stderr"].write("Failed to load results")

51
modules/translate.py Normal file
View file

@ -0,0 +1,51 @@
import re
import Utils
URL_TRANSLATE = "https://translate.google.com/"
REGEX_LANGUAGES = re.compile("(\w{2})?:(\w{2})? ")
class Module(object):
def __init__(self, bot):
bot.events.on("received").on("command").on("translate", "tr").hook(
self.translate, help="Translate the provided phrase or the "
"last line seen.")
def translate(self, event):
phrase = event["args"]
if not phrase:
phrase = event["channel"].log.get()
if phrase:
phrase = phrase.message
if not phrase:
event["stderr"].write("No phrase provided.")
return
source_language = "auto"
target_language = "en"
language_match = re.match(REGEX_LANGUAGES, phrase)
if language_match:
if language_match.group(1):
source_language = language_match.group(1)
if language_match.group(2):
target_language = language_match.group(2)
phrase = phrase.split(" ", 1)[1]
soup = Utils.get_url(URL_TRANSLATE, post_params={
"sl": source_language, "tl": target_language, "js": "n",
"prev": "_t", "hl": "en", "ie": "UTF-8", "text": phrase,
"file": "", "edit-text": ""}, method="POST", soup=True)
if soup:
languages_element = soup.find(id="gt-otf-switch")
translated_element = soup.find(id="result_box").find("span")
if languages_element and translated_element:
source_language, target_language = languages_element.attrs[
"href"].split("&sl=", 1)[1].split("&tl=", 1)
target_language = target_language.split("&", 1)[0]
translated = translated_element.text
event["stdout"].write("(%s > %s) %s" % (source_language,
target_language, translated))
return
event["stderr"].write("Failed to translate, try checking "
"source/target languages (https://cloud.google.com/translate/"
"v2/using_rest#language-params")

69
modules/twitter.py Normal file
View file

@ -0,0 +1,69 @@
import datetime, re, time, traceback
import twitter
import Utils
REGEX_TWITTERURL = re.compile(
"https?://(?:www\.)?twitter.com/[^/]+/status/(\d+)", re.I)
class Module(object):
def __init__(self, bot):
self.bot = bot
bot.events.on("received").on("command").on("twitter", "tw"
).hook(self.twitter)
def make_timestamp(self, s):
seconds_since = time.time()-datetime.datetime.strptime(s,
"%a %b %d %H:%M:%S %z %Y").timestamp()
since, unit = Utils.time_unit(seconds_since)
return "%s %s ago" % (since, unit)
def twitter(self, event):
api_key = self.bot.config["twitter-api-key"]
api_secret = self.bot.config["twitter-api-secret"]
access_token = self.bot.config["twitter-access-token"]
access_secret = self.bot.config["twitter-access-secret"]
if event["args"]:
target = event["args"]
else:
target = event["log"].find(REGEX_TWITTERURL)
if target:
target = target.message
if target:
twitter_object = twitter.Twitter(auth=twitter.OAuth(
access_token, access_secret, api_key, api_secret))
url_match = re.search(REGEX_TWITTERURL, target)
if url_match or target.isdigit():
tweet_id = url_match.group(1) if url_match else target
try:
tweet = twitter_object.statuses.show(id=tweet_id)
except:
traceback.print_exc()
tweet = None
elif target.startswith("@"):
try:
tweet = twitter_object.statuses.user_timeline(
screen_name=target[1:], count=1)[0]
except:
traceback.print_exc()
tweet = None
if tweet:
username = "@%s" % tweet["user"]["screen_name"]
if "retweeted_status" in tweet:
original_username = "@%s" % tweet["retweeted_status"
]["user"]["screen_name"]
original_text = tweet["retweeted_status"]["text"]
retweet_timestamp = self.make_timestamp(tweet[
"created_at"])
original_timestamp = self.make_timestamp(tweet[
"retweeted_status"]["created_at"])
event["stdout"].write("(%s (%s) retweeted %s (%s)) %s" % (
username, retweet_timestamp,
original_username, original_timestamp, original_text))
else:
event["stdout"].write("(%s, %s) %s" % (username,
self.make_timestamp(tweet["created_at"]), tweet["text"]))
else:
event["stderr"].write("Invalid tweet identifiers provided")
else:
event["stderr"].write("No tweet provided to get information about")

View file

@ -0,0 +1,34 @@
import json, re
import Utils
URL_URBANDICTIONARY = "http://api.urbandictionary.com/v0/define"
REGEX_DEFNUMBER = re.compile("-n(\d+) \S+")
class Module(object):
def __init__(self, bot):
bot.events.on("received").on("command").on("urbandictionary", "ud"
).hook(self.ud, min_args=1,
help="Get the definition of a provided term")
def ud(self, event):
term = event["args"]
number = 1
match = re.match(REGEX_DEFNUMBER, term)
if match:
number = int(match.group(1))
term = term.split(" ", 1)[1]
page = Utils.get_url(URL_URBANDICTIONARY, get_params={"term": term},
json=True)
if page:
if len(page["list"]):
if number > 0 and len(page["list"]) > number-1:
definition = page["list"][number-1]
event["stdout"].write("%s: %s" % (definition["word"],
definition["definition"].replace("\n", " ").replace(
"\r", "").replace(" ", " ")))
else:
event["stderr"].write("Definition number does not exist")
else:
event["stderr"].write("No results found")
else:
event["stderr"].write("Failed to load results")

35
modules/weather.py Normal file
View file

@ -0,0 +1,35 @@
import Utils
URL_WEATHER = "http://api.openweathermap.org/data/2.5/weather"
class Module(object):
def __init__(self, bot):
bot.events.on("received").on("command").on("weather").hook(
self.weather, min_args=1,
help="Get current weather data for a provided location")
self.bot = bot
def weather(self, event):
api_key = self.bot.config["openweathermap-api-key"]
page = Utils.get_url(URL_WEATHER, get_params={
"q": event["args"], "units": "metric",
"APPID": api_key},
json=True)
if page:
if "weather" in page:
location = "%s, %s" % (page["name"], page["sys"][
"country"])
celsius = "%dC" % page["main"]["temp"]
fahrenheit = "%dF" % ((page["main"]["temp"]*(9/5))+32)
description = page["weather"][0]["description"].title()
humidity = "%s%%" % page["main"]["humidity"]
wind_speed = "%sKM/h" % page["wind"]["speed"]
event["stdout"].write(
"(%s) %s/%s | %s | Humidity: %s | Wind: %s" % (
location, celsius, fahrenheit, description, humidity,
wind_speed))
else:
event["stderr"].write("No weather information for this location")
else:
event["stderr"].write("Failed to load results")

42
modules/wolframalpha.py Normal file
View file

@ -0,0 +1,42 @@
import re
import Utils
URL_WA = "http://api.wolframalpha.com/v2/query"
REGEX_CHARHEX = re.compile("\\\\:(\S{4})")
class Module(object):
_name = "Wolfram|Alpha"
def __init__(self, bot):
bot.events.on("received").on("command").on("wolframalpha", "wa"
).hook(self.wa, min_args=1, help=
"Evauate a given string on Wolfram|Alpha")
self.bot = bot
def wa(self, event):
soup = Utils.get_url(URL_WA, get_params={"input": event["args"],
"appid": self.bot.config["wolframalpha-api-key"],
"format": "plaintext", "reinterpret": "true"}, soup=True)
if soup:
if int(soup.find("queryresult").get("numpods")) > 0:
input = soup.find(id="Input").find("subpod").find("plaintext"
).text
for pod in soup.find_all("pod"):
if pod.get("primary") == "true":
answer = pod.find("subpod").find("plaintext")
text = "(%s) %s" % (input.replace(" | ", ": "),
answer.text.strip().replace(" | ", ": "
).replace("\n", " | ").replace("\r", ""))
while True:
match = re.search(REGEX_CHARHEX, text)
if match:
text = re.sub(REGEX_CHARHEX, chr(int(
match.group(1), 16)), text)
else:
break
event["stdout"].write(text)
break
else:
event["stderr"].write("No results found")
else:
event["stderr"].write("Failed to load results")

105
modules/youtube.py Normal file
View file

@ -0,0 +1,105 @@
import re
import Utils
REGEX_YOUTUBE = re.compile(
"https?://(?:www.)?(?:youtu.be/|youtube.com/watch\?[^\S]*v=)(\w{11})",
re.I)
REGEX_ISO8601 = re.compile("PT(\d+H)?(\d+M)?(\d+S)?", re.I)
URL_YOUTUBESEARCH = "https://www.googleapis.com/youtube/v3/search"
URL_YOUTUBEVIDEO = "https://www.googleapis.com/youtube/v3/videos"
URL_YOUTUBESHORT = "https://youtu.be/%s"
ARROW_UP = ""
ARROW_DOWN = ""
class Module(object):
def __init__(self, bot):
self.bot = bot
bot.events.on("received").on("command").on("yt", "youtube"
).hook(self.yt,
help="Find a video on youtube")
bot.events.on("received").on("message").on("channel").hook(
self.channel_message)
bot.events.on("boot").on("done").hook(self.boot_done)
def validate_setchannel(self, s):
return s.lower() == "true"
def boot_done(self, event):
self.bot.events.on("postboot").on("configure").on(
"channelset").call(setting="autoyoutube",
help="Disable/Enable automatically getting info from youtube URLs",
validate=self.validate_setchannel)
def get_video_page(self, video_id, part):
return Utils.get_url(URL_YOUTUBEVIDEO, get_params={"part": part,
"id": video_id, "key": self.bot.config["google-api-key"]},
json=True)
def video_details(self, video_id):
snippet = self.get_video_page(video_id, "snippet")
if snippet["items"]:
snippet = snippet["items"][0]["snippet"]
statistics = self.get_video_page(video_id, "statistics")[
"items"][0]["statistics"]
content = self.get_video_page(video_id, "contentDetails")[
"items"][0]["contentDetails"]
video_uploader = snippet["channelTitle"]
video_title = snippet["title"]
video_views = statistics["viewCount"]
video_likes = statistics["likeCount"]
video_dislikes = statistics["dislikeCount"]
video_duration = content["duration"]
match = re.match(REGEX_ISO8601, video_duration)
video_duration = ""
video_duration += "%s:" % match.group(1)[:-1].zfill(2
) if match.group(1) else ""
video_duration += "%s:" % match.group(2)[:-1].zfill(2
) if match.group(2) else ""
video_duration += "%s" % match.group(3)[:-1].zfill(2
) if match.group(3) else ""
return "%s (%s) uploaded by %s, %s views (%s%s%s%s) %s" % (
video_title, video_duration, video_uploader, "{:,}".format(
int(video_views)), video_likes, ARROW_UP, ARROW_DOWN, video_dislikes,
URL_YOUTUBESHORT % video_id)
def yt(self, event):
video_id = None
search = None
if event["args"]:
search = event["args"]
else:
last_youtube = event["channel"].log.find(REGEX_YOUTUBE)
if last_youtube:
video_id = re.search(REGEX_YOUTUBE, last_youtube.message).group(1)
if search or video_id:
if not video_id:
search_page = Utils.get_url(URL_YOUTUBESEARCH,
get_params={"q": search, "part": "snippet",
"maxResults": "1", "type": "video",
"key": self.bot.config["google-api-key"]},
json=True)
if search_page:
if search_page["pageInfo"]["totalResults"] > 0:
video_id = search_page["items"][0]["id"]["videoId"]
else:
event["stderr"].write("No videos found")
else:
event["stderr"].write("Failed to load results")
if video_id:
event["stdout"].write(self.video_details(video_id))
else:
event["stderr"].write("No search phrase provided")
else:
event["stderr"].write("No search phrase provided")
def channel_message(self, event):
match = re.search(REGEX_YOUTUBE, event["message"])
if match and event["channel"].get_setting("autoyoutube", False):
youtube_id = match.group(1)
video_details = self.video_details(youtube_id)
if video_details:
self.bot.events.on("send").on("stdout").call(target=event[
"channel"], message=video_details, module_name="Youtube",
server=event["server"])

39
start.py Executable file
View file

@ -0,0 +1,39 @@
#!/usr/bin/env python3
import argparse
import IRCBot, Config, Database
def bool_input(s):
result = input("%s (Y/n): " % s)
return not result or result[0].lower() in ["", "y"]
bot = IRCBot.Bot()
database = Database.Database(bot)
config_object = Config.Config(bot)
bot.database = database
bot.config_object = config_object
servers = database.get_servers()
for server in servers:
bot.add_server(*server)
if len(bot.servers):
bot.modules.load_modules()
bot.events.on("boot").on("done").call()
bot.connect_all()
bot.run()
else:
try:
if bool_input("no servers found, add one?"):
hostname = input("hostname: ")
port = int(input("port: "))
tls = bool_input("tls?")
password = input("password?: ")
ipv4 = bool_input("ipv4?")
nickname = input("nickname: ")
username = input("username: ")
realname = input("realname: ")
database.add_server(hostname, port, password, ipv4,
tls, nickname, username, realname)
except KeyboardInterrupt:
print()
pass