Track lines-to-be-sent up until they're completely sent, queue up more than 1
line at a time in write buffer if we've got space (src/IRCServer.py)
This commit is contained in:
parent
7f7941f564
commit
7792be247c
1 changed files with 43 additions and 14 deletions
|
@ -1,11 +1,14 @@
|
|||
import collections, socket, ssl, sys, time, typing
|
||||
from src import EventManager, IRCBot, IRCChannel, IRCChannels, IRCObject
|
||||
from src import IRCUser, utils
|
||||
import collections, datetime, socket, ssl, sys, time, typing
|
||||
from src import EventManager, IRCBot, IRCChannel, IRCChannels, IRCLine
|
||||
from src import IRCObject, IRCUser, utils
|
||||
|
||||
THROTTLE_LINES = 4
|
||||
THROTTLE_SECONDS = 1
|
||||
UNTHROTTLED_MAX_LINES = 10
|
||||
|
||||
READ_TIMEOUT_SECONDS = 120
|
||||
PING_INTERVAL_SECONDS = 30
|
||||
|
||||
LINE_CUTOFF = 450
|
||||
|
||||
class Server(IRCObject.Object):
|
||||
|
@ -37,7 +40,8 @@ class Server(IRCObject.Object):
|
|||
self.cap_started = False
|
||||
|
||||
self.write_buffer = b""
|
||||
self.buffered_lines = [] # type: typing.List[bytes]
|
||||
self.queued_lines = [] # type: typing.List[IRCLine.Line]
|
||||
self.buffered_lines = [] # type: typing.List[IRCLine.Line]
|
||||
self._write_throttling = False
|
||||
self.read_buffer = b""
|
||||
self.recent_sends = [] # type: typing.List[float]
|
||||
|
@ -313,16 +317,31 @@ class Server(IRCObject.Object):
|
|||
encoded = line.split("\n")[0].strip("\r").encode("utf8")
|
||||
if len(encoded) > LINE_CUTOFF:
|
||||
encoded = encoded[:LINE_CUTOFF]
|
||||
self.buffered_lines.append(encoded + b"\r\n")
|
||||
|
||||
encoded = b"%s\r\n" % encoded
|
||||
line_obj = IRCLine.Line(datetime.datetime.utcnow(), encoded)
|
||||
self.queued_lines.append(line_obj)
|
||||
|
||||
self.bot.log.debug("%s (raw send) | %s", [str(self), line])
|
||||
|
||||
def _send(self):
|
||||
if not len(self.write_buffer):
|
||||
self.write_buffer = self.buffered_lines.pop(0)
|
||||
throttle_space = self.throttle_space()
|
||||
to_buffer = self.queued_lines[:throttle_space]
|
||||
self.queued_lines = self.queued_lines[throttle_space:]
|
||||
for line in to_buffer:
|
||||
self.write_buffer += line.data
|
||||
self.buffered_lines.append(line)
|
||||
|
||||
bytes_written = self.socket.send(self.write_buffer)
|
||||
self.bytes_written += bytes_written
|
||||
self.write_buffer = self.write_buffer[bytes_written:]
|
||||
bytes_written_i = self.socket.send(self.write_buffer)
|
||||
bytes_written = self.write_buffer[:bytes_written_i]
|
||||
lines_sent = bytes_written.count(b"\r\n")
|
||||
for i in range(lines_sent):
|
||||
self.buffered_lines.pop(0).sent()
|
||||
|
||||
self.write_buffer = self.write_buffer[bytes_written_i:]
|
||||
|
||||
self.bytes_written += bytes_written_i
|
||||
|
||||
if not self.waiting_send():
|
||||
self.events.on("writebuffer.empty").call(server=self)
|
||||
|
@ -331,13 +350,12 @@ class Server(IRCObject.Object):
|
|||
self.recent_sends.append(now)
|
||||
self.last_send = now
|
||||
def waiting_send(self) -> bool:
|
||||
return bool(len(self.write_buffer)) or bool(len(self.buffered_lines))
|
||||
return bool(len(self.write_buffer)) or bool(len(self.queued_lines))
|
||||
|
||||
def throttle_done(self) -> bool:
|
||||
return self.send_throttle_timeout() == 0
|
||||
def send_throttle_timeout(self) -> float:
|
||||
if len(self.write_buffer) or not self._write_throttling:
|
||||
return 0
|
||||
|
||||
def throttle_prune(self):
|
||||
now = time.monotonic()
|
||||
popped = 0
|
||||
for i, recent_send in enumerate(self.recent_sends[:]):
|
||||
|
@ -346,12 +364,23 @@ class Server(IRCObject.Object):
|
|||
self.recent_sends.pop(i-popped)
|
||||
popped += 1
|
||||
|
||||
if len(self.recent_sends) < THROTTLE_LINES:
|
||||
def throttle_space(self) -> int:
|
||||
if not self._write_throttling:
|
||||
return UNTHROTTLED_MAX_LINES
|
||||
return max(0, THROTTLE_LINES-len(self.recent_sends))
|
||||
|
||||
def send_throttle_timeout(self) -> float:
|
||||
if len(self.write_buffer) or not self._write_throttling:
|
||||
return 0
|
||||
|
||||
self.throttle_prune()
|
||||
if self.throttle_space() > 0:
|
||||
return 0
|
||||
|
||||
time_left = self.recent_sends[0]+THROTTLE_SECONDS
|
||||
time_left = time_left-now
|
||||
return time_left
|
||||
|
||||
def set_write_throttling(self, is_on: bool):
|
||||
self._write_throttling = is_on
|
||||
|
||||
|
|
Loading…
Reference in a new issue