Compare commits

...

11 commits

6 changed files with 289 additions and 69 deletions

View file

@ -43,6 +43,8 @@ class bot:
markov: MarkovBot
autoMethod: str
dnsblMode: str
statuses: dict[str, dict[str, str]]
ops: dict[str, bool]
def __init__(self, server: str): ...

14
bot.py
View file

@ -14,6 +14,7 @@ import handlers
import bare
from threading import Thread
from markov import MarkovBot
from traceback import format_exc
def mfind(message: str, find: list, usePrefix: bool = True) -> bool:
@ -52,6 +53,8 @@ class bot(bare.bot):
else "FireBot"
)
self.queue: list[bbytes] = [] # pyright: ignore [reportInvalidTypeForm]
self.statuses = {"firepup": {}}
self.ops = {}
self.sock = socket(AF_INET, SOCK_STREAM)
self.current = "user"
self.threads = (
@ -233,6 +236,8 @@ class bot(bare.bot):
if self.queue:
return bytes(self.queue.pop(0))
data = bytes(self.sock.recv(2048))
if data.lazy_decode() == "":
return data
while not data.endswith(b"\r\n"):
data += bytes(self.sock.recv(2048))
data = bytes(data.strip(b"\r\n"))
@ -313,6 +318,7 @@ class bot(bare.bot):
if action in handlers.handles:
res, chan = handlers.handles[action](self, ircmsg)
if res == "reload" and type(chan) == str:
try:
reload(conf)
self.adminnames = (
conf.servers[self.server]["admins"]
@ -339,6 +345,14 @@ class bot(bare.bot):
reload(cmds)
reload(handlers)
self.msg("Reloaded successfully", chan)
except Exception:
Err = format_exc()
for line in Err.split("\n"):
self.log(line, "ERROR")
self.msg(
"Reload failed, likely partially reloaded. Please check error logs.",
chan,
)
else:
if ircmsg.startswith("PING "):
self.ping(ircmsg)

View file

@ -152,6 +152,26 @@ def debug(bot: bare.bot, chan: str, name: str, message: str) -> None:
bot.msg(f"[DEBUG] {dbg_out}", chan)
def debugInternal(bot: bare.bot, chan: str, name: str, message: str) -> None:
things = dir(bot)
try:
thing = message.split(" ", 1)[1]
except IndexError:
bot.msg("You can't just ask me to lookup nothing.", chan)
return
if thing in things:
bot.msg(f"self.{thing} = {getattr(bot, thing)}", chan)
else:
bot.msg(f'I have nothing called "{thing}"', chan)
def debugEval(bot: bare.bot, chan: str, name: str, message: str) -> None:
try:
bot.msg(str(eval(message.split(" ", 1)[1])), chan)
except Exception as E:
bot.msg(f"Exception: {E}", chan)
def raw(bot: bare.bot, chan: str, name: str, message: str) -> None:
bot.sendraw(message.split(" ", 1)[1])
@ -213,6 +233,70 @@ def markov(bot: bare.bot, chan: str, name: str, message: str) -> None:
bot.msg(proposed, chan)
def setStatus(bot: bare.bot, chan: str, name: str, message: str) -> None:
user, stat, reas = ("", 0, "")
try:
if message.split(" ")[1] == "help":
bot.msg(
"Assuming you want help with status codes. 1 is Available, 2 is Busy, 3 is Unavailable, anything else is Unknown.",
chan,
)
return
message = message.split(" ", 1)[1]
user = message.split(" ")[0].lower()
stat = int(message.split(" ")[1])
reas = message.split(" ", 2)[2]
except IndexError:
bot.msg(
f"Insufficent information to set a status. Only got {len(message.split(' ')) - (1 if '.sS' in message else 0)}/3 expected args.",
chan,
)
return
except ValueError:
bot.msg("Status parameter must be an int.", chan)
return
match stat:
case 1:
stat = "Available"
case 2:
stat = "Busy"
case 3:
stat = "Unavailable"
case _:
stat = "Unknown"
if user in ["me", "my", "I"]:
user = "firepup"
bot.statuses[user] = {"status": stat, "reason": reas}
bot.msg(f"Status set for '{user}'. Raw data: {bot.statuses[user]}", chan)
def getStatus(bot: bare.bot, chan: str, name: str, message: str) -> None:
user = ""
try:
user = message.split(" ")[1]
except IndexError:
user = "firepup"
if bot.statuses.get(user) is None:
bot.msg("You've gotta provide a nick I actually recognize.", chan)
return
bot.msg(
f"{user}'s status: {'Unknown' if not bot.statuses[user].get('status') else bot.statuses[user]['status']} - {'Reason unset' if not bot.statuses[user].get('reason') else bot.statuses[user]['reason']}",
chan,
)
def check(bot: bare.bot, chan: str, name: str, message: str) -> None:
try:
msg = message.split(" ", 1)[1]
nick = msg.split("!")[0]
host = msg.split("@", 1)[1]
dnsbl, raws = conf.dnsblHandler(bot, nick, host, chan)
bot.msg(f"Blacklist check: {dnsbl if dnsbl else 'Safe.'} ({raws})", chan)
except Exception as E:
bot.msg("Blacklist lookup failed. Error recorded to bot logs.", chan)
bot.log(str(E), "FATAL")
data: dict[str, dict[str, Any]] = {
"!botlist": {"prefix": False, "aliases": []},
"bugs bugs bugs": {"prefix": False, "aliases": []},
@ -224,10 +308,16 @@ data: dict[str, dict[str, Any]] = {
"check": checks.admin,
},
"uptime": {"prefix": True, "aliases": []},
"raw ": {"prefix": True, "aliases": ["cmd "], "check": checks.admin},
"raw": {"prefix": True, "aliases": ["cmd "], "check": checks.admin},
"debug": {"prefix": True, "aliases": ["dbg", "d"], "check": checks.admin},
"debugInternal": {
"prefix": True,
"aliases": ["dbgInt", "dI"],
"check": checks.admin,
},
"debugEval": {"prefix": True, "aliases": ["dbgEval", "dE"], "check": checks.admin},
"8ball": {"prefix": True, "aliases": ["eightball", "8b"]},
"join ": {"prefix": True, "aliases": [], "check": checks.admin},
"join": {"prefix": True, "aliases": ["j"], "check": checks.admin},
"quote": {"prefix": True, "aliases": ["q"]},
"goat.mode.activate": {"prefix": True, "aliases": ["g.m.a"], "check": checks.admin},
"goat.mode.deactivate": {
@ -244,6 +334,9 @@ data: dict[str, dict[str, Any]] = {
"version": {"prefix": True, "aliases": ["ver", "v"]},
"np": {"prefix": True, "aliases": []},
"markov": {"prefix": True, "aliases": ["m"]},
"setStatus": {"prefix": True, "aliases": ["sS"], "check": checks.admin},
"getStatus": {"prefix": True, "aliases": ["gS"]},
"check": {"prefix": True, "aliases": [], "check": checks.admin},
}
regexes: list[str] = [conf.npbase, conf.su]
call: dict[str, Callable[[bare.bot, str, str, str], None]] = {
@ -254,10 +347,12 @@ call: dict[str, Callable[[bare.bot, str, str, str], None]] = {
conf.su: sudo,
"restart": reboot,
"uptime": uptime,
"raw ": raw,
"raw": raw,
"debug": debug,
"debugInternal": debugInternal,
"debugEval": debugEval,
"8ball": eball,
"join ": join,
"join": join,
"quote": quote,
"goat.mode.activate": goatOn,
"goat.mode.decativate": goatOff,
@ -270,4 +365,7 @@ call: dict[str, Callable[[bare.bot, str, str, str], None]] = {
"version": version,
"np": fmpull,
"markov": markov,
"setStatus": setStatus,
"getStatus": getStatus,
"check": check,
}

108
config.py
View file

@ -4,13 +4,35 @@ from dotenv import load_dotenv # type: ignore
import re, codecs
from typing import Optional, Any, Union
import bare, pylast
from pydnsbl import DNSBLIpChecker, DNSBLDomainChecker
from pydnsbl import DNSBLIpChecker, DNSBLDomainChecker, providers as BL
ipbl = DNSBLIpChecker()
hsbl = DNSBLDomainChecker()
class droneBL(BL.Provider):
def process_response(self, response):
reasons = set()
for result in response:
reason = result.host
if reason in ["127.0.0.3"]:
reasons.add("IRC Spambot")
elif reason in ["127.0.0.19"]:
reasons.add("Abused VPN")
elif reason in ["127.0.0.9", "127.0.0.8"]:
reasons.add("Open Proxy")
elif reason in ["127.0.0.13"]:
reasons.add("Automated Attacks")
else:
print("Unknown dnsbl reason: " + reason, flush=True)
reasons.add("unknown")
return reasons
providers = BL.BASE_PROVIDERS + [droneBL("dnsbl.dronebl.org")]
ipbl = DNSBLIpChecker(providers=providers)
hsbl = DNSBLDomainChecker(providers=providers)
load_dotenv()
__version__ = "v3.0.14"
__version__ = "v3.0.16"
npbase: str = (
"\[\x0303last\.fm\x03\] [A-Za-z0-9_[\]{}\\|\-^]{1,$MAX} (is listening|last listened) to: \x02.+ - .*\x02( \([0-9]+ plays\)( \[.*\])?)?" # pyright: ignore [reportInvalidStringEscapeSequence]
)
@ -24,7 +46,7 @@ servers: dict[str, dict[str, Any]] = {
"channels": {"#random": 0, "#dice": 0, "#offtopic": 0, "#main/replirc": 0},
"ignores": ["#main/replirc"],
"hosts": ["9pfs.repl.co"],
"dnsblMode": "kickban"
"dnsblMode": "kickban",
},
"efnet": {
"address": "irc.underworld.no",
@ -46,6 +68,7 @@ servers: dict[str, dict[str, Any]] = {
"#fp-radio": 0,
"#fp-radio-debug": 0,
"#hardfork": 0,
"#opers": 0,
},
"ignores": ["#fp-radio"],
"admins": ["h-tl"],
@ -102,6 +125,20 @@ def decode_escapes(s: str) -> str:
return ESCAPE_SEQUENCE_RE.sub(decode_match, s)
def cmdFind(message: str, find: list, usePrefix: bool = True) -> bool:
cmd = None
try:
cmd = message.split(" ", 1)[0]
except IndexError:
...
if not cmd:
return False
if usePrefix:
return any(cmd == prefix + match for match in find)
else:
return any(cmd == match for match in find)
def mfind(message: str, find: list, usePrefix: bool = True) -> bool:
if usePrefix:
return any(message[: len(match) + 1] == prefix + match for match in find)
@ -123,22 +160,67 @@ def sub(
return result
def dnsbl(hostname: str) -> Union[str, None]:
def dnsbl(hostname: str) -> tuple[str, dict[str, list[str]]]:
hosts = []
hstDT = None
hstDT = {}
try:
hstDT = ipbl.check(hostname).detected_by
except ValueError:
except ValueError: # It's not an IP
try:
hstDT = hsbl.check(hostname).detected_by
except ValueError: # It's also not a hostname
hstDT = {}
for host in hstDT:
if hstDT[host] != ["unknown"]:
hosts.append(host)
print(f'DEBUG: {host} - {hstDT[host]}')
if not hosts:
return
return "", hstDT
hostStr = None
if len(hosts) >= 3:
hostStr = ', and '.join((', '.join(hosts)).rsplit(", ", 1))
hostStr = ", and ".join((", ".join(hosts)).rsplit(", ", 1))
else:
hostStr = ' and '.join(hosts)
return hostStr
hostStr = " and ".join(hosts)
return hostStr, hstDT
def dnsblHandler(
bot: bare.bot, nick: str, hostname: str, chan: str
) -> tuple[str, dict[str, list[str]]]:
dnsblStatus = "Not enabled"
dnsblResps = {}
if bot.dnsblMode != "none":
dnsblStatus, dnsblResps = dnsbl(hostname)
if dnsblStatus:
match bot.dnsblMode:
case "kickban":
bot.sendraw(
f"KICK #{chan} {nick} :Sorry, but you're on the {dnsblStatus} blacklist(s)."
)
bot.sendraw(f"MODE #{chan} +b *!*@{hostname}")
case "akill":
bot.sendraw(
f"OS AKILL ADD *@{hostname} !P Sorry, but you're on the {dnsblStatus} blacklist(s)."
)
case "kline":
bot.sendraw(
f"KILL {nick} :Sorry, but you're on the {dnsblStatus} blacklist(s)."
)
bot.sendraw(
f"KLINE 524160 *@{hostname} :Sorry, but you're on the {dnsblStatus} blacklist(s)."
)
bot.sendraw(
f"KLINE *@{hostname} :Sorry, but you're on the {dnsblStatus} blacklist(s)."
)
case "gline":
bot.sendraw(
f"KILL {nick} :Sorry, but you're on the {dnsblStatus} blacklist(s)."
)
bot.sendraw(
f"GLINE *@{hostname} 524160 :Sorry, but you're on the {dnsblStatus} blacklist(s)."
)
bot.sendraw(
f"GLINE *@{hostname} :Sorry, but you're on the {dnsblStatus} blacklist(s)."
)
case _:
bot.log(f'Unknown dnsbl Mode "{bot.dnsblMode}"!', "WARN")
return dnsblStatus, dnsblResps

View file

@ -6,6 +6,7 @@ from typing import Union, Callable
from overrides import bytes, bbytes
from importlib import reload
import bare, re, checks
from traceback import format_exc
def CTCP(bot: bare.bot, msg: str) -> bool:
@ -103,16 +104,34 @@ def PRIVMSG(bot: bare.bot, msg: str) -> Union[tuple[None, None], tuple[str, str]
triggers = [cmd]
triggers.extend(cmds.data[cmd]["aliases"])
triggers = list(conf.sub(call, bot, chan, name).lower() for call in triggers)
if conf.mfind(
if conf.cmdFind(
conf.sub(message, bot, chan, name).lower(),
triggers,
cmds.data[cmd]["prefix"],
):
if "check" in cmds.data[cmd] and cmds.data[cmd]["check"]:
if cmds.data[cmd]["check"](bot, name, host, chan, cmd):
try:
cmds.call[cmd](bot, chan, name, message)
except Exception:
Err = format_exc()
for line in Err.split("\n"):
bot.log(line, "ERROR")
bot.msg(
"Sorry, I had an error trying to execute that command. Please check error logs.",
chan,
)
else:
try:
cmds.call[cmd](bot, chan, name, message)
except Exception:
Err = format_exc()
for line in Err.split("\n"):
bot.log(line, "ERROR")
bot.msg(
"Sorry, I had an error trying to execute that command. Please check error logs.",
chan,
)
handled = True
break
if not handled:
@ -124,7 +143,7 @@ def PRIVMSG(bot: bare.bot, msg: str) -> Union[tuple[None, None], tuple[str, str]
cmds.call[check](bot, chan, name, message)
handled = True
break
if not handled and conf.mfind(message, ["reload", "r"]):
if not handled and conf.cmdFind(message, ["reload", "r"]):
if checks.admin(bot, name, host, chan, "reload"):
return "reload", chan
handled = True
@ -189,27 +208,30 @@ def JOIN(bot: bare.bot, msg: str) -> tuple[None, None]:
nick = msg.split("!", 1)[0][1:]
hostname = msg.split("@", 1)[1].split(" ", 1)[0].strip()
chan = msg.split("#")[-1].strip()
if bot.dnsblMode != "none":
dnsblStatus = conf.dnsbl(hostname)
if dnsblStatus:
match bot.dnsblMode:
case "kickban":
bot.sendraw(f"KICK #{chan} {nick} :Sorry, but you're on the {dnsblStatus} blacklist(s).")
bot.sendraw(f"MODE #{chan} +b *!*@{hostname}")
case "akill":
bot.sendraw(f"OS AKILL ADD *@{hostname} !P Sorry, but you're on the {dnsblStatus} blacklists(s).")
case "kline":
bot.sendraw(f"KILL {nick} :Sorry, but you're on the {dnsblStatus} blacklist(s).")
bot.sendraw(f"KLINE 524160 *@{hostname} :Sorry, but you're on the {dnsblStatus} blacklist(s).")
bot.sendraw(f"KLINE *@{hostname} :Sorry, but you're on the {dnsblStatus} blacklist(s).")
case "gline":
bot.sendraw(f"KILL {nick} :Sorry, but you're on the {dnsblStatus} blacklist(s).")
bot.sendraw(f"GLINE *@{hostname} 524160 :Sorry, but you're on the {dnsblStatus} blacklist(s).")
bot.sendraw(f"GLINE *@{hostname} :Sorry, but you're on the {dnsblStatus} blacklist(s).")
case _:
bot.log(f'Unknown dnsbl Mode "{bot.dnsblMode}"!', "WARN")
conf.dnsblHandler(bot, nick, hostname, chan)
return None, None
def MODE(bot: bare.bot, msg: str) -> tuple[None, None]:
chan = msg.split("#", 1)[1].split(" ", 1)[0]
add = True if msg.split("#", 1)[1].split(" ", 2)[1][0] == "+" else False
modes = msg.split("#", 1)[1].split(" ", 2)[1][1:]
users = ""
try:
users = msg.split("#", 1)[1].split(" ", 2)[2].split()
except IndexError:
...
if len(modes) != len(users):
bot.log("Refusing to handle modes that do not have corresponding users.")
return None, None
for i in range(len(modes)):
if users[i] == bot.nick:
if modes[i] == "o":
bot.ops[chan] = add
bot.log(f"{'Got' if add else 'Lost'} ops in {chan}")
return None, None
def NULL(bot: bare.bot, msg: str) -> tuple[None, None]:
return None, None
@ -221,8 +243,10 @@ handles: dict[
"NICK": NICK,
"KICK": KICK,
"PART": PART,
"MODE": NULL,
"MODE": MODE,
"TOPIC": NULL,
"QUIT": QUIT,
"JOIN": JOIN,
"NOTICE": NULL,
"INVITE": NULL,
}

View file

@ -10,7 +10,7 @@ def log(
level: str = "LOG",
time: Union[dt, str] = "now",
) -> None:
if level in ["EXIT", "CRASH", "FATAL"]:
if level in ["EXIT", "CRASH", "FATAL", "ERROR"]:
stream = stderr
else:
stream = stdout