Partway through MAJOR rework

This commit is contained in:
Firepup Sixfifty 2023-11-04 20:37:35 -05:00
parent d2bd195764
commit aedffec6ac
5 changed files with 156 additions and 89 deletions

43
bot.py Normal file
View file

@ -0,0 +1,43 @@
#!/usr/bin/python3
from socket import socket, AF_INET, SOCK_STREAM
from overrides import bytes, bbytes
from logs import log
import re
class bot:
def __init__(server: str):
self.gmode = False
self.server = server
self.nicklen = 30
self.address = servers[server]["address"]
self.port = servers[server]["port"] if "port" in servers[server] else 6667
self.channels = servers[server]["channels"]
self.interval = (
servers[server]["interval"] if "interval" in servers[server] else 50
)
self.prefix = "."
self.rebt = "fire"
self.gblrebt = "all"
self.lrebt = 7 + len(rebt)
self.lgblrebt = 7 + len(gblrebt)
self.adminnames = servers[server]["admins"]
self.exitcode = f"bye {botnick.lower()}"
self.np = re.compile(npbase.replace("MAX", f"{nicklen}"))
self.queue = []
self.__version__ = "v1.0.5"
self.socket = socket(AF_INET, SOCK_STREAM)
log(f"Start init for {server}", self.server)
def send(self, command: str) -> int:
pass
def recv(self) -> bytes:
pass
def log(self, message: object) -> None:
log(message, self.server)
def exit(message: object) -> None:
log(message, self.server, "EXIT")
exit(1)

14
core.py
View file

@ -4,6 +4,8 @@ from time import sleep
from sys import argv as args
from threading import Thread
from datetime import datetime as dt
from logs import log
def launch(server: str) -> None:
system(f"python3 -u ircbot.py {server}")
@ -17,12 +19,12 @@ servers = [
]
def is_dead(thr):
def is_dead(thr: Thread) -> bool:
thr.join(timeout=0)
return not thr.is_alive()
def start(server):
def start(server: str) -> Thread:
t = Thread(target=launch, args=(server,))
t.daemon = True
t.start()
@ -30,15 +32,15 @@ def start(server):
if __name__ == "__main__":
print(f"[LOG][CORE][{dt.now()}] Begin initialization")
log("Begin initialization", "CORE")
for server in servers:
threads[server] = start(server)
print(f"[LOG][CORE][{dt.now()}] Started all instances. Idling...")
log("Started all instances. Idling...", "CORE")
while 1:
sleep(60)
print(f"[LOG][CORE][{dt.now()}] Running a checkup on all running instances")
log("Running a checkup on all running instances", "CORE")
for server in threads:
t = threads[server]
if is_dead(t):
print(f"[WARN][CORE][{dt.now()}] The thread for {server} died, restarting it...")
log(f"The thread for {server} died, restarting it...", "CORE", "WARN")
threads[server] = start(server)

142
ircbot.py
View file

@ -1,38 +1,17 @@
#!/usr/bin/python3
from time import sleep
from builtins import bytes as bbytes
from overrides import bytes, bbytes
import re, random as r, codecs
from sys import argv as args, exit as xit
from sys import argv as args, exit as xit, stdout, stderr
from socket import socket, AF_INET, SOCK_STREAM
from os import environ as env
from dotenv import load_dotenv
from pythonlangutil.overload import Overload, signature
from typing import Iterable, Type, TypeVar, Any
from datetime import datetime as dt
from logs import log
_T = TypeVar("_T")
load_dotenv()
class bytes(bbytes):
"""Local override of builtin bytes class to add "lazy_decode\" """
self = b''
def __new__(cls: Type[_T], thing: Any = None, encoding: str = "UTF-8", errors: str="strict") -> _T:
if type(thing) == str:
cls.self = super().__new__(cls, thing, encoding, errors)
return cls.self
elif thing == None:
cls.self = super().__new__(cls)
return cls.self
else:
cls.self = super().__new__(cls, thing)
return cls.self
@classmethod
def lazy_decode(cls):
return str(cls.self)[2:-1]
__version__ = "v1.0.5"
ircsock = socket(AF_INET, SOCK_STREAM)
botnick = "FireBot"
@ -42,35 +21,30 @@ servers = {
"port": 6601,
"interval": 200,
"pass": env["ircnow_pass"],
"channels": {"#random": 0, "#dice": 0, "#offtopic": 0, botnick: 0},
"channels": {"#random": 0, "#dice": 0, "#offtopic": 0},
"admins": ["firepup", "h|thelounge"],
},
"efnet": {
"address": "irc.mzima.net",
"channels": {"#random": 0, "#dice": 0, botnick: 0},
"channels": {"#random": 0, "#dice": 0},
"admins": ["firepup", "h|tl"],
},
"replirc": {
"address": "localhost",
"pass": env["replirc_pass"],
"channels": {"#random": 0, "#dice": 0, "#main": 0, "#bots": 0, botnick: 0},
"channels": {"#random": 0, "#dice": 0, "#main": 0, "#bots": 0},
"admins": ["firepup", "firepup|lounge", "h|tl"],
},
}
server = ""
def log(message: str, origin: str = "serv", time: dt = dt.now(), level: str = "LOG") -> None:
if origin == "serv":
origin = args[1]
print(f"[{level}][{origin}][{time}] {message}")
server = args[1] if args else "UNSTABLE BOT MODE"
def exit(message: object) -> None:
log(message, level = "EXIT")
log(message, server, "EXIT")
xit(1)
if __name__ == "__main__":
gmode = False
server = args[1]
nicklen = 30
address = servers[server]["address"]
port = servers[server]["port"] if "port" in servers[server] else 6667
@ -85,12 +59,10 @@ if __name__ == "__main__":
exitcode = f"bye {botnick.lower()}"
ircmsg = ""
blanks = 0
npbase = "\[\x0303last\.fm\x03\] [A-Za-z0-9_[\]{}\\|^]{1,MAX} (is listening|last listened) to: \x02.+ - .+\x02 \([0-9]+ plays\)( \[.*\])?"
np = re.compile(
npbase.replace("MAX", f"{nicklen}")
)
npbase = "\[\x0303last\.fm\x03\] [A-Za-z0-9_[\]{}\\|^]{1,MAX} (is listening|last listened) to: \x02.+ - .*\x02 \([0-9]+ plays\)( \[.*\])?"
np = re.compile(npbase.replace("MAX", f"{nicklen}"))
queue = []
log(f"Start init for {server}")
log(f"Start init for {server}", server)
npallowed = ["FireBitBot"]
ESCAPE_SEQUENCE_RE = re.compile(
r"""
@ -104,6 +76,7 @@ ESCAPE_SEQUENCE_RE = re.compile(
re.UNICODE | re.VERBOSE,
)
def decode_escapes(s: str) -> str:
def decode_match(match):
return codecs.decode(match.group(0), "unicode-escape")
@ -115,7 +88,7 @@ def sucheck(message: str):
return re.search("^(su|sudo|(su .*|sudo .*))$", message)
def send(command: str, encoding: str = "UTF-8") -> int:
def send(command: str) -> int:
return ircsock.send(bytes(command))
@ -124,8 +97,8 @@ def recv() -> bytes:
if queue:
return bytes(queue.pop(0))
data = bytes(ircsock.recv(2048).strip(b"\r\n"))
if b'\r\n' in data:
queue.extend(data.split(b'\r\n'))
if b"\r\n" in data:
queue.extend(data.split(b"\r\n"))
return bytes(queue.pop(0))
return data
@ -137,31 +110,31 @@ def ping(ircmsg: str) -> int:
def sendraw(command: str) -> int:
log(f"RAW sending {command}")
log(f"RAW sending {command}", server)
command = f"{command}\n"
return send(command.replace("$BOTNICK", botnick))
def sendmsg(msg: str, target: str) -> None:
if target != "NickServ" and not mfind(msg, ["IDENTIFY"], False):
log(f"Sending {bytes(msg).lazy_decode()} to {target}")
log(f"Sending {bytes(msg).lazy_decode()} to {target}", server)
else:
log("Identifying myself...")
log("Identifying myself...", server)
send(f"PRIVMSG {target} :{msg}\n")
def notice(msg, target, silent: bool = False):
if not silent:
log(f"Sending {bytes(msg).lazy_decode()} to {target} (NOTICE)")
log(f"Sending {bytes(msg).lazy_decode()} to {target} (NOTICE)", server)
send(f"NOTICE {target} :{msg}\n")
"{fg"
def CTCPHandler(msg: str, sender: str = "", isRaw: bool = False):
if isRaw:
sender = msg.split("!", 1)[0][1:]
message = msg.split("PRIVMSG", 1)[1].split(":", 1)[1].strip()
CTCP = msg.split("\x01")[1]
log(f"Responding to CTCP {CTCP} from {sender}")
CTCP = msg.split("\x01")[1].split(" ", 1)[0]
log(f"Responding to CTCP {CTCP} from {sender}", server)
if CTCP == "VERSION":
notice(
f"\x01VERSION FireBot {__version__} (https://git.amcforum.wiki/Firepup650/fire-ircbot)\x01",
@ -180,18 +153,18 @@ def CTCPHandler(msg: str, sender: str = "", isRaw: bool = False):
)
return True
elif CTCP == "FINGER":
notice(f"\x01FINGER Firepup's bot\x01", sender, True)
notice("\x01FINGER Firepup's bot\x01", sender, True)
return True
elif CTCP == "CLIENTINFO":
notice(f"\x01CLIENTINFO ACTION VERSION USERINFO SOURCE FINGER", sender, True)
notice("\x01CLIENTINFO ACTION VERSION USERINFO SOURCE FINGER", sender, True)
return True
log(f"Unknown CTCP {CTCP}")
log(f"Unknown CTCP {CTCP}", server)
return False
def joinserver():
log(f"Joining {server}...")
global e, nicklen, npbase, np, botnick
log(f"Joining {server}...", server)
global nicklen, npbase, np, botnick
ircsock.connect((address, port))
send(f"USER {botnick} {botnick} {botnick} {botnick}\n")
send(f"NICK {botnick}\n")
@ -205,15 +178,13 @@ def joinserver():
if ircmsg.find("NICKLEN=") != -1:
global nicklen
nicklen = int(ircmsg.split("NICKLEN=")[1].split(" ")[0])
np = re.compile(
npbase.replace("MAX", f"{nicklen}")
)
log(f"NICKLEN set to {nicklen}")
np = re.compile(npbase.replace("MAX", f"{nicklen}"))
log(f"NICKLEN set to {nicklen}", server)
if ircmsg.find("Nickname") != -1:
log(f"My nickname's in use? lemme try that again...", level = "WARN")
log("Nickname in use", server, "WARN")
botnick = f"{botnick}{r.randint(0,1000)}"
send(f"NICK {botnick}\n")
log(f"botnick is now {botnick}")
log(f"botnick is now {botnick}", server)
if ircmsg.startswith("PING "):
# pong = "PONG :" + input("Ping?:") + "\n"
# pong = pong.replace("\\\\", "\\")
@ -221,9 +192,9 @@ def joinserver():
if len(ircmsg.split("\x01")) == 3:
CTCPHandler(ircmsg, isRaw=True)
if ircmsg.find("Closing Link") != -1:
log("I tried.", level = "EXIT")
log("I tried.", server, "EXIT")
exit("Closing Link")
log(f"Joined {server} successfully!")
log(f"Joined {server} successfully!", server)
def mfind(message: str, find: list, usePrefix: bool = True):
@ -234,7 +205,7 @@ def mfind(message: str, find: list, usePrefix: bool = True):
def joinchan(chan: str, origin: str, chanList: dict, lock: bool = True):
log(f"Joining {chan}...")
log(f"Joining {chan}...", server)
chan = chan.replace(" ", "")
if "," in chan:
chans = chan.split(",")
@ -260,17 +231,17 @@ def joinchan(chan: str, origin: str, chanList: dict, lock: bool = True):
if len(ircmsg.split("\x01")) == 3:
CTCPHandler(ircmsg, isRaw=True)
if ircmsg.find("No such channel") != -1:
log(f"Joining {chan} failed (DM)", level = "WARN")
log(f"Joining {chan} failed (DM)", server, "WARN")
if origin != "null":
sendmsg(f"{chan} is an invalid channel", origin)
break
elif ircmsg.find("Cannot join channel (+i)") != -1:
log(f"Joining {chan} failed (Private)", level = "WARN")
log(f"Joining {chan} failed (Private)", server, "WARN")
if origin != "null":
sendmsg(f"Permission denied to channel {chan}", origin)
break
elif ircmsg.find("End of") != -1:
log(f"Joining {chan} succeeded")
log(f"Joining {chan} succeeded", server)
if origin != "null":
sendmsg(f"Joined {chan}", origin)
chanList[chan] = 0
@ -280,14 +251,14 @@ def joinchan(chan: str, origin: str, chanList: dict, lock: bool = True):
def op(name, chan):
if name != "":
log(f"Attempting op of {name}...")
log(f"Attempting op of {name} in {chan}...", server)
send(f"MODE {chan} +o {name}\n")
def main():
try:
global ircmsg, channels, e, gmode, prefix, rebt, gblrebt, lrebt, lgblrebt, blanks
log("Starting connection..")
log("Starting connection..", server)
joinserver()
if "pass" in servers[server]:
sendmsg(f"IDENTIFY FireBot {servers[server]['pass']}", "NickServ")
@ -326,19 +297,22 @@ def main():
if name.endswith("dsc"):
helpErr = True
chan = ircmsg.split("PRIVMSG", 1)[1].split(":", 1)[0].strip()
log(f'Got "{bytes(message).lazy_decode()}" from "{name}" in "{chan}"')
log(
f'Got "{bytes(message).lazy_decode()}" from "{name}" in "{chan}"',
server
)
if "goat" in name.lower() and gmode == True:
log(f"GOAT DETECTED")
log("GOAT DETECTED", server)
sendmsg("Hello Goat", chan)
gmode = False
if len(name) < nicklen and chan in channels:
channels[chan] += 1
elif len(name) > nicklen:
log(f"Name too long ({len(name)} > {nicklen})")
if len(name) > nicklen:
log(f"Name too long ({len(name)} > {nicklen})", server)
continue
elif chan not in channels:
log(f"Channel not in channels ({chan} not in {channels})")
log(f"Channel not in channels ({chan} not in {channels})", server)
continue
else:
channels[chan] += 1
if mfind(
message.lower(),
[f"hi {botnick.lower()}", f"hello {botnick.lower()}"],
@ -382,12 +356,12 @@ def main():
elif name.lower() in adminnames and mfind(
message, ["goat.mode.activate"]
):
log(f"GOAT DETECTION ACTIVATED")
log("GOAT DETECTION ACTIVATED", server)
gmode = True
elif name.lower() in adminnames and mfind(
message, ["goat.mode.deactivate"]
):
log(f"GOAT DETECTION DEACTIVATED")
log("GOAT DETECTION DEACTIVATED", server)
gmode = False
elif mfind(message, ["quote"]):
r.seed()
@ -424,7 +398,9 @@ def main():
sendmsg(f"[DEBUG] NICKLEN={nicklen}", chan)
sendmsg(f"[DEBUG] ADMINS={adminnames}", chan)
sendmsg(f"[DEBUG] CHANNELS={channels}", chan)
elif mfind(message, ["raw ", "cmd "]) and name.lower() in adminnames:
elif (
mfind(message, ["raw ", "cmd "]) and name.lower() in adminnames
):
sendraw(message.split(" ", 1)[1])
elif (
mfind(message, [f"reboot {rebt}", f"reboot {gblrebt}"], False)
@ -445,7 +421,7 @@ def main():
if i != chan.strip():
sendmsg("goodbye... :'(", i)
send("QUIT :Shutting down\n", "UTF-8")
log(f"QUIT")
log("QUIT", server)
exit("goodbye :'(")
# raise EOFError
elif sucheck(message):
@ -454,7 +430,7 @@ def main():
"Error - system failure, contact system operator", chan
)
elif "bot" in name.lower():
log(f"lol, no.")
log("lol, no.", server)
else:
sendmsg("Access Denied", chan)
elif np.search(message) and name in npallowed:
@ -489,9 +465,9 @@ def main():
else:
if ircmsg.startswith("PING "):
ping(ircmsg)
if ircmsg.find("Closing Link") != -1:
elif ircmsg.startswith("ERROR :Closing Link"):
exit("I got killed :'(")
if ircmsg.find("ERROR :Ping timeout: ") != -1:
elif ircmsg.startswith("ERROR :Ping "):
exit("Ping timeout")
except KeyboardInterrupt:
pass

17
logs.py Normal file
View file

@ -0,0 +1,17 @@
#!/usr/bin/python3
from datetime import datetime as dt
from sys import stdout, stderr
def log(
message: str, origin: str = "Unknown", level: str = "LOG", time: dt = dt.now()
) -> None:
if level == "EXIT":
stream = stderr
else:
stream = stdout
if not "\n" in message:
print(f"[{level}][{origin}][{time}] {message}")
else:
for line in message.split("\n"):
print(f"[{level}][{origin}][{time}] {line}")

29
overrides.py Normal file
View file

@ -0,0 +1,29 @@
#!/usr/bin/python3
from builtins import bytes as bbytes
from typing import TypeVar, Type, Any
_T = TypeVar("_T")
class bytes(bbytes):
"""Local override of builtin bytes class to add "lazy_decode" """
def __new__(
cls: Type[_T],
thing: Any = None,
encoding: str = "UTF-8",
errors: str = "strict",
) -> _T:
if type(thing) == str:
cls.value = super().__new__(cls, thing, encoding, errors)
elif thing == None:
cls.value = super().__new__(cls)
elif thing != None:
cls.value = super().__new__(cls, thing)
else:
raise AttributeError("wtf")
return cls.value
@classmethod
def lazy_decode(self):
return str(self.value)[2:-1]