Compare commits
11 commits
49aa61417a
...
b720b5edf8
Author | SHA1 | Date | |
---|---|---|---|
b720b5edf8 | |||
ce6dc597ef | |||
1feb1e2813 | |||
aa0dd3cc22 | |||
152c87a8db | |||
b695b0e676 | |||
8b41a048c5 | |||
58060a5249 | |||
fdd490518c | |||
8dc9755c46 | |||
971a889cc1 |
6 changed files with 289 additions and 69 deletions
2
bare.py
2
bare.py
|
@ -43,6 +43,8 @@ class bot:
|
|||
markov: MarkovBot
|
||||
autoMethod: str
|
||||
dnsblMode: str
|
||||
statuses: dict[str, dict[str, str]]
|
||||
ops: dict[str, bool]
|
||||
|
||||
def __init__(self, server: str): ...
|
||||
|
||||
|
|
14
bot.py
14
bot.py
|
@ -14,6 +14,7 @@ import handlers
|
|||
import bare
|
||||
from threading import Thread
|
||||
from markov import MarkovBot
|
||||
from traceback import format_exc
|
||||
|
||||
|
||||
def mfind(message: str, find: list, usePrefix: bool = True) -> bool:
|
||||
|
@ -52,6 +53,8 @@ class bot(bare.bot):
|
|||
else "FireBot"
|
||||
)
|
||||
self.queue: list[bbytes] = [] # pyright: ignore [reportInvalidTypeForm]
|
||||
self.statuses = {"firepup": {}}
|
||||
self.ops = {}
|
||||
self.sock = socket(AF_INET, SOCK_STREAM)
|
||||
self.current = "user"
|
||||
self.threads = (
|
||||
|
@ -233,6 +236,8 @@ class bot(bare.bot):
|
|||
if self.queue:
|
||||
return bytes(self.queue.pop(0))
|
||||
data = bytes(self.sock.recv(2048))
|
||||
if data.lazy_decode() == "":
|
||||
return data
|
||||
while not data.endswith(b"\r\n"):
|
||||
data += bytes(self.sock.recv(2048))
|
||||
data = bytes(data.strip(b"\r\n"))
|
||||
|
@ -313,6 +318,7 @@ class bot(bare.bot):
|
|||
if action in handlers.handles:
|
||||
res, chan = handlers.handles[action](self, ircmsg)
|
||||
if res == "reload" and type(chan) == str:
|
||||
try:
|
||||
reload(conf)
|
||||
self.adminnames = (
|
||||
conf.servers[self.server]["admins"]
|
||||
|
@ -339,6 +345,14 @@ class bot(bare.bot):
|
|||
reload(cmds)
|
||||
reload(handlers)
|
||||
self.msg("Reloaded successfully", chan)
|
||||
except Exception:
|
||||
Err = format_exc()
|
||||
for line in Err.split("\n"):
|
||||
self.log(line, "ERROR")
|
||||
self.msg(
|
||||
"Reload failed, likely partially reloaded. Please check error logs.",
|
||||
chan,
|
||||
)
|
||||
else:
|
||||
if ircmsg.startswith("PING "):
|
||||
self.ping(ircmsg)
|
||||
|
|
100
commands.py
100
commands.py
|
@ -152,6 +152,26 @@ def debug(bot: bare.bot, chan: str, name: str, message: str) -> None:
|
|||
bot.msg(f"[DEBUG] {dbg_out}", chan)
|
||||
|
||||
|
||||
def debugInternal(bot: bare.bot, chan: str, name: str, message: str) -> None:
|
||||
things = dir(bot)
|
||||
try:
|
||||
thing = message.split(" ", 1)[1]
|
||||
except IndexError:
|
||||
bot.msg("You can't just ask me to lookup nothing.", chan)
|
||||
return
|
||||
if thing in things:
|
||||
bot.msg(f"self.{thing} = {getattr(bot, thing)}", chan)
|
||||
else:
|
||||
bot.msg(f'I have nothing called "{thing}"', chan)
|
||||
|
||||
|
||||
def debugEval(bot: bare.bot, chan: str, name: str, message: str) -> None:
|
||||
try:
|
||||
bot.msg(str(eval(message.split(" ", 1)[1])), chan)
|
||||
except Exception as E:
|
||||
bot.msg(f"Exception: {E}", chan)
|
||||
|
||||
|
||||
def raw(bot: bare.bot, chan: str, name: str, message: str) -> None:
|
||||
bot.sendraw(message.split(" ", 1)[1])
|
||||
|
||||
|
@ -213,6 +233,70 @@ def markov(bot: bare.bot, chan: str, name: str, message: str) -> None:
|
|||
bot.msg(proposed, chan)
|
||||
|
||||
|
||||
def setStatus(bot: bare.bot, chan: str, name: str, message: str) -> None:
|
||||
user, stat, reas = ("", 0, "")
|
||||
try:
|
||||
if message.split(" ")[1] == "help":
|
||||
bot.msg(
|
||||
"Assuming you want help with status codes. 1 is Available, 2 is Busy, 3 is Unavailable, anything else is Unknown.",
|
||||
chan,
|
||||
)
|
||||
return
|
||||
message = message.split(" ", 1)[1]
|
||||
user = message.split(" ")[0].lower()
|
||||
stat = int(message.split(" ")[1])
|
||||
reas = message.split(" ", 2)[2]
|
||||
except IndexError:
|
||||
bot.msg(
|
||||
f"Insufficent information to set a status. Only got {len(message.split(' ')) - (1 if '.sS' in message else 0)}/3 expected args.",
|
||||
chan,
|
||||
)
|
||||
return
|
||||
except ValueError:
|
||||
bot.msg("Status parameter must be an int.", chan)
|
||||
return
|
||||
match stat:
|
||||
case 1:
|
||||
stat = "Available"
|
||||
case 2:
|
||||
stat = "Busy"
|
||||
case 3:
|
||||
stat = "Unavailable"
|
||||
case _:
|
||||
stat = "Unknown"
|
||||
if user in ["me", "my", "I"]:
|
||||
user = "firepup"
|
||||
bot.statuses[user] = {"status": stat, "reason": reas}
|
||||
bot.msg(f"Status set for '{user}'. Raw data: {bot.statuses[user]}", chan)
|
||||
|
||||
|
||||
def getStatus(bot: bare.bot, chan: str, name: str, message: str) -> None:
|
||||
user = ""
|
||||
try:
|
||||
user = message.split(" ")[1]
|
||||
except IndexError:
|
||||
user = "firepup"
|
||||
if bot.statuses.get(user) is None:
|
||||
bot.msg("You've gotta provide a nick I actually recognize.", chan)
|
||||
return
|
||||
bot.msg(
|
||||
f"{user}'s status: {'Unknown' if not bot.statuses[user].get('status') else bot.statuses[user]['status']} - {'Reason unset' if not bot.statuses[user].get('reason') else bot.statuses[user]['reason']}",
|
||||
chan,
|
||||
)
|
||||
|
||||
|
||||
def check(bot: bare.bot, chan: str, name: str, message: str) -> None:
|
||||
try:
|
||||
msg = message.split(" ", 1)[1]
|
||||
nick = msg.split("!")[0]
|
||||
host = msg.split("@", 1)[1]
|
||||
dnsbl, raws = conf.dnsblHandler(bot, nick, host, chan)
|
||||
bot.msg(f"Blacklist check: {dnsbl if dnsbl else 'Safe.'} ({raws})", chan)
|
||||
except Exception as E:
|
||||
bot.msg("Blacklist lookup failed. Error recorded to bot logs.", chan)
|
||||
bot.log(str(E), "FATAL")
|
||||
|
||||
|
||||
data: dict[str, dict[str, Any]] = {
|
||||
"!botlist": {"prefix": False, "aliases": []},
|
||||
"bugs bugs bugs": {"prefix": False, "aliases": []},
|
||||
|
@ -226,8 +310,14 @@ data: dict[str, dict[str, Any]] = {
|
|||
"uptime": {"prefix": True, "aliases": []},
|
||||
"raw": {"prefix": True, "aliases": ["cmd "], "check": checks.admin},
|
||||
"debug": {"prefix": True, "aliases": ["dbg", "d"], "check": checks.admin},
|
||||
"debugInternal": {
|
||||
"prefix": True,
|
||||
"aliases": ["dbgInt", "dI"],
|
||||
"check": checks.admin,
|
||||
},
|
||||
"debugEval": {"prefix": True, "aliases": ["dbgEval", "dE"], "check": checks.admin},
|
||||
"8ball": {"prefix": True, "aliases": ["eightball", "8b"]},
|
||||
"join ": {"prefix": True, "aliases": [], "check": checks.admin},
|
||||
"join": {"prefix": True, "aliases": ["j"], "check": checks.admin},
|
||||
"quote": {"prefix": True, "aliases": ["q"]},
|
||||
"goat.mode.activate": {"prefix": True, "aliases": ["g.m.a"], "check": checks.admin},
|
||||
"goat.mode.deactivate": {
|
||||
|
@ -244,6 +334,9 @@ data: dict[str, dict[str, Any]] = {
|
|||
"version": {"prefix": True, "aliases": ["ver", "v"]},
|
||||
"np": {"prefix": True, "aliases": []},
|
||||
"markov": {"prefix": True, "aliases": ["m"]},
|
||||
"setStatus": {"prefix": True, "aliases": ["sS"], "check": checks.admin},
|
||||
"getStatus": {"prefix": True, "aliases": ["gS"]},
|
||||
"check": {"prefix": True, "aliases": [], "check": checks.admin},
|
||||
}
|
||||
regexes: list[str] = [conf.npbase, conf.su]
|
||||
call: dict[str, Callable[[bare.bot, str, str, str], None]] = {
|
||||
|
@ -256,6 +349,8 @@ call: dict[str, Callable[[bare.bot, str, str, str], None]] = {
|
|||
"uptime": uptime,
|
||||
"raw": raw,
|
||||
"debug": debug,
|
||||
"debugInternal": debugInternal,
|
||||
"debugEval": debugEval,
|
||||
"8ball": eball,
|
||||
"join": join,
|
||||
"quote": quote,
|
||||
|
@ -270,4 +365,7 @@ call: dict[str, Callable[[bare.bot, str, str, str], None]] = {
|
|||
"version": version,
|
||||
"np": fmpull,
|
||||
"markov": markov,
|
||||
"setStatus": setStatus,
|
||||
"getStatus": getStatus,
|
||||
"check": check,
|
||||
}
|
||||
|
|
108
config.py
108
config.py
|
@ -4,13 +4,35 @@ from dotenv import load_dotenv # type: ignore
|
|||
import re, codecs
|
||||
from typing import Optional, Any, Union
|
||||
import bare, pylast
|
||||
from pydnsbl import DNSBLIpChecker, DNSBLDomainChecker
|
||||
from pydnsbl import DNSBLIpChecker, DNSBLDomainChecker, providers as BL
|
||||
|
||||
ipbl = DNSBLIpChecker()
|
||||
hsbl = DNSBLDomainChecker()
|
||||
|
||||
class droneBL(BL.Provider):
|
||||
def process_response(self, response):
|
||||
reasons = set()
|
||||
for result in response:
|
||||
reason = result.host
|
||||
if reason in ["127.0.0.3"]:
|
||||
reasons.add("IRC Spambot")
|
||||
elif reason in ["127.0.0.19"]:
|
||||
reasons.add("Abused VPN")
|
||||
elif reason in ["127.0.0.9", "127.0.0.8"]:
|
||||
reasons.add("Open Proxy")
|
||||
elif reason in ["127.0.0.13"]:
|
||||
reasons.add("Automated Attacks")
|
||||
else:
|
||||
print("Unknown dnsbl reason: " + reason, flush=True)
|
||||
reasons.add("unknown")
|
||||
return reasons
|
||||
|
||||
|
||||
providers = BL.BASE_PROVIDERS + [droneBL("dnsbl.dronebl.org")]
|
||||
|
||||
ipbl = DNSBLIpChecker(providers=providers)
|
||||
hsbl = DNSBLDomainChecker(providers=providers)
|
||||
|
||||
load_dotenv()
|
||||
__version__ = "v3.0.14"
|
||||
__version__ = "v3.0.16"
|
||||
npbase: str = (
|
||||
"\[\x0303last\.fm\x03\] [A-Za-z0-9_[\]{}\\|\-^]{1,$MAX} (is listening|last listened) to: \x02.+ - .*\x02( \([0-9]+ plays\)( \[.*\])?)?" # pyright: ignore [reportInvalidStringEscapeSequence]
|
||||
)
|
||||
|
@ -24,7 +46,7 @@ servers: dict[str, dict[str, Any]] = {
|
|||
"channels": {"#random": 0, "#dice": 0, "#offtopic": 0, "#main/replirc": 0},
|
||||
"ignores": ["#main/replirc"],
|
||||
"hosts": ["9pfs.repl.co"],
|
||||
"dnsblMode": "kickban"
|
||||
"dnsblMode": "kickban",
|
||||
},
|
||||
"efnet": {
|
||||
"address": "irc.underworld.no",
|
||||
|
@ -46,6 +68,7 @@ servers: dict[str, dict[str, Any]] = {
|
|||
"#fp-radio": 0,
|
||||
"#fp-radio-debug": 0,
|
||||
"#hardfork": 0,
|
||||
"#opers": 0,
|
||||
},
|
||||
"ignores": ["#fp-radio"],
|
||||
"admins": ["h-tl"],
|
||||
|
@ -102,6 +125,20 @@ def decode_escapes(s: str) -> str:
|
|||
return ESCAPE_SEQUENCE_RE.sub(decode_match, s)
|
||||
|
||||
|
||||
def cmdFind(message: str, find: list, usePrefix: bool = True) -> bool:
|
||||
cmd = None
|
||||
try:
|
||||
cmd = message.split(" ", 1)[0]
|
||||
except IndexError:
|
||||
...
|
||||
if not cmd:
|
||||
return False
|
||||
if usePrefix:
|
||||
return any(cmd == prefix + match for match in find)
|
||||
else:
|
||||
return any(cmd == match for match in find)
|
||||
|
||||
|
||||
def mfind(message: str, find: list, usePrefix: bool = True) -> bool:
|
||||
if usePrefix:
|
||||
return any(message[: len(match) + 1] == prefix + match for match in find)
|
||||
|
@ -123,22 +160,67 @@ def sub(
|
|||
return result
|
||||
|
||||
|
||||
def dnsbl(hostname: str) -> Union[str, None]:
|
||||
def dnsbl(hostname: str) -> tuple[str, dict[str, list[str]]]:
|
||||
hosts = []
|
||||
hstDT = None
|
||||
hstDT = {}
|
||||
try:
|
||||
hstDT = ipbl.check(hostname).detected_by
|
||||
except ValueError:
|
||||
except ValueError: # It's not an IP
|
||||
try:
|
||||
hstDT = hsbl.check(hostname).detected_by
|
||||
except ValueError: # It's also not a hostname
|
||||
hstDT = {}
|
||||
for host in hstDT:
|
||||
if hstDT[host] != ["unknown"]:
|
||||
hosts.append(host)
|
||||
print(f'DEBUG: {host} - {hstDT[host]}')
|
||||
if not hosts:
|
||||
return
|
||||
return "", hstDT
|
||||
hostStr = None
|
||||
if len(hosts) >= 3:
|
||||
hostStr = ', and '.join((', '.join(hosts)).rsplit(", ", 1))
|
||||
hostStr = ", and ".join((", ".join(hosts)).rsplit(", ", 1))
|
||||
else:
|
||||
hostStr = ' and '.join(hosts)
|
||||
return hostStr
|
||||
hostStr = " and ".join(hosts)
|
||||
return hostStr, hstDT
|
||||
|
||||
|
||||
def dnsblHandler(
|
||||
bot: bare.bot, nick: str, hostname: str, chan: str
|
||||
) -> tuple[str, dict[str, list[str]]]:
|
||||
dnsblStatus = "Not enabled"
|
||||
dnsblResps = {}
|
||||
if bot.dnsblMode != "none":
|
||||
dnsblStatus, dnsblResps = dnsbl(hostname)
|
||||
if dnsblStatus:
|
||||
match bot.dnsblMode:
|
||||
case "kickban":
|
||||
bot.sendraw(
|
||||
f"KICK #{chan} {nick} :Sorry, but you're on the {dnsblStatus} blacklist(s)."
|
||||
)
|
||||
bot.sendraw(f"MODE #{chan} +b *!*@{hostname}")
|
||||
case "akill":
|
||||
bot.sendraw(
|
||||
f"OS AKILL ADD *@{hostname} !P Sorry, but you're on the {dnsblStatus} blacklist(s)."
|
||||
)
|
||||
case "kline":
|
||||
bot.sendraw(
|
||||
f"KILL {nick} :Sorry, but you're on the {dnsblStatus} blacklist(s)."
|
||||
)
|
||||
bot.sendraw(
|
||||
f"KLINE 524160 *@{hostname} :Sorry, but you're on the {dnsblStatus} blacklist(s)."
|
||||
)
|
||||
bot.sendraw(
|
||||
f"KLINE *@{hostname} :Sorry, but you're on the {dnsblStatus} blacklist(s)."
|
||||
)
|
||||
case "gline":
|
||||
bot.sendraw(
|
||||
f"KILL {nick} :Sorry, but you're on the {dnsblStatus} blacklist(s)."
|
||||
)
|
||||
bot.sendraw(
|
||||
f"GLINE *@{hostname} 524160 :Sorry, but you're on the {dnsblStatus} blacklist(s)."
|
||||
)
|
||||
bot.sendraw(
|
||||
f"GLINE *@{hostname} :Sorry, but you're on the {dnsblStatus} blacklist(s)."
|
||||
)
|
||||
case _:
|
||||
bot.log(f'Unknown dnsbl Mode "{bot.dnsblMode}"!', "WARN")
|
||||
return dnsblStatus, dnsblResps
|
||||
|
|
68
handlers.py
68
handlers.py
|
@ -6,6 +6,7 @@ from typing import Union, Callable
|
|||
from overrides import bytes, bbytes
|
||||
from importlib import reload
|
||||
import bare, re, checks
|
||||
from traceback import format_exc
|
||||
|
||||
|
||||
def CTCP(bot: bare.bot, msg: str) -> bool:
|
||||
|
@ -103,16 +104,34 @@ def PRIVMSG(bot: bare.bot, msg: str) -> Union[tuple[None, None], tuple[str, str]
|
|||
triggers = [cmd]
|
||||
triggers.extend(cmds.data[cmd]["aliases"])
|
||||
triggers = list(conf.sub(call, bot, chan, name).lower() for call in triggers)
|
||||
if conf.mfind(
|
||||
if conf.cmdFind(
|
||||
conf.sub(message, bot, chan, name).lower(),
|
||||
triggers,
|
||||
cmds.data[cmd]["prefix"],
|
||||
):
|
||||
if "check" in cmds.data[cmd] and cmds.data[cmd]["check"]:
|
||||
if cmds.data[cmd]["check"](bot, name, host, chan, cmd):
|
||||
try:
|
||||
cmds.call[cmd](bot, chan, name, message)
|
||||
except Exception:
|
||||
Err = format_exc()
|
||||
for line in Err.split("\n"):
|
||||
bot.log(line, "ERROR")
|
||||
bot.msg(
|
||||
"Sorry, I had an error trying to execute that command. Please check error logs.",
|
||||
chan,
|
||||
)
|
||||
else:
|
||||
try:
|
||||
cmds.call[cmd](bot, chan, name, message)
|
||||
except Exception:
|
||||
Err = format_exc()
|
||||
for line in Err.split("\n"):
|
||||
bot.log(line, "ERROR")
|
||||
bot.msg(
|
||||
"Sorry, I had an error trying to execute that command. Please check error logs.",
|
||||
chan,
|
||||
)
|
||||
handled = True
|
||||
break
|
||||
if not handled:
|
||||
|
@ -124,7 +143,7 @@ def PRIVMSG(bot: bare.bot, msg: str) -> Union[tuple[None, None], tuple[str, str]
|
|||
cmds.call[check](bot, chan, name, message)
|
||||
handled = True
|
||||
break
|
||||
if not handled and conf.mfind(message, ["reload", "r"]):
|
||||
if not handled and conf.cmdFind(message, ["reload", "r"]):
|
||||
if checks.admin(bot, name, host, chan, "reload"):
|
||||
return "reload", chan
|
||||
handled = True
|
||||
|
@ -189,27 +208,30 @@ def JOIN(bot: bare.bot, msg: str) -> tuple[None, None]:
|
|||
nick = msg.split("!", 1)[0][1:]
|
||||
hostname = msg.split("@", 1)[1].split(" ", 1)[0].strip()
|
||||
chan = msg.split("#")[-1].strip()
|
||||
if bot.dnsblMode != "none":
|
||||
dnsblStatus = conf.dnsbl(hostname)
|
||||
if dnsblStatus:
|
||||
match bot.dnsblMode:
|
||||
case "kickban":
|
||||
bot.sendraw(f"KICK #{chan} {nick} :Sorry, but you're on the {dnsblStatus} blacklist(s).")
|
||||
bot.sendraw(f"MODE #{chan} +b *!*@{hostname}")
|
||||
case "akill":
|
||||
bot.sendraw(f"OS AKILL ADD *@{hostname} !P Sorry, but you're on the {dnsblStatus} blacklists(s).")
|
||||
case "kline":
|
||||
bot.sendraw(f"KILL {nick} :Sorry, but you're on the {dnsblStatus} blacklist(s).")
|
||||
bot.sendraw(f"KLINE 524160 *@{hostname} :Sorry, but you're on the {dnsblStatus} blacklist(s).")
|
||||
bot.sendraw(f"KLINE *@{hostname} :Sorry, but you're on the {dnsblStatus} blacklist(s).")
|
||||
case "gline":
|
||||
bot.sendraw(f"KILL {nick} :Sorry, but you're on the {dnsblStatus} blacklist(s).")
|
||||
bot.sendraw(f"GLINE *@{hostname} 524160 :Sorry, but you're on the {dnsblStatus} blacklist(s).")
|
||||
bot.sendraw(f"GLINE *@{hostname} :Sorry, but you're on the {dnsblStatus} blacklist(s).")
|
||||
case _:
|
||||
bot.log(f'Unknown dnsbl Mode "{bot.dnsblMode}"!', "WARN")
|
||||
conf.dnsblHandler(bot, nick, hostname, chan)
|
||||
return None, None
|
||||
|
||||
|
||||
def MODE(bot: bare.bot, msg: str) -> tuple[None, None]:
|
||||
chan = msg.split("#", 1)[1].split(" ", 1)[0]
|
||||
add = True if msg.split("#", 1)[1].split(" ", 2)[1][0] == "+" else False
|
||||
modes = msg.split("#", 1)[1].split(" ", 2)[1][1:]
|
||||
users = ""
|
||||
try:
|
||||
users = msg.split("#", 1)[1].split(" ", 2)[2].split()
|
||||
except IndexError:
|
||||
...
|
||||
if len(modes) != len(users):
|
||||
bot.log("Refusing to handle modes that do not have corresponding users.")
|
||||
return None, None
|
||||
for i in range(len(modes)):
|
||||
if users[i] == bot.nick:
|
||||
if modes[i] == "o":
|
||||
bot.ops[chan] = add
|
||||
bot.log(f"{'Got' if add else 'Lost'} ops in {chan}")
|
||||
return None, None
|
||||
|
||||
|
||||
def NULL(bot: bare.bot, msg: str) -> tuple[None, None]:
|
||||
return None, None
|
||||
|
||||
|
@ -221,8 +243,10 @@ handles: dict[
|
|||
"NICK": NICK,
|
||||
"KICK": KICK,
|
||||
"PART": PART,
|
||||
"MODE": NULL,
|
||||
"MODE": MODE,
|
||||
"TOPIC": NULL,
|
||||
"QUIT": QUIT,
|
||||
"JOIN": JOIN,
|
||||
"NOTICE": NULL,
|
||||
"INVITE": NULL,
|
||||
}
|
||||
|
|
2
logs.py
2
logs.py
|
@ -10,7 +10,7 @@ def log(
|
|||
level: str = "LOG",
|
||||
time: Union[dt, str] = "now",
|
||||
) -> None:
|
||||
if level in ["EXIT", "CRASH", "FATAL"]:
|
||||
if level in ["EXIT", "CRASH", "FATAL", "ERROR"]:
|
||||
stream = stderr
|
||||
else:
|
||||
stream = stdout
|
||||
|
|
Loading…
Reference in a new issue