Compare commits

...

11 commits

6 changed files with 289 additions and 69 deletions

View file

@ -43,6 +43,8 @@ class bot:
markov: MarkovBot markov: MarkovBot
autoMethod: str autoMethod: str
dnsblMode: str dnsblMode: str
statuses: dict[str, dict[str, str]]
ops: dict[str, bool]
def __init__(self, server: str): ... def __init__(self, server: str): ...

14
bot.py
View file

@ -14,6 +14,7 @@ import handlers
import bare import bare
from threading import Thread from threading import Thread
from markov import MarkovBot from markov import MarkovBot
from traceback import format_exc
def mfind(message: str, find: list, usePrefix: bool = True) -> bool: def mfind(message: str, find: list, usePrefix: bool = True) -> bool:
@ -52,6 +53,8 @@ class bot(bare.bot):
else "FireBot" else "FireBot"
) )
self.queue: list[bbytes] = [] # pyright: ignore [reportInvalidTypeForm] self.queue: list[bbytes] = [] # pyright: ignore [reportInvalidTypeForm]
self.statuses = {"firepup": {}}
self.ops = {}
self.sock = socket(AF_INET, SOCK_STREAM) self.sock = socket(AF_INET, SOCK_STREAM)
self.current = "user" self.current = "user"
self.threads = ( self.threads = (
@ -233,6 +236,8 @@ class bot(bare.bot):
if self.queue: if self.queue:
return bytes(self.queue.pop(0)) return bytes(self.queue.pop(0))
data = bytes(self.sock.recv(2048)) data = bytes(self.sock.recv(2048))
if data.lazy_decode() == "":
return data
while not data.endswith(b"\r\n"): while not data.endswith(b"\r\n"):
data += bytes(self.sock.recv(2048)) data += bytes(self.sock.recv(2048))
data = bytes(data.strip(b"\r\n")) data = bytes(data.strip(b"\r\n"))
@ -313,6 +318,7 @@ class bot(bare.bot):
if action in handlers.handles: if action in handlers.handles:
res, chan = handlers.handles[action](self, ircmsg) res, chan = handlers.handles[action](self, ircmsg)
if res == "reload" and type(chan) == str: if res == "reload" and type(chan) == str:
try:
reload(conf) reload(conf)
self.adminnames = ( self.adminnames = (
conf.servers[self.server]["admins"] conf.servers[self.server]["admins"]
@ -339,6 +345,14 @@ class bot(bare.bot):
reload(cmds) reload(cmds)
reload(handlers) reload(handlers)
self.msg("Reloaded successfully", chan) self.msg("Reloaded successfully", chan)
except Exception:
Err = format_exc()
for line in Err.split("\n"):
self.log(line, "ERROR")
self.msg(
"Reload failed, likely partially reloaded. Please check error logs.",
chan,
)
else: else:
if ircmsg.startswith("PING "): if ircmsg.startswith("PING "):
self.ping(ircmsg) self.ping(ircmsg)

View file

@ -152,6 +152,26 @@ def debug(bot: bare.bot, chan: str, name: str, message: str) -> None:
bot.msg(f"[DEBUG] {dbg_out}", chan) bot.msg(f"[DEBUG] {dbg_out}", chan)
def debugInternal(bot: bare.bot, chan: str, name: str, message: str) -> None:
things = dir(bot)
try:
thing = message.split(" ", 1)[1]
except IndexError:
bot.msg("You can't just ask me to lookup nothing.", chan)
return
if thing in things:
bot.msg(f"self.{thing} = {getattr(bot, thing)}", chan)
else:
bot.msg(f'I have nothing called "{thing}"', chan)
def debugEval(bot: bare.bot, chan: str, name: str, message: str) -> None:
try:
bot.msg(str(eval(message.split(" ", 1)[1])), chan)
except Exception as E:
bot.msg(f"Exception: {E}", chan)
def raw(bot: bare.bot, chan: str, name: str, message: str) -> None: def raw(bot: bare.bot, chan: str, name: str, message: str) -> None:
bot.sendraw(message.split(" ", 1)[1]) bot.sendraw(message.split(" ", 1)[1])
@ -213,6 +233,70 @@ def markov(bot: bare.bot, chan: str, name: str, message: str) -> None:
bot.msg(proposed, chan) bot.msg(proposed, chan)
def setStatus(bot: bare.bot, chan: str, name: str, message: str) -> None:
user, stat, reas = ("", 0, "")
try:
if message.split(" ")[1] == "help":
bot.msg(
"Assuming you want help with status codes. 1 is Available, 2 is Busy, 3 is Unavailable, anything else is Unknown.",
chan,
)
return
message = message.split(" ", 1)[1]
user = message.split(" ")[0].lower()
stat = int(message.split(" ")[1])
reas = message.split(" ", 2)[2]
except IndexError:
bot.msg(
f"Insufficent information to set a status. Only got {len(message.split(' ')) - (1 if '.sS' in message else 0)}/3 expected args.",
chan,
)
return
except ValueError:
bot.msg("Status parameter must be an int.", chan)
return
match stat:
case 1:
stat = "Available"
case 2:
stat = "Busy"
case 3:
stat = "Unavailable"
case _:
stat = "Unknown"
if user in ["me", "my", "I"]:
user = "firepup"
bot.statuses[user] = {"status": stat, "reason": reas}
bot.msg(f"Status set for '{user}'. Raw data: {bot.statuses[user]}", chan)
def getStatus(bot: bare.bot, chan: str, name: str, message: str) -> None:
user = ""
try:
user = message.split(" ")[1]
except IndexError:
user = "firepup"
if bot.statuses.get(user) is None:
bot.msg("You've gotta provide a nick I actually recognize.", chan)
return
bot.msg(
f"{user}'s status: {'Unknown' if not bot.statuses[user].get('status') else bot.statuses[user]['status']} - {'Reason unset' if not bot.statuses[user].get('reason') else bot.statuses[user]['reason']}",
chan,
)
def check(bot: bare.bot, chan: str, name: str, message: str) -> None:
try:
msg = message.split(" ", 1)[1]
nick = msg.split("!")[0]
host = msg.split("@", 1)[1]
dnsbl, raws = conf.dnsblHandler(bot, nick, host, chan)
bot.msg(f"Blacklist check: {dnsbl if dnsbl else 'Safe.'} ({raws})", chan)
except Exception as E:
bot.msg("Blacklist lookup failed. Error recorded to bot logs.", chan)
bot.log(str(E), "FATAL")
data: dict[str, dict[str, Any]] = { data: dict[str, dict[str, Any]] = {
"!botlist": {"prefix": False, "aliases": []}, "!botlist": {"prefix": False, "aliases": []},
"bugs bugs bugs": {"prefix": False, "aliases": []}, "bugs bugs bugs": {"prefix": False, "aliases": []},
@ -224,10 +308,16 @@ data: dict[str, dict[str, Any]] = {
"check": checks.admin, "check": checks.admin,
}, },
"uptime": {"prefix": True, "aliases": []}, "uptime": {"prefix": True, "aliases": []},
"raw ": {"prefix": True, "aliases": ["cmd "], "check": checks.admin}, "raw": {"prefix": True, "aliases": ["cmd "], "check": checks.admin},
"debug": {"prefix": True, "aliases": ["dbg", "d"], "check": checks.admin}, "debug": {"prefix": True, "aliases": ["dbg", "d"], "check": checks.admin},
"debugInternal": {
"prefix": True,
"aliases": ["dbgInt", "dI"],
"check": checks.admin,
},
"debugEval": {"prefix": True, "aliases": ["dbgEval", "dE"], "check": checks.admin},
"8ball": {"prefix": True, "aliases": ["eightball", "8b"]}, "8ball": {"prefix": True, "aliases": ["eightball", "8b"]},
"join ": {"prefix": True, "aliases": [], "check": checks.admin}, "join": {"prefix": True, "aliases": ["j"], "check": checks.admin},
"quote": {"prefix": True, "aliases": ["q"]}, "quote": {"prefix": True, "aliases": ["q"]},
"goat.mode.activate": {"prefix": True, "aliases": ["g.m.a"], "check": checks.admin}, "goat.mode.activate": {"prefix": True, "aliases": ["g.m.a"], "check": checks.admin},
"goat.mode.deactivate": { "goat.mode.deactivate": {
@ -244,6 +334,9 @@ data: dict[str, dict[str, Any]] = {
"version": {"prefix": True, "aliases": ["ver", "v"]}, "version": {"prefix": True, "aliases": ["ver", "v"]},
"np": {"prefix": True, "aliases": []}, "np": {"prefix": True, "aliases": []},
"markov": {"prefix": True, "aliases": ["m"]}, "markov": {"prefix": True, "aliases": ["m"]},
"setStatus": {"prefix": True, "aliases": ["sS"], "check": checks.admin},
"getStatus": {"prefix": True, "aliases": ["gS"]},
"check": {"prefix": True, "aliases": [], "check": checks.admin},
} }
regexes: list[str] = [conf.npbase, conf.su] regexes: list[str] = [conf.npbase, conf.su]
call: dict[str, Callable[[bare.bot, str, str, str], None]] = { call: dict[str, Callable[[bare.bot, str, str, str], None]] = {
@ -254,10 +347,12 @@ call: dict[str, Callable[[bare.bot, str, str, str], None]] = {
conf.su: sudo, conf.su: sudo,
"restart": reboot, "restart": reboot,
"uptime": uptime, "uptime": uptime,
"raw ": raw, "raw": raw,
"debug": debug, "debug": debug,
"debugInternal": debugInternal,
"debugEval": debugEval,
"8ball": eball, "8ball": eball,
"join ": join, "join": join,
"quote": quote, "quote": quote,
"goat.mode.activate": goatOn, "goat.mode.activate": goatOn,
"goat.mode.decativate": goatOff, "goat.mode.decativate": goatOff,
@ -270,4 +365,7 @@ call: dict[str, Callable[[bare.bot, str, str, str], None]] = {
"version": version, "version": version,
"np": fmpull, "np": fmpull,
"markov": markov, "markov": markov,
"setStatus": setStatus,
"getStatus": getStatus,
"check": check,
} }

108
config.py
View file

@ -4,13 +4,35 @@ from dotenv import load_dotenv # type: ignore
import re, codecs import re, codecs
from typing import Optional, Any, Union from typing import Optional, Any, Union
import bare, pylast import bare, pylast
from pydnsbl import DNSBLIpChecker, DNSBLDomainChecker from pydnsbl import DNSBLIpChecker, DNSBLDomainChecker, providers as BL
ipbl = DNSBLIpChecker()
hsbl = DNSBLDomainChecker() class droneBL(BL.Provider):
def process_response(self, response):
reasons = set()
for result in response:
reason = result.host
if reason in ["127.0.0.3"]:
reasons.add("IRC Spambot")
elif reason in ["127.0.0.19"]:
reasons.add("Abused VPN")
elif reason in ["127.0.0.9", "127.0.0.8"]:
reasons.add("Open Proxy")
elif reason in ["127.0.0.13"]:
reasons.add("Automated Attacks")
else:
print("Unknown dnsbl reason: " + reason, flush=True)
reasons.add("unknown")
return reasons
providers = BL.BASE_PROVIDERS + [droneBL("dnsbl.dronebl.org")]
ipbl = DNSBLIpChecker(providers=providers)
hsbl = DNSBLDomainChecker(providers=providers)
load_dotenv() load_dotenv()
__version__ = "v3.0.14" __version__ = "v3.0.16"
npbase: str = ( npbase: str = (
"\[\x0303last\.fm\x03\] [A-Za-z0-9_[\]{}\\|\-^]{1,$MAX} (is listening|last listened) to: \x02.+ - .*\x02( \([0-9]+ plays\)( \[.*\])?)?" # pyright: ignore [reportInvalidStringEscapeSequence] "\[\x0303last\.fm\x03\] [A-Za-z0-9_[\]{}\\|\-^]{1,$MAX} (is listening|last listened) to: \x02.+ - .*\x02( \([0-9]+ plays\)( \[.*\])?)?" # pyright: ignore [reportInvalidStringEscapeSequence]
) )
@ -24,7 +46,7 @@ servers: dict[str, dict[str, Any]] = {
"channels": {"#random": 0, "#dice": 0, "#offtopic": 0, "#main/replirc": 0}, "channels": {"#random": 0, "#dice": 0, "#offtopic": 0, "#main/replirc": 0},
"ignores": ["#main/replirc"], "ignores": ["#main/replirc"],
"hosts": ["9pfs.repl.co"], "hosts": ["9pfs.repl.co"],
"dnsblMode": "kickban" "dnsblMode": "kickban",
}, },
"efnet": { "efnet": {
"address": "irc.underworld.no", "address": "irc.underworld.no",
@ -46,6 +68,7 @@ servers: dict[str, dict[str, Any]] = {
"#fp-radio": 0, "#fp-radio": 0,
"#fp-radio-debug": 0, "#fp-radio-debug": 0,
"#hardfork": 0, "#hardfork": 0,
"#opers": 0,
}, },
"ignores": ["#fp-radio"], "ignores": ["#fp-radio"],
"admins": ["h-tl"], "admins": ["h-tl"],
@ -102,6 +125,20 @@ def decode_escapes(s: str) -> str:
return ESCAPE_SEQUENCE_RE.sub(decode_match, s) return ESCAPE_SEQUENCE_RE.sub(decode_match, s)
def cmdFind(message: str, find: list, usePrefix: bool = True) -> bool:
cmd = None
try:
cmd = message.split(" ", 1)[0]
except IndexError:
...
if not cmd:
return False
if usePrefix:
return any(cmd == prefix + match for match in find)
else:
return any(cmd == match for match in find)
def mfind(message: str, find: list, usePrefix: bool = True) -> bool: def mfind(message: str, find: list, usePrefix: bool = True) -> bool:
if usePrefix: if usePrefix:
return any(message[: len(match) + 1] == prefix + match for match in find) return any(message[: len(match) + 1] == prefix + match for match in find)
@ -123,22 +160,67 @@ def sub(
return result return result
def dnsbl(hostname: str) -> Union[str, None]: def dnsbl(hostname: str) -> tuple[str, dict[str, list[str]]]:
hosts = [] hosts = []
hstDT = None hstDT = {}
try: try:
hstDT = ipbl.check(hostname).detected_by hstDT = ipbl.check(hostname).detected_by
except ValueError: except ValueError: # It's not an IP
try:
hstDT = hsbl.check(hostname).detected_by hstDT = hsbl.check(hostname).detected_by
except ValueError: # It's also not a hostname
hstDT = {}
for host in hstDT: for host in hstDT:
if hstDT[host] != ["unknown"]: if hstDT[host] != ["unknown"]:
hosts.append(host) hosts.append(host)
print(f'DEBUG: {host} - {hstDT[host]}')
if not hosts: if not hosts:
return return "", hstDT
hostStr = None hostStr = None
if len(hosts) >= 3: if len(hosts) >= 3:
hostStr = ', and '.join((', '.join(hosts)).rsplit(", ", 1)) hostStr = ", and ".join((", ".join(hosts)).rsplit(", ", 1))
else: else:
hostStr = ' and '.join(hosts) hostStr = " and ".join(hosts)
return hostStr return hostStr, hstDT
def dnsblHandler(
bot: bare.bot, nick: str, hostname: str, chan: str
) -> tuple[str, dict[str, list[str]]]:
dnsblStatus = "Not enabled"
dnsblResps = {}
if bot.dnsblMode != "none":
dnsblStatus, dnsblResps = dnsbl(hostname)
if dnsblStatus:
match bot.dnsblMode:
case "kickban":
bot.sendraw(
f"KICK #{chan} {nick} :Sorry, but you're on the {dnsblStatus} blacklist(s)."
)
bot.sendraw(f"MODE #{chan} +b *!*@{hostname}")
case "akill":
bot.sendraw(
f"OS AKILL ADD *@{hostname} !P Sorry, but you're on the {dnsblStatus} blacklist(s)."
)
case "kline":
bot.sendraw(
f"KILL {nick} :Sorry, but you're on the {dnsblStatus} blacklist(s)."
)
bot.sendraw(
f"KLINE 524160 *@{hostname} :Sorry, but you're on the {dnsblStatus} blacklist(s)."
)
bot.sendraw(
f"KLINE *@{hostname} :Sorry, but you're on the {dnsblStatus} blacklist(s)."
)
case "gline":
bot.sendraw(
f"KILL {nick} :Sorry, but you're on the {dnsblStatus} blacklist(s)."
)
bot.sendraw(
f"GLINE *@{hostname} 524160 :Sorry, but you're on the {dnsblStatus} blacklist(s)."
)
bot.sendraw(
f"GLINE *@{hostname} :Sorry, but you're on the {dnsblStatus} blacklist(s)."
)
case _:
bot.log(f'Unknown dnsbl Mode "{bot.dnsblMode}"!', "WARN")
return dnsblStatus, dnsblResps

View file

@ -6,6 +6,7 @@ from typing import Union, Callable
from overrides import bytes, bbytes from overrides import bytes, bbytes
from importlib import reload from importlib import reload
import bare, re, checks import bare, re, checks
from traceback import format_exc
def CTCP(bot: bare.bot, msg: str) -> bool: def CTCP(bot: bare.bot, msg: str) -> bool:
@ -103,16 +104,34 @@ def PRIVMSG(bot: bare.bot, msg: str) -> Union[tuple[None, None], tuple[str, str]
triggers = [cmd] triggers = [cmd]
triggers.extend(cmds.data[cmd]["aliases"]) triggers.extend(cmds.data[cmd]["aliases"])
triggers = list(conf.sub(call, bot, chan, name).lower() for call in triggers) triggers = list(conf.sub(call, bot, chan, name).lower() for call in triggers)
if conf.mfind( if conf.cmdFind(
conf.sub(message, bot, chan, name).lower(), conf.sub(message, bot, chan, name).lower(),
triggers, triggers,
cmds.data[cmd]["prefix"], cmds.data[cmd]["prefix"],
): ):
if "check" in cmds.data[cmd] and cmds.data[cmd]["check"]: if "check" in cmds.data[cmd] and cmds.data[cmd]["check"]:
if cmds.data[cmd]["check"](bot, name, host, chan, cmd): if cmds.data[cmd]["check"](bot, name, host, chan, cmd):
try:
cmds.call[cmd](bot, chan, name, message) cmds.call[cmd](bot, chan, name, message)
except Exception:
Err = format_exc()
for line in Err.split("\n"):
bot.log(line, "ERROR")
bot.msg(
"Sorry, I had an error trying to execute that command. Please check error logs.",
chan,
)
else: else:
try:
cmds.call[cmd](bot, chan, name, message) cmds.call[cmd](bot, chan, name, message)
except Exception:
Err = format_exc()
for line in Err.split("\n"):
bot.log(line, "ERROR")
bot.msg(
"Sorry, I had an error trying to execute that command. Please check error logs.",
chan,
)
handled = True handled = True
break break
if not handled: if not handled:
@ -124,7 +143,7 @@ def PRIVMSG(bot: bare.bot, msg: str) -> Union[tuple[None, None], tuple[str, str]
cmds.call[check](bot, chan, name, message) cmds.call[check](bot, chan, name, message)
handled = True handled = True
break break
if not handled and conf.mfind(message, ["reload", "r"]): if not handled and conf.cmdFind(message, ["reload", "r"]):
if checks.admin(bot, name, host, chan, "reload"): if checks.admin(bot, name, host, chan, "reload"):
return "reload", chan return "reload", chan
handled = True handled = True
@ -189,27 +208,30 @@ def JOIN(bot: bare.bot, msg: str) -> tuple[None, None]:
nick = msg.split("!", 1)[0][1:] nick = msg.split("!", 1)[0][1:]
hostname = msg.split("@", 1)[1].split(" ", 1)[0].strip() hostname = msg.split("@", 1)[1].split(" ", 1)[0].strip()
chan = msg.split("#")[-1].strip() chan = msg.split("#")[-1].strip()
if bot.dnsblMode != "none": conf.dnsblHandler(bot, nick, hostname, chan)
dnsblStatus = conf.dnsbl(hostname)
if dnsblStatus:
match bot.dnsblMode:
case "kickban":
bot.sendraw(f"KICK #{chan} {nick} :Sorry, but you're on the {dnsblStatus} blacklist(s).")
bot.sendraw(f"MODE #{chan} +b *!*@{hostname}")
case "akill":
bot.sendraw(f"OS AKILL ADD *@{hostname} !P Sorry, but you're on the {dnsblStatus} blacklists(s).")
case "kline":
bot.sendraw(f"KILL {nick} :Sorry, but you're on the {dnsblStatus} blacklist(s).")
bot.sendraw(f"KLINE 524160 *@{hostname} :Sorry, but you're on the {dnsblStatus} blacklist(s).")
bot.sendraw(f"KLINE *@{hostname} :Sorry, but you're on the {dnsblStatus} blacklist(s).")
case "gline":
bot.sendraw(f"KILL {nick} :Sorry, but you're on the {dnsblStatus} blacklist(s).")
bot.sendraw(f"GLINE *@{hostname} 524160 :Sorry, but you're on the {dnsblStatus} blacklist(s).")
bot.sendraw(f"GLINE *@{hostname} :Sorry, but you're on the {dnsblStatus} blacklist(s).")
case _:
bot.log(f'Unknown dnsbl Mode "{bot.dnsblMode}"!', "WARN")
return None, None return None, None
def MODE(bot: bare.bot, msg: str) -> tuple[None, None]:
chan = msg.split("#", 1)[1].split(" ", 1)[0]
add = True if msg.split("#", 1)[1].split(" ", 2)[1][0] == "+" else False
modes = msg.split("#", 1)[1].split(" ", 2)[1][1:]
users = ""
try:
users = msg.split("#", 1)[1].split(" ", 2)[2].split()
except IndexError:
...
if len(modes) != len(users):
bot.log("Refusing to handle modes that do not have corresponding users.")
return None, None
for i in range(len(modes)):
if users[i] == bot.nick:
if modes[i] == "o":
bot.ops[chan] = add
bot.log(f"{'Got' if add else 'Lost'} ops in {chan}")
return None, None
def NULL(bot: bare.bot, msg: str) -> tuple[None, None]: def NULL(bot: bare.bot, msg: str) -> tuple[None, None]:
return None, None return None, None
@ -221,8 +243,10 @@ handles: dict[
"NICK": NICK, "NICK": NICK,
"KICK": KICK, "KICK": KICK,
"PART": PART, "PART": PART,
"MODE": NULL, "MODE": MODE,
"TOPIC": NULL, "TOPIC": NULL,
"QUIT": QUIT, "QUIT": QUIT,
"JOIN": JOIN, "JOIN": JOIN,
"NOTICE": NULL,
"INVITE": NULL,
} }

View file

@ -10,7 +10,7 @@ def log(
level: str = "LOG", level: str = "LOG",
time: Union[dt, str] = "now", time: Union[dt, str] = "now",
) -> None: ) -> None:
if level in ["EXIT", "CRASH", "FATAL"]: if level in ["EXIT", "CRASH", "FATAL", "ERROR"]:
stream = stderr stream = stderr
else: else:
stream = stdout stream = stdout