Switch to using threading.Condition to trigger write thread
This commit is contained in:
parent
1f1b63ea6d
commit
f3b6d6df0a
1 changed files with 15 additions and 19 deletions
|
@ -36,14 +36,9 @@ class Bot(object):
|
||||||
self._rtrigger_server, self._rtrigger_client = socket.socketpair()
|
self._rtrigger_server, self._rtrigger_client = socket.socketpair()
|
||||||
self._read_poll.register(self._rtrigger_server.fileno(), select.EPOLLIN)
|
self._read_poll.register(self._rtrigger_server.fileno(), select.EPOLLIN)
|
||||||
|
|
||||||
self._wtrigger_server, self._wtrigger_client = socket.socketpair()
|
|
||||||
self._write_poll.register(self._wtrigger_server.fileno(),
|
|
||||||
select.EPOLLIN)
|
|
||||||
|
|
||||||
self._rtrigger_lock = threading.Lock()
|
self._rtrigger_lock = threading.Lock()
|
||||||
self._rtriggered = False
|
self._rtriggered = False
|
||||||
self._wtrigger_lock = threading.Lock()
|
self._write_condition = threading.Condition()
|
||||||
self._wtriggered = False
|
|
||||||
|
|
||||||
self._read_thread = None
|
self._read_thread = None
|
||||||
self._write_thread = None
|
self._write_thread = None
|
||||||
|
@ -59,10 +54,8 @@ class Bot(object):
|
||||||
self._rtriggered = True
|
self._rtriggered = True
|
||||||
self._rtrigger_client.send(b"TRIGGER")
|
self._rtrigger_client.send(b"TRIGGER")
|
||||||
def trigger_write(self):
|
def trigger_write(self):
|
||||||
with self._wtrigger_lock:
|
with self._write_condition:
|
||||||
if not self._wtriggered:
|
self._write_condition.notify()
|
||||||
self._wtriggered = True
|
|
||||||
self._wtrigger_client.send(b"TRIGGER")
|
|
||||||
|
|
||||||
def trigger(self,
|
def trigger(self,
|
||||||
func: typing.Optional[typing.Callable[[], typing.Any]]=None,
|
func: typing.Optional[typing.Callable[[], typing.Any]]=None,
|
||||||
|
@ -252,18 +245,21 @@ class Bot(object):
|
||||||
if not self.servers:
|
if not self.servers:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
with self._write_condition:
|
||||||
|
writeable = False
|
||||||
for fd, server in self.servers.items():
|
for fd, server in self.servers.items():
|
||||||
if server.socket.waiting_immediate_send():
|
if server.socket.waiting_immediate_send():
|
||||||
self._write_poll.register(fd, select.EPOLLOUT)
|
self._write_poll.register(fd, select.EPOLLOUT)
|
||||||
|
writeable = True
|
||||||
|
|
||||||
|
if not writeable:
|
||||||
|
self._write_condition.wait()
|
||||||
|
continue
|
||||||
|
|
||||||
events = self._write_poll.poll()
|
events = self._write_poll.poll()
|
||||||
|
|
||||||
for fd, event in events:
|
for fd, event in events:
|
||||||
if fd == self._wtrigger_server.fileno():
|
if event & select.EPOLLOUT:
|
||||||
# throw away data from trigger socket
|
|
||||||
self._wtrigger_server.recv(1024)
|
|
||||||
with self._wtrigger_lock:
|
|
||||||
self._wtriggered = False
|
|
||||||
elif event & select.EPOLLOUT:
|
|
||||||
self._write_poll.unregister(fd)
|
self._write_poll.unregister(fd)
|
||||||
if fd in self.servers:
|
if fd in self.servers:
|
||||||
server = self.servers[fd]
|
server = self.servers[fd]
|
||||||
|
|
Loading…
Reference in a new issue