move IRCLine related code from utils.irc to IRCLine.py

This commit is contained in:
jesopo 2019-10-27 10:19:00 +00:00
parent c4c076137d
commit 8f4b5a0e70
9 changed files with 98 additions and 97 deletions

View file

@ -1,7 +1,7 @@
#--depends-on commands #--depends-on commands
#--depends-on permissions #--depends-on permissions
from src import ModuleManager, utils from src import IRCLine, ModuleManager, utils
class Module(ModuleManager.BaseModule): class Module(ModuleManager.BaseModule):
@utils.hook("received.command.nick", min_args=1) @utils.hook("received.command.nick", min_args=1)
@ -148,7 +148,7 @@ class Module(ModuleManager.BaseModule):
raise utils.EventError("Please provide <hostname>:[+]<port>") raise utils.EventError("Please provide <hostname>:[+]<port>")
port = int(port) port = int(port)
hostmask = utils.irc.parse_hostmask(event["args_split"][2]) hostmask = IRCLine.parse_hostmask(event["args_split"][2])
nickname = hostmask.nickname nickname = hostmask.nickname
username = hostmask.username or nickname username = hostmask.username or nickname
realname = nickname realname = nickname

View file

@ -1,5 +1,5 @@
import re import re
from src import utils from src import IRCLine, utils
STR_MORE = " (more...)" STR_MORE = " (more...)"
STR_MORE_LEN = len(STR_MORE.encode("utf8")) STR_MORE_LEN = len(STR_MORE.encode("utf8"))
@ -57,7 +57,7 @@ class Out(object):
if truncated: if truncated:
valid, truncated = self._adjust_to_word_boundaries(valid, truncated) valid, truncated = self._adjust_to_word_boundaries(valid, truncated)
line = utils.irc.parse_line(valid+STR_MORE) line = IRCLine.parse_line(valid+STR_MORE)
self._text = "%s%s" % (STR_CONTINUED, truncated) self._text = "%s%s" % (STR_CONTINUED, truncated)
else: else:
self._text = "" self._text = ""

View file

@ -4,7 +4,7 @@ class Module(ModuleManager.BaseModule):
@utils.hook("raw.send.privmsg", priority=EventManager.PRIORITY_MONITOR) @utils.hook("raw.send.privmsg", priority=EventManager.PRIORITY_MONITOR)
@utils.hook("raw.send.notice", priority=EventManager.PRIORITY_MONITOR) @utils.hook("raw.send.notice", priority=EventManager.PRIORITY_MONITOR)
def send_message(self, event): def send_message(self, event):
our_hostmask = utils.irc.parse_hostmask(event["server"].hostmask()) our_hostmask = IRCLine.parse_hostmask(event["server"].hostmask())
echo = IRCLine.ParsedLine(event["line"].command, event["line"].args, echo = IRCLine.ParsedLine(event["line"].command, event["line"].args,
source=our_hostmask, tags=event["line"].tags) source=our_hostmask, tags=event["line"].tags)

View file

@ -15,7 +15,7 @@ class Module(ModuleManager.BaseModule):
target = event["line"].args[0] target = event["line"].args[0]
lines = event["line"].args[1].split("\n") lines = event["line"].args[1].split("\n")
batch = utils.irc.IRCSendBatch("draft/multiline", batch = IRCLine.IRCSendBatch("draft/multiline",
[target]) [target])
for line in lines: for line in lines:
batch.add_line(utils.irc.protocol.privmsg(target, line)) batch.add_line(utils.irc.protocol.privmsg(target, line))

View file

@ -1,5 +1,5 @@
import enum import enum
from src import EventManager, ModuleManager, utils from src import EventManager, IRCLine, ModuleManager, utils
from . import channel, core, ircv3, message, user from . import channel, core, ircv3, message, user
class Module(ModuleManager.BaseModule): class Module(ModuleManager.BaseModule):
@ -178,7 +178,7 @@ class Module(ModuleManager.BaseModule):
batch_type = event["line"].args[1] batch_type = event["line"].args[1]
args = event["line"].args[2:] args = event["line"].args[2:]
batch = utils.irc.IRCBatch(identifier, batch_type, args, batch = IRCLine.IRCBatch(identifier, batch_type, args,
event["line"].tags, source=event["line"].source) event["line"].tags, source=event["line"].source)
event["server"].batches[identifier] = batch event["server"].batches[identifier] = batch

View file

@ -1,4 +1,4 @@
from src import utils from src import IRCLine, utils
def handle_332(events, event): def handle_332(events, event):
channel = event["server"].channels.get(event["line"].args[1]) channel = event["server"].channels.get(event["line"].args[1])
@ -18,7 +18,7 @@ def topic(events, event):
def handle_333(events, event): def handle_333(events, event):
channel = event["server"].channels.get(event["line"].args[1]) channel = event["server"].channels.get(event["line"].args[1])
topic_setter = utils.irc.parse_hostmask(event["line"].args[2]) topic_setter = IRCLine.parse_hostmask(event["line"].args[2])
topic_time = int(event["line"].args[3]) topic_time = int(event["line"].args[3])
channel.set_topic_setter(topic_setter) channel.set_topic_setter(topic_setter)
@ -42,7 +42,7 @@ def handle_353(event):
nickname = nickname[1:] nickname = nickname[1:]
if event["server"].has_capability_str("userhost-in-names"): if event["server"].has_capability_str("userhost-in-names"):
hostmask = utils.irc.parse_hostmask(nickname) hostmask = IRCLine.parse_hostmask(nickname)
nickname = hostmask.nickname nickname = hostmask.nickname
user = event["server"].get_user(hostmask.nickname, user = event["server"].get_user(hostmask.nickname,
username=hostmask.username, hostname=hostmask.hostname) username=hostmask.username, hostname=hostmask.hostname)

View file

@ -37,6 +37,21 @@ class Hostmask(object):
def __str__(self): def __str__(self):
return self.hostmask return self.hostmask
def parse_hostmask(hostmask: str) -> Hostmask:
nickname, _, username = hostmask.partition("!")
username, _, hostname = username.partition("@")
return Hostmask(nickname, username, hostname, hostmask)
MESSAGE_TAG_ESCAPED = [r"\:", r"\s", r"\\", r"\r", r"\n"]
MESSAGE_TAG_UNESCAPED = [";", " ", "\\", "\r", "\n"]
def message_tag_escape(s):
return utils.irc.multi_replace(s, MESSAGE_TAG_UNESCAPED,
MESSAGE_TAG_ESCAPED)
def message_tag_unescape(s):
unescaped = utils.irc.multi_replace(s, MESSAGE_TAG_ESCAPED,
MESSAGE_TAG_UNESCAPED)
return unescaped.replace("\\", "")
class ParsedLine(object): class ParsedLine(object):
def __init__(self, command: str, args: typing.List[str], def __init__(self, command: str, args: typing.List[str],
source: Hostmask=None, source: Hostmask=None,
@ -76,7 +91,7 @@ class ParsedLine(object):
tag_pieces = [] tag_pieces = []
for tag, value in tags.items(): for tag, value in tags.items():
if value: if value:
value_escaped = utils.irc.message_tag_escape(value) value_escaped = message_tag_escape(value)
tag_pieces.append("%s=%s" % (tag, value_escaped)) tag_pieces.append("%s=%s" % (tag, value_escaped))
else: else:
tag_pieces.append(tag) tag_pieces.append(tag)
@ -144,6 +159,41 @@ class ParsedLine(object):
return valid, overflow return valid, overflow
def parse_line(line: str) -> ParsedLine:
tags = {} # type: typing.Dict[str, typing.Any]
source = None # type: typing.Optional[Hostmask]
command = None
if line[0] == "@":
tags_prefix, line = line[1:].split(" ", 1)
for tag in filter(None, tags_prefix.split(";")):
tag, sep, value = tag.partition("=")
if value:
tags[tag] = message_tag_unescape(value)
else:
tags[tag] = None
line, trailing_separator, trailing_split = line.partition(" :")
trailing = None # type: typing.Optional[str]
if trailing_separator:
trailing = trailing_split
if line[0] == ":":
source_str, line = line[1:].split(" ", 1)
source = parse_hostmask(source_str)
command, sep, line = line.partition(" ")
args = [] # type: typing.List[str]
if line:
# this is so that `args` is empty if `line` is empty
args = line.split(" ")
if not trailing == None:
args.append(typing.cast(str, trailing))
return ParsedLine(command, args, source, tags)
class SentLine(IRCObject.Object): class SentLine(IRCObject.Object):
def __init__(self, events: "EventManager.Events", def __init__(self, events: "EventManager.Events",
send_time: datetime.datetime, hostmask: str, line: ParsedLine): send_time: datetime.datetime, hostmask: str, line: ParsedLine):
@ -161,3 +211,32 @@ class SentLine(IRCObject.Object):
return self.parsed_line.truncate(self._hostmask)[0] return self.parsed_line.truncate(self._hostmask)[0]
def for_wire(self) -> bytes: def for_wire(self) -> bytes:
return b"%s\r\n" % self._for_wire().encode("utf8") return b"%s\r\n" % self._for_wire().encode("utf8")
class IRCBatch(object):
def __init__(self, identifier: str, batch_type: str, args: typing.List[str],
tags: typing.Dict[str, str]=None, source: Hostmask=None):
self.identifier = identifier
self.type = batch_type
self.args = args
self.tags = tags or {}
self.source = source
self._lines = [] # type: typing.List[ParsedLine]
def add_line(self, line: ParsedLine):
self._lines.append(line)
def get_lines(self) -> typing.List[ParsedLine]:
return self._lines
class IRCSendBatch(IRCBatch):
def __init__(self, batch_type: str, args: typing.List[str],
tags: typing.Dict[str, str]=None):
IRCBatch.__init__(self, str(uuid.uuid4()), batch_type, args, tags)
def get_lines(self) -> typing.List[ParsedLine]:
lines = []
for line in self._lines:
line.add_tag("batch", self.identifier)
lines.append(line)
lines.insert(0, ParsedLine("BATCH",
["+%s" % self.identifier, self.type]))
lines.append(ParsedLine("BATCH", ["-%s" % self.identifier]))
return lines

View file

@ -229,7 +229,7 @@ class Server(IRCObject.Object):
for line in lines: for line in lines:
self.bot.log.debug("%s (raw recv) | %s", [str(self), line]) self.bot.log.debug("%s (raw recv) | %s", [str(self), line])
self.events.on("raw.received").call_unsafe(server=self, self.events.on("raw.received").call_unsafe(server=self,
line=utils.irc.parse_line(line)) line=IRCLine.parse_line(line))
self.check_users() self.check_users()
def check_users(self): def check_users(self):
for user in self.new_users: for user in self.new_users:
@ -294,7 +294,7 @@ class Server(IRCObject.Object):
return line_obj return line_obj
return None return None
def send_raw(self, line: str): def send_raw(self, line: str):
return self.send(utils.irc.parse_line(line)) return self.send(IRCLine.parse_line(line))
def send_user(self, username: str, realname: str def send_user(self, username: str, realname: str
) -> typing.Optional[IRCLine.SentLine]: ) -> typing.Optional[IRCLine.SentLine]:

View file

@ -1,5 +1,5 @@
import json, string, re, typing, uuid import json, string, re, typing, uuid
from src import IRCLine, utils from src import utils
from . import protocol from . import protocol
ASCII_UPPER = string.ascii_uppercase ASCII_UPPER = string.ascii_uppercase
@ -10,7 +10,7 @@ RFC1459_UPPER = STRICT_RFC1459_UPPER+"^"
RFC1459_LOWER = STRICT_RFC1459_LOWER+"~" RFC1459_LOWER = STRICT_RFC1459_LOWER+"~"
# case mapping lowercase/uppcase logic # case mapping lowercase/uppcase logic
def _multi_replace(s: str, def multi_replace(s: str,
chars1: typing.Iterable[str], chars1: typing.Iterable[str],
chars2: typing.Iterable[str]) -> str: chars2: typing.Iterable[str]) -> str:
for char1, char2 in zip(chars1, chars2): for char1, char2 in zip(chars1, chars2):
@ -18,11 +18,11 @@ def _multi_replace(s: str,
return s return s
def lower(case_mapping: str, s: str) -> str: def lower(case_mapping: str, s: str) -> str:
if case_mapping == "ascii": if case_mapping == "ascii":
return _multi_replace(s, ASCII_UPPER, ASCII_LOWER) return multi_replace(s, ASCII_UPPER, ASCII_LOWER)
elif case_mapping == "rfc1459": elif case_mapping == "rfc1459":
return _multi_replace(s, RFC1459_UPPER, RFC1459_LOWER) return multi_replace(s, RFC1459_UPPER, RFC1459_LOWER)
elif case_mapping == "strict-rfc1459": elif case_mapping == "strict-rfc1459":
return _multi_replace(s, STRICT_RFC1459_UPPER, STRICT_RFC1459_LOWER) return multi_replace(s, STRICT_RFC1459_UPPER, STRICT_RFC1459_LOWER)
else: else:
raise ValueError("unknown casemapping '%s'" % case_mapping) raise ValueError("unknown casemapping '%s'" % case_mapping)
@ -30,55 +30,6 @@ def lower(case_mapping: str, s: str) -> str:
def equals(case_mapping: str, s1: str, s2: str) -> bool: def equals(case_mapping: str, s1: str, s2: str) -> bool:
return lower(case_mapping, s1) == lower(case_mapping, s2) return lower(case_mapping, s1) == lower(case_mapping, s2)
def parse_hostmask(hostmask: str) -> IRCLine.Hostmask:
nickname, _, username = hostmask.partition("!")
username, _, hostname = username.partition("@")
return IRCLine.Hostmask(nickname, username, hostname, hostmask)
MESSAGE_TAG_ESCAPED = [r"\:", r"\s", r"\\", r"\r", r"\n"]
MESSAGE_TAG_UNESCAPED = [";", " ", "\\", "\r", "\n"]
def message_tag_escape(s):
return _multi_replace(s, MESSAGE_TAG_UNESCAPED, MESSAGE_TAG_ESCAPED)
def message_tag_unescape(s):
unescaped = _multi_replace(s, MESSAGE_TAG_ESCAPED, MESSAGE_TAG_UNESCAPED)
return unescaped.replace("\\", "")
def parse_line(line: str) -> IRCLine.ParsedLine:
tags = {} # type: typing.Dict[str, typing.Any]
source = None # type: typing.Optional[IRCLine.Hostmask]
command = None
if line[0] == "@":
tags_prefix, line = line[1:].split(" ", 1)
for tag in filter(None, tags_prefix.split(";")):
tag, sep, value = tag.partition("=")
if value:
tags[tag] = message_tag_unescape(value)
else:
tags[tag] = None
line, trailing_separator, trailing_split = line.partition(" :")
trailing = None # type: typing.Optional[str]
if trailing_separator:
trailing = trailing_split
if line[0] == ":":
source_str, line = line[1:].split(" ", 1)
source = parse_hostmask(source_str)
command, sep, line = line.partition(" ")
args = [] # type: typing.List[str]
if line:
# this is so that `args` is empty if `line` is empty
args = line.split(" ")
if not trailing == None:
args.append(typing.cast(str, trailing))
return IRCLine.ParsedLine(command, args, source, tags)
REGEX_COLOR = re.compile("%s(?:(\d{1,2})(?:,(\d{1,2}))?)?" % utils.consts.COLOR) REGEX_COLOR = re.compile("%s(?:(\d{1,2})(?:,(\d{1,2}))?)?" % utils.consts.COLOR)
def color(s: str, foreground: utils.consts.IRCColor, def color(s: str, foreground: utils.consts.IRCColor,
@ -256,35 +207,6 @@ def parse_ctcp(s: str) -> typing.Optional[CTCPMessage]:
return None return None
class IRCBatch(object):
def __init__(self, identifier: str, batch_type: str, args: typing.List[str],
tags: typing.Dict[str, str]=None, source: IRCLine.Hostmask=None):
self.identifier = identifier
self.type = batch_type
self.args = args
self.tags = tags or {}
self.source = source
self._lines = [] # type: typing.List[IRCLine.ParsedLine]
def add_line(self, line: IRCLine.ParsedLine):
self._lines.append(line)
def get_lines(self) -> typing.List[IRCLine.ParsedLine]:
return self._lines
class IRCSendBatch(IRCBatch):
def __init__(self, batch_type: str, args: typing.List[str],
tags: typing.Dict[str, str]=None):
IRCBatch.__init__(self, str(uuid.uuid4()), batch_type, args, tags)
def get_lines(self) -> typing.List[IRCLine.ParsedLine]:
lines = []
for line in self._lines:
line.add_tag("batch", self.identifier)
lines.append(line)
lines.insert(0, IRCLine.ParsedLine("BATCH",
["+%s" % self.identifier, self.type]))
lines.append(IRCLine.ParsedLine("BATCH", ["-%s" % self.identifier]))
return lines
class Capability(object): class Capability(object):
def __init__(self, ratified_name: typing.Optional[str], def __init__(self, ratified_name: typing.Optional[str],
draft_name: str=None, alias: str=None, draft_name: str=None, alias: str=None,