Massive re-works.

This commit is contained in:
Firepup Sixfifty 2023-11-01 20:28:02 -05:00
parent f43da11467
commit fcbce622bf
3 changed files with 134 additions and 99 deletions

1
.gitignore vendored
View file

@ -1 +1,2 @@
.env .env
__pycache__/**

View file

@ -13,7 +13,7 @@ threads = {}
servers = [ servers = [
"ircnow", "ircnow",
"replirc", "replirc",
# "efnet", "efnet",
] ]

188
ircbot.py
View file

@ -2,20 +2,36 @@
from time import sleep from time import sleep
from builtins import bytes as bbytes from builtins import bytes as bbytes
import re, random as r, codecs import re, random as r, codecs
from sys import argv as args from sys import argv as args, exit as xit
from socket import socket, AF_INET, SOCK_STREAM from socket import socket, AF_INET, SOCK_STREAM
from os import environ as env from os import environ as env
from dotenv import load_dotenv from dotenv import load_dotenv
from pythonlangutil.overload import Overload, signature
from typing import Iterable, Type, TypeVar, Any
from datetime import datetime as dt
_T = TypeVar("_T")
load_dotenv() load_dotenv()
class bytes(bbytes): class bytes(bbytes):
"""Local override of builtin bytes class to add "lazy_decode\" """ """Local override of builtin bytes class to add "lazy_decode\" """
self = b''
def lazy_decode(e): def __new__(cls: Type[_T], thing: Any = None, encoding: str = "UTF-8", errors: str="strict") -> _T:
return str(e)[2:-1] if type(thing) == str:
cls.self = super().__new__(cls, thing, encoding, errors)
return cls.self
elif thing == None:
cls.self = super().__new__(cls)
return cls.self
else:
cls.self = super().__new__(cls, thing)
return cls.self
@classmethod
def lazy_decode(cls):
return str(cls.self)[2:-1]
__version__ = "v1.0.5" __version__ = "v1.0.5"
ircsock = socket(AF_INET, SOCK_STREAM) ircsock = socket(AF_INET, SOCK_STREAM)
@ -30,7 +46,7 @@ servers = {
"admins": ["firepup", "h|thelounge"], "admins": ["firepup", "h|thelounge"],
}, },
"efnet": { "efnet": {
"address": "irc.servercentral.net", "address": "irc.mzima.net",
"channels": {"#random": 0, "#dice": 0, botnick: 0}, "channels": {"#random": 0, "#dice": 0, botnick: 0},
"admins": ["firepup", "h|tl"], "admins": ["firepup", "h|tl"],
}, },
@ -41,6 +57,19 @@ servers = {
"admins": ["firepup", "firepup|lounge", "h|tl"], "admins": ["firepup", "firepup|lounge", "h|tl"],
}, },
} }
server = ""
def log(message: str, origin: str = "serv", time: dt = dt.now(), level: str = "LOG") -> None:
if origin == "serv":
global server
origin = server
print(f"[{level}][{origin}][{time}] {message}")
def exit(message: object) -> None:
log(message, level = "EXIT")
xit(1)
if __name__ == "__main__":
gmode = False gmode = False
server = args[1] server = args[1]
nicklen = 30 nicklen = 30
@ -48,13 +77,11 @@ address = servers[server]["address"]
port = servers[server]["port"] if "port" in servers[server] else 6667 port = servers[server]["port"] if "port" in servers[server] else 6667
channels = servers[server]["channels"] channels = servers[server]["channels"]
interval = servers[server]["interval"] if "interval" in servers[server] else 50 interval = servers[server]["interval"] if "interval" in servers[server] else 50
encoding = "UTF-8"
prefix = "." prefix = "."
rebt = "fire" rebt = "fire"
gblrebt = "all" gblrebt = "all"
lrebt = 7 + len(rebt) lrebt = 7 + len(rebt)
lgblrebt = 7 + len(gblrebt) lgblrebt = 7 + len(gblrebt)
e = encoding
adminnames = servers[server]["admins"] adminnames = servers[server]["admins"]
exitcode = f"bye {botnick.lower()}" exitcode = f"bye {botnick.lower()}"
ircmsg = "" ircmsg = ""
@ -63,6 +90,7 @@ npbase = "\[\x0303last\.fm\x03\] [A-Za-z0-9_[\]{}\\|^]{1,MAX} (is listening|last
np = re.compile( np = re.compile(
npbase.replace("MAX", f"{nicklen}") npbase.replace("MAX", f"{nicklen}")
) )
log(f"Start init for {server}")
npallowed = ["FireBitBot"] npallowed = ["FireBitBot"]
ESCAPE_SEQUENCE_RE = re.compile( ESCAPE_SEQUENCE_RE = re.compile(
r""" r"""
@ -76,40 +104,49 @@ ESCAPE_SEQUENCE_RE = re.compile(
re.UNICODE | re.VERBOSE, re.UNICODE | re.VERBOSE,
) )
def decode_escapes(s: str) -> str:
def decode_escapes(s):
def decode_match(match): def decode_match(match):
return codecs.decode(match.group(0), "unicode-escape") return codecs.decode(match.group(0), "unicode-escape")
return ESCAPE_SEQUENCE_RE.sub(decode_match, s) return ESCAPE_SEQUENCE_RE.sub(decode_match, s)
def sucheck(message): def sucheck(message: str):
return re.search("^(su|sudo|(su .*|sudo .*))$", message) return re.search("^(su|sudo|(su .*|sudo .*))$", message)
def ping(ircmsg): def send(command: str, encoding: str = "UTF-8") -> int:
return ircsock.send(bytes(command))
def recv() -> bytes:
return bytes(ircsock.recv(2048).strip(b"\r\n"))
def ping(ircmsg: str) -> int:
pong = f"PONG :{ircmsg.split('PING :')[1]}\n" pong = f"PONG :{ircmsg.split('PING :')[1]}\n"
ircsock.send(bytes(pong, e))
print(pong, end="") print(pong, end="")
return send(pong)
def sendmsg(msg, target): def sendraw(command: str) -> int:
log(f"RAW sending {command}")
command = f"{command}\n"
return send(command.replace("$BOTNICK", botnick))
def sendmsg(msg: str, target: str) -> None:
if target != "NickServ" and not mfind(msg, ["IDENTIFY"], False): if target != "NickServ" and not mfind(msg, ["IDENTIFY"], False):
print( log(f"Sending {bytes(msg).lazy_decode()} to {target}")
f"[LOG][{server}] Sending {bytes(msg.encode()).lazy_decode()} to {target}"
)
else: else:
print(f"[LOG][{server}] Identifying myself...") log("Identifying myself...")
ircsock.send(bytes(f"PRIVMSG {target} :{msg}\n", e)) send(f"PRIVMSG {target} :{msg}\n")
def notice(msg, target, silent: bool = False): def notice(msg, target, silent: bool = False):
if not silent: if not silent:
print( log(f"Sending {bytes(msg).lazy_decode()} to {target} (NOTICE)")
f"[LOG][{server}] Sending {bytes(msg.encode()).lazy_decode()} to {target} (NOTICE)" send(f"NOTICE {target} :{msg}\n")
)
ircsock.send(bytes(f"NOTICE {target} :{msg}\n", e))
def CTCPHandler(msg: str, sender: str = "", isRaw: bool = False): def CTCPHandler(msg: str, sender: str = "", isRaw: bool = False):
@ -117,7 +154,7 @@ def CTCPHandler(msg: str, sender: str = "", isRaw: bool = False):
sender = msg.split("!", 1)[0][1:] sender = msg.split("!", 1)[0][1:]
message = msg.split("PRIVMSG", 1)[1].split(":", 1)[1].strip() message = msg.split("PRIVMSG", 1)[1].split(":", 1)[1].strip()
CTCP = msg.split("\x01")[1] CTCP = msg.split("\x01")[1]
print(f"[LOG][{server}] Responding to CTCP {CTCP} from {sender}") log(f"Responding to CTCP {CTCP} from {sender}")
if CTCP == "VERSION": if CTCP == "VERSION":
notice( notice(
f"\x01VERSION FireBot {__version__} (https://git.amcforum.wiki/Firepup650/fire-ircbot)\x01", f"\x01VERSION FireBot {__version__} (https://git.amcforum.wiki/Firepup650/fire-ircbot)\x01",
@ -141,34 +178,35 @@ def CTCPHandler(msg: str, sender: str = "", isRaw: bool = False):
elif CTCP == "CLIENTINFO": elif CTCP == "CLIENTINFO":
notice(f"\x01CLIENTINFO ACTION VERSION USERINFO SOURCE FINGER", sender, True) notice(f"\x01CLIENTINFO ACTION VERSION USERINFO SOURCE FINGER", sender, True)
return True return True
log(f"Unknown CTCP {CTCP}")
return False return False
def joinserver(): def joinserver():
print(f"[LOG][{server}] Joining {server}...") log(f"Joining {server}...")
global e, nicklen, npbase, np, botnick global e, nicklen, npbase, np, botnick
ircsock.connect((address, port)) ircsock.connect((address, port))
ircsock.send(bytes(f"USER {botnick} {botnick} {botnick} {botnick}\n", e)) send(f"USER {botnick} {botnick} {botnick} {botnick}\n")
ircsock.send(bytes(f"NICK {botnick}\n", e)) send(f"NICK {botnick}\n")
ircmsg = "" ircmsg = ""
while ( while (
ircmsg.find("MODE " + botnick) == -1 and ircmsg.find("PRIVMSG " + botnick) == -1 ircmsg.find("MODE " + botnick) == -1 and ircmsg.find("PRIVMSG " + botnick) == -1
): ):
ircmsg = ircsock.recv(2048).strip(b"\r\n").decode() ircmsg = recv().decode()
if ircmsg != "": if ircmsg != "":
print(bytes(ircmsg.encode()).lazy_decode()) print(bytes(ircmsg).lazy_decode())
if ircmsg.find("NICKLEN=") != -1: if ircmsg.find("NICKLEN=") != -1:
global nicklen global nicklen
nicklen = int(ircmsg.split("NICKLEN=")[1].split(" ")[0]) nicklen = int(ircmsg.split("NICKLEN=")[1].split(" ")[0])
np = re.compile( np = re.compile(
npbase.replace("MAX", f"{nicklen}") npbase.replace("MAX", f"{nicklen}")
) )
print(f"[LOG][{server}] NICKLEN set to {nicklen}") log(f"NICKLEN set to {nicklen}")
if ircmsg.find("Nickname already in use") != -1: if ircmsg.find("Nickname") != -1:
print(f"[LOG][{server}] My nickname's in use? lemme try that again...") log(f"My nickname's in use? lemme try that again...", level = "WARN")
botnick = f"{botnick}{r.randint(0,1000)}" botnick = f"{botnick}{r.randint(0,1000)}"
ircsock.send(bytes(f"NICK {botnick}\n", e)) send(f"NICK {botnick}\n")
print(f"[LOG][{server}] botnick is now {botnick}") log(f"botnick is now {botnick}")
if ircmsg.find("PING :") != -1: if ircmsg.find("PING :") != -1:
# pong = "PONG :" + input("Ping?:") + "\n" # pong = "PONG :" + input("Ping?:") + "\n"
# pong = pong.replace("\\\\", "\\") # pong = pong.replace("\\\\", "\\")
@ -176,9 +214,9 @@ def joinserver():
if len(ircmsg.split("\x01")) == 3: if len(ircmsg.split("\x01")) == 3:
CTCPHandler(ircmsg, isRaw=True) CTCPHandler(ircmsg, isRaw=True)
if ircmsg.find("Closing Link") != -1: if ircmsg.find("Closing Link") != -1:
print(f"[LOG][{server}] I tried.") log("I tried.", level = "EXIT")
exit(f"[EXIT][{server}] Closing Link") exit("Closing Link")
print(f"[LOG][{server}] Joined {server} successfully!") log(f"Joined {server} successfully!")
def mfind(message: str, find: list, usePrefix: bool = True): def mfind(message: str, find: list, usePrefix: bool = True):
@ -189,14 +227,14 @@ def mfind(message: str, find: list, usePrefix: bool = True):
def joinchan(chan: str, origin: str, chanList: dict, lock: bool = True): def joinchan(chan: str, origin: str, chanList: dict, lock: bool = True):
print(f"[LOG][{server}] Joining {chan}...") log(f"Joining {chan}...")
chan = chan.replace(" ", "") chan = chan.replace(" ", "")
if "," in chan: if "," in chan:
chans = chan.split(",") chans = chan.split(",")
for subchan in chans: for subchan in chans:
chanList = joinchan(subchan, origin, chanList) chanList = joinchan(subchan, origin, chanList)
return chanList return chanList
if chan.startswith("0"): if chan.startswith("0") or chan == "#main":
if origin != "null": if origin != "null":
sendmsg("Refusing to join channel 0", origin) sendmsg("Refusing to join channel 0", origin)
return chanList return chanList
@ -204,28 +242,28 @@ def joinchan(chan: str, origin: str, chanList: dict, lock: bool = True):
if origin != "null": if origin != "null":
sendmsg(f"I'm already in {chan}.", origin) sendmsg(f"I'm already in {chan}.", origin)
return chanList return chanList
ircsock.send(bytes(f"JOIN {chan}\n", e)) send(f"JOIN {chan}\n")
ircmsg = "" ircmsg = ""
while True: while True:
ircmsg = ircsock.recv(2048).strip(b"\r\n").decode() ircmsg = recv().decode()
if ircmsg != "": if ircmsg != "":
print(bytes(ircmsg.encode()).lazy_decode()) print(bytes(ircmsg).lazy_decode())
if ircmsg.find("PING :") != -1: if ircmsg.find("PING :") != -1:
ping() ping()
if len(ircmsg.split("\x01")) == 3: if len(ircmsg.split("\x01")) == 3:
CTCPHandler(ircmsg, isRaw=True) CTCPHandler(ircmsg, isRaw=True)
if ircmsg.find("No such channel") != -1: if ircmsg.find("No such channel") != -1:
print(f"[LOG][{server}] Joining {chan} failed (DM)") log(f"Joining {chan} failed (DM)", level = "WARN")
if origin != "null": if origin != "null":
sendmsg(f"{chan} is an invalid channel", origin) sendmsg(f"{chan} is an invalid channel", origin)
break break
elif ircmsg.find("Cannot join channel (+i)") != -1: elif ircmsg.find("Cannot join channel (+i)") != -1:
print(f"[LOG][{server}] Joining {chan} failed (Private)") log(f"Joining {chan} failed (Private)", level = "WARN")
if origin != "null": if origin != "null":
sendmsg(f"Permission denied to channel {chan}", origin) sendmsg(f"Permission denied to channel {chan}", origin)
break break
elif ircmsg.find("End of") != -1: elif ircmsg.find("End of") != -1:
print(f"[LOG][{server}] Joining {chan} succeeded") log(f"Joining {chan} succeeded")
if origin != "null": if origin != "null":
sendmsg(f"Joined {chan}", origin) sendmsg(f"Joined {chan}", origin)
chanList[chan] = 0 chanList[chan] = 0
@ -235,14 +273,14 @@ def joinchan(chan: str, origin: str, chanList: dict, lock: bool = True):
def op(name, chan): def op(name, chan):
if name != "": if name != "":
print(f"[LOG][{server}] Attempting op of {name}...") log(f"Attempting op of {name}...")
ircsock.send(bytes(f"MODE {chan} +o {name}\n", e)) send(f"MODE {chan} +o {name}\n")
def main(): def main():
try: try:
global ircmsg, channels, e, gmode, prefix, rebt, gblrebt, lrebt, lgblrebt, blanks global ircmsg, channels, e, gmode, prefix, rebt, gblrebt, lrebt, lgblrebt, blanks
print(f"[LOG][{server}] Starting connection..") log("Starting connection..")
joinserver() joinserver()
if "pass" in servers[server]: if "pass" in servers[server]:
sendmsg(f"IDENTIFY FireBot {servers[server]['pass']}", "NickServ") sendmsg(f"IDENTIFY FireBot {servers[server]['pass']}", "NickServ")
@ -251,10 +289,10 @@ def main():
joinchan(chan, "null", channels, False) joinchan(chan, "null", channels, False)
while 1: while 1:
global ircmsg, gmode global ircmsg, gmode
raw = bytes(ircsock.recv(2048).strip(b"\r\n")) raw = recv()
ircmsg = raw.decode() ircmsg = raw.decode()
if ircmsg == "": if ircmsg == "":
exit(f"[EXIT][{server}] Probably a netsplit") exit("Probably a netsplit")
else: else:
print(raw.lazy_decode(), sep="\n") print(raw.lazy_decode(), sep="\n")
if ircmsg.find("PRIVMSG") != -1: if ircmsg.find("PRIVMSG") != -1:
@ -281,24 +319,18 @@ def main():
if name.endswith("dsc"): if name.endswith("dsc"):
helpErr = True helpErr = True
chan = ircmsg.split("PRIVMSG", 1)[1].split(":", 1)[0].strip() chan = ircmsg.split("PRIVMSG", 1)[1].split(":", 1)[0].strip()
print( log(f'Got "{bytes(message).lazy_decode()}" from "{name}" in "{chan}"')
f'[LOG][{server}] Got "{bytes(message.encode()).lazy_decode()}" from "{name}" in "{chan}"'
)
if "goat" in name.lower() and gmode == True: if "goat" in name.lower() and gmode == True:
print(f"[LOG][{server}] GOAT DETECTED") log(f"GOAT DETECTED")
sendmsg("Hello Goat", chan) sendmsg("Hello Goat", chan)
gmode = False gmode = False
if len(name) < nicklen and chan in channels: if len(name) < nicklen and chan in channels:
channels[chan] += 1 channels[chan] += 1
elif len(name) > nicklen: elif len(name) > nicklen:
print( log(f"Name too long ({len(name)} > {nicklen})")
f"[LOG][{server}] Name too long ({len(name)} > {nicklen})"
)
continue continue
elif chan not in channels: elif chan not in channels:
print( log(f"Channel not in channels ({chan} not in {channels})")
f"[LOG][{server}] Channel not in channels ({chan} not in {channels})"
)
continue continue
if mfind( if mfind(
message.lower(), message.lower(),
@ -343,17 +375,17 @@ def main():
elif name.lower() in adminnames and mfind( elif name.lower() in adminnames and mfind(
message, ["goat.mode.activate"] message, ["goat.mode.activate"]
): ):
print(f"[LOG][{server}] GOAT DETECTION ACTIVATED") log(f"GOAT DETECTION ACTIVATED")
gmode = True gmode = True
elif name.lower() in adminnames and mfind( elif name.lower() in adminnames and mfind(
message, ["goat.mode.deactivate"] message, ["goat.mode.deactivate"]
): ):
print(f"[LOG][{server}] GOAT DETECTION DEACTIVATED") log(f"GOAT DETECTION DEACTIVATED")
gmode = False gmode = False
elif mfind(message, ["quote"]): elif mfind(message, ["quote"]):
r.seed() r.seed()
log = open("mastermessages.txt", "r") mm = open("mastermessages.txt", "r")
q = log.readlines() q = mm.readlines()
sel = decode_escapes( sel = decode_escapes(
str(r.sample(q, 1)) str(r.sample(q, 1))
.strip("[]'") .strip("[]'")
@ -361,14 +393,14 @@ def main():
.strip('"') .strip('"')
) )
sendmsg(sel, chan) sendmsg(sel, chan)
log.close() mm.close()
elif mfind(message, ["join "]) and name.lower() in adminnames: elif mfind(message, ["join "]) and name.lower() in adminnames:
newchan = message.split(" ")[1].strip() newchan = message.split(" ")[1].strip()
channels = joinchan(newchan, chan, channels) channels = joinchan(newchan, chan, channels)
elif mfind(message, ["eightball", "8ball", "8b"]): elif mfind(message, ["eightball", "8ball", "8b"]):
if message.endswith("?"): if message.endswith("?"):
log = open("eightball.txt", "r") eb = open("eightball.txt", "r")
q = log.readlines() q = eb.readlines()
sel = ( sel = (
str(r.sample(q, 1)) str(r.sample(q, 1))
.strip("[]'") .strip("[]'")
@ -376,7 +408,7 @@ def main():
.strip('"') .strip('"')
) )
sendmsg(f"The magic eightball says: {sel}", chan) sendmsg(f"The magic eightball says: {sel}", chan)
log.close() eb.close()
else: else:
sendmsg("Please pose a Yes or No question.", chan) sendmsg("Please pose a Yes or No question.", chan)
elif ( elif (
@ -385,15 +417,17 @@ def main():
sendmsg(f"[DEBUG] NICKLEN={nicklen}", chan) sendmsg(f"[DEBUG] NICKLEN={nicklen}", chan)
sendmsg(f"[DEBUG] ADMINS={adminnames}", chan) sendmsg(f"[DEBUG] ADMINS={adminnames}", chan)
sendmsg(f"[DEBUG] CHANNELS={channels}", chan) sendmsg(f"[DEBUG] CHANNELS={channels}", chan)
elif mfind(message, ["raw ", "cmd "]) and name.lower() in adminnames:
sendraw(message.split(" ", 1)[1])
elif ( elif (
mfind(message, [f"reboot {rebt}", f"reboot {gblrebt}"], False) mfind(message, [f"reboot {rebt}", f"reboot {gblrebt}"], False)
and name.lower() in adminnames and name.lower() in adminnames
): ):
for i in channels: for i in channels:
sendmsg("Rebooting...", i) sendmsg("Rebooting...", i)
ircsock.send(bytes("QUIT :Rebooting\n", e)) send("QUIT :Rebooting\n")
__import__("os").system(f"python3 -u ircbot.py {server}") __import__("os").system(f"python3 -u ircbot.py {server}")
exit(f"[EXIT][{server}] Inner layer exited or crashed") exit("Inner layer exited or crashed")
elif ( elif (
name.lower() in adminnames name.lower() in adminnames
and message.rstrip().lower() == exitcode and message.rstrip().lower() == exitcode
@ -403,9 +437,9 @@ def main():
# print(f'[LOG][{server}] i="{i}" vs chan="{chan}"') # print(f'[LOG][{server}] i="{i}" vs chan="{chan}"')
if i != chan.strip(): if i != chan.strip():
sendmsg("goodbye... :'(", i) sendmsg("goodbye... :'(", i)
ircsock.send(bytes("QUIT :Shutting down\n", "UTF-8")) send("QUIT :Shutting down\n", "UTF-8")
print(f"[LOG][{server}] QUIT") log(f"QUIT")
exit(f"[EXIT][{server}] goodbye :'(") exit("goodbye :'(")
# raise EOFError # raise EOFError
elif sucheck(message): elif sucheck(message):
if name.lower() in adminnames: if name.lower() in adminnames:
@ -413,7 +447,7 @@ def main():
"Error - system failure, contact system operator", chan "Error - system failure, contact system operator", chan
) )
elif "bot" in name.lower(): elif "bot" in name.lower():
print(f"[LOG][{server}] lol, no.") log(f"lol, no.")
else: else:
sendmsg("Access Denied", chan) sendmsg("Access Denied", chan)
elif np.search(message) and name in npallowed: elif np.search(message) and name in npallowed:
@ -435,8 +469,8 @@ def main():
if chan in channels and channels[chan] >= interval: if chan in channels and channels[chan] >= interval:
r.seed() r.seed()
channels[chan] = 0 channels[chan] = 0
log = open("mastermessages.txt", "r") mm = open("mastermessages.txt", "r")
q = log.readlines() q = mm.readlines()
sel = decode_escapes( sel = decode_escapes(
str(r.sample(q, 1)) str(r.sample(q, 1))
.strip("[]'") .strip("[]'")
@ -444,14 +478,14 @@ def main():
.strip('"') .strip('"')
) )
sendmsg(f"[QUOTE] {sel}", chan) sendmsg(f"[QUOTE] {sel}", chan)
log.close() mm.close()
else: else:
if ircmsg.find("PING") != -1: if ircmsg.find("PING") != -1:
ping(ircmsg) ping(ircmsg)
if ircmsg.find("Closing Link") != -1: if ircmsg.find("Closing Link") != -1:
exit(f"[EXIT][{server}] I got killed :'(") exit("I got killed :'(")
if ircmsg.find("ERROR :Ping timeout: ") != -1: if ircmsg.find("ERROR :Ping timeout: ") != -1:
exit(f"[EXIT][{server} Ping timeout]") exit("Ping timeout]")
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass