FireBot/bot.py

306 lines
13 KiB
Python
Raw Normal View History

2023-11-05 01:37:35 +00:00
#!/usr/bin/python3
from socket import socket, AF_INET, SOCK_STREAM
from overrides import bytes, bbytes
from logs import log
import re
2023-11-07 13:25:53 +00:00
from typing import NoReturn, Union
2023-11-07 13:53:14 +00:00
from config import npbase, servers, __version__, prefix
2023-11-07 00:18:52 +00:00
import commands as cmds
2023-11-07 13:53:14 +00:00
from config import decode_escapes, servers
from time import sleep
2023-11-06 00:50:04 +00:00
2023-11-07 13:25:53 +00:00
def mfind(message: str, find: list, usePrefix: bool = True) -> bool:
if usePrefix:
return any(message[: len(match) + 1] == prefix + match for match in find)
else:
return any(message[: len(match)] == match for match in find)
2023-11-05 01:37:35 +00:00
class bot:
2023-11-06 22:23:31 +00:00
def __init__(self, server: str):
2023-11-05 01:37:35 +00:00
self.gmode = False
self.server = server
self.nicklen = 30
self.address = servers[server]["address"]
self.port = servers[server]["port"] if "port" in servers[server] else 6667
self.channels = servers[server]["channels"]
self.interval = (
servers[server]["interval"] if "interval" in servers[server] else 50
)
2023-11-06 03:08:47 +00:00
self.nick = "FireBot"
2023-11-05 01:37:35 +00:00
self.rebt = "fire"
self.gblrebt = "all"
self.adminnames = servers[server]["admins"]
2023-11-06 03:08:47 +00:00
self.exitcode = f"bye {self.nick.lower()}"
2023-11-05 01:37:35 +00:00
self.queue = []
2023-11-06 02:36:33 +00:00
self.sock = socket(AF_INET, SOCK_STREAM)
2023-11-07 13:53:14 +00:00
self.log(f"Start init for {self.server}")
2023-11-05 01:37:35 +00:00
2023-11-06 00:49:57 +00:00
def connect(self) -> None:
2023-11-07 13:53:14 +00:00
self.log(f"Joining {self.server}...")
2023-11-06 02:36:33 +00:00
self.sock.connect((self.address, self.port))
2023-11-06 03:08:47 +00:00
self.send(f"USER {self.nick} {self.nick} {self.nick} {self.nick}\n")
self.send(f"NICK {self.nick}\n")
ircmsg = ""
2023-11-06 00:49:57 +00:00
while (
ircmsg.find(f"MODE {self.nick}") == -1
and ircmsg.find(f"PRIVMSG {self.nick}") == -1
2023-11-06 00:49:57 +00:00
):
ircmsg = self.recv().decode()
if ircmsg != "":
print(bytes(ircmsg).lazy_decode())
if ircmsg.find("NICKLEN=") != -1:
self.nicklen = int(ircmsg.split("NICKLEN=")[1].split(" ")[0])
2023-11-07 13:53:14 +00:00
self.log(f"NICKLEN set to {self.nicklen}")
2023-11-06 02:05:34 +00:00
elif ircmsg.find("Nickname") != -1:
2023-11-06 02:24:43 +00:00
self.log("Nickname in use", "WARN")
2023-11-06 03:08:47 +00:00
self.nick = f"{self.nick}{r.randint(0,1000)}"
self.send(f"NICK {self.nick}\n")
self.log(f"nick is now {self.nick}")
2023-11-06 02:05:34 +00:00
elif ircmsg.startswith("PING "):
2023-11-06 00:49:57 +00:00
self.ping(ircmsg)
2023-11-06 02:05:34 +00:00
elif len(ircmsg.split("\x01")) == 3:
2023-11-06 00:49:57 +00:00
self.CTCPHandler(ircmsg, isRaw=True)
2023-11-06 02:05:34 +00:00
elif ircmsg.find("Closing Link") != -1:
2023-11-06 00:49:57 +00:00
self.exit("Closing Link")
else:
self.exit("Lost connection to the server")
2023-11-07 13:53:14 +00:00
self.log(f"Joined {self.server} successfully!")
2023-11-06 00:49:57 +00:00
2023-11-07 00:18:52 +00:00
def join(self, chan: str, origin: str, lock: bool = True) -> None:
2023-11-06 22:20:54 +00:00
self.log(f"Joining {chan}...")
2023-11-06 03:26:41 +00:00
chan = chan.replace(" ", "")
if "," in chan:
chans = chan.split(",")
for subchan in chans:
2023-11-06 22:20:54 +00:00
self.join(subchan, origin)
2023-11-06 03:26:41 +00:00
return
if chan.startswith("0") or (chan == "#main" and lock):
if origin != "null":
2023-11-06 22:20:54 +00:00
self.sendmsg(f"Refusing to join channel {chan} (protected)", origin)
2023-11-06 03:26:41 +00:00
return
2023-11-07 13:53:14 +00:00
if chan in self.channels and lock:
2023-11-06 03:26:41 +00:00
if origin != "null":
2023-11-06 22:20:54 +00:00
self.sendmsg(f"I'm already in {chan}.", origin)
2023-11-06 03:26:41 +00:00
return
2023-11-06 22:20:54 +00:00
self.send(f"JOIN {chan}\n")
2023-11-06 03:26:41 +00:00
while True:
ircmsg = self.recv().decode()
if ircmsg != "":
code = 0
try:
code = int(ircmsg.split(" ", 2)[1].strip())
except (IndexError, ValueError):
pass
2023-11-06 03:26:41 +00:00
print(bytes(ircmsg).lazy_decode())
if ircmsg.startswith("PING "):
self.ping(ircmsg)
2023-11-06 03:26:41 +00:00
elif len(ircmsg.split("\x01")) == 3:
2023-11-06 22:20:54 +00:00
self.CTCP(ircmsg, isRaw=True)
elif code == 403:
2023-11-06 03:26:41 +00:00
self.log(f"Joining {chan} failed", "WARN")
if origin != "null":
sendmsg(f"{chan} is an invalid channel", origin)
break
elif code == 473:
self.log(f"Joining {chan} failed (+i)", "WARN")
2023-11-06 03:26:41 +00:00
if origin != "null":
2023-11-06 22:20:54 +00:00
self.sendmsg(f"{chan} is +i, and I'm not invited.", origin)
2023-11-06 03:26:41 +00:00
break
2023-11-06 22:04:46 +00:00
elif code == 520:
self.log(f"Joining {chan} failed (+O)", "WARN")
if origin != "null":
2023-11-06 22:20:54 +00:00
self.sendmsg(f"{chan} is +O, and I'm not an operator.", origin)
elif code == 366:
2023-11-07 13:53:14 +00:00
self.log(f"Joining {chan} succeeded")
2023-11-06 03:26:41 +00:00
if origin != "null":
2023-11-06 22:20:54 +00:00
self.sendmsg(f"Joined {chan}", origin)
2023-11-06 03:26:41 +00:00
self.channels[chan] = 0
break
2023-11-07 13:59:01 +00:00
def ping(self, ircmsg: str) -> int:
pong = f"PONG :{ircmsg.split('PING :')[1]}\n"
print(pong, end="")
return self.send(pong)
2023-11-05 01:37:35 +00:00
def send(self, command: str) -> int:
2023-11-06 22:20:54 +00:00
return self.sock.send(bytes(command))
2023-11-05 01:37:35 +00:00
def recv(self) -> bytes:
2023-11-05 22:26:50 +00:00
if self.queue:
return bytes(self.queue.pop(0))
2023-11-06 02:36:33 +00:00
data = bytes(self.sock.recv(2048).strip(b"\r\n"))
2023-11-05 22:26:50 +00:00
if b"\r\n" in data:
self.queue.extend(data.split(b"\r\n"))
return bytes(self.queue.pop(0))
return data
2023-11-05 01:37:35 +00:00
2023-11-06 02:24:43 +00:00
def log(self, message: object, level: str = "LOG") -> None:
2023-11-05 01:37:35 +00:00
log(message, self.server)
2023-11-06 22:20:54 +00:00
def exit(self, message: object) -> NoReturn:
2023-11-05 01:37:35 +00:00
log(message, self.server, "EXIT")
exit(1)
2023-11-06 06:29:00 +00:00
def CTCP(self, msg: str, sender: str = "", isRaw: bool = False) -> bool:
2023-11-06 06:29:00 +00:00
if isRaw:
sender = msg.split("!", 1)[0][1:]
message = msg.split("PRIVMSG", 1)[1].split(":", 1)[1].strip()
kind = msg.split("\x01")[1].split(" ", 1)[0]
2023-11-07 13:25:53 +00:00
self.log(f"Responding to CTCP \"{kind}\" from {sender}")
if kind == "VERSION":
self.notice(
2023-11-06 06:29:00 +00:00
f"\x01VERSION FireBot {__version__} (https://git.amcforum.wiki/Firepup650/fire-ircbot)\x01",
sender,
True,
)
return True
elif kind == "USERINFO":
self.notice("\x01USERINFO FireBot (Firepup's bot)\x01", sender, True)
2023-11-06 06:29:00 +00:00
return True
elif kind == "SOURCE":
self.notice(
2023-11-06 06:29:00 +00:00
"\x01SOURCE https://git.amcforum.wiki/Firepup650/fire-ircbot\x01",
sender,
True,
)
return True
elif kind == "FINGER":
self.notice("\x01FINGER Firepup's bot\x01", sender, True)
2023-11-06 06:29:00 +00:00
return True
elif kind == "CLIENTINFO":
self.notice(
2023-11-06 06:33:11 +00:00
"\x01CLIENTINFO ACTION VERSION USERINFO SOURCE FINGER\x01", sender, True
)
2023-11-06 06:29:00 +00:00
return True
2023-11-07 13:25:53 +00:00
self.log(f"Unknown CTCP \"{kind}\"", "WARN")
2023-11-06 06:29:00 +00:00
return False
2023-11-07 00:18:52 +00:00
2023-11-07 13:25:53 +00:00
def msg(self, msg: str, target: str) -> None:
if not (target == "NickServ" and mfind(msg, ["IDENTIFY"], False)):
self.log(f"Sending {bytes(msg).lazy_decode()} to {target}")
else:
self.log("Identifying myself...")
self.send(f"PRIVMSG {target} :{msg}\n")
def op(self, name: str, chan: str) -> Union[int, None]:
if name != "":
self.log(f"Attempting op of {name} in {chan}...")
return self.send(f"MODE {chan} +o {name}\n")
def notice(self, msg: str, target: str, silent: bool = False) -> int:
if not silent:
self.log(f"Sending {bytes(msg).lazy_decode()} to {target} (NOTICE)")
return self.send(f"NOTICE {target} :{msg}\n")
def sendraw(self, command: str) -> int:
self.log(f"RAW sending {command}")
command = f"{command}\n"
return self.send(command.replace("$BOTNICK", botnick))
2023-11-07 00:18:52 +00:00
def mainloop(self) -> NoReturn:
self.log("Starting connection..")
self.connect()
if "pass" in servers[self.server]:
self.msg(f"IDENTIFY FireBot {servers[self.server]['pass']}", "NickServ")
sleep(0.5)
for chan in self.channels:
self.join(chan, "null", False)
while 1:
raw = self.recv()
ircmsg = raw.decode()
if ircmsg == "":
2023-11-07 13:59:01 +00:00
self.exit("Probably a netsplit")
2023-11-07 00:18:52 +00:00
else:
print(raw.lazy_decode(), sep="\n")
action = "Unknown"
try:
action = ircmsg.split(" ", 2)[1].strip()
except IndexError:
pass
if action == "PRIVMSG":
# Format of ":[Nick]![ident]@[host|vhost] PRIVMSG [channel] :[message]”
name = ircmsg.split("!", 1)[0][1:]
helpErr = False
if (name.startswith("saxjax") and server == "efnet") or (
name == "ReplIRC" and server == "replirc"
):
if ircmsg.find("<") != -1 and ircmsg.find(">") != -1:
Nname = ircmsg.split("<", 1)[1].split(">", 1)[0].strip()
if name == "ReplIRC":
name = Nname[4:]
else:
name = Nname
message = ircmsg.split(">", 1)[1].strip()
helpErr = True
else:
message = (
ircmsg.split("PRIVMSG", 1)[1].split(":", 1)[1].strip()
)
else:
message = ircmsg.split("PRIVMSG", 1)[1].split(":", 1)[1].strip()
if name.endswith("dsc"):
helpErr = True
chan = ircmsg.split("PRIVMSG", 1)[1].split(":", 1)[0].strip()
self.log(
f'Got "{bytes(message).lazy_decode()}" from "{name}" in "{chan}"',
)
2023-11-07 13:53:14 +00:00
if len(name) > self.nicklen:
self.log(f"Name too long ({len(name)} > {self.nicklen})")
2023-11-07 00:18:52 +00:00
continue
2023-11-07 13:53:14 +00:00
elif chan not in self.channels:
2023-11-07 00:18:52 +00:00
self.log(
2023-11-07 13:53:14 +00:00
f"Channel not in channels ({chan} not in {self.channels})",
2023-11-07 05:19:59 +00:00
"WARN",
2023-11-07 00:18:52 +00:00
)
else:
2023-11-07 13:53:14 +00:00
self.channels[chan] += 1
2023-11-07 00:18:52 +00:00
if "goat" in name.lower() and self.gmode == True:
cmds.goat(self, chan)
handled = False
for cmd in cmds.data:
2023-11-07 13:53:14 +00:00
triggers = [cmd]
triggers.extend(cmds.data[cmd]["aliases"])
triggers = list(call.replace("$BOTNICK", self.nick) for call in triggers)
2023-11-07 05:19:59 +00:00
if mfind(
message,
2023-11-07 13:53:14 +00:00
triggers,
2023-11-07 05:19:59 +00:00
cmds.data[cmd]["prefix"],
):
2023-11-07 13:53:14 +00:00
cmds.call[cmd](self, chan, name, message)
2023-11-07 00:18:52 +00:00
handled = True
break
2023-11-07 05:19:40 +00:00
if not handled:
for check in cmds.checks:
2023-11-07 05:19:59 +00:00
if re.search(
2023-11-07 13:53:14 +00:00
check.replace("$MAX", str(self.nicklen)).replace(
2023-11-07 05:19:59 +00:00
"$BOTNICK", self.nick
),
message,
):
2023-11-07 05:19:40 +00:00
cmds.call[check](self, chan, name, message)
handled = True
break
if not handled and len(message.split("\x01")) == 3:
if not self.CTCP(message, name):
CTCP = message.split("\x01")[1]
if CTCP == "ACTION ducks":
self.msg("\x01ACTION gets hit by a duck\x01", chan)
elif CTCP.startswith("ACTION ducks"):
self.msg(
f"\x01ACTION gets hit by {CTCP.split(' ', 2)[2]}\x01",
chan,
)
if chan in self.channels and self.channels[chan] >= self.interval:
r.seed()
self.channels[chan] = 0
mm = open("mastermessages.txt", "r")
q = mm.readlines()
sel = decode_escapes(
str(r.sample(q, 1))
.strip("[]'")
.replace("\\n", "")
.strip('"')
)
self.msg(f"[QUOTE] {sel}", chan)
mm.close()