FireBot/ircbot.py

481 lines
19 KiB
Python
Raw Normal View History

2023-10-23 22:10:07 +00:00
#!/usr/bin/python3
from time import sleep
2023-11-05 01:37:35 +00:00
from overrides import bytes, bbytes
2023-10-23 22:10:07 +00:00
import re, random as r, codecs
2023-11-05 01:37:35 +00:00
from sys import argv as args, exit as xit, stdout, stderr
2023-10-23 22:10:07 +00:00
from socket import socket, AF_INET, SOCK_STREAM
2023-10-23 22:43:10 +00:00
from dotenv import load_dotenv
2023-11-02 01:28:02 +00:00
from pythonlangutil.overload import Overload, signature
from datetime import datetime as dt
2023-11-05 01:37:35 +00:00
from logs import log
from subprocess import run, PIPE
from config import npbase, servers, __version__
2023-10-23 22:43:10 +00:00
2023-10-23 22:10:07 +00:00
ircsock = socket(AF_INET, SOCK_STREAM)
botnick = "FireBot"
2023-11-05 01:37:35 +00:00
server = args[1] if args else "UNSTABLE BOT MODE"
2023-11-02 01:28:02 +00:00
2023-11-05 04:17:08 +00:00
2023-11-02 01:28:02 +00:00
def exit(message: object) -> None:
2023-11-05 01:37:35 +00:00
log(message, server, "EXIT")
2023-11-02 01:28:02 +00:00
xit(1)
2023-11-05 01:37:35 +00:00
2023-11-02 01:28:02 +00:00
if __name__ == "__main__":
gmode = False
nicklen = 30
address = servers[server]["address"]
port = servers[server]["port"] if "port" in servers[server] else 6667
channels = servers[server]["channels"]
interval = servers[server]["interval"] if "interval" in servers[server] else 50
prefix = "."
rebt = "fire"
gblrebt = "all"
adminnames = servers[server]["admins"]
exitcode = f"bye {botnick.lower()}"
2023-11-05 01:37:35 +00:00
np = re.compile(npbase.replace("MAX", f"{nicklen}"))
queue = []
2023-11-05 01:37:35 +00:00
log(f"Start init for {server}", server)
npallowed = ["FireBitBot"]
2023-10-23 22:10:07 +00:00
ESCAPE_SEQUENCE_RE = re.compile(
r"""
( \\U........ # 8-digit hex escapes
| \\u.... # 4-digit hex escapes
| \\x.. # 2-digit hex escapes
| \\[0-7]{1,3} # Octal escapes
| \\N\{[^}]+\} # Unicode characters by name
| \\[\\'"abfnrtv] # Single-character escapes
)""",
re.UNICODE | re.VERBOSE,
)
2023-11-05 01:37:35 +00:00
2023-11-02 01:28:02 +00:00
def decode_escapes(s: str) -> str:
2023-10-23 22:10:07 +00:00
def decode_match(match):
return codecs.decode(match.group(0), "unicode-escape")
return ESCAPE_SEQUENCE_RE.sub(decode_match, s)
2023-11-02 01:28:02 +00:00
def sucheck(message: str):
2023-10-28 04:20:18 +00:00
return re.search("^(su|sudo|(su .*|sudo .*))$", message)
2023-10-23 22:10:07 +00:00
2023-11-05 01:37:35 +00:00
def send(command: str) -> int:
2023-11-02 01:28:02 +00:00
return ircsock.send(bytes(command))
def recv() -> bytes:
global queue
if queue:
return bytes(queue.pop(0))
data = bytes(ircsock.recv(2048).strip(b"\r\n"))
2023-11-05 01:37:35 +00:00
if b"\r\n" in data:
queue.extend(data.split(b"\r\n"))
return bytes(queue.pop(0))
return data
2023-11-02 01:28:02 +00:00
def ping(ircmsg: str) -> int:
pong = f"PONG :{ircmsg.split('PING :')[1]}\n"
print(pong, end="")
2023-11-02 01:28:02 +00:00
return send(pong)
def sendraw(command: str) -> int:
2023-11-05 01:37:35 +00:00
log(f"RAW sending {command}", server)
2023-11-02 01:28:02 +00:00
command = f"{command}\n"
return send(command.replace("$BOTNICK", botnick))
2023-11-02 01:28:02 +00:00
def sendmsg(msg: str, target: str) -> None:
if target != "NickServ" and not mfind(msg, ["IDENTIFY"], False):
2023-11-05 01:37:35 +00:00
log(f"Sending {bytes(msg).lazy_decode()} to {target}", server)
else:
2023-11-05 01:37:35 +00:00
log("Identifying myself...", server)
2023-11-02 01:28:02 +00:00
send(f"PRIVMSG {target} :{msg}\n")
def notice(msg, target, silent: bool = False):
if not silent:
2023-11-05 01:37:35 +00:00
log(f"Sending {bytes(msg).lazy_decode()} to {target} (NOTICE)", server)
2023-11-02 01:28:02 +00:00
send(f"NOTICE {target} :{msg}\n")
2023-11-05 04:17:08 +00:00
2023-11-05 01:37:35 +00:00
"{fg"
2023-11-05 04:17:08 +00:00
def CTCPHandler(msg: str, sender: str = "", isRaw: bool = False):
if isRaw:
sender = msg.split("!", 1)[0][1:]
message = msg.split("PRIVMSG", 1)[1].split(":", 1)[1].strip()
2023-11-05 01:37:35 +00:00
CTCP = msg.split("\x01")[1].split(" ", 1)[0]
log(f"Responding to CTCP {CTCP} from {sender}", server)
if CTCP == "VERSION":
notice(
f"\x01VERSION FireBot {__version__} (https://git.amcforum.wiki/Firepup650/fire-ircbot)\x01",
sender,
True,
)
2023-10-24 01:35:50 +00:00
return True
elif CTCP == "USERINFO":
notice("\x01USERINFO FireBot (Firepup's bot)\x01", sender, True)
2023-10-24 01:35:50 +00:00
return True
elif CTCP == "SOURCE":
notice(
2023-10-28 04:20:18 +00:00
"\x01SOURCE https://git.amcforum.wiki/Firepup650/fire-ircbot\x01",
sender,
True,
)
2023-10-24 01:35:50 +00:00
return True
elif CTCP == "FINGER":
2023-11-05 01:37:35 +00:00
notice("\x01FINGER Firepup's bot\x01", sender, True)
2023-10-24 01:35:50 +00:00
return True
elif CTCP == "CLIENTINFO":
2023-11-05 01:44:45 +00:00
notice("\x01CLIENTINFO ACTION VERSION USERINFO SOURCE FINGER\x01", sender, True)
2023-10-24 01:35:50 +00:00
return True
2023-11-05 01:37:35 +00:00
log(f"Unknown CTCP {CTCP}", server)
2023-10-24 01:35:50 +00:00
return False
2023-10-23 22:10:07 +00:00
def joinserver():
2023-11-05 01:37:35 +00:00
log(f"Joining {server}...", server)
global nicklen, npbase, np, botnick
2023-10-23 22:10:07 +00:00
ircsock.connect((address, port))
2023-11-02 01:28:02 +00:00
send(f"USER {botnick} {botnick} {botnick} {botnick}\n")
send(f"NICK {botnick}\n")
2023-10-23 22:10:07 +00:00
ircmsg = ""
while (
ircmsg.find("MODE " + botnick) == -1 and ircmsg.find("PRIVMSG " + botnick) == -1
):
2023-11-02 01:28:02 +00:00
ircmsg = recv().decode()
2023-10-23 22:10:07 +00:00
if ircmsg != "":
2023-11-02 01:28:02 +00:00
print(bytes(ircmsg).lazy_decode())
2023-10-23 22:10:07 +00:00
if ircmsg.find("NICKLEN=") != -1:
global nicklen
nicklen = int(ircmsg.split("NICKLEN=")[1].split(" ")[0])
2023-11-05 01:37:35 +00:00
np = re.compile(npbase.replace("MAX", f"{nicklen}"))
log(f"NICKLEN set to {nicklen}", server)
2023-11-02 01:28:02 +00:00
if ircmsg.find("Nickname") != -1:
2023-11-05 01:37:35 +00:00
log("Nickname in use", server, "WARN")
botnick = f"{botnick}{r.randint(0,1000)}"
2023-11-02 01:28:02 +00:00
send(f"NICK {botnick}\n")
2023-11-05 01:37:35 +00:00
log(f"botnick is now {botnick}", server)
if ircmsg.startswith("PING "):
2023-10-23 22:10:07 +00:00
# pong = "PONG :" + input("Ping?:") + "\n"
# pong = pong.replace("\\\\", "\\")
ping(ircmsg)
if len(ircmsg.split("\x01")) == 3:
CTCPHandler(ircmsg, isRaw=True)
2023-10-23 22:10:07 +00:00
if ircmsg.find("Closing Link") != -1:
2023-11-05 01:37:35 +00:00
log("I tried.", server, "EXIT")
2023-11-02 01:28:02 +00:00
exit("Closing Link")
2023-11-05 01:37:35 +00:00
log(f"Joined {server} successfully!", server)
2023-10-23 22:10:07 +00:00
def mfind(message: str, find: list, usePrefix: bool = True):
if usePrefix:
return any(message[: len(match) + 1] == prefix + match for match in find)
else:
return any(message[: len(match)] == match for match in find)
def joinchan(chan: str, origin: str, chanList: dict, lock: bool = True):
2023-11-05 01:37:35 +00:00
log(f"Joining {chan}...", server)
2023-10-23 22:10:07 +00:00
chan = chan.replace(" ", "")
if "," in chan:
chans = chan.split(",")
for subchan in chans:
chanList = joinchan(subchan, origin, chanList)
return chanList
2023-11-02 01:37:37 +00:00
if chan.startswith("0") or (chan == "#main" and lock):
2023-10-23 22:10:07 +00:00
if origin != "null":
sendmsg("Refusing to join channel 0", origin)
return chanList
if chan in channels and lock:
if origin != "null":
sendmsg(f"I'm already in {chan}.", origin)
return chanList
2023-11-02 01:28:02 +00:00
send(f"JOIN {chan}\n")
2023-10-23 22:10:07 +00:00
ircmsg = ""
while True:
2023-11-02 01:28:02 +00:00
ircmsg = recv().decode()
2023-10-23 22:10:07 +00:00
if ircmsg != "":
2023-11-02 01:28:02 +00:00
print(bytes(ircmsg).lazy_decode())
if ircmsg.startswith("PING "):
ping(ircmsg)
if len(ircmsg.split("\x01")) == 3:
CTCPHandler(ircmsg, isRaw=True)
2023-10-23 22:10:07 +00:00
if ircmsg.find("No such channel") != -1:
2023-11-05 01:37:35 +00:00
log(f"Joining {chan} failed (DM)", server, "WARN")
2023-10-23 22:10:07 +00:00
if origin != "null":
sendmsg(f"{chan} is an invalid channel", origin)
break
elif ircmsg.find("Cannot join channel (+i)") != -1:
2023-11-05 01:37:35 +00:00
log(f"Joining {chan} failed (Private)", server, "WARN")
2023-10-23 22:10:07 +00:00
if origin != "null":
sendmsg(f"Permission denied to channel {chan}", origin)
break
elif ircmsg.find("End of") != -1:
2023-11-05 01:37:35 +00:00
log(f"Joining {chan} succeeded", server)
2023-10-23 22:10:07 +00:00
if origin != "null":
sendmsg(f"Joined {chan}", origin)
chanList[chan] = 0
break
return chanList
def op(name, chan):
if name != "":
2023-11-05 01:37:35 +00:00
log(f"Attempting op of {name} in {chan}...", server)
2023-11-02 01:28:02 +00:00
send(f"MODE {chan} +o {name}\n")
2023-10-23 22:10:07 +00:00
def main():
try:
2023-11-06 03:19:26 +00:00
global channels, e, gmode, prefix, rebt, gblrebt
2023-11-05 01:37:35 +00:00
log("Starting connection..", server)
2023-10-23 22:10:07 +00:00
joinserver()
if "pass" in servers[server]:
sendmsg(f"IDENTIFY FireBot {servers[server]['pass']}", "NickServ")
2023-10-23 22:10:07 +00:00
sleep(0.5)
for chan in channels:
joinchan(chan, "null", channels, False)
while 1:
2023-11-05 02:45:21 +00:00
global gmode
2023-11-02 01:28:02 +00:00
raw = recv()
2023-10-23 22:10:07 +00:00
ircmsg = raw.decode()
2023-10-28 04:20:18 +00:00
if ircmsg == "":
2023-11-02 01:28:02 +00:00
exit("Probably a netsplit")
2023-10-28 04:20:18 +00:00
else:
2023-10-23 22:10:07 +00:00
print(raw.lazy_decode(), sep="\n")
2023-11-05 02:45:21 +00:00
action = "Unknown"
try:
action = ircmsg.split(" ", 2)[1].strip()
except IndexError:
pass
if action == "PRIVMSG":
# Format of ":[Nick]![ident]@[host|vhost] PRIVMSG [channel] :[message]”
2023-10-28 04:20:18 +00:00
name = ircmsg.split("!", 1)[0][1:]
helpErr = False
if (name.startswith("saxjax") and server == "efnet") or (
name == "ReplIRC" and server == "replirc"
):
if ircmsg.find("<") != -1 and ircmsg.find(">") != -1:
Nname = ircmsg.split("<", 1)[1].split(">", 1)[0].strip()
if name == "ReplIRC":
name = Nname[4:]
else:
name = Nname
message = ircmsg.split(">", 1)[1].strip()
helpErr = True
2023-10-23 22:10:07 +00:00
else:
2023-10-28 04:20:18 +00:00
message = (
ircmsg.split("PRIVMSG", 1)[1].split(":", 1)[1].strip()
)
2023-10-23 22:10:07 +00:00
else:
message = ircmsg.split("PRIVMSG", 1)[1].split(":", 1)[1].strip()
2023-10-28 04:20:18 +00:00
if name.endswith("dsc"):
helpErr = True
chan = ircmsg.split("PRIVMSG", 1)[1].split(":", 1)[0].strip()
2023-11-05 01:37:35 +00:00
log(
f'Got "{bytes(message).lazy_decode()}" from "{name}" in "{chan}"',
2023-11-05 04:17:08 +00:00
server,
2023-11-05 01:37:35 +00:00
)
2023-10-28 04:20:18 +00:00
if "goat" in name.lower() and gmode == True:
2023-11-05 01:37:35 +00:00
log("GOAT DETECTED", server)
2023-10-28 04:20:18 +00:00
sendmsg("Hello Goat", chan)
gmode = False
2023-11-05 01:37:35 +00:00
if len(name) > nicklen:
log(f"Name too long ({len(name)} > {nicklen})", server)
2023-10-28 04:20:18 +00:00
continue
2023-11-05 02:20:18 +00:00
elif chan == botnick:
2023-11-05 04:17:08 +00:00
pass # TODO: Somehow combine into other checks
2023-10-28 04:20:18 +00:00
elif chan not in channels:
2023-11-05 04:17:08 +00:00
log(
f"Channel not in channels ({chan} not in {channels})",
server,
)
2023-10-28 04:20:18 +00:00
continue
2023-11-05 01:37:35 +00:00
else:
channels[chan] += 1
2023-10-28 04:20:18 +00:00
if mfind(
message.lower(),
["!botlist"],
False,
):
2023-11-05 04:17:08 +00:00
sendmsg(
f"Hi! I'm FireBot (https://git.amcforum.wiki/Firepup650/fire-ircbot)! My admins on this server are {adminnames}.",
chan,
)
if mfind(
message.lower(),
["bugs bugs bugs"],
False,
):
2023-11-05 04:17:08 +00:00
sendmsg(
f"\x01ACTION realizes {name} looks like a bug, and squashes {name}\x01",
chan,
)
if mfind(
message.lower(),
2023-10-28 04:20:18 +00:00
[f"hi {botnick.lower()}", f"hello {botnick.lower()}"],
False,
):
sendmsg(f"Hello {name}!", chan)
elif (
mfind(message, ["op me"], False) and name.lower() in adminnames
):
op(name, chan)
elif mfind(message, ["ping"]):
sendmsg(
2023-11-05 23:58:28 +00:00
f"{name}: pong",
chan,
)
elif mfind(message, ["uptime"]):
2023-11-06 00:51:50 +00:00
uptime = (
run(["uptime", "-p"], stdout=PIPE).stdout.decode().strip()
)
sendmsg(
f"Uptime: {uptime}",
chan,
)
2023-10-28 04:20:18 +00:00
elif mfind(message, ["amIAdmin"]):
2023-10-23 22:10:07 +00:00
sendmsg(
2023-10-28 04:20:18 +00:00
f"{name.lower()} in {adminnames} == {name.lower() in adminnames}",
chan,
2023-10-23 22:10:07 +00:00
)
2023-10-28 04:20:18 +00:00
elif mfind(message, ["help"]):
if not helpErr:
2023-11-05 01:55:04 +00:00
sendmsg("Command list needs rework", name)
continue
2023-10-28 04:20:18 +00:00
sendmsg("List of commands:", name)
sendmsg(f'Current prefix is "{prefix}"', name)
sendmsg(f"{prefix}help - Sends this help list", name)
sendmsg(
f"{prefix}quote - Sends a random firepup quote", name
)
2023-10-23 22:10:07 +00:00
sendmsg(
2023-10-28 04:20:18 +00:00
f"{prefix}(eightball,8ball,8b) [question]? - Asks the magic eightball a question",
2023-10-23 22:10:07 +00:00
name,
)
2023-10-28 04:20:18 +00:00
sendmsg(
f"(hi,hello) {botnick} - The bot says hi to you", name
)
if name.lower() in adminnames:
sendmsg(f"reboot {rebt} - Restarts the bot", name)
sendmsg(exitcode + " - Shuts down the bot", name)
sendmsg("op me - Makes the bot try to op you", name)
sendmsg(
f"{prefix}join [channel(s)] - Joins the bot to the specified channel(s)",
name,
)
else:
sendmsg("Sorry, I can't send help to bridged users.", chan)
elif name.lower() in adminnames and mfind(
message, ["goat.mode.activate"]
):
2023-11-05 01:37:35 +00:00
log("GOAT DETECTION ACTIVATED", server)
2023-10-28 04:20:18 +00:00
gmode = True
elif name.lower() in adminnames and mfind(
message, ["goat.mode.deactivate"]
):
2023-11-05 01:37:35 +00:00
log("GOAT DETECTION DEACTIVATED", server)
2023-10-28 04:20:18 +00:00
gmode = False
elif mfind(message, ["quote"]):
r.seed()
2023-11-02 01:28:02 +00:00
mm = open("mastermessages.txt", "r")
q = mm.readlines()
2023-10-28 04:20:18 +00:00
sel = decode_escapes(
2023-10-23 22:10:07 +00:00
str(r.sample(q, 1))
.strip("[]'")
.replace("\\n", "")
.strip('"')
)
2023-10-28 04:20:18 +00:00
sendmsg(sel, chan)
2023-11-02 01:28:02 +00:00
mm.close()
2023-10-28 04:20:18 +00:00
elif mfind(message, ["join "]) and name.lower() in adminnames:
newchan = message.split(" ")[1].strip()
channels = joinchan(newchan, chan, channels)
elif mfind(message, ["eightball", "8ball", "8b"]):
if message.endswith("?"):
2023-11-02 01:28:02 +00:00
eb = open("eightball.txt", "r")
q = eb.readlines()
2023-10-28 04:20:18 +00:00
sel = (
str(r.sample(q, 1))
.strip("[]'")
.replace("\\n", "")
.strip('"')
)
sendmsg(f"The magic eightball says: {sel}", chan)
2023-11-02 01:28:02 +00:00
eb.close()
2023-10-28 04:20:18 +00:00
else:
sendmsg("Please pose a Yes or No question.", chan)
elif (
mfind(message, ["debug", "dbg"]) and name.lower() in adminnames
):
sendmsg(f"[DEBUG] NICKLEN={nicklen}", chan)
sendmsg(f"[DEBUG] ADMINS={adminnames}", chan)
sendmsg(f"[DEBUG] CHANNELS={channels}", chan)
2023-11-05 01:37:35 +00:00
elif (
mfind(message, ["raw ", "cmd "]) and name.lower() in adminnames
):
2023-11-02 01:28:02 +00:00
sendraw(message.split(" ", 1)[1])
2023-10-28 04:20:18 +00:00
elif (
2023-11-06 03:13:59 +00:00
mfind(message, [f"reboot {rebt}", f"reboot {gblrebt}"], False)
or mfind(message, ["restart", "reboot"])
) and name.lower() in adminnames:
2023-11-02 01:28:02 +00:00
send("QUIT :Rebooting\n")
2023-11-05 02:22:42 +00:00
exit("Reboot")
2023-10-28 04:20:18 +00:00
elif sucheck(message):
if name.lower() in adminnames:
sendmsg(
"Error - system failure, contact system operator", chan
)
elif "bot" in name.lower():
2023-11-05 01:37:35 +00:00
log("lol, no.", server)
2023-10-28 04:20:18 +00:00
else:
sendmsg("Access Denied", chan)
elif np.search(message) and name in npallowed:
2023-10-28 04:20:18 +00:00
x02 = "\x02"
sendmsg(
f"f.sp {message.split(':')[1].split('(')[0].strip(f' {x02}')}",
chan,
)
elif len(message.split("\x01")) == 3:
if not CTCPHandler(message, name):
CTCP = message.split("\x01")[1]
if CTCP == "ACTION ducks":
sendmsg("\x01ACTION gets hit by a duck\x01", chan)
elif CTCP.startswith("ACTION ducks"):
sendmsg(
f"\x01ACTION gets hit by {CTCP.split(' ', 2)[2]}\x01",
chan,
)
if chan in channels and channels[chan] >= interval:
r.seed()
channels[chan] = 0
2023-11-02 01:28:02 +00:00
mm = open("mastermessages.txt", "r")
q = mm.readlines()
2023-10-28 04:20:18 +00:00
sel = decode_escapes(
str(r.sample(q, 1))
.strip("[]'")
.replace("\\n", "")
.strip('"')
)
sendmsg(f"[QUOTE] {sel}", chan)
2023-11-02 01:28:02 +00:00
mm.close()
2023-10-28 04:20:18 +00:00
else:
if ircmsg.startswith("PING "):
2023-10-28 04:20:18 +00:00
ping(ircmsg)
2023-11-05 01:37:35 +00:00
elif ircmsg.startswith("ERROR :Closing Link"):
2023-11-02 01:28:02 +00:00
exit("I got killed :'(")
2023-11-05 01:37:35 +00:00
elif ircmsg.startswith("ERROR :Ping "):
exit("Ping timeout")
2023-10-23 22:10:07 +00:00
except KeyboardInterrupt:
pass
if __name__ == "__main__":
main()