Re-work to reloadable handler system

This commit is contained in:
Firepup Sixfifty 2023-11-08 21:20:50 -06:00
parent 1f4adf21cc
commit 9bd2ea7e29
Signed by: Firepup650
GPG key ID: 7C92E2ABBBFAB9BA
8 changed files with 194 additions and 22 deletions

20
bot.py
View file

@ -36,7 +36,7 @@ class bot:
self.__version__ = conf.__version__
self.nick = "FireBot"
self.adminnames = conf.servers[server]["admins"]
self.queue = []
self.queue: list[bbytes] = []
self.sock = socket(AF_INET, SOCK_STREAM)
self.npallowed = ["FireBitBot"]
self.log(f"Start init for {self.server}")
@ -53,11 +53,16 @@ class bot:
):
ircmsg = self.recv().decode()
if ircmsg != "":
code = 0
try:
code = int(ircmsg.split(" ", 2)[1].strip())
except (IndexError, ValueError):
pass
print(bytes(ircmsg).lazy_decode())
if ircmsg.find("NICKLEN=") != -1:
self.nicklen = int(ircmsg.split("NICKLEN=")[1].split(" ")[0])
self.log(f"NICKLEN set to {self.nicklen}")
elif ircmsg.find("Nickname") != -1:
elif code == 433:
self.log("Nickname in use", "WARN")
self.nick = f"{self.nick}{r.randint(0,1000)}"
self.send(f"NICK {self.nick}\n")
@ -151,10 +156,10 @@ class bot:
return bytes(self.queue.pop(0))
return data
def log(self, message: object, level: str = "LOG") -> None:
def log(self, message: str, level: str = "LOG") -> None:
logs.log(message, self.server)
def exit(self, message: object) -> NoReturn:
def exit(self, message: str) -> NoReturn:
logs.log(message, self.server, "EXIT")
exit(1)
@ -203,6 +208,7 @@ class bot:
if name != "":
self.log(f"Attempting op of {name} in {chan}...")
return self.send(f"MODE {chan} +o {name}\n")
return None
def notice(self, msg: str, target: str, silent: bool = False) -> int:
if not silent:
@ -240,8 +246,8 @@ class bot:
# Format of ":[Nick]![ident]@[host|vhost] PRIVMSG [channel] :[message]”
name = ircmsg.split("!", 1)[0][1:]
helpErr = False
if (name.startswith("saxjax") and server == "efnet") or (
name == "ReplIRC" and server == "replirc"
if (name.startswith("saxjax") and self.server == "efnet") or (
name == "ReplIRC" and self.server == "replirc"
):
if ircmsg.find("<") != -1 and ircmsg.find(">") != -1:
Nname = ircmsg.split("<", 1)[1].split(">", 1)[0].strip()
@ -278,7 +284,7 @@ class bot:
else:
self.channels[chan] += 1
if "goat" in name.lower() and self.gmode == True:
cmds.goat(self, chan)
cmds.goat(self, chan, name, message)
handled = False
for cmd in cmds.data:
triggers = [cmd]

View file

@ -1,6 +1,7 @@
from subprocess import run, PIPE
from config import npbase, su, decode_escapes
import random as r
from typing import Any
def goat(bot, chan: str, name: str, message: str) -> None:
@ -54,6 +55,14 @@ def isAdmin(bot, chan: str, name: str, message: str) -> None:
def help(bot, chan: str, name: str, message: str) -> None:
helpErr = False
if (name.startswith("saxjax") and bot.server == "efnet") or (
name == "ReplIRC" and bot.server == "replirc"
):
if message.find("<") != -1 and message.find(">") != -1:
helpErr = True
elif name.endswith("dsc"):
helpErr = True
if not helpErr:
bot.msg("Command list needs rework", name)
return
@ -65,7 +74,7 @@ def help(bot, chan: str, name: str, message: str) -> None:
f"{bot.prefix}(eightball,8ball,8b) [question]? - Asks the magic eightball a question",
name,
)
bot.msg(f"(hi,hello) {botnick} - The bot says hi to you", name)
bot.msg(f"(hi,hello) {bot.nick} - The bot says hi to you", name)
if name.lower() in bot.adminnames:
bot.msg(f"reboot {bot.rebt} - Restarts the bot", name)
bot.msg("op me - Makes the bot try to op you", name)
@ -150,7 +159,7 @@ def nowplaying(bot, chan: str, name: str, message: str) -> None:
)
data = {
data: dict[str, dict[str, Any]] = {
"!botlist": {"prefix": False, "aliases": []},
"bugs bugs bugs": {"prefix": False, "aliases": []},
"hi $BOTNICK": {"prefix": False, "aliases": ["hello $BOTNICK"]},

View file

@ -1,13 +1,14 @@
#!/usr/bin/python3
from os import environ as env
from dotenv import load_dotenv
import re
import re, codecs
from typing import Union, Any
load_dotenv()
__version__ = "v2.0.3"
npbase = "\[\x0303last\.fm\x03\] [A-Za-z0-9_[\]{}\\|^]{1,$MAX} (is listening|last listened) to: \x02.+ - .*\x02( \([0-9]+ plays\)( \[.*\])?)?"
su = "^(su|sudo|(su .*|sudo .*))$"
servers = {
servers: dict[str, dict[str, Any]] = {
"ircnow": {
"address": "localhost",
"port": 6601,
@ -47,3 +48,10 @@ def decode_escapes(s: str) -> str:
return codecs.decode(match.group(0), "unicode-escape")
return ESCAPE_SEQUENCE_RE.sub(decode_match, s)
def mfind(message: str, find: list, usePrefix: bool = True) -> bool:
if usePrefix:
return any(message[: len(match) + 1] == prefix + match for match in find)
else:
return any(message[: len(match)] == match for match in find)

149
handlers.py Normal file
View file

@ -0,0 +1,149 @@
import random as r
import config as conf
import re
import commands as cmds
from typing import Union
from overrides import bytes, bbytes
from importlib import reload
def CTCP(bot, msg: str, sender: str = "", isRaw: bool = False) -> bool:
if isRaw:
sender = msg.split("!", 1)[0][1:]
message = msg.split("PRIVMSG", 1)[1].split(":", 1)[1].strip()
kind = msg.split("\x01")[1].split(" ", 1)[0]
bot.log(f'Responding to CTCP "{kind}" from {sender}')
if kind == "VERSION":
bot.notice(
f"\x01VERSION FireBot {conf.__version__} (https://git.amcforum.wiki/Firepup650/fire-ircbot)\x01",
sender,
True,
)
return True
elif kind == "USERINFO":
bot.notice("\x01USERINFO FireBot (Firepup's bot)\x01", sender, True)
return True
elif kind == "SOURCE":
bot.notice(
"\x01SOURCE https://git.amcforum.wiki/Firepup650/fire-ircbot\x01",
sender,
True,
)
return True
elif kind == "FINGER":
bot.notice("\x01FINGER Firepup's bot\x01", sender, True)
return True
elif kind == "CLIENTINFO":
bot.notice(
"\x01CLIENTINFO ACTION VERSION USERINFO SOURCE FINGER\x01", sender, True
)
return True
bot.log(f'Unknown CTCP "{kind}"', "WARN")
return False
def PRIVMSG(bot, msg: str) -> Union[None, str]:
# Format of ":[Nick]![ident]@[host|vhost] PRIVMSG [channel] :[message]”
name = msg.split("!", 1)[0][1:]
if (name.startswith("saxjax") and bot.server == "efnet") or (
name == "ReplIRC" and bot.server == "replirc"
):
if msg.find("<") != -1 and msg.find(">") != -1:
Nname = msg.split("<", 1)[1].split(">", 1)[0].strip()
if name == "ReplIRC":
name = Nname[4:]
else:
name = Nname
message = msg.split(">", 1)[1].strip()
else:
message = (
msg.split("PRIVMSG", 1)[1].split(":", 1)[1].strip()
)
elif name == bot.nick:
return # type: ignore
else:
message = msg.split("PRIVMSG", 1)[1].split(":", 1)[1].strip()
chan = msg.split("PRIVMSG", 1)[1].split(":", 1)[0].strip()
bot.log(
f'Got "{bytes(message).lazy_decode()}" from "{name}" in "{chan}"',
)
if len(name) > bot.nicklen:
bot.log(f"Name too long ({len(name)} > {bot.nicklen})")
return # type: ignore
elif chan not in bot.channels:
bot.log(
f"Channel not in channels ({chan} not in {bot.channels})",
"WARN",
)
if not chan.startswith(("#", "+", "&")):
chan = name
else:
bot.channels[chan] += 1
if "goat" in name.lower() and bot.gmode == True:
cmds.goat(bot, chan, name, message)
handled = False
for cmd in cmds.data:
triggers = [cmd]
triggers.extend(cmds.data[cmd]["aliases"])
triggers = list(
call.replace("$BOTNICK", bot.nick.lower())
for call in triggers
)
if conf.mfind(
message.lower(),
triggers,
cmds.data[cmd]["prefix"],
):
if (
"admin" in cmds.data[cmd] and cmds.data[cmd]["admin"]
) and name not in bot.adminnames:
bot.msg(
f"Sorry {name}, you don't have permission to use {cmd.strip()}.",
chan,
)
else:
cmds.call[cmd](bot, chan, name, message)
handled = True
break
if not handled:
for check in cmds.checks:
if re.search(
check.replace("$MAX", str(bot.nicklen)).replace(
"$BOTNICK", bot.nick
),
message,
):
cmds.call[check](bot, chan, name, message)
handled = True
break
if not handled and conf.mfind(message, ["reload"]):
if name in bot.adminnames:
return "reload"
else:
bot.msg(
f"Sorry {name}, you don't have permission to use reload.",
chan,
)
handled = True
if not handled and len(message.split("\x01")) == 3:
if not bot.CTCP(message, name):
CTCP = message.split("\x01")[1]
if CTCP == "ACTION ducks":
bot.msg("\x01ACTION gets hit by a duck\x01", chan)
elif CTCP.startswith("ACTION ducks"):
bot.msg(
f"\x01ACTION gets hit by {CTCP.split(' ', 2)[2]}\x01",
chan,
)
if chan in bot.channels and bot.channels[chan] >= bot.interval:
r.seed()
bot.channels[chan] = 0
mm = open("mastermessages.txt", "r")
q = mm.readlines()
sel = conf.decode_escapes(
str(r.sample(q, 1))
.strip("[]'")
.replace("\\n", "")
.strip('"')
)
bot.msg(f"[QUOTE] {sel}", chan)
mm.close()
return # type: ignore

View file

@ -5,7 +5,6 @@ import re, random as r, codecs
from sys import argv as args, exit as xit, stdout, stderr
from socket import socket, AF_INET, SOCK_STREAM
from dotenv import load_dotenv
from pythonlangutil.overload import Overload, signature
from datetime import datetime as dt
from logs import log
from subprocess import run, PIPE
@ -16,7 +15,7 @@ botnick = "FireBot"
server = args[1] if args else "UNSTABLE BOT MODE"
def exit(message: object) -> None:
def exit(message: str) -> None:
log(message, server, "EXIT")
xit(1)
@ -34,7 +33,7 @@ if __name__ == "__main__":
adminnames = servers[server]["admins"]
exitcode = f"bye {botnick.lower()}"
np = re.compile(npbase.replace("MAX", f"{nicklen}"))
queue = []
queue: list[bbytes] = []
log(f"Start init for {server}", server)
npallowed = ["FireBitBot"]
ESCAPE_SEQUENCE_RE = re.compile(

View file

@ -4,7 +4,7 @@ from sys import argv as args
from traceback import format_exc
from logs import log
server = args[1] if args else "UNSTABLE BOT MODE"
server = args[1] if args else "UNSTABLE"
if __name__ == "__main__":
@ -15,4 +15,4 @@ if __name__ == "__main__":
Err = format_exc()
for line in Err.split("\n"):
log(line, server, "CRASH")
exit(-1)
exit(1)

View file

@ -1,10 +1,11 @@
#!/usr/bin/python3
from datetime import datetime as dt
from sys import stdout, stderr
from typing import Union
def log(
message: str, origin: str = "Unknown", level: str = "LOG", time: dt = "now"
message: str, origin: str = "Unknown", level: str = "LOG", time: Union[dt, str] = "now"
) -> None:
if level in ["EXIT", "CRASH"]:
stream = stderr

View file

@ -28,14 +28,14 @@ class bytes(bbytes):
errors: str = "strict",
) -> _T:
if type(thing) == str:
cls.value = super().__new__(cls, thing, encoding, errors)
cls.value = super().__new__(cls, thing, encoding, errors) # type: ignore
elif thing == None:
cls.value = super().__new__(cls)
cls.value = super().__new__(cls) # type: ignore
elif thing != None:
cls.value = super().__new__(cls, thing)
cls.value = super().__new__(cls, thing) # type: ignore
else:
raise AttributeError("wtf")
return cls.value
raise AttributeError("This shouldn't happen")
return cls.value # type: ignore
@classmethod
def lazy_decode(self):