From 7792be247ce0e4b3b9f60b4dab690e68b3d7280f Mon Sep 17 00:00:00 2001 From: jesopo Date: Sun, 10 Feb 2019 14:09:27 +0000 Subject: [PATCH] Track lines-to-be-sent up until they're completely sent, queue up more than 1 line at a time in write buffer if we've got space (src/IRCServer.py) --- src/IRCServer.py | 57 ++++++++++++++++++++++++++++++++++++------------ 1 file changed, 43 insertions(+), 14 deletions(-) diff --git a/src/IRCServer.py b/src/IRCServer.py index 417825dc..2ecba0e0 100644 --- a/src/IRCServer.py +++ b/src/IRCServer.py @@ -1,11 +1,14 @@ -import collections, socket, ssl, sys, time, typing -from src import EventManager, IRCBot, IRCChannel, IRCChannels, IRCObject -from src import IRCUser, utils +import collections, datetime, socket, ssl, sys, time, typing +from src import EventManager, IRCBot, IRCChannel, IRCChannels, IRCLine +from src import IRCObject, IRCUser, utils THROTTLE_LINES = 4 THROTTLE_SECONDS = 1 +UNTHROTTLED_MAX_LINES = 10 + READ_TIMEOUT_SECONDS = 120 PING_INTERVAL_SECONDS = 30 + LINE_CUTOFF = 450 class Server(IRCObject.Object): @@ -37,7 +40,8 @@ class Server(IRCObject.Object): self.cap_started = False self.write_buffer = b"" - self.buffered_lines = [] # type: typing.List[bytes] + self.queued_lines = [] # type: typing.List[IRCLine.Line] + self.buffered_lines = [] # type: typing.List[IRCLine.Line] self._write_throttling = False self.read_buffer = b"" self.recent_sends = [] # type: typing.List[float] @@ -313,16 +317,31 @@ class Server(IRCObject.Object): encoded = line.split("\n")[0].strip("\r").encode("utf8") if len(encoded) > LINE_CUTOFF: encoded = encoded[:LINE_CUTOFF] - self.buffered_lines.append(encoded + b"\r\n") + + encoded = b"%s\r\n" % encoded + line_obj = IRCLine.Line(datetime.datetime.utcnow(), encoded) + self.queued_lines.append(line_obj) + self.bot.log.debug("%s (raw send) | %s", [str(self), line]) def _send(self): if not len(self.write_buffer): - self.write_buffer = self.buffered_lines.pop(0) + throttle_space = self.throttle_space() + to_buffer = self.queued_lines[:throttle_space] + self.queued_lines = self.queued_lines[throttle_space:] + for line in to_buffer: + self.write_buffer += line.data + self.buffered_lines.append(line) - bytes_written = self.socket.send(self.write_buffer) - self.bytes_written += bytes_written - self.write_buffer = self.write_buffer[bytes_written:] + bytes_written_i = self.socket.send(self.write_buffer) + bytes_written = self.write_buffer[:bytes_written_i] + lines_sent = bytes_written.count(b"\r\n") + for i in range(lines_sent): + self.buffered_lines.pop(0).sent() + + self.write_buffer = self.write_buffer[bytes_written_i:] + + self.bytes_written += bytes_written_i if not self.waiting_send(): self.events.on("writebuffer.empty").call(server=self) @@ -331,13 +350,12 @@ class Server(IRCObject.Object): self.recent_sends.append(now) self.last_send = now def waiting_send(self) -> bool: - return bool(len(self.write_buffer)) or bool(len(self.buffered_lines)) + return bool(len(self.write_buffer)) or bool(len(self.queued_lines)) + def throttle_done(self) -> bool: return self.send_throttle_timeout() == 0 - def send_throttle_timeout(self) -> float: - if len(self.write_buffer) or not self._write_throttling: - return 0 + def throttle_prune(self): now = time.monotonic() popped = 0 for i, recent_send in enumerate(self.recent_sends[:]): @@ -346,12 +364,23 @@ class Server(IRCObject.Object): self.recent_sends.pop(i-popped) popped += 1 - if len(self.recent_sends) < THROTTLE_LINES: + def throttle_space(self) -> int: + if not self._write_throttling: + return UNTHROTTLED_MAX_LINES + return max(0, THROTTLE_LINES-len(self.recent_sends)) + + def send_throttle_timeout(self) -> float: + if len(self.write_buffer) or not self._write_throttling: + return 0 + + self.throttle_prune() + if self.throttle_space() > 0: return 0 time_left = self.recent_sends[0]+THROTTLE_SECONDS time_left = time_left-now return time_left + def set_write_throttling(self, is_on: bool): self._write_throttling = is_on