threading.Lock() around any access to _write_buffer

This commit is contained in:
jesopo 2019-07-04 06:56:05 +01:00
parent 61d354eb94
commit 3afbe5fd82

View file

@ -1,4 +1,4 @@
import datetime, socket, ssl, time, typing import datetime, socket, ssl, time, threading, typing
from src import IRCLine, Logging, IRCObject, utils from src import IRCLine, Logging, IRCObject, utils
THROTTLE_LINES = 4 THROTTLE_LINES = 4
@ -25,6 +25,7 @@ class Socket(IRCObject.Object):
self.connected = False self.connected = False
self._write_buffer = b"" self._write_buffer = b""
self._write_buffer_lock = threading.Lock()
self._queued_lines = [] # type: typing.List[IRCLine.SentLine] self._queued_lines = [] # type: typing.List[IRCLine.SentLine]
self._buffered_lines = [] # type: typing.List[IRCLine.SentLine] self._buffered_lines = [] # type: typing.List[IRCLine.SentLine]
self._read_buffer = b"" self._read_buffer = b""
@ -124,34 +125,37 @@ class Socket(IRCObject.Object):
self._buffered_lines.append(line) self._buffered_lines.append(line)
def send(self, line: IRCLine.SentLine, immediate: bool=False): def send(self, line: IRCLine.SentLine, immediate: bool=False):
if immediate: with self._write_buffer_lock:
self._immediate_buffer(line) if immediate:
else: self._immediate_buffer(line)
self._queued_lines.append(line) else:
self._queued_lines.append(line)
def _fill_throttle(self): def _fill_throttle(self):
if not self._write_buffer and self._throttle_when_empty: with self._write_buffer_lock:
self._throttle_when_empty = False if not self._write_buffer and self._throttle_when_empty:
self._write_throttling = True self._throttle_when_empty = False
self._recent_sends.clear() self._write_throttling = True
self._recent_sends.clear()
throttle_space = self.throttle_space() throttle_space = self.throttle_space()
if throttle_space: if throttle_space:
to_buffer = self._queued_lines[:throttle_space] to_buffer = self._queued_lines[:throttle_space]
self._queued_lines = self._queued_lines[throttle_space:] self._queued_lines = self._queued_lines[throttle_space:]
for line in to_buffer: for line in to_buffer:
self._immediate_buffer(line) self._immediate_buffer(line)
def _send(self) -> typing.List[IRCLine.SentLine]: def _send(self) -> typing.List[IRCLine.SentLine]:
bytes_written_i = self._socket.send(self._write_buffer)
bytes_written = self._write_buffer[:bytes_written_i]
sent_lines_count = bytes_written.count(b"\n")
sent_lines = [] # type: typing.List[IRCLine.SentLine] sent_lines = [] # type: typing.List[IRCLine.SentLine]
for i in range(sent_lines_count): with self._write_buffer_lock:
sent_lines.append(self._buffered_lines.pop(0)) bytes_written_i = self._socket.send(self._write_buffer)
bytes_written = self._write_buffer[:bytes_written_i]
self._write_buffer = self._write_buffer[bytes_written_i:] sent_lines_count = bytes_written.count(b"\n")
for i in range(sent_lines_count):
sent_lines.append(self._buffered_lines.pop(0))
self._write_buffer = self._write_buffer[bytes_written_i:]
self.bytes_written += bytes_written_i self.bytes_written += bytes_written_i