import queue, os, select, socket, threading, time, traceback, typing, uuid from src import EventManager, Exports, IRCServer, Logging, ModuleManager from src import Socket, utils VERSION = "v1.2.1" TRIGGER_RETURN = 1 TRIGGER_EXCEPTION = 2 class Bot(object): def __init__(self, directory, args, cache, config, database, events, exports, log, modules, timers): self.directory = directory self.args = args self.cache = cache self.config = config self.database = database self._events = events self._exports = exports self.log = log self.modules = modules self._timers = timers self.start_time = time.time() self.lock = threading.Lock() self.running = True self.poll = select.epoll() self.servers = {} self.other_sockets = {} self._trigger_server, self._trigger_client = socket.socketpair() self.add_socket(Socket.Socket(self._trigger_server, lambda _, s: None)) self._trigger_functions = [] self._events.on("timer.reconnect").hook(self._timed_reconnect) def trigger(self, func: typing.Optional[typing.Callable[[], typing.Any]]=None ) -> typing.Any: func = func or (lambda: None) if utils.is_main_thread(): returned = func() self._trigger_client.send(b"TRIGGER") return returned self.lock.acquire() func_queue = queue.Queue(1) # type: queue.Queue[str] self._trigger_functions.append([func, func_queue]) self.lock.release() self._trigger_client.send(b"TRIGGER") type, returned = func_queue.get(block=True) if type == TRIGGER_EXCEPTION: raise returned elif type == TRIGGER_RETURN: return returned def add_server(self, server_id: int, connect: bool = True, connection_params: typing.Optional[ utils.irc.IRCConnectionParameters]=None) -> IRCServer.Server: if not connection_params: connection_params = utils.irc.IRCConnectionParameters( *self.database.servers.get(server_id)) new_server = IRCServer.Server(self, self._events, connection_params.id, connection_params.alias, connection_params) self._events.on("new.server").call(server=new_server) if not connect: return new_server self.connect(new_server) return new_server def add_socket(self, sock: socket.socket): self.other_sockets[sock.fileno()] = sock self.poll.register(sock.fileno(), select.EPOLLIN) def remove_socket(self, sock: socket.socket): del self.other_sockets[sock.fileno()] self.poll.unregister(sock.fileno()) def get_server(self, id: int) -> typing.Optional[IRCServer.Server]: for server in self.servers.values(): if server.id == id: return server return None def connect(self, server: IRCServer.Server) -> bool: try: server.connect() except Exception as e: self.log.warn("Failed to connect to %s: %s", [str(server), str(e)]) return False self.servers[server.fileno()] = server self.poll.register(server.fileno(), select.EPOLLOUT) return True def next_send(self) -> typing.Optional[float]: next = None for server in self.servers.values(): timeout = server.socket.send_throttle_timeout() if (server.socket.waiting_send() and (next == None or timeout < next)): next = timeout return next def next_ping(self) -> typing.Optional[float]: timeouts = [] for server in self.servers.values(): timeout = server.until_next_ping() if not timeout == None: timeouts.append(timeout) if not timeouts: return None return min(timeouts) def next_read_timeout(self) -> typing.Optional[float]: timeouts = [] for server in self.servers.values(): timeouts.append(server.until_read_timeout()) if not timeouts: return None return min(timeouts) def get_poll_timeout(self) -> float: timeouts = [] timeouts.append(self._timers.next()) timeouts.append(self.next_send()) timeouts.append(self.next_ping()) timeouts.append(self.next_read_timeout()) timeouts.append(self.cache.next_expiration()) return min([timeout for timeout in timeouts if not timeout == None]) def register_read(self, server: IRCServer.Server): self.poll.modify(server.fileno(), select.EPOLLIN) def register_write(self, server: IRCServer.Server): self.poll.modify(server.fileno(), select.EPOLLOUT) def register_both(self, server: IRCServer.Server): self.poll.modify(server.fileno(), select.EPOLLIN|select.EPOLLOUT) def disconnect(self, server: IRCServer.Server): try: self.poll.unregister(server.fileno()) except FileNotFoundError: pass del self.servers[server.fileno()] def _timed_reconnect(self, event: EventManager.Event): if not self.reconnect(event["server_id"], event.get("connection_params", None)): event["timer"].redo() def reconnect(self, server_id: int, connection_params: typing.Optional[ utils.irc.IRCConnectionParameters]=None) -> bool: server = self.add_server(server_id, False, connection_params) if self.connect(server): self.servers[server.fileno()] = server return True return False def set_setting(self, setting: str, value: typing.Any): self.database.bot_settings.set(setting, value) def get_setting(self, setting: str, default: typing.Any=None) -> typing.Any: return self.database.bot_settings.get(setting, default) def find_settings(self, pattern: str, default: typing.Any=[] ) -> typing.List[typing.Any]: return self.database.bot_settings.find(pattern, default) def find_settings_prefix(self, prefix: str, default: typing.Any=[] ) -> typing.List[typing.Any]: return self.database.bot_settings.find_prefix( prefix, default) def del_setting(self, setting: str): self.database.bot_settings.delete(setting) def run(self): while self.running: if not self.servers: break events = self.poll.poll(self.get_poll_timeout()) self.lock.acquire() self._timers.call() self.cache.expire() for func, func_queue in self._trigger_functions: try: returned = func() type = TRIGGER_RETURN except Exception as e: returned = e type = TRIGGER_EXCEPTION func_queue.put([type, returned]) self._trigger_functions.clear() for fd, event in events: sock = None irc = False if fd in self.servers: sock = self.servers[fd] irc = True elif fd in self.other_sockets: sock = self.other_sockets[fd] if sock: if event & select.EPOLLIN: data = sock.read() if data == None: sock.disconnect() continue for piece in data: sock.parse_data(piece) elif event & select.EPOLLOUT: sock._send() if sock.fileno() in self.servers: self.register_read(sock) elif event & select.EPULLHUP: self.log.warn("Recieved EPOLLHUP for %s", [str(sock)]) sock.disconnect() for server in list(self.servers.values()): if server.read_timed_out(): self.log.warn("Pinged out from %s", [str(server)]) server.disconnect() elif server.ping_due() and not server.ping_sent: server.send_ping() server.ping_sent = True if not server.socket.connected: self._events.on("server.disconnect").call(server=server) self.disconnect(server) if not self.get_server(server.id): reconnect_delay = self.config.get("reconnect-delay", 10) self._timers.add("reconnect", reconnect_delay, server_id=server.id) self.log.warn( "Disconnected from %s, reconnecting in %d seconds", [str(server), reconnect_delay]) elif (server.socket.waiting_send() and server.socket.throttle_done()): self.register_both(server) for sock in list(self.other_sockets.values()): if not sock.connected: self.remove_socket(sock) elif sock.waiting_send(): self.register_both(sock) self.lock.release()