#!/usr/bin/python3 from time import sleep from overrides import bytes, bbytes import re, random as r, codecs from sys import argv as args, exit as xit, stdout, stderr from socket import socket, AF_INET, SOCK_STREAM from os import environ as env from dotenv import load_dotenv from pythonlangutil.overload import Overload, signature from datetime import datetime as dt from logs import log load_dotenv() __version__ = "v1.0.5" ircsock = socket(AF_INET, SOCK_STREAM) botnick = "FireBot" servers = { "ircnow": { "address": "localhost", "port": 6601, "interval": 200, "pass": env["ircnow_pass"], "channels": {"#random": 0, "#dice": 0, "#offtopic": 0}, "admins": ["firepup", "h|thelounge", "firepup|lounge"], }, "efnet": { "address": "irc.mzima.net", "channels": {"#random": 0, "#dice": 0}, "admins": ["firepup", "h|tl"], }, "replirc": { "address": "localhost", "pass": env["replirc_pass"], "channels": {"#random": 0, "#dice": 0, "#main": 0, "#bots": 0}, "admins": ["firepup", "firepup|lounge", "h|tl"], }, } server = args[1] if args else "UNSTABLE BOT MODE" def exit(message: object) -> None: log(message, server, "EXIT") xit(1) if __name__ == "__main__": gmode = False nicklen = 30 address = servers[server]["address"] port = servers[server]["port"] if "port" in servers[server] else 6667 channels = servers[server]["channels"] interval = servers[server]["interval"] if "interval" in servers[server] else 50 prefix = "." rebt = "fire" gblrebt = "all" lrebt = 7 + len(rebt) lgblrebt = 7 + len(gblrebt) adminnames = servers[server]["admins"] exitcode = f"bye {botnick.lower()}" ircmsg = "" blanks = 0 npbase = "\[\x0303last\.fm\x03\] [A-Za-z0-9_[\]{}\\|^]{1,MAX} (is listening|last listened) to: \x02.+ - .*\x02 \([0-9]+ plays\)( \[.*\])?" np = re.compile(npbase.replace("MAX", f"{nicklen}")) queue = [] log(f"Start init for {server}", server) npallowed = ["FireBitBot"] ESCAPE_SEQUENCE_RE = re.compile( r""" ( \\U........ # 8-digit hex escapes | \\u.... # 4-digit hex escapes | \\x.. # 2-digit hex escapes | \\[0-7]{1,3} # Octal escapes | \\N\{[^}]+\} # Unicode characters by name | \\[\\'"abfnrtv] # Single-character escapes )""", re.UNICODE | re.VERBOSE, ) def decode_escapes(s: str) -> str: def decode_match(match): return codecs.decode(match.group(0), "unicode-escape") return ESCAPE_SEQUENCE_RE.sub(decode_match, s) def sucheck(message: str): return re.search("^(su|sudo|(su .*|sudo .*))$", message) def send(command: str) -> int: return ircsock.send(bytes(command)) def recv() -> bytes: global queue if queue: return bytes(queue.pop(0)) data = bytes(ircsock.recv(2048).strip(b"\r\n")) if b"\r\n" in data: queue.extend(data.split(b"\r\n")) return bytes(queue.pop(0)) return data def ping(ircmsg: str) -> int: pong = f"PONG :{ircmsg.split('PING :')[1]}\n" print(pong, end="") return send(pong) def sendraw(command: str) -> int: log(f"RAW sending {command}", server) command = f"{command}\n" return send(command.replace("$BOTNICK", botnick)) def sendmsg(msg: str, target: str) -> None: if target != "NickServ" and not mfind(msg, ["IDENTIFY"], False): log(f"Sending {bytes(msg).lazy_decode()} to {target}", server) else: log("Identifying myself...", server) send(f"PRIVMSG {target} :{msg}\n") def notice(msg, target, silent: bool = False): if not silent: log(f"Sending {bytes(msg).lazy_decode()} to {target} (NOTICE)", server) send(f"NOTICE {target} :{msg}\n") "{fg" def CTCPHandler(msg: str, sender: str = "", isRaw: bool = False): if isRaw: sender = msg.split("!", 1)[0][1:] message = msg.split("PRIVMSG", 1)[1].split(":", 1)[1].strip() CTCP = msg.split("\x01")[1].split(" ", 1)[0] log(f"Responding to CTCP {CTCP} from {sender}", server) if CTCP == "VERSION": notice( f"\x01VERSION FireBot {__version__} (https://git.amcforum.wiki/Firepup650/fire-ircbot)\x01", sender, True, ) return True elif CTCP == "USERINFO": notice("\x01USERINFO FireBot (Firepup's bot)\x01", sender, True) return True elif CTCP == "SOURCE": notice( "\x01SOURCE https://git.amcforum.wiki/Firepup650/fire-ircbot\x01", sender, True, ) return True elif CTCP == "FINGER": notice("\x01FINGER Firepup's bot\x01", sender, True) return True elif CTCP == "CLIENTINFO": notice("\x01CLIENTINFO ACTION VERSION USERINFO SOURCE FINGER\x01", sender, True) return True log(f"Unknown CTCP {CTCP}", server) return False def joinserver(): log(f"Joining {server}...", server) global nicklen, npbase, np, botnick ircsock.connect((address, port)) send(f"USER {botnick} {botnick} {botnick} {botnick}\n") send(f"NICK {botnick}\n") ircmsg = "" while ( ircmsg.find("MODE " + botnick) == -1 and ircmsg.find("PRIVMSG " + botnick) == -1 ): ircmsg = recv().decode() if ircmsg != "": print(bytes(ircmsg).lazy_decode()) if ircmsg.find("NICKLEN=") != -1: global nicklen nicklen = int(ircmsg.split("NICKLEN=")[1].split(" ")[0]) np = re.compile(npbase.replace("MAX", f"{nicklen}")) log(f"NICKLEN set to {nicklen}", server) if ircmsg.find("Nickname") != -1: log("Nickname in use", server, "WARN") botnick = f"{botnick}{r.randint(0,1000)}" send(f"NICK {botnick}\n") log(f"botnick is now {botnick}", server) if ircmsg.startswith("PING "): # pong = "PONG :" + input("Ping?:") + "\n" # pong = pong.replace("\\\\", "\\") ping(ircmsg) if len(ircmsg.split("\x01")) == 3: CTCPHandler(ircmsg, isRaw=True) if ircmsg.find("Closing Link") != -1: log("I tried.", server, "EXIT") exit("Closing Link") log(f"Joined {server} successfully!", server) def mfind(message: str, find: list, usePrefix: bool = True): if usePrefix: return any(message[: len(match) + 1] == prefix + match for match in find) else: return any(message[: len(match)] == match for match in find) def joinchan(chan: str, origin: str, chanList: dict, lock: bool = True): log(f"Joining {chan}...", server) chan = chan.replace(" ", "") if "," in chan: chans = chan.split(",") for subchan in chans: chanList = joinchan(subchan, origin, chanList) return chanList if chan.startswith("0") or (chan == "#main" and lock): if origin != "null": sendmsg("Refusing to join channel 0", origin) return chanList if chan in channels and lock: if origin != "null": sendmsg(f"I'm already in {chan}.", origin) return chanList send(f"JOIN {chan}\n") ircmsg = "" while True: ircmsg = recv().decode() if ircmsg != "": print(bytes(ircmsg).lazy_decode()) if ircmsg.startswith("PING "): ping(ircmsg) if len(ircmsg.split("\x01")) == 3: CTCPHandler(ircmsg, isRaw=True) if ircmsg.find("No such channel") != -1: log(f"Joining {chan} failed (DM)", server, "WARN") if origin != "null": sendmsg(f"{chan} is an invalid channel", origin) break elif ircmsg.find("Cannot join channel (+i)") != -1: log(f"Joining {chan} failed (Private)", server, "WARN") if origin != "null": sendmsg(f"Permission denied to channel {chan}", origin) break elif ircmsg.find("End of") != -1: log(f"Joining {chan} succeeded", server) if origin != "null": sendmsg(f"Joined {chan}", origin) chanList[chan] = 0 break return chanList def op(name, chan): if name != "": log(f"Attempting op of {name} in {chan}...", server) send(f"MODE {chan} +o {name}\n") def main(): try: global channels, e, gmode, prefix, rebt, gblrebt, lrebt, lgblrebt, blanks log("Starting connection..", server) joinserver() if "pass" in servers[server]: sendmsg(f"IDENTIFY FireBot {servers[server]['pass']}", "NickServ") sleep(0.5) for chan in channels: joinchan(chan, "null", channels, False) while 1: global gmode raw = recv() ircmsg = raw.decode() if ircmsg == "": exit("Probably a netsplit") else: print(raw.lazy_decode(), sep="\n") action = "Unknown" try: action = ircmsg.split(" ", 2)[1].strip() except IndexError: pass if action == "PRIVMSG": # Format of ":[Nick]![ident]@[host|vhost] PRIVMSG [channel] :[message]” name = ircmsg.split("!", 1)[0][1:] helpErr = False if (name.startswith("saxjax") and server == "efnet") or ( name == "ReplIRC" and server == "replirc" ): if ircmsg.find("<") != -1 and ircmsg.find(">") != -1: Nname = ircmsg.split("<", 1)[1].split(">", 1)[0].strip() if name == "ReplIRC": name = Nname[4:] else: name = Nname message = ircmsg.split(">", 1)[1].strip() helpErr = True else: message = ( ircmsg.split("PRIVMSG", 1)[1].split(":", 1)[1].strip() ) else: message = ircmsg.split("PRIVMSG", 1)[1].split(":", 1)[1].strip() if name.endswith("dsc"): helpErr = True chan = ircmsg.split("PRIVMSG", 1)[1].split(":", 1)[0].strip() log( f'Got "{bytes(message).lazy_decode()}" from "{name}" in "{chan}"', server, ) if "goat" in name.lower() and gmode == True: log("GOAT DETECTED", server) sendmsg("Hello Goat", chan) gmode = False if len(name) > nicklen: log(f"Name too long ({len(name)} > {nicklen})", server) continue elif chan == botnick: pass # TODO: Somehow combine into other checks elif chan not in channels: log( f"Channel not in channels ({chan} not in {channels})", server, ) continue else: channels[chan] += 1 if mfind( message.lower(), ["!botlist"], False, ): sendmsg( f"Hi! I'm FireBot (https://git.amcforum.wiki/Firepup650/fire-ircbot)! My admins on this server are {adminnames}.", chan, ) if mfind( message.lower(), ["bugs bugs bugs"], False, ): sendmsg( f"\x01ACTION realizes {name} looks like a bug, and squashes {name}\x01", chan, ) if mfind( message.lower(), [f"hi {botnick.lower()}", f"hello {botnick.lower()}"], False, ): sendmsg(f"Hello {name}!", chan) elif ( mfind(message, ["op me"], False) and name.lower() in adminnames ): op(name, chan) elif mfind(message, ["amIAdmin"]): sendmsg( f"{name.lower()} in {adminnames} == {name.lower() in adminnames}", chan, ) elif mfind(message, ["help"]): if not helpErr: sendmsg("Command list needs rework", name) continue sendmsg("List of commands:", name) sendmsg(f'Current prefix is "{prefix}"', name) sendmsg(f"{prefix}help - Sends this help list", name) sendmsg( f"{prefix}quote - Sends a random firepup quote", name ) sendmsg( f"{prefix}(eightball,8ball,8b) [question]? - Asks the magic eightball a question", name, ) sendmsg( f"(hi,hello) {botnick} - The bot says hi to you", name ) if name.lower() in adminnames: sendmsg(f"reboot {rebt} - Restarts the bot", name) sendmsg(exitcode + " - Shuts down the bot", name) sendmsg("op me - Makes the bot try to op you", name) sendmsg( f"{prefix}join [channel(s)] - Joins the bot to the specified channel(s)", name, ) else: sendmsg("Sorry, I can't send help to bridged users.", chan) elif name.lower() in adminnames and mfind( message, ["goat.mode.activate"] ): log("GOAT DETECTION ACTIVATED", server) gmode = True elif name.lower() in adminnames and mfind( message, ["goat.mode.deactivate"] ): log("GOAT DETECTION DEACTIVATED", server) gmode = False elif mfind(message, ["quote"]): r.seed() mm = open("mastermessages.txt", "r") q = mm.readlines() sel = decode_escapes( str(r.sample(q, 1)) .strip("[]'") .replace("\\n", "") .strip('"') ) sendmsg(sel, chan) mm.close() elif mfind(message, ["join "]) and name.lower() in adminnames: newchan = message.split(" ")[1].strip() channels = joinchan(newchan, chan, channels) elif mfind(message, ["eightball", "8ball", "8b"]): if message.endswith("?"): eb = open("eightball.txt", "r") q = eb.readlines() sel = ( str(r.sample(q, 1)) .strip("[]'") .replace("\\n", "") .strip('"') ) sendmsg(f"The magic eightball says: {sel}", chan) eb.close() else: sendmsg("Please pose a Yes or No question.", chan) elif ( mfind(message, ["debug", "dbg"]) and name.lower() in adminnames ): sendmsg(f"[DEBUG] NICKLEN={nicklen}", chan) sendmsg(f"[DEBUG] ADMINS={adminnames}", chan) sendmsg(f"[DEBUG] CHANNELS={channels}", chan) elif ( mfind(message, ["raw ", "cmd "]) and name.lower() in adminnames ): sendraw(message.split(" ", 1)[1]) elif ( mfind(message, [f"reboot {rebt}", f"reboot {gblrebt}"], False) and name.lower() in adminnames ): send("QUIT :Rebooting\n") exit("Reboot") elif ( name.lower() in adminnames and message.rstrip().lower() == exitcode ): sendmsg("oh...okay. :'(", chan) for i in channels: # print(f'[LOG][{server}] i="{i}" vs chan="{chan}"') if i != chan.strip(): sendmsg("goodbye... :'(", i) send("QUIT :Shutting down\n", "UTF-8") log("QUIT", server) exit("goodbye :'(") # raise EOFError elif sucheck(message): if name.lower() in adminnames: sendmsg( "Error - system failure, contact system operator", chan ) elif "bot" in name.lower(): log("lol, no.", server) else: sendmsg("Access Denied", chan) elif np.search(message) and name in npallowed: x02 = "\x02" sendmsg( f"f.sp {message.split(':')[1].split('(')[0].strip(f' {x02}')}", chan, ) elif len(message.split("\x01")) == 3: if not CTCPHandler(message, name): CTCP = message.split("\x01")[1] if CTCP == "ACTION ducks": sendmsg("\x01ACTION gets hit by a duck\x01", chan) elif CTCP.startswith("ACTION ducks"): sendmsg( f"\x01ACTION gets hit by {CTCP.split(' ', 2)[2]}\x01", chan, ) if chan in channels and channels[chan] >= interval: r.seed() channels[chan] = 0 mm = open("mastermessages.txt", "r") q = mm.readlines() sel = decode_escapes( str(r.sample(q, 1)) .strip("[]'") .replace("\\n", "") .strip('"') ) sendmsg(f"[QUOTE] {sel}", chan) mm.close() else: if ircmsg.startswith("PING "): ping(ircmsg) elif ircmsg.startswith("ERROR :Closing Link"): exit("I got killed :'(") elif ircmsg.startswith("ERROR :Ping "): exit("Ping timeout") except KeyboardInterrupt: pass if __name__ == "__main__": main()