166 lines
6.5 KiB
Python
166 lines
6.5 KiB
Python
#!/usr/bin/python3
|
|
from socket import socket, AF_INET, SOCK_STREAM
|
|
from overrides import bytes, bbytes
|
|
from logs import log
|
|
import re
|
|
from typing import NoReturn
|
|
from config import npbase, servers, __version__
|
|
|
|
|
|
class bot:
|
|
def __init__(self, server: str):
|
|
self.gmode = False
|
|
self.server = server
|
|
self.nicklen = 30
|
|
self.address = servers[server]["address"]
|
|
self.port = servers[server]["port"] if "port" in servers[server] else 6667
|
|
self.channels = servers[server]["channels"]
|
|
self.interval = (
|
|
servers[server]["interval"] if "interval" in servers[server] else 50
|
|
)
|
|
self.nick = "FireBot"
|
|
self.prefix = "."
|
|
self.rebt = "fire"
|
|
self.gblrebt = "all"
|
|
self.adminnames = servers[server]["admins"]
|
|
self.exitcode = f"bye {self.nick.lower()}"
|
|
self.np = re.compile(npbase.replace("MAX", f"{nicklen}"))
|
|
self.queue = []
|
|
self.sock = socket(AF_INET, SOCK_STREAM)
|
|
self.log(f"Start init for {server}")
|
|
|
|
def connect(self) -> None:
|
|
self.log(f"Joining {server}...")
|
|
self.sock.connect((self.address, self.port))
|
|
self.send(f"USER {self.nick} {self.nick} {self.nick} {self.nick}\n")
|
|
self.send(f"NICK {self.nick}\n")
|
|
ircmsg = ""
|
|
while (
|
|
ircmsg.find(f"MODE {self.nick}") == -1
|
|
and ircmsg.find(f"PRIVMSG {self.nick}") == -1
|
|
):
|
|
ircmsg = self.recv().decode()
|
|
if ircmsg != "":
|
|
print(bytes(ircmsg).lazy_decode())
|
|
if ircmsg.find("NICKLEN=") != -1:
|
|
self.nicklen = int(ircmsg.split("NICKLEN=")[1].split(" ")[0])
|
|
self.np = re.compile(npbase.replace("MAX", f"{self.nicklen}"))
|
|
self.log(f"NICKLEN set to {nicklen}")
|
|
elif ircmsg.find("Nickname") != -1:
|
|
self.log("Nickname in use", "WARN")
|
|
self.nick = f"{self.nick}{r.randint(0,1000)}"
|
|
self.send(f"NICK {self.nick}\n")
|
|
self.log(f"nick is now {self.nick}")
|
|
elif ircmsg.startswith("PING "):
|
|
self.ping(ircmsg)
|
|
elif len(ircmsg.split("\x01")) == 3:
|
|
self.CTCPHandler(ircmsg, isRaw=True)
|
|
elif ircmsg.find("Closing Link") != -1:
|
|
self.exit("Closing Link")
|
|
else:
|
|
self.exit("Lost connection to the server")
|
|
self.log(f"Joined {server} successfully!")
|
|
|
|
def join(self, chan: str, origin: str) -> None:
|
|
self.log(f"Joining {chan}...")
|
|
chan = chan.replace(" ", "")
|
|
if "," in chan:
|
|
chans = chan.split(",")
|
|
for subchan in chans:
|
|
self.join(subchan, origin)
|
|
return
|
|
if chan.startswith("0") or (chan == "#main" and lock):
|
|
if origin != "null":
|
|
self.sendmsg(f"Refusing to join channel {chan} (protected)", origin)
|
|
return
|
|
if chan in channels and lock:
|
|
if origin != "null":
|
|
self.sendmsg(f"I'm already in {chan}.", origin)
|
|
return
|
|
self.send(f"JOIN {chan}\n")
|
|
while True:
|
|
ircmsg = self.recv().decode()
|
|
if ircmsg != "":
|
|
code = 0
|
|
try:
|
|
code = int(ircmsg.split(" ", 2)[1].strip())
|
|
except (IndexError, ValueError):
|
|
pass
|
|
print(bytes(ircmsg).lazy_decode())
|
|
if ircmsg.startswith("PING "):
|
|
self.ping(ircmsg)
|
|
elif len(ircmsg.split("\x01")) == 3:
|
|
self.CTCP(ircmsg, isRaw=True)
|
|
elif code == 403:
|
|
self.log(f"Joining {chan} failed", "WARN")
|
|
if origin != "null":
|
|
sendmsg(f"{chan} is an invalid channel", origin)
|
|
break
|
|
elif code == 473:
|
|
self.log(f"Joining {chan} failed (+i)", "WARN")
|
|
if origin != "null":
|
|
self.sendmsg(f"{chan} is +i, and I'm not invited.", origin)
|
|
break
|
|
elif code == 520:
|
|
self.log(f"Joining {chan} failed (+O)", "WARN")
|
|
if origin != "null":
|
|
self.sendmsg(f"{chan} is +O, and I'm not an operator.", origin)
|
|
elif code == 366:
|
|
log(f"Joining {chan} succeeded", server)
|
|
if origin != "null":
|
|
self.sendmsg(f"Joined {chan}", origin)
|
|
self.channels[chan] = 0
|
|
break
|
|
|
|
def send(self, command: str) -> int:
|
|
return self.sock.send(bytes(command))
|
|
|
|
def recv(self) -> bytes:
|
|
if self.queue:
|
|
return bytes(self.queue.pop(0))
|
|
data = bytes(self.sock.recv(2048).strip(b"\r\n"))
|
|
if b"\r\n" in data:
|
|
self.queue.extend(data.split(b"\r\n"))
|
|
return bytes(self.queue.pop(0))
|
|
return data
|
|
|
|
def log(self, message: object, level: str = "LOG") -> None:
|
|
log(message, self.server)
|
|
|
|
def exit(self, message: object) -> NoReturn:
|
|
log(message, self.server, "EXIT")
|
|
exit(1)
|
|
|
|
def CTCP(self, msg: str, sender: str = "", isRaw: bool = False) -> bool:
|
|
if isRaw:
|
|
sender = msg.split("!", 1)[0][1:]
|
|
message = msg.split("PRIVMSG", 1)[1].split(":", 1)[1].strip()
|
|
kind = msg.split("\x01")[1].split(" ", 1)[0]
|
|
self.log(f"Responding to CTCP {kind} from {sender}")
|
|
if kind == "VERSION":
|
|
self.notice(
|
|
f"\x01VERSION FireBot {__version__} (https://git.amcforum.wiki/Firepup650/fire-ircbot)\x01",
|
|
sender,
|
|
True,
|
|
)
|
|
return True
|
|
elif kind == "USERINFO":
|
|
self.notice("\x01USERINFO FireBot (Firepup's bot)\x01", sender, True)
|
|
return True
|
|
elif kind == "SOURCE":
|
|
self.notice(
|
|
"\x01SOURCE https://git.amcforum.wiki/Firepup650/fire-ircbot\x01",
|
|
sender,
|
|
True,
|
|
)
|
|
return True
|
|
elif kind == "FINGER":
|
|
self.notice("\x01FINGER Firepup's bot\x01", sender, True)
|
|
return True
|
|
elif kind == "CLIENTINFO":
|
|
self.notice(
|
|
"\x01CLIENTINFO ACTION VERSION USERINFO SOURCE FINGER\x01", sender, True
|
|
)
|
|
return True
|
|
self.log(f"Unknown CTCP {kind}")
|
|
return False
|