Simplify SentLine by shifting truncation to ParsedLine (and commands.outs)
This commit is contained in:
parent
88e796e334
commit
621830c360
2 changed files with 40 additions and 46 deletions
|
@ -2,6 +2,7 @@ import re
|
|||
from src import utils
|
||||
|
||||
STR_MORE = " (more...)"
|
||||
STR_MORE_LEN = len(STR_MORE.encode("utf8"))
|
||||
STR_CONTINUED = "(...continued) "
|
||||
|
||||
class Out(object):
|
||||
|
@ -42,14 +43,21 @@ class Out(object):
|
|||
line = line_factory(self._target_str, full_text, tags=self._tags)
|
||||
if self._assured:
|
||||
line.assure()
|
||||
sent_line = self.server.send(line)
|
||||
|
||||
if sent_line:
|
||||
sent_line.truncate_marker = STR_MORE
|
||||
if sent_line.truncated():
|
||||
self._text = "%s%s" % (STR_CONTINUED, sent_line.truncated())
|
||||
else:
|
||||
self._text = ""
|
||||
valid, truncated = line.truncate(self.server.hostmask())
|
||||
|
||||
if truncated:
|
||||
truncated = valid[-STR_MORE_LEN:]+truncated
|
||||
print(valid)
|
||||
new_line = valid[:-STR_MORE_LEN]+STR_MORE
|
||||
print(len(new_line))
|
||||
line = utils.irc.parse_line(new_line)
|
||||
|
||||
self._text = "%s%s" % (STR_CONTINUED, truncated)
|
||||
else:
|
||||
self._text = ""
|
||||
|
||||
sent_line = self.server.send(line)
|
||||
|
||||
def set_prefix(self, prefix):
|
||||
self.module_name = prefix
|
||||
|
|
|
@ -2,7 +2,7 @@ import datetime, typing
|
|||
from src import IRCObject, utils
|
||||
|
||||
# this should be 510 (RFC1459, 512 with \r\n) but a server BitBot uses is broken
|
||||
LINE_CUTOFF = 470
|
||||
LINE_MAX = 470
|
||||
|
||||
class IRCArgs(object):
|
||||
def __init__(self, args: typing.List[str]):
|
||||
|
@ -100,6 +100,26 @@ class ParsedLine(object):
|
|||
|
||||
return " ".join(pieces).split("\n")[0].strip("\r")
|
||||
|
||||
def _line_max(self, hostmask: str) -> int:
|
||||
return LINE_MAX-len((":%s " % hostmask).encode("utf8"))
|
||||
def truncate(self, hostmask: str) -> typing.Tuple[str, str]:
|
||||
valid_bytes = b""
|
||||
valid_index = -1
|
||||
|
||||
line_max = self._line_max(hostmask)
|
||||
|
||||
formatted = self.format()
|
||||
for i, char in enumerate(formatted):
|
||||
encoded_char = char.encode("utf8")
|
||||
if len(valid_bytes)+len(encoded_char) > line_max:
|
||||
break
|
||||
else:
|
||||
valid_bytes += encoded_char
|
||||
valid_index = i
|
||||
valid_index += 1
|
||||
|
||||
return formatted[:valid_index], formatted[valid_index:]
|
||||
|
||||
class SentLine(IRCObject.Object):
|
||||
def __init__(self, events: "EventManager.EventHook",
|
||||
send_time: datetime.datetime, hostmask: str, line: ParsedLine):
|
||||
|
@ -108,46 +128,12 @@ class SentLine(IRCObject.Object):
|
|||
self._hostmask = hostmask
|
||||
self.parsed_line = line
|
||||
|
||||
self.truncate_marker: typing.Optional[str] = None
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return "IRCLine.SentLine(%s)" % self.__str__()
|
||||
def __str__(self) -> str:
|
||||
return self.decoded_data()
|
||||
return self._for_wire()
|
||||
|
||||
def _char_limit(self) -> int:
|
||||
return LINE_CUTOFF-len(":%s " % self._hostmask)
|
||||
|
||||
def _encode_truncate(self) -> typing.Tuple[bytes, str]:
|
||||
line = self.parsed_line.format()
|
||||
byte_max = self._char_limit()
|
||||
encoded = b""
|
||||
truncated = ""
|
||||
truncate_marker = b""
|
||||
if not self.truncate_marker == None:
|
||||
truncate_marker = typing.cast(str, self.truncate_marker
|
||||
).encode("utf8")
|
||||
|
||||
for i, character in enumerate(line):
|
||||
encoded_character = character.encode("utf8")
|
||||
new_len = len(encoded + encoded_character)
|
||||
if truncate_marker and (byte_max-new_len) < len(truncate_marker):
|
||||
encoded += truncate_marker
|
||||
truncated = line[i:]
|
||||
break
|
||||
elif new_len > byte_max:
|
||||
truncated = line[i:]
|
||||
break
|
||||
else:
|
||||
encoded += encoded_character
|
||||
return (encoded, truncated)
|
||||
|
||||
def _for_wire(self) -> bytes:
|
||||
return self._encode_truncate()[0]
|
||||
def _for_wire(self) -> str:
|
||||
return self.parsed_line.truncate(self._hostmask)[0]
|
||||
def for_wire(self) -> bytes:
|
||||
return b"%s\r\n" % self._for_wire()
|
||||
|
||||
def decoded_data(self) -> str:
|
||||
return self._for_wire().decode("utf8")
|
||||
def truncated(self) -> str:
|
||||
return self._encode_truncate()[1]
|
||||
return b"%s\r\n" % self._for_wire().encode("utf8")
|
||||
|
|
Loading…
Reference in a new issue