forked from Firepup650/FireBot
368 lines
15 KiB
Python
368 lines
15 KiB
Python
#!/usr/bin/python3
|
|
from socket import socket, AF_INET, SOCK_STREAM
|
|
from overrides import bytes, bbytes
|
|
import logs
|
|
import re
|
|
from typing import NoReturn, Union
|
|
import commands as cmds
|
|
import config as conf
|
|
from time import sleep
|
|
from importlib import reload
|
|
import random as r
|
|
|
|
|
|
def mfind(message: str, find: list, usePrefix: bool = True) -> bool:
|
|
if usePrefix:
|
|
return any(message[: len(match) + 1] == conf.prefix + match for match in find)
|
|
else:
|
|
return any(message[: len(match)] == match for match in find)
|
|
|
|
|
|
class bot:
|
|
def __init__(self, server: str):
|
|
self.gmode = False
|
|
self.server = server
|
|
self.nicklen = 30
|
|
self.address = conf.servers[server]["address"]
|
|
self.port = (
|
|
conf.servers[server]["port"] if "port" in conf.servers[server] else 6667
|
|
)
|
|
self.channels = conf.servers[server]["channels"]
|
|
self.interval = (
|
|
conf.servers[server]["interval"]
|
|
if "interval" in conf.servers[server]
|
|
else 50
|
|
)
|
|
self.__version__ = conf.__version__
|
|
self.nick = "FireBot"
|
|
self.adminnames = conf.servers[server]["admins"]
|
|
self.queue: list[bbytes] = []
|
|
self.sock = socket(AF_INET, SOCK_STREAM)
|
|
self.npallowed = ["FireBitBot"]
|
|
self.log(f"Start init for {self.server}")
|
|
|
|
def connect(self) -> None:
|
|
self.log(f"Joining {self.server}...")
|
|
self.sock.connect((self.address, self.port))
|
|
self.send(f"USER {self.nick} {self.nick} {self.nick} {self.nick}\n")
|
|
self.send(f"NICK {self.nick}\n")
|
|
ircmsg = ""
|
|
while (
|
|
ircmsg.find(f"MODE {self.nick}") == -1
|
|
and ircmsg.find(f"PRIVMSG {self.nick}") == -1
|
|
):
|
|
ircmsg = self.recv().decode()
|
|
if ircmsg != "":
|
|
code = 0
|
|
try:
|
|
code = int(ircmsg.split(" ", 2)[1].strip())
|
|
except (IndexError, ValueError):
|
|
pass
|
|
print(bytes(ircmsg).lazy_decode())
|
|
if ircmsg.find("NICKLEN=") != -1:
|
|
self.nicklen = int(ircmsg.split("NICKLEN=")[1].split(" ")[0])
|
|
self.log(f"NICKLEN set to {self.nicklen}")
|
|
elif code == 433:
|
|
self.log("Nickname in use", "WARN")
|
|
self.nick = f"{self.nick}{r.randint(0,1000)}"
|
|
self.send(f"NICK {self.nick}\n")
|
|
self.log(f"nick is now {self.nick}")
|
|
elif ircmsg.startswith("PING "):
|
|
self.ping(ircmsg)
|
|
elif len(ircmsg.split("\x01")) == 3:
|
|
self.CTCP(ircmsg, isRaw=True)
|
|
elif ircmsg.find("Closing Link") != -1:
|
|
self.exit("Closing Link")
|
|
else:
|
|
self.exit("Lost connection to the server")
|
|
self.log(f"Joined {self.server} successfully!")
|
|
|
|
def join(self, chan: str, origin: str, lock: bool = True) -> None:
|
|
self.log(f"Joining {chan}...")
|
|
chan = chan.replace(" ", "")
|
|
if "," in chan:
|
|
chans = chan.split(",")
|
|
for subchan in chans:
|
|
self.join(subchan, origin)
|
|
return
|
|
if chan.startswith("0") or (chan == "#main" and lock):
|
|
if origin != "null":
|
|
self.msg(f"Refusing to join channel {chan} (protected)", origin)
|
|
return
|
|
if chan in self.channels and lock:
|
|
if origin != "null":
|
|
self.msg(f"I'm already in {chan}.", origin)
|
|
return
|
|
self.send(f"JOIN {chan}\n")
|
|
while True:
|
|
ircmsg = self.recv().decode()
|
|
if ircmsg != "":
|
|
code = 0
|
|
try:
|
|
code = int(ircmsg.split(" ", 2)[1].strip())
|
|
except (IndexError, ValueError):
|
|
pass
|
|
print(bytes(ircmsg).lazy_decode())
|
|
if ircmsg.startswith("PING "):
|
|
self.ping(ircmsg)
|
|
elif len(ircmsg.split("\x01")) == 3:
|
|
self.CTCP(ircmsg, isRaw=True)
|
|
elif code == 403:
|
|
self.log(f"Joining {chan} failed", "WARN")
|
|
if origin != "null":
|
|
self.msg(f"{chan} is an invalid channel", origin)
|
|
break
|
|
elif code == 473:
|
|
self.log(f"Joining {chan} failed (+i)", "WARN")
|
|
if origin != "null":
|
|
self.msg(f"{chan} is +i, and I'm not invited.", origin)
|
|
break
|
|
elif code == 520:
|
|
self.log(f"Joining {chan} failed (+O)", "WARN")
|
|
if origin != "null":
|
|
self.msg(f"{chan} is +O, and I'm not an operator.", origin)
|
|
break
|
|
elif code == 405:
|
|
self.log(f"Joining {chan} failed (too many channels)", "WARN")
|
|
if origin != "null":
|
|
self.msg(f"I'm in too many channels to join {chan}", origin)
|
|
break
|
|
elif code == 471:
|
|
self.log(f"Joining {chan} failed (+l)", "WARN")
|
|
if origin != "null":
|
|
self.msg(f"{chan} is +l, and is full", origin)
|
|
break
|
|
elif code == 366:
|
|
self.log(f"Joining {chan} succeeded")
|
|
if origin != "null":
|
|
self.msg(f"Joined {chan}", origin)
|
|
self.channels[chan] = 0
|
|
break
|
|
|
|
def ping(self, ircmsg: str) -> int:
|
|
pong = f"PONG :{ircmsg.split('PING :')[1]}\n"
|
|
print(pong, end="")
|
|
return self.send(pong)
|
|
|
|
def send(self, command: str) -> int:
|
|
return self.sock.send(bytes(command))
|
|
|
|
def recv(self) -> bytes:
|
|
if self.queue:
|
|
return bytes(self.queue.pop(0))
|
|
data = bytes(self.sock.recv(2048).strip(b"\r\n"))
|
|
if b"\r\n" in data:
|
|
self.queue.extend(data.split(b"\r\n"))
|
|
return bytes(self.queue.pop(0))
|
|
return data
|
|
|
|
def log(self, message: str, level: str = "LOG") -> None:
|
|
logs.log(message, self.server)
|
|
|
|
def exit(self, message: str) -> NoReturn:
|
|
logs.log(message, self.server, "EXIT")
|
|
exit(1)
|
|
|
|
def CTCP(self, msg: str, sender: str = "", isRaw: bool = False) -> bool:
|
|
if isRaw:
|
|
sender = msg.split("!", 1)[0][1:]
|
|
message = msg.split("PRIVMSG", 1)[1].split(":", 1)[1].strip()
|
|
kind = msg.split("\x01")[1].split(" ", 1)[0]
|
|
self.log(f'Responding to CTCP "{kind}" from {sender}')
|
|
if kind == "VERSION":
|
|
self.notice(
|
|
f"\x01VERSION FireBot {conf.__version__} (https://git.amcforum.wiki/Firepup650/fire-ircbot)\x01",
|
|
sender,
|
|
True,
|
|
)
|
|
return True
|
|
elif kind == "USERINFO":
|
|
self.notice("\x01USERINFO FireBot (Firepup's bot)\x01", sender, True)
|
|
return True
|
|
elif kind == "SOURCE":
|
|
self.notice(
|
|
"\x01SOURCE https://git.amcforum.wiki/Firepup650/fire-ircbot\x01",
|
|
sender,
|
|
True,
|
|
)
|
|
return True
|
|
elif kind == "FINGER":
|
|
self.notice("\x01FINGER Firepup's bot\x01", sender, True)
|
|
return True
|
|
elif kind == "CLIENTINFO":
|
|
self.notice(
|
|
"\x01CLIENTINFO ACTION VERSION USERINFO SOURCE FINGER\x01", sender, True
|
|
)
|
|
return True
|
|
self.log(f'Unknown CTCP "{kind}"', "WARN")
|
|
return False
|
|
|
|
def msg(self, msg: str, target: str) -> None:
|
|
if not (target == "NickServ" and mfind(msg, ["IDENTIFY"], False)):
|
|
self.log(f"Sending {bytes(msg).lazy_decode()} to {target}")
|
|
else:
|
|
self.log("Identifying myself...")
|
|
self.send(f"PRIVMSG {target} :{msg}\n")
|
|
|
|
def op(self, name: str, chan: str) -> Union[int, None]:
|
|
if name != "":
|
|
self.log(f"Attempting op of {name} in {chan}...")
|
|
return self.send(f"MODE {chan} +o {name}\n")
|
|
return None
|
|
|
|
def notice(self, msg: str, target: str, silent: bool = False) -> int:
|
|
if not silent:
|
|
self.log(f"Sending {bytes(msg).lazy_decode()} to {target} (NOTICE)")
|
|
return self.send(f"NOTICE {target} :{msg}\n")
|
|
|
|
def sendraw(self, command: str) -> int:
|
|
self.log(f"RAW sending {command}")
|
|
command = f"{command}\n"
|
|
return self.send(command.replace("$BOTNICK", self.nick))
|
|
|
|
def mainloop(self) -> NoReturn:
|
|
self.log("Starting connection..")
|
|
self.connect()
|
|
if "pass" in conf.servers[self.server]:
|
|
self.msg(
|
|
f"IDENTIFY FireBot {conf.servers[self.server]['pass']}", "NickServ"
|
|
)
|
|
sleep(0.5)
|
|
for chan in self.channels:
|
|
self.join(chan, "null", False)
|
|
while 1:
|
|
raw = self.recv()
|
|
ircmsg = raw.decode()
|
|
if ircmsg == "":
|
|
self.exit("Probably a netsplit")
|
|
else:
|
|
print(raw.lazy_decode(), sep="\n")
|
|
action = "Unknown"
|
|
try:
|
|
action = ircmsg.split(" ", 2)[1].strip()
|
|
except IndexError:
|
|
pass
|
|
if action == "PRIVMSG":
|
|
# Format of ":[Nick]![ident]@[host|vhost] PRIVMSG [channel] :[message]”
|
|
name = ircmsg.split("!", 1)[0][1:]
|
|
helpErr = False
|
|
if (name.startswith("saxjax") and self.server == "efnet") or (
|
|
name == "ReplIRC" and self.server == "replirc"
|
|
):
|
|
if ircmsg.find("<") != -1 and ircmsg.find(">") != -1:
|
|
Nname = ircmsg.split("<", 1)[1].split(">", 1)[0].strip()
|
|
if name == "ReplIRC":
|
|
name = Nname[4:]
|
|
else:
|
|
name = Nname
|
|
message = ircmsg.split(">", 1)[1].strip()
|
|
helpErr = True
|
|
else:
|
|
message = (
|
|
ircmsg.split("PRIVMSG", 1)[1].split(":", 1)[1].strip()
|
|
)
|
|
elif name == self.nick:
|
|
continue
|
|
else:
|
|
message = ircmsg.split("PRIVMSG", 1)[1].split(":", 1)[1].strip()
|
|
if name.endswith("dsc"):
|
|
helpErr = True
|
|
chan = ircmsg.split("PRIVMSG", 1)[1].split(":", 1)[0].strip()
|
|
self.log(
|
|
f'Got "{bytes(message).lazy_decode()}" from "{name}" in "{chan}"',
|
|
)
|
|
if len(name) > self.nicklen:
|
|
self.log(f"Name too long ({len(name)} > {self.nicklen})")
|
|
continue
|
|
elif chan not in self.channels:
|
|
self.log(
|
|
f"Channel not in channels ({chan} not in {self.channels})",
|
|
"WARN",
|
|
)
|
|
if not chan.startswith(("#", "+", "&")):
|
|
chan = name
|
|
else:
|
|
self.channels[chan] += 1
|
|
if "goat" in name.lower() and self.gmode == True:
|
|
cmds.goat(self, chan, name, message)
|
|
handled = False
|
|
for cmd in cmds.data:
|
|
triggers = [cmd]
|
|
triggers.extend(cmds.data[cmd]["aliases"])
|
|
triggers = list(
|
|
call.replace("$BOTNICK", self.nick.lower())
|
|
for call in triggers
|
|
)
|
|
if mfind(
|
|
message.lower(),
|
|
triggers,
|
|
cmds.data[cmd]["prefix"],
|
|
):
|
|
if (
|
|
"admin" in cmds.data[cmd] and cmds.data[cmd]["admin"]
|
|
) and name not in self.adminnames:
|
|
self.msg(
|
|
f"Sorry {name}, you don't have permission to use {cmd.strip()}.",
|
|
chan,
|
|
)
|
|
else:
|
|
cmds.call[cmd](self, chan, name, message)
|
|
handled = True
|
|
break
|
|
if not handled:
|
|
for check in cmds.checks:
|
|
if re.search(
|
|
check.replace("$MAX", str(self.nicklen)).replace(
|
|
"$BOTNICK", self.nick
|
|
),
|
|
message,
|
|
):
|
|
cmds.call[check](self, chan, name, message)
|
|
handled = True
|
|
break
|
|
if not handled and mfind(message, ["reload"]):
|
|
if name in self.adminnames:
|
|
reload(conf)
|
|
reload(cmds)
|
|
self.__version__ = conf.__version__
|
|
self.msg("Reloaded config and commands", chan)
|
|
else:
|
|
self.msg(
|
|
f"Sorry {name}, you don't have permission to use reload.",
|
|
chan,
|
|
)
|
|
handled = True
|
|
if not handled and len(message.split("\x01")) == 3:
|
|
if not self.CTCP(message, name):
|
|
CTCP = message.split("\x01")[1]
|
|
if CTCP == "ACTION ducks":
|
|
self.msg("\x01ACTION gets hit by a duck\x01", chan)
|
|
elif CTCP.startswith("ACTION ducks"):
|
|
self.msg(
|
|
f"\x01ACTION gets hit by {CTCP.split(' ', 2)[2]}\x01",
|
|
chan,
|
|
)
|
|
if chan in self.channels and self.channels[chan] >= self.interval:
|
|
r.seed()
|
|
self.channels[chan] = 0
|
|
mm = open("mastermessages.txt", "r")
|
|
q = mm.readlines()
|
|
sel = conf.decode_escapes(
|
|
str(r.sample(q, 1))
|
|
.strip("[]'")
|
|
.replace("\\n", "")
|
|
.strip('"')
|
|
)
|
|
self.msg(f"[QUOTE] {sel}", chan)
|
|
mm.close()
|
|
elif action == "NICK":
|
|
name = ircmsg.split("!", 1)[0][1:]
|
|
if name == self.nick:
|
|
self.nick = ircmsg.split("NICK", 1)[1].split(":", 1)[1].strip()
|
|
else:
|
|
if ircmsg.startswith("PING "):
|
|
self.ping(ircmsg)
|
|
elif ircmsg.startswith("ERROR :Closing Link"):
|
|
self.exit("I got killed :'(")
|
|
elif ircmsg.startswith("ERROR :Ping "):
|
|
self.exit("Ping timeout")
|