#!/usr/bin/python3 from socket import socket, AF_INET, SOCK_STREAM from overrides import bytes, bbytes import logs import re from typing import NoReturn, Union import commands as cmds import config as conf from time import sleep from importlib import reload import threads import random as r import handlers import bare from threading import Thread from markov import MarkovBot def mfind(message: str, find: list, usePrefix: bool = True) -> bool: if usePrefix: return any(message[: len(match) + 1] == conf.prefix + match for match in find) else: return any(message[: len(match)] == match for match in find) class bot(bare.bot): def __init__(self, server: str): self.gmode = False self.server = server self.nicklen = 30 self.address = conf.servers[server]["address"] self.port = ( conf.servers[server]["port"] if "port" in conf.servers[server] else 6667 ) self.channels = conf.servers[server]["channels"] self.adminnames = ( conf.servers[server]["admins"] if "admins" in conf.servers[server] else [] ) self.ignores = ( conf.servers[server]["ignores"] if "ignores" in conf.servers[server] else [] ) self.__version__ = conf.__version__ self.npallowed = conf.npallowed self.interval = ( conf.servers[server]["interval"] if "interval" in conf.servers[server] else 50 ) self.nick = ( conf.servers[server]["nick"] if "nick" in conf.servers[server] else "FireBot" ) self.queue: list[bbytes] = [] # pyright: ignore [reportInvalidTypeForm] self.sock = socket(AF_INET, SOCK_STREAM) self.current = "user" self.threads = ( conf.servers[server]["threads"] if "threads" in conf.servers[server] else [] ) self.onIdntCmds = ( conf.servers[server]["onIdntCmds"] if "onIdntCmds" in conf.servers[server] else [] ) self.onJoinCmds = ( conf.servers[server]["onJoinCmds"] if "onJoinCmds" in conf.servers[server] else [] ) self.onStrtCmds = ( conf.servers[server]["onStrtCmds"] if "onStrtCmds" in conf.servers[server] else [] ) self.autoMethod = ( conf.servers[server]["autoMethod"] if "autoMethod" in conf.servers[server] else "QUOTE" ) self.lastfmLink = conf.lastfmLink with open("mastermessages.txt") as f: TMFeed = [] for line in f.readlines(): TMFeed.extend([line.strip().split()]) self.markov = MarkovBot(TMFeed) self.log(f"Start init for {self.server}") def connect(self) -> None: self.log(f"Joining {self.server}...") self.sock.connect((self.address, self.port)) self.send("\n") # Just for sanity if self.onStrtCmds: for cmd in self.onStrtCmds: self.send(cmd + "\n") if "serverPass" in conf.servers[self.server]: self.send(f"PASS {conf.servers[self.server]['serverPass']}\n") self.send(f"USER {self.nick} {self.nick} {self.nick} {self.nick}\n") self.send(f"NICK {self.nick}\n") ircmsg = "" while True: ircmsg = self.recv().safe_decode() if ircmsg != "": code = 0 try: code = int(ircmsg.split(" ", 2)[1].strip()) except (IndexError, ValueError): pass print(bytes(ircmsg).lazy_decode()) if "NICKLEN" in ircmsg: self.nicklen = int(ircmsg.split("NICKLEN=")[1].split(" ")[0]) self.log(f"NICKLEN set to {self.nicklen}") if code == 433: self.log("Nickname in use", "WARN") self.nick = f"{self.nick}{r.randint(0,1000)}" self.send(f"NICK {self.nick}\n") self.log(f"nick is now {self.nick}") if code in [376, 422]: self.log(f"Success by code: {code}") break if " MODE " in ircmsg or " PRIVMSG " in ircmsg: self.log(f"Success by MSG/MODE") break if ircmsg.startswith("PING "): self.ping(ircmsg) if len(ircmsg.split("\x01")) == 3: handlers.CTCP(self, ircmsg) if "Closing link" in ircmsg: self.exit("Closing Link") else: self.exit("Lost connection to the server") self.log(f"Joined {self.server} successfully!") def join(self, chan: str, origin: str, lock: bool = True) -> None: self.log(f"Joining {chan}...") chan = chan.replace(" ", "") if "," in chan: chans = chan.split(",") for subchan in chans: self.join(subchan, origin) return if chan.startswith("0") or ( chan == "#main" and lock and self.server != "replirc" ): if origin != "null": self.msg(f"Refusing to join channel {chan} (protected)", origin) return if chan in self.channels and lock: if origin != "null": self.msg(f"I'm already in {chan}.", origin) return self.send(f"JOIN {chan}\n") while True: ircmsg = self.recv().safe_decode() if ircmsg != "": code = 0 try: code = int(ircmsg.split(" ", 2)[1].strip()) except (IndexError, ValueError): pass print(bytes(ircmsg).lazy_decode()) if ircmsg.startswith("PING "): self.ping(ircmsg) elif ircmsg.startswith("ERROR "): self.exit("Lost connection to the server while joining a channel") elif len(ircmsg.split("\x01")) == 3: handlers.CTCP(self, ircmsg) elif code == 403: self.log(f"Joining {chan} failed", "WARN") if origin != "null": self.msg(f"{chan} is an invalid channel", origin) break elif code == 473: self.log(f"Joining {chan} failed (+i)", "WARN") if origin != "null": self.msg(f"{chan} is +i, and I'm not invited.", origin) break elif code == 474: self.log(f"Joining {chan} failed (+b)", "WARN") if origin != "null": self.msg(f"I'm banned from {chan}.", origin) break elif code == 480: self.log(f"Joining {chan} failed (+S)", "WARN") if origin != "null": self.msg( f"{chan} is +S, and I'm not connected over SSL.", origin ) break elif code == 519: self.log(f"Joining {chan} failed (+A)", "WARN") if origin != "null": self.msg(f"{chan} is +A, and I'm not an admin.", origin) break elif code == 520: self.log(f"Joining {chan} failed (+O)", "WARN") if origin != "null": self.msg(f"{chan} is +O, and I'm not an operator.", origin) break elif code == 405: self.log(f"Joining {chan} failed (too many channels)", "WARN") if origin != "null": self.msg(f"I'm in too many channels to join {chan}", origin) break elif code == 471: self.log(f"Joining {chan} failed (+l)", "WARN") if origin != "null": self.msg(f"{chan} is +l, and is full", origin) break elif code == 366: self.log(f"Joining {chan} succeeded") if origin != "null": self.msg(f"Joined {chan}", origin) self.channels[chan] = 0 break def ping(self, ircmsg: str) -> int: pong = f"PONG :{ircmsg.split('PING :')[1]}\n" print(pong, end="") return self.send(pong) def send(self, command: str) -> int: return self.sock.send(bytes(command)) def recv(self) -> bytes: if self.queue: return bytes(self.queue.pop(0)) data = bytes(self.sock.recv(2048).strip(b"\r\n")) if b"\r\n" in data: self.queue.extend(data.split(b"\r\n")) return bytes(self.queue.pop(0)) return data def log(self, message: str, level: str = "LOG") -> None: logs.log(message, self.server, level) def exit(self, message: str) -> NoReturn: logs.log(message, self.server, "EXIT") exit(1) def msg(self, msg: str, target: str) -> None: if not (target == "NickServ" and mfind(msg, ["IDENTIFY"], False)): self.log(f"Sending {bytes(msg).lazy_decode()} to {target}") else: self.log("Identifying myself...") self.send(f"PRIVMSG {target} :{msg}\n") def op(self, name: str, chan: str) -> Union[int, None]: if name != "": self.log(f"Attempting op of {name} in {chan}...") return self.send(f"MODE {chan} +o {name}\n") return None def notice(self, msg: str, target: str, silent: bool = False) -> int: if not silent: self.log(f"Sending {bytes(msg).lazy_decode()} to {target} (NOTICE)") return self.send(f"NOTICE {target} :{msg}\n") def sendraw(self, command: str) -> int: self.log(f"RAW sending {command}") command = f"{command}\n" return self.send(command.replace("$BOTNICK", self.nick)) def mainloop(self) -> NoReturn: self.log("Starting connection..") self.connect() if "pass" in conf.servers[self.server]: self.msg( f"IDENTIFY FireBot {conf.servers[self.server]['pass']}", "NickServ" ) sleep(0.5) if self.onIdntCmds: for cmd in self.onIdntCmds: self.send(cmd + "\n") for chan in self.channels: self.join(chan, "null", False) if self.onJoinCmds: for cmd in self.onJoinCmds: self.send(cmd + "\n") tMgr = None if self.threads: tdict = {} for thread in self.threads: tdict[thread] = threads.data[thread] if tdict[thread]["passInstance"]: tdict[thread]["args"] = [self] tMgr = Thread(target=threads.threadManager, args=(tdict,)) tMgr.daemon = True tMgr.start() while 1: raw = self.recv() ircmsg = raw.safe_decode() if ircmsg == "": self.exit("Probably a netsplit") else: print(raw.lazy_decode(), sep="\n") action = "Unknown" try: action = ircmsg.split(" ", 2)[1].strip() except IndexError: pass self.tmpHost = "" if action in handlers.handles: res, chan = handlers.handles[action](self, ircmsg) if res == "reload" and type(chan) == str: reload(conf) self.adminnames = ( conf.servers[self.server]["admins"] if "admins" in conf.servers[self.server] else [] ) self.ignores = ( conf.servers[self.server]["ignores"] if "ignores" in conf.servers[self.server] else [] ) self.__version__ = conf.__version__ self.npallowed = conf.npallowed self.interval = ( conf.servers[self.server]["interval"] if "interval" in conf.servers[self.server] else 50 ) reload(cmds) reload(handlers) self.msg("Reloaded successfully", chan) else: if ircmsg.startswith("PING "): self.ping(ircmsg) elif ircmsg.startswith("ERROR :Closing Link"): self.exit("I got killed :'(") elif ircmsg.startswith("ERROR :Ping "): self.exit("Ping timeout") else: self.log("Unrecognized server request!", "WARN") self.exit("While loop broken")