Shift formatting protocol data to it's own file: utils.irc.protocol
This commit is contained in:
parent
d937145f93
commit
186b6b82a2
8 changed files with 135 additions and 389 deletions
|
@ -15,7 +15,7 @@ class Module(ModuleManager.BaseModule):
|
|||
modes = self._get_modes(channel, user)
|
||||
if modes:
|
||||
channel.send_mode("+%s" % "".join(modes),
|
||||
" ".join([user.nickname for mode in modes]))
|
||||
[user.nickname for mode in modes])
|
||||
|
||||
@utils.hook("received.join")
|
||||
def on_join(self, event):
|
||||
|
@ -41,7 +41,7 @@ class Module(ModuleManager.BaseModule):
|
|||
modes = [item[0] for item in chunk]
|
||||
nicknames = [item[1] for item in chunk]
|
||||
channel.send_mode(
|
||||
"+%s" % "".join(modes), " ".join(nicknames))
|
||||
"+%s" % "".join(modes), nicknames)
|
||||
@utils.hook("received.command.syncmodes", channel_only=True)
|
||||
def sync_modes(self, event):
|
||||
"""
|
||||
|
|
|
@ -217,7 +217,7 @@ class Module(ModuleManager.BaseModule):
|
|||
"""
|
||||
target = event["user"].nickname if not event["args_split"] else event[
|
||||
"args_split"][0]
|
||||
event["target"].send_mode("+o", target)
|
||||
event["target"].send_mode("+o", [target])
|
||||
@utils.hook("received.command.deop", channel_only=True)
|
||||
def deop(self, event):
|
||||
"""
|
||||
|
@ -228,7 +228,7 @@ class Module(ModuleManager.BaseModule):
|
|||
"""
|
||||
target = event["user"].nickname if not event["args_split"] else event[
|
||||
"args_split"][0]
|
||||
event["target"].send_mode("-o", target)
|
||||
event["target"].send_mode("-o", [target])
|
||||
|
||||
@utils.hook("received.command.voice", channel_only=True)
|
||||
def voice(self, event):
|
||||
|
@ -240,7 +240,7 @@ class Module(ModuleManager.BaseModule):
|
|||
"""
|
||||
target = event["user"].nickname if not event["args_split"] else event[
|
||||
"args_split"][0]
|
||||
event["target"].send_mode("+v", target)
|
||||
event["target"].send_mode("+v", [target])
|
||||
@utils.hook("received.command.devoice", channel_only=True)
|
||||
def devoice(self, event):
|
||||
"""
|
||||
|
@ -251,7 +251,7 @@ class Module(ModuleManager.BaseModule):
|
|||
"""
|
||||
target = event["user"].nickname if not event["args_split"] else event[
|
||||
"args_split"][0]
|
||||
event["target"].send_mode("-v", target)
|
||||
event["target"].send_mode("-v", [target])
|
||||
|
||||
@utils.hook("received.command.topic", min_args=1, channel_only=True,
|
||||
remove_empty=False)
|
||||
|
|
|
@ -21,7 +21,7 @@ class Module(ModuleManager.BaseModule):
|
|||
event["server"].attempted_join[channel] = key
|
||||
|
||||
event["server"].send_join(
|
||||
",".join(channels_sorted), ",".join(keys_sorted))
|
||||
",".join(channels_sorted), keys_sorted)
|
||||
|
||||
@utils.hook("self.join")
|
||||
def on_join(self, event):
|
||||
|
|
|
@ -127,18 +127,18 @@ class Channel(IRCObject.Object):
|
|||
return self.bot.database.user_channel_settings.find_all_by_setting(
|
||||
self.id, setting, default)
|
||||
|
||||
def send_message(self, text: str, prefix: str=None, tags: dict={}):
|
||||
self.server.send_message(self.name, text, prefix=prefix, tags=tags)
|
||||
def send_notice(self, text: str, prefix: str=None, tags: dict={}):
|
||||
self.server.send_notice(self.name, text, prefix=prefix, tags=tags)
|
||||
def send_mode(self, mode: str=None, target: str=None):
|
||||
def send_message(self, text: str, tags: dict={}):
|
||||
self.server.send_message(self.name, text, tags=tags)
|
||||
def send_notice(self, text: str, tags: dict={}):
|
||||
self.server.send_notice(self.name, text, tags=tags)
|
||||
def send_mode(self, mode: str=None, target: typing.List[str]=None):
|
||||
self.server.send_mode(self.name, mode, target)
|
||||
def send_kick(self, target: str, reason: str=None):
|
||||
self.server.send_kick(self.name, target, reason)
|
||||
def send_ban(self, hostmask: str):
|
||||
self.server.send_mode(self.name, "+b", hostmask)
|
||||
self.server.send_mode(self.name, "+b", [hostmask])
|
||||
def send_unban(self, hostmask: str):
|
||||
self.server.send_mode(self.name, "-b", hostmask)
|
||||
self.server.send_mode(self.name, "-b", [hostmask])
|
||||
def send_topic(self, topic: str):
|
||||
self.server.send_topic(self.name, topic)
|
||||
def send_part(self, reason: str=None):
|
||||
|
|
104
src/IRCServer.py
104
src/IRCServer.py
|
@ -237,7 +237,8 @@ class Server(IRCObject.Object):
|
|||
self.set_setting("last-read", utils.iso8601_format(now))
|
||||
return lines
|
||||
|
||||
def send(self, line: str):
|
||||
def send(self, line_parsed: utils.irc.IRCParsedLine):
|
||||
line = line_parsed.format()
|
||||
results = self.events.on("preprocess.send").call_unsafe(
|
||||
server=self, line=line)
|
||||
for result in results:
|
||||
|
@ -248,6 +249,7 @@ class Server(IRCObject.Object):
|
|||
line_obj = IRCLine.Line(self, datetime.datetime.utcnow(), line_stripped)
|
||||
self.socket.send(line_obj)
|
||||
return line_obj
|
||||
|
||||
def _send(self):
|
||||
lines = self.socket._send()
|
||||
for line in lines:
|
||||
|
@ -255,13 +257,12 @@ class Server(IRCObject.Object):
|
|||
self.events.on("raw.send").call_unsafe(server=self, line=line)
|
||||
|
||||
def send_user(self, username: str, realname: str) -> IRCLine.Line:
|
||||
return self.send("USER %s 0 * %s" %
|
||||
(username, utils.irc.trailing(realname)))
|
||||
return self.send(utils.irc.protocol.user(username, realname))
|
||||
def send_nick(self, nickname: str) -> IRCLine.Line:
|
||||
return self.send("NICK %s" % nickname)
|
||||
return self.send(utils.irc.protocol.nick(nickname))
|
||||
|
||||
def send_capibility_ls(self) -> IRCLine.Line:
|
||||
return self.send("CAP LS 302")
|
||||
return self.send(utils.irc.protocol.capability_ls())
|
||||
def queue_capability(self, capability: str):
|
||||
self._capability_queue.add(capability)
|
||||
def queue_capabilities(self, capabilities: typing.List[str]):
|
||||
|
@ -278,11 +279,11 @@ class Server(IRCObject.Object):
|
|||
def has_capability_queue(self):
|
||||
return bool(len(self._capability_queue))
|
||||
def send_capability_request(self, capability: str) -> IRCLine.Line:
|
||||
return self.send("CAP REQ %s" % utils.irc.trailing(capability))
|
||||
return self.send(utils.irc.protocol.capability_request(capability))
|
||||
def send_capability_end(self) -> IRCLine.Line:
|
||||
return self.send("CAP END")
|
||||
return self.send(utils.irc.protocol.capability_end())
|
||||
def send_authenticate(self, text: str) -> IRCLine.Line:
|
||||
return self.send("AUTHENTICATE %s" % text)
|
||||
return self.send(utils.irc.protocol.authenticate(text))
|
||||
|
||||
def waiting_for_capabilities(self) -> bool:
|
||||
return bool(len(self._capabilities_waiting))
|
||||
|
@ -294,92 +295,59 @@ class Server(IRCObject.Object):
|
|||
self.send_capability_end()
|
||||
|
||||
def send_pass(self, password: str) -> IRCLine.Line:
|
||||
return self.send("PASS %s" % password)
|
||||
return self.send(utils.irc.protocol.password(password))
|
||||
|
||||
def send_ping(self, nonce: str="hello") -> IRCLine.Line:
|
||||
return self.send("PING %s" % utils.irc.trailing(nonce))
|
||||
return self.send(utils.irc.protocol.ping(nonce))
|
||||
def send_pong(self, nonce: str="hello") -> IRCLine.Line:
|
||||
return self.send("PONG %s" % utils.irc.trailing(nonce))
|
||||
return self.send(utils.irc.protocol.pong(nonce))
|
||||
|
||||
def try_rejoin(self, event: EventManager.Event):
|
||||
if event["server_id"] == self.id and event["channel_name"
|
||||
] in self.attempted_join:
|
||||
self.send_join(event["channel_name"], event["key"])
|
||||
def send_join(self, channel_name: str, key: str=None) -> IRCLine.Line:
|
||||
return self.send("JOIN %s%s" % (channel_name,
|
||||
"" if key else " %s" % key))
|
||||
self.send_join(event["channel_name"], [event["key"]])
|
||||
def send_join(self, channel_name: str, keys: typing.List[str]=None
|
||||
) -> IRCLine.Line:
|
||||
return self.send(utils.irc.protocol.join(channel_name, keys))
|
||||
def send_part(self, channel_name: str, reason: str=None) -> IRCLine.Line:
|
||||
return self.send("PART %s%s" % (channel_name,
|
||||
"" if reason == None else " %s" % reason))
|
||||
return self.send(utils.irc.protocol.part(channel_name, reason))
|
||||
def send_quit(self, reason: str="Leaving") -> IRCLine.Line:
|
||||
return self.send("QUIT %s" % utils.irc.trailing(reason))
|
||||
return self.send(utils.irc.protocol.quit(reason))
|
||||
|
||||
def _tag_str(self, tags: dict) -> str:
|
||||
tag_str = ""
|
||||
for tag, value in tags.items():
|
||||
if tag_str:
|
||||
tag_str += ","
|
||||
tag_str += tag
|
||||
if value:
|
||||
tag_str += "=%s" % value
|
||||
if tag_str:
|
||||
tag_str = "@%s" % tag_str
|
||||
return tag_str
|
||||
def send_message(self, target: str, message: str, tags: dict={}
|
||||
) -> IRCLine.Line:
|
||||
return self.send(utils.irc.protocol.message(target, message, tags))
|
||||
|
||||
def send_message(self, target: str, message: str, prefix: str=None,
|
||||
tags: dict={}) -> IRCLine.Line:
|
||||
full_message = message if not prefix else prefix+message
|
||||
tag_str = "" if not tags else "%s " % self._tag_str(tags)
|
||||
return self.send("%sPRIVMSG %s %s" %
|
||||
(tag_str, target, utils.irc.trailing(full_message)))
|
||||
|
||||
def send_notice(self, target: str, message: str, prefix: str=None,
|
||||
tags: dict={}) -> IRCLine.Line:
|
||||
full_message = message if not prefix else prefix+message
|
||||
tag_str = "" if not tags else "%s " % self._tag_str(tags)
|
||||
return self.send("%sNOTICE %s %s" %
|
||||
(tag_str, target, utils.irc.trailing(full_message)))
|
||||
def send_notice(self, target: str, message: str, tags: dict={}
|
||||
) -> IRCLine.Line:
|
||||
return self.send(utils.irc.protocol.notice(target, message, tags))
|
||||
|
||||
def send_tagmsg(self, target, tags: dict):
|
||||
return self.send("%s TAGMSG %s" % (self._tag_str(tags), target))
|
||||
return self.send(utils.irc.protocol.tagmsg(target, tags))
|
||||
|
||||
def send_mode(self, target: str, mode: str=None, args: str=None
|
||||
def send_mode(self, target: str, mode: str=None, args: typing.List[str]=None
|
||||
) -> IRCLine.Line:
|
||||
return self.send("MODE %s%s%s" % (target,
|
||||
"" if mode == None else " %s" % mode,
|
||||
"" if args == None else " %s" % args))
|
||||
return self.send(utils.irc.protocol.mode(target, mode, args))
|
||||
|
||||
def send_topic(self, channel_name: str, topic: str) -> IRCLine.Line:
|
||||
return self.send("TOPIC %s %s" %
|
||||
(channel_name, utils.irc.trailing(topic)))
|
||||
return self.send(utils.irc.protocol.topic(channel_name, topic))
|
||||
def send_kick(self, channel_name: str, target: str, reason: str=None
|
||||
) -> IRCLine.Line:
|
||||
reason = ""
|
||||
if not reason == None:
|
||||
reason = " %s" % utils.irc.trailing(typing.cast(str, reason))
|
||||
return self.send("KICK %s %s%s" % (channel_name, target, reason))
|
||||
return self.send(utils.irc.protocol.kick(channel_name, target, reason))
|
||||
def send_names(self, channel_name: str) -> IRCLine.Line:
|
||||
return self.send("NAMES %s" % channel_name)
|
||||
return self.send(utils.irc.protocol.names(channel_name))
|
||||
def send_list(self, search_for: str=None) -> IRCLine.Line:
|
||||
return self.send(
|
||||
"LIST%s" % "" if search_for == None else " %s" % search_for)
|
||||
return self.send(utils.irc.protocol.list(search_for))
|
||||
def send_invite(self, target: str, channel_name: str) -> IRCLine.Line:
|
||||
return self.send("INVITE %s %s" % (target, channel_name))
|
||||
return self.send(utils.irc.protocol.invite(target, channel_name))
|
||||
|
||||
def send_whois(self, target: str) -> IRCLine.Line:
|
||||
return self.send("WHOIS %s" % target)
|
||||
return self.send(utils.irc.protocol.whois(target))
|
||||
def send_whowas(self, target: str, amount: int=None, server: str=None
|
||||
) -> IRCLine.Line:
|
||||
server = ""
|
||||
if not server == None:
|
||||
server = " %s" % utils.irc.trailing(typing.cast(str, server))
|
||||
|
||||
return self.send("WHOWAS %s%s%s" % (target,
|
||||
"" if amount == None else " %s" % amount,
|
||||
"" if server == None else " %s" % utils.irc.trailing(server)))
|
||||
return self.send(utils.irc.protocol.whowas(target, amount, server))
|
||||
def send_who(self, filter: str=None) -> IRCLine.Line:
|
||||
return self.send("WHO%s" % ("" if filter == None else " %s" % filter))
|
||||
return self.send(utils.irc.protocol.who(filter))
|
||||
def send_whox(self, mask: str, filter: str, fields: str, label: str=None
|
||||
) -> IRCLine.Line:
|
||||
return self.send("WHO %s %s%%%s%s" % (mask, filter, fields,
|
||||
","+label if label else ""))
|
||||
return self.send(utils.irc.protocol.whox(mask, filter, fields, label))
|
||||
|
|
|
@ -65,10 +65,9 @@ class User(IRCObject.Object):
|
|||
return self.bot.database.user_channel_settings.find_by_setting(
|
||||
self.get_id(), setting, default)
|
||||
|
||||
def send_message(self, message: str, prefix: str=None, tags: dict={}):
|
||||
self.server.send_message(self.nickname, message, prefix=prefix,
|
||||
tags=tags)
|
||||
def send_notice(self, text: str, prefix: str=None, tags: dict={}):
|
||||
self.server.send_notice(self.nickname, text, prefix=prefix, tags=tags)
|
||||
def send_message(self, message: str, tags: dict={}):
|
||||
self.server.send_message(self.nickname, message, tags=tags)
|
||||
def send_notice(self, text: str, tags: dict={}):
|
||||
self.server.send_notice(self.nickname, text, tags=tags)
|
||||
def send_ctcp_response(self, command: str, args: str):
|
||||
self.send_notice("\x01%s %s\x01" % (command, args))
|
||||
|
|
302
src/utils/irc.py
302
src/utils/irc.py
|
@ -1,302 +0,0 @@
|
|||
import json, string, re, typing
|
||||
from src import utils
|
||||
|
||||
ASCII_UPPER = string.ascii_uppercase
|
||||
ASCII_LOWER = string.ascii_lowercase
|
||||
STRICT_RFC1459_UPPER = ASCII_UPPER+r'\[]'
|
||||
STRICT_RFC1459_LOWER = ASCII_LOWER+r'|{}'
|
||||
RFC1459_UPPER = STRICT_RFC1459_UPPER+"^"
|
||||
RFC1459_LOWER = STRICT_RFC1459_LOWER+"~"
|
||||
|
||||
# case mapping lowercase/uppcase logic
|
||||
def _multi_replace(s: str,
|
||||
chars1: typing.Iterable[str],
|
||||
chars2: typing.Iterable[str]) -> str:
|
||||
for char1, char2 in zip(chars1, chars2):
|
||||
s = s.replace(char1, char2)
|
||||
return s
|
||||
def lower(case_mapping: str, s: str) -> str:
|
||||
if case_mapping == "ascii":
|
||||
return _multi_replace(s, ASCII_UPPER, ASCII_LOWER)
|
||||
elif case_mapping == "rfc1459":
|
||||
return _multi_replace(s, RFC1459_UPPER, RFC1459_LOWER)
|
||||
elif case_mapping == "strict-rfc1459":
|
||||
return _multi_replace(s, STRICT_RFC1459_UPPER, STRICT_RFC1459_LOWER)
|
||||
else:
|
||||
raise ValueError("unknown casemapping '%s'" % case_mapping)
|
||||
|
||||
# compare a string while respecting case mapping
|
||||
def equals(case_mapping: str, s1: str, s2: str) -> bool:
|
||||
return lower(case_mapping, s1) == lower(case_mapping, s2)
|
||||
|
||||
class IRCHostmask(object):
|
||||
def __init__(self, nickname: str, username: str, hostname: str,
|
||||
hostmask: str):
|
||||
self.nickname = nickname
|
||||
self.username = username
|
||||
self.hostname = hostname
|
||||
self.hostmask = hostmask
|
||||
def __repr__(self):
|
||||
return "IRCHostmask(%s)" % self.__str__()
|
||||
def __str__(self):
|
||||
return self.hostmask
|
||||
|
||||
def seperate_hostmask(hostmask: str) -> IRCHostmask:
|
||||
nickname, _, username = hostmask.partition("!")
|
||||
username, _, hostname = username.partition("@")
|
||||
return IRCHostmask(nickname, username, hostname, hostmask)
|
||||
|
||||
class IRCArgs(object):
|
||||
def __init__(self, args: typing.List[str]):
|
||||
self._args = args
|
||||
|
||||
def get(self, index: int) -> typing.Optional[str]:
|
||||
if len(self._args) > index:
|
||||
return self._args[index]
|
||||
return None
|
||||
|
||||
def __repr__(self):
|
||||
return "IRCArgs(%s)" % self._args
|
||||
def __len__(self) -> int:
|
||||
return len(self._args)
|
||||
def __getitem__(self, index) -> str:
|
||||
return self._args[index]
|
||||
|
||||
|
||||
class IRCParsedLine(object):
|
||||
def __init__(self, command: str, args: IRCArgs, prefix: IRCHostmask = None,
|
||||
tags: dict = None):
|
||||
self.tags = tags
|
||||
self.prefix = prefix
|
||||
self.command = command
|
||||
self.args = args
|
||||
|
||||
MESSAGE_TAG_ESCAPED = [r"\:", r"\s", r"\\", r"\r", r"\n"]
|
||||
MESSAGE_TAG_UNESCAPED = [";", " ", "\\", "\r", "\n"]
|
||||
def message_tag_escape(s):
|
||||
return _multi_replace(s, MESSAGE_TAG_UNESCAPED, MESSAGE_TAG_ESCAPED)
|
||||
def message_tag_unescape(s):
|
||||
return _multi_replace(s, MESSAGE_TAG_ESCAPED, MESSAGE_TAG_UNESCAPED)
|
||||
|
||||
def parse_line(line: str) -> IRCParsedLine:
|
||||
tags = {} # type: typing.Dict[str, typing.Any]
|
||||
prefix = None # type: typing.Optional[IRCHostmask]
|
||||
command = None
|
||||
|
||||
if line[0] == "@":
|
||||
tags_prefix, line = line[1:].split(" ", 1)
|
||||
|
||||
if tags_prefix[0] == "{":
|
||||
tags_prefix = message_tag_unescape(tags_prefix)
|
||||
tags = json.loads(tags_prefix)
|
||||
else:
|
||||
for tag in filter(None, tags_prefix.split(";")):
|
||||
tag, sep, value = tag.partition("=")
|
||||
if sep:
|
||||
tags[tag] = message_tag_unescape(value)
|
||||
else:
|
||||
tags[tag] = None
|
||||
|
||||
line, trailing_separator, trailing_split = line.partition(" :")
|
||||
|
||||
trailing = None # type: typing.Optional[str]
|
||||
if trailing_separator:
|
||||
trailing = trailing_split
|
||||
|
||||
if line[0] == ":":
|
||||
prefix_str, line = line[1:].split(" ", 1)
|
||||
prefix = seperate_hostmask(prefix_str)
|
||||
|
||||
command, sep, line = line.partition(" ")
|
||||
args = [] # type: typing.List[str]
|
||||
if line:
|
||||
# this is so that `args` is empty if `line` is empty
|
||||
args = line.split(" ")
|
||||
|
||||
if not trailing == None:
|
||||
args.append(typing.cast(str, trailing))
|
||||
|
||||
return IRCParsedLine(command, IRCArgs(args), prefix, tags)
|
||||
|
||||
|
||||
REGEX_COLOR = re.compile("%s(?:(\d{1,2})(?:,(\d{1,2}))?)?" % utils.consts.COLOR)
|
||||
|
||||
def color(s: str, foreground: utils.consts.IRCColor,
|
||||
background: utils.consts.IRCColor=None) -> str:
|
||||
foreground_s = str(foreground.irc).zfill(2)
|
||||
background_s = ""
|
||||
if background:
|
||||
background_s = ",%s" % str(background.irc).zfill(2)
|
||||
|
||||
return "%s%s%s%s%s" % (utils.consts.COLOR, foreground_s, background_s, s,
|
||||
utils.consts.COLOR)
|
||||
|
||||
def bold(s: str) -> str:
|
||||
return "%s%s%s" % (utils.consts.BOLD, s, utils.consts.BOLD)
|
||||
|
||||
def underline(s: str) -> str:
|
||||
return "%s%s%s" % (utils.consts.UNDERLINE, s, utils.consts.UNDERLINE)
|
||||
|
||||
def strip_font(s: str) -> str:
|
||||
s = s.replace(utils.consts.BOLD, "")
|
||||
s = s.replace(utils.consts.ITALIC, "")
|
||||
s = REGEX_COLOR.sub("", s)
|
||||
s = s.replace(utils.consts.COLOR, "")
|
||||
return s
|
||||
|
||||
FORMAT_TOKENS = [
|
||||
utils.consts.BOLD,
|
||||
utils.consts.RESET,
|
||||
utils.consts.UNDERLINE
|
||||
]
|
||||
FORMAT_STRIP = [
|
||||
"\x08" # backspace
|
||||
]
|
||||
def _format_tokens(s: str) -> typing.List[str]:
|
||||
is_color = False
|
||||
foreground = ""
|
||||
background = ""
|
||||
is_background = False
|
||||
matches = [] # type: typing.List[str]
|
||||
|
||||
for i, char in enumerate(s):
|
||||
last_char = i == len(s)-1
|
||||
if is_color:
|
||||
can_add = False
|
||||
current_color = background if is_background else foreground
|
||||
color_finished = False
|
||||
if char.isdigit() and len(current_color) < 2:
|
||||
if is_background:
|
||||
background += char
|
||||
else:
|
||||
foreground += char
|
||||
color_finished = (len(current_color)+1) == 2
|
||||
|
||||
if char == "," and not is_background:
|
||||
is_background = True
|
||||
elif not char.isdigit() or (color_finished and last_char):
|
||||
color = foreground
|
||||
if background:
|
||||
color += ","+background
|
||||
|
||||
matches.append("\x03%s" % color)
|
||||
is_color = False
|
||||
foreground = ""
|
||||
background = ""
|
||||
is_background = False
|
||||
|
||||
if char == utils.consts.COLOR:
|
||||
if is_color:
|
||||
matches.append(char)
|
||||
else:
|
||||
is_color = True
|
||||
elif char in FORMAT_TOKENS:
|
||||
matches.append(char)
|
||||
elif char in FORMAT_STRIP:
|
||||
matches.append(char)
|
||||
return matches
|
||||
|
||||
def _color_match(code: typing.Optional[str], foreground: bool) -> str:
|
||||
if not code:
|
||||
return ""
|
||||
color = utils.consts.COLOR_CODES[int(code)]
|
||||
return color.to_ansi(not foreground)
|
||||
|
||||
def parse_format(s: str) -> str:
|
||||
has_foreground = False
|
||||
has_background = False
|
||||
bold = False
|
||||
underline = False
|
||||
|
||||
for token in _format_tokens(s):
|
||||
replace = ""
|
||||
type = token[0]
|
||||
|
||||
if type == utils.consts.COLOR:
|
||||
match = REGEX_COLOR.match(token)
|
||||
|
||||
if match and (match.group(1) or match.group(2)):
|
||||
foreground = _color_match(match.group(1), True)
|
||||
background = _color_match(match.group(2), False)
|
||||
|
||||
if foreground:
|
||||
replace += foreground
|
||||
has_foreground = True
|
||||
if background:
|
||||
replace += background
|
||||
has_background = True
|
||||
else:
|
||||
if has_foreground:
|
||||
has_foreground = False
|
||||
replace += utils.consts.ANSI_FOREGROUND_RESET
|
||||
if has_background:
|
||||
has_background = False
|
||||
replace += utils.consts.ANSI_BACKGROUND_RESET
|
||||
elif type == utils.consts.BOLD:
|
||||
if bold:
|
||||
replace += utils.consts.ANSI_BOLD_RESET
|
||||
else:
|
||||
replace += utils.consts.ANSI_BOLD
|
||||
bold = not bold
|
||||
elif type == utils.consts.RESET:
|
||||
replace += utils.consts.ANSI_RESET
|
||||
elif type == utils.consts.UNDERLINE:
|
||||
if underline:
|
||||
replace += utils.consts.ANSI_UNDERLINE_RESET
|
||||
else:
|
||||
replace += utils.consts.ANSI_UNDERLINE
|
||||
underline = not underline
|
||||
elif type in FORMAT_STRIP:
|
||||
replace = ""
|
||||
|
||||
s = s.replace(token, replace, 1)
|
||||
|
||||
return s + utils.consts.ANSI_RESET
|
||||
|
||||
OPT_STR = typing.Optional[str]
|
||||
class IRCConnectionParameters(object):
|
||||
def __init__(self, id: int, alias: OPT_STR, hostname: str, port: int,
|
||||
password: OPT_STR, tls: bool, ipv4: bool, bindhost: OPT_STR,
|
||||
nickname: str, username: OPT_STR, realname: OPT_STR,
|
||||
args: typing.Dict[str, str]={}):
|
||||
self.id = id
|
||||
self.alias = alias
|
||||
self.hostname = hostname
|
||||
self.port = port
|
||||
self.tls = tls
|
||||
self.ipv4 = ipv4
|
||||
self.bindhost = bindhost
|
||||
self.password = password
|
||||
self.nickname = nickname
|
||||
self.username = username
|
||||
self.realname = realname
|
||||
self.args = args
|
||||
|
||||
class CTCPMessage(object):
|
||||
def __init__(self, command: str, message: str):
|
||||
self.command = command
|
||||
self.message = message
|
||||
def parse_ctcp(s: str) -> typing.Optional[CTCPMessage]:
|
||||
ctcp = s.startswith("\x01")
|
||||
if s.startswith("\x01"):
|
||||
ctcp_command, sep, ctcp_message = s[1:].partition(" ")
|
||||
if ctcp_command.endswith("\x01"):
|
||||
ctcp_command = ctcp_command[:-1]
|
||||
if ctcp_message.endswith("\x01"):
|
||||
ctcp_message = ctcp_message[:-1]
|
||||
return CTCPMessage(ctcp_command, ctcp_message)
|
||||
|
||||
return None
|
||||
|
||||
class IRCBatch(object):
|
||||
def __init__(self, identifier: str, batch_type: str, tags: dict):
|
||||
self.id = identifier
|
||||
self.type = batch_type
|
||||
self.tags = tags
|
||||
self.lines = [] # type: typing.List[IRCParsedLine]
|
||||
|
||||
def trailing(s: str) -> str:
|
||||
if s[0] == ":" or " " in s:
|
||||
return ":%s" % s
|
||||
else:
|
||||
return s
|
81
src/utils/irc/protocol.py
Normal file
81
src/utils/irc/protocol.py
Normal file
|
@ -0,0 +1,81 @@
|
|||
import typing
|
||||
from src import utils
|
||||
|
||||
def user(username: str, realname: str) -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("USER", [username, "0", "*", realname])
|
||||
def nick(nickname: str) -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("NICK", [nickname])
|
||||
|
||||
def capability_ls() -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("CAP", ["LS", "302"])
|
||||
def capability_request(capability: str) -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("CAP", ["REQ", capability])
|
||||
def capability_end() -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("CAP", ["END"])
|
||||
def authenticate(text: str) -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("AUTHENTICATE", [text])
|
||||
|
||||
def password(password: str) -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("PASS", [password])
|
||||
|
||||
def ping(nonce: str="hello") -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("PING", [nonce])
|
||||
def pong(nonce: str="hello") -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("PONG", [nonce])
|
||||
|
||||
def join(channel_name: str, keys: typing.List[str]
|
||||
) -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("JOIN", [channel_name]+keys)
|
||||
def part(channel_name: str, reason: str=None) -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("PART", [channel_name]+(
|
||||
[reason] if reason else []))
|
||||
def quit(reason: str=None) -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("QUIT", [reason] if reason else [])
|
||||
|
||||
def message(target: str, message: str, tags: dict={}
|
||||
) -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("PRIVMSG", [target, message], tags=tags)
|
||||
def notice(target: str, message: str, tags: dict={}
|
||||
) -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("NOTICE", [target, message], tags=tags)
|
||||
def tagmsg(target, tags: dict) -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("TAGMSG", [target], tags=tags)
|
||||
|
||||
def mode(target: str, mode: str=None, args: typing.List[str]=None
|
||||
) -> 'utils.irc.IRCParsedLine':
|
||||
command_args = [target]
|
||||
if mode:
|
||||
command_args.append(mode)
|
||||
if args:
|
||||
command_args = command_args+args
|
||||
return utils.irc.IRCParsedLine("MODE", command_args)
|
||||
|
||||
def topic(channel_name: str, topic: str) -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("TOPIC", [channel_name, topic])
|
||||
def kick(channel_name: str, target: str, reason: str=None
|
||||
) -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("KICK", [channel_name, target]+(
|
||||
[reason] if reason else []))
|
||||
def names(channel_name: str) -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("NAMES", [channel_name])
|
||||
def list(search_for: str=None) -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("LIST", [search_for] if search_for else [])
|
||||
def invite(target: str, channel_name: str) -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("INVITE", [target, channel_name])
|
||||
|
||||
def whois(target: str) -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("WHOIS", [target])
|
||||
def whowas(target: str, amount: int=None, server: str=None
|
||||
) -> 'utils.irc.IRCParsedLine':
|
||||
command_args = [target]
|
||||
if amount:
|
||||
command_args.append(str(amount))
|
||||
if server:
|
||||
command_args.append(server)
|
||||
return utils.irc.IRCParsedLine("WHOWAS", command_args)
|
||||
def who(filter: str=None) -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("WHO", [filter] if filter else [])
|
||||
def whox(mask: str, filter: str, fields: str, label: str=None
|
||||
) -> 'utils.irc.IRCParsedLine':
|
||||
flags = "%s%%%s%s" % (filter, fields, ","+label if label else "")
|
||||
return utils.irc.IRCParsedLine("WHO", [mask, flags])
|
Loading…
Reference in a new issue