Move utils.irc.IRCParsedLine to IRCLine.ParsedLine, improve truncation
mechanism, don't convert sent line from ParsedLine to text to ParsedLine for line_handler handling
This commit is contained in:
parent
f4a403836b
commit
8c94bcf6ca
8 changed files with 188 additions and 190 deletions
|
@ -40,8 +40,8 @@ class Out(object):
|
|||
else:
|
||||
raise ValueError("Unknown command methd '%s'" % method)
|
||||
|
||||
line.truncate_marker = STR_MORE
|
||||
if line.truncated():
|
||||
line.end_replace(STR_MORE)
|
||||
self._text = "%s%s" % (STR_CONTINUED, line.truncated())
|
||||
else:
|
||||
self._text = ""
|
||||
|
|
|
@ -33,10 +33,9 @@ class Module(ModuleManager.BaseModule):
|
|||
|
||||
@utils.hook("raw.send")
|
||||
def handle_send(self, event):
|
||||
line = utils.irc.parse_line(event["line"])
|
||||
self.events.on("raw.send").on(line.command).call_unsafe(
|
||||
args=line.args, tags=line.tags, server=event["server"],
|
||||
direction=utils.Direction.SEND)
|
||||
self.events.on("raw.send").on(event["line"].command).call_unsafe(
|
||||
args=event["line"].args, tags=event["line"].tags,
|
||||
server=event["server"], direction=utils.Direction.SEND)
|
||||
|
||||
# ping from the server
|
||||
@utils.hook("raw.received.ping")
|
||||
|
|
133
src/IRCLine.py
133
src/IRCLine.py
|
@ -1,23 +1,90 @@
|
|||
import datetime, typing
|
||||
from src import IRCObject, IRCServer, utils
|
||||
from src import IRCObject
|
||||
|
||||
# this should be 510 (RFC1459, 512 with \r\n) but a server BitBot uses is broken
|
||||
LINE_CUTOFF = 470
|
||||
|
||||
class IRCArgs(object):
|
||||
def __init__(self, args: typing.List[str]):
|
||||
self._args = args
|
||||
|
||||
def get(self, index: int) -> typing.Optional[str]:
|
||||
if len(self._args) > index:
|
||||
return self._args[index]
|
||||
return None
|
||||
|
||||
def __repr__(self):
|
||||
return "IRCArgs(%s)" % self._args
|
||||
def __len__(self) -> int:
|
||||
return len(self._args)
|
||||
def __getitem__(self, index) -> str:
|
||||
return self._args[index]
|
||||
|
||||
class Hostmask(object):
|
||||
def __init__(self, nickname: str, username: str, hostname: str,
|
||||
hostmask: str):
|
||||
self.nickname = nickname
|
||||
self.username = username
|
||||
self.hostname = hostname
|
||||
self.hostmask = hostmask
|
||||
def __repr__(self):
|
||||
return "Hostmask(%s)" % self.__str__()
|
||||
def __str__(self):
|
||||
return self.hostmask
|
||||
|
||||
class ParsedLine(object):
|
||||
def __init__(self, command: str, args: typing.List[str],
|
||||
prefix: Hostmask=None,
|
||||
tags: typing.Dict[str, str]={}):
|
||||
self.command = command
|
||||
self._args = args
|
||||
self.args = IRCArgs(args)
|
||||
self.prefix = prefix
|
||||
self.tags = {} if tags == None else tags
|
||||
|
||||
def _tag_str(self, tags: typing.Dict[str, str]) -> str:
|
||||
tag_str = ""
|
||||
for tag, value in tags.items():
|
||||
if tag_str:
|
||||
tag_str += ","
|
||||
tag_str += tag
|
||||
if value:
|
||||
tag_str += "=%s" % value
|
||||
if tag_str:
|
||||
tag_str = "@%s" % tag_str
|
||||
return tag_str
|
||||
|
||||
def format(self) -> str:
|
||||
s = ""
|
||||
if self.tags:
|
||||
s += "%s " % self._tag_str(self.tags)
|
||||
|
||||
if self.prefix:
|
||||
s += "%s " % self.prefix
|
||||
|
||||
s += self.command.upper()
|
||||
|
||||
if self.args:
|
||||
if len(self._args) > 1:
|
||||
s += " %s" % " ".join(self._args[:-1])
|
||||
|
||||
s += " "
|
||||
if " " in self._args[-1] or self._args[-1][0] == ":":
|
||||
s += ":%s" % self._args[-1]
|
||||
else:
|
||||
s += self._args[-1]
|
||||
|
||||
return s
|
||||
|
||||
class Line(IRCObject.Object):
|
||||
def __init__(self, server: "IRCServer.Server", send_time: datetime.datetime,
|
||||
line: str):
|
||||
self.server = server
|
||||
self._line = line
|
||||
def __init__(self, send_time: datetime.datetime, hostmask: str,
|
||||
line: ParsedLine):
|
||||
self.send_time = send_time
|
||||
self._hostmask = hostmask
|
||||
self.parsed_line = line
|
||||
|
||||
data, truncated = utils.encode_truncate(line, "utf8",
|
||||
self._char_limit())
|
||||
|
||||
self._data = data
|
||||
self._truncated = truncated
|
||||
|
||||
self._on_send = [] # type: typing.List[typing.Callable[[], None]]
|
||||
self._on_send: typing.List[typing.Callable[[], None]] = []
|
||||
self.truncate_marker: typing.Optional[str] = None
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return "IRCLine.Line(%s)" % self.__str__()
|
||||
|
@ -25,25 +92,43 @@ class Line(IRCObject.Object):
|
|||
return self.decoded_data()
|
||||
|
||||
def _char_limit(self) -> int:
|
||||
return LINE_CUTOFF-len(":%s " % self.server.hostmask())
|
||||
return LINE_CUTOFF-len(":%s " % self._hostmask)
|
||||
|
||||
def _encode_truncate(self) -> typing.Tuple[bytes, str]:
|
||||
line = self.parsed_line.format()
|
||||
byte_max = self._char_limit()
|
||||
encoded = b""
|
||||
truncated = ""
|
||||
truncate_marker = b""
|
||||
if not self.truncate_marker == None:
|
||||
truncate_marker = typing.cast(str, self.truncate_marker
|
||||
).encode("utf8")
|
||||
|
||||
for i, character in enumerate(line):
|
||||
encoded_character = character.encode("utf8")
|
||||
new_len = len(encoded + encoded_character)
|
||||
if truncate_marker and (byte_max-new_len) < len(truncate_marker):
|
||||
encoded += truncate_marker
|
||||
truncated = line[i:]
|
||||
break
|
||||
elif new_len > byte_max:
|
||||
truncated = line[i:]
|
||||
break
|
||||
else:
|
||||
encoded += encoded_character
|
||||
return (encoded, truncated)
|
||||
|
||||
def _data(self) -> bytes:
|
||||
return self._encode_truncate()[0]
|
||||
def data(self) -> bytes:
|
||||
return b"%s\r\n" % self._data
|
||||
return b"%s\r\n" % self._data()
|
||||
def decoded_data(self) -> str:
|
||||
return self._data.decode("utf8")
|
||||
return self._data().decode("utf8")
|
||||
def truncated(self) -> str:
|
||||
return self._truncated
|
||||
return self._encode_truncate()[1]
|
||||
|
||||
def on_send(self, func: typing.Callable[[], None]):
|
||||
self._on_send.append(func)
|
||||
def sent(self):
|
||||
for func in self._on_send[:]:
|
||||
func()
|
||||
|
||||
def end_replace(self, s: str):
|
||||
s_encoded = s.encode("utf8")
|
||||
s_len = len(s_encoded)
|
||||
|
||||
removed = self._data[-s_len:].decode("utf8")
|
||||
self._truncated = removed+self._truncated
|
||||
self._data = self._data[:-s_len]+s_encoded
|
||||
|
|
|
@ -29,7 +29,7 @@ class Server(IRCObject.Object):
|
|||
self.agreed_capabilities = set([]) # type: typing.Set[str]
|
||||
self.requested_capabilities = [] # type: typing.List[str]
|
||||
self.server_capabilities = {} # type: typing.Dict[str, str]
|
||||
self.batches = {} # type: typing.Dict[str, utils.irc.IRCParsedLine]
|
||||
self.batches = {} # type: typing.Dict[str, IRCLine.ParsedLine]
|
||||
self.cap_started = False
|
||||
|
||||
self.users = {} # type: typing.Dict[str, IRCUser.User]
|
||||
|
@ -237,23 +237,24 @@ class Server(IRCObject.Object):
|
|||
self.set_setting("last-read", utils.iso8601_format(now))
|
||||
return lines
|
||||
|
||||
def send(self, line_parsed: utils.irc.IRCParsedLine):
|
||||
def send(self, line_parsed: IRCLine.ParsedLine):
|
||||
line = line_parsed.format()
|
||||
results = self.events.on("preprocess.send").call_unsafe(
|
||||
server=self, line=line)
|
||||
for result in results:
|
||||
if result:
|
||||
line = result
|
||||
break
|
||||
results = list(filter(None, results))
|
||||
if results:
|
||||
line = results[0]
|
||||
|
||||
line_stripped = line.split("\n", 1)[0].strip("\r")
|
||||
line_obj = IRCLine.Line(self, datetime.datetime.utcnow(), line_stripped)
|
||||
line_obj = IRCLine.Line(datetime.datetime.utcnow(), self.hostmask(),
|
||||
line_parsed)
|
||||
self.socket.send(line_obj)
|
||||
return line_obj
|
||||
|
||||
def _send(self):
|
||||
lines = self.socket._send()
|
||||
for line in lines:
|
||||
self.bot.log.debug("%s (raw send) | %s", [str(self), line])
|
||||
self.bot.log.debug("%s (raw send) | %s", [str(self), line.format()])
|
||||
self.events.on("raw.send").call_unsafe(server=self, line=line)
|
||||
|
||||
def send_user(self, username: str, realname: str) -> IRCLine.Line:
|
||||
|
|
|
@ -120,15 +120,14 @@ class Socket(IRCObject.Object):
|
|||
def send(self, line: IRCLine.Line):
|
||||
self._queued_lines.append(line)
|
||||
|
||||
def _send(self) -> typing.List[str]:
|
||||
decoded_sent = []
|
||||
def _send(self) -> typing.List[IRCLine.ParsedLine]:
|
||||
sent_lines = []
|
||||
throttle_space = self.throttle_space()
|
||||
if throttle_space:
|
||||
to_buffer = self._queued_lines[:throttle_space]
|
||||
self._queued_lines = self._queued_lines[throttle_space:]
|
||||
for line in to_buffer:
|
||||
decoded_data = line.decoded_data()
|
||||
decoded_sent.append(decoded_data)
|
||||
sent_lines.append(line.parsed_line)
|
||||
|
||||
self._write_buffer += line.data()
|
||||
self._buffered_lines.append(line)
|
||||
|
@ -147,7 +146,7 @@ class Socket(IRCObject.Object):
|
|||
self._recent_sends.append(now)
|
||||
self.last_send = now
|
||||
|
||||
return decoded_sent
|
||||
return sent_lines
|
||||
|
||||
def waiting_send(self) -> bool:
|
||||
return bool(len(self._write_buffer)) or bool(len(self._queued_lines))
|
||||
|
|
|
@ -199,16 +199,3 @@ def is_ip(s: str) -> bool:
|
|||
except ValueError:
|
||||
return False
|
||||
return True
|
||||
|
||||
def encode_truncate(s: str, encoding: str, byte_max: int
|
||||
) -> typing.Tuple[bytes, str]:
|
||||
encoded = b""
|
||||
truncated = ""
|
||||
for i, character in enumerate(s):
|
||||
encoded_character = character.encode(encoding)
|
||||
if len(encoded + encoded_character) > byte_max:
|
||||
truncated = s[i:]
|
||||
break
|
||||
else:
|
||||
encoded += encoded_character
|
||||
return encoded, truncated
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
import json, string, re, typing
|
||||
from src import utils
|
||||
from src import IRCLine, utils
|
||||
from . import protocol
|
||||
|
||||
ASCII_UPPER = string.ascii_uppercase
|
||||
|
@ -30,77 +30,10 @@ def lower(case_mapping: str, s: str) -> str:
|
|||
def equals(case_mapping: str, s1: str, s2: str) -> bool:
|
||||
return lower(case_mapping, s1) == lower(case_mapping, s2)
|
||||
|
||||
class IRCHostmask(object):
|
||||
def __init__(self, nickname: str, username: str, hostname: str,
|
||||
hostmask: str):
|
||||
self.nickname = nickname
|
||||
self.username = username
|
||||
self.hostname = hostname
|
||||
self.hostmask = hostmask
|
||||
def __repr__(self):
|
||||
return "IRCHostmask(%s)" % self.__str__()
|
||||
def __str__(self):
|
||||
return self.hostmask
|
||||
|
||||
def seperate_hostmask(hostmask: str) -> IRCHostmask:
|
||||
def seperate_hostmask(hostmask: str) -> IRCLine.Hostmask:
|
||||
nickname, _, username = hostmask.partition("!")
|
||||
username, _, hostname = username.partition("@")
|
||||
return IRCHostmask(nickname, username, hostname, hostmask)
|
||||
|
||||
class IRCArgs(object):
|
||||
def __init__(self, args: typing.List[str]):
|
||||
self._args = args
|
||||
|
||||
def get(self, index: int) -> typing.Optional[str]:
|
||||
if len(self._args) > index:
|
||||
return self._args[index]
|
||||
return None
|
||||
|
||||
def __repr__(self):
|
||||
return "IRCArgs(%s)" % self._args
|
||||
def __len__(self) -> int:
|
||||
return len(self._args)
|
||||
def __getitem__(self, index) -> str:
|
||||
return self._args[index]
|
||||
|
||||
def _tag_str(tags: typing.Dict[str, str]) -> str:
|
||||
tag_str = ""
|
||||
for tag, value in tags.items():
|
||||
if tag_str:
|
||||
tag_str += ","
|
||||
tag_str += tag
|
||||
if value:
|
||||
tag_str += "=%s" % value
|
||||
if tag_str:
|
||||
tag_str = "@%s" % tag_str
|
||||
return tag_str
|
||||
|
||||
class IRCParsedLine(object):
|
||||
def __init__(self, command: str, args: typing.List[str],
|
||||
prefix: IRCHostmask=None,
|
||||
tags: typing.Dict[str, str]={}):
|
||||
self.command = command
|
||||
self._args = args
|
||||
self.args = IRCArgs(args)
|
||||
self.prefix = prefix
|
||||
self.tags = {} if tags == None else tags
|
||||
|
||||
def format(self) -> str:
|
||||
s = ""
|
||||
if self.tags:
|
||||
s += "%s " % _tag_str(self.tags)
|
||||
|
||||
if self.prefix:
|
||||
s += "%s " % self.prefix
|
||||
|
||||
s += self.command.upper()
|
||||
|
||||
if self.args:
|
||||
if len(self._args) > 1:
|
||||
s += " %s" % " ".join(self._args[:-1])
|
||||
s += " %s" % trailing(self._args[-1])
|
||||
|
||||
return s
|
||||
return IRCLine.Hostmask(nickname, username, hostname, hostmask)
|
||||
|
||||
MESSAGE_TAG_ESCAPED = [r"\:", r"\s", r"\\", r"\r", r"\n"]
|
||||
MESSAGE_TAG_UNESCAPED = [";", " ", "\\", "\r", "\n"]
|
||||
|
@ -110,9 +43,9 @@ def message_tag_unescape(s):
|
|||
unescaped = _multi_replace(s, MESSAGE_TAG_ESCAPED, MESSAGE_TAG_UNESCAPED)
|
||||
return unescaped.replace("\\", "")
|
||||
|
||||
def parse_line(line: str) -> IRCParsedLine:
|
||||
def parse_line(line: str) -> IRCLine.ParsedLine:
|
||||
tags = {} # type: typing.Dict[str, typing.Any]
|
||||
prefix = None # type: typing.Optional[IRCHostmask]
|
||||
prefix = None # type: typing.Optional[IRCLine.Hostmask]
|
||||
command = None
|
||||
|
||||
if line[0] == "@":
|
||||
|
@ -144,7 +77,7 @@ def parse_line(line: str) -> IRCParsedLine:
|
|||
if not trailing == None:
|
||||
args.append(typing.cast(str, trailing))
|
||||
|
||||
return IRCParsedLine(command, args, prefix, tags)
|
||||
return IRCLine.ParsedLine(command, args, prefix, tags)
|
||||
|
||||
|
||||
REGEX_COLOR = re.compile("%s(?:(\d{1,2})(?:,(\d{1,2}))?)?" % utils.consts.COLOR)
|
||||
|
@ -322,20 +255,14 @@ class IRCBatch(object):
|
|||
self.id = identifier
|
||||
self.type = batch_type
|
||||
self.tags = tags
|
||||
self.lines = [] # type: typing.List[IRCParsedLine]
|
||||
self.lines = [] # type: typing.List[IRCLine.ParsedLine]
|
||||
class IRCRecvBatch(IRCBatch):
|
||||
pass
|
||||
class IRCSendBatch(IRCBatch):
|
||||
def _add_line(self, line: IRCParsedLine):
|
||||
def _add_line(self, line: IRCLine.ParsedLine):
|
||||
line.tags["batch"] = self.id
|
||||
self.lines.append(line)
|
||||
def message(self, target: str, message: str, tags: dict={}):
|
||||
self._add_line(utils.irc.protocol.message(target, message, tags))
|
||||
def notice(self, target: str, message: str, tags: dict={}):
|
||||
self._add_line(utils.irc.protocol.notice(target, message, tags))
|
||||
|
||||
def trailing(s: str) -> str:
|
||||
if s[0] == ":" or " " in s:
|
||||
return ":%s" % s
|
||||
else:
|
||||
return s
|
||||
|
|
|
@ -1,90 +1,90 @@
|
|||
import typing
|
||||
from src import utils
|
||||
from src import IRCLine, utils
|
||||
|
||||
def user(username: str, realname: str) -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("USER", [username, "0", "*", realname])
|
||||
def nick(nickname: str) -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("NICK", [nickname])
|
||||
def user(username: str, realname: str) -> IRCLine.ParsedLine:
|
||||
return IRCLine.ParsedLine("USER", [username, "0", "*", realname])
|
||||
def nick(nickname: str) -> IRCLine.ParsedLine:
|
||||
return IRCLine.ParsedLine("NICK", [nickname])
|
||||
|
||||
def capability_ls() -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("CAP", ["LS", "302"])
|
||||
def capability_request(capability: str) -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("CAP", ["REQ", capability])
|
||||
def capability_end() -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("CAP", ["END"])
|
||||
def authenticate(text: str) -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("AUTHENTICATE", [text])
|
||||
def capability_ls() -> IRCLine.ParsedLine:
|
||||
return IRCLine.ParsedLine("CAP", ["LS", "302"])
|
||||
def capability_request(capability: str) -> IRCLine.ParsedLine:
|
||||
return IRCLine.ParsedLine("CAP", ["REQ", capability])
|
||||
def capability_end() -> IRCLine.ParsedLine:
|
||||
return IRCLine.ParsedLine("CAP", ["END"])
|
||||
def authenticate(text: str) -> IRCLine.ParsedLine:
|
||||
return IRCLine.ParsedLine("AUTHENTICATE", [text])
|
||||
|
||||
def password(password: str) -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("PASS", [password])
|
||||
def password(password: str) -> IRCLine.ParsedLine:
|
||||
return IRCLine.ParsedLine("PASS", [password])
|
||||
|
||||
def ping(nonce: str="hello") -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("PING", [nonce])
|
||||
def pong(nonce: str="hello") -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("PONG", [nonce])
|
||||
def ping(nonce: str="hello") -> IRCLine.ParsedLine:
|
||||
return IRCLine.ParsedLine("PING", [nonce])
|
||||
def pong(nonce: str="hello") -> IRCLine.ParsedLine:
|
||||
return IRCLine.ParsedLine("PONG", [nonce])
|
||||
|
||||
def join(channel_name: str, keys: typing.List[str]=None
|
||||
) -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("JOIN", [channel_name]+(
|
||||
) -> IRCLine.ParsedLine:
|
||||
return IRCLine.ParsedLine("JOIN", [channel_name]+(
|
||||
keys if keys else []))
|
||||
def part(channel_name: str, reason: str=None) -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("PART", [channel_name]+(
|
||||
def part(channel_name: str, reason: str=None) -> IRCLine.ParsedLine:
|
||||
return IRCLine.ParsedLine("PART", [channel_name]+(
|
||||
[reason] if reason else []))
|
||||
def quit(reason: str=None) -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("QUIT", [reason] if reason else [])
|
||||
def quit(reason: str=None) -> IRCLine.ParsedLine:
|
||||
return IRCLine.ParsedLine("QUIT", [reason] if reason else [])
|
||||
|
||||
def message(target: str, message: str, tags: typing.Dict[str, str]={}
|
||||
) -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("PRIVMSG", [target, message], tags=tags)
|
||||
) -> IRCLine.ParsedLine:
|
||||
return IRCLine.ParsedLine("PRIVMSG", [target, message], tags=tags)
|
||||
def notice(target: str, message: str, tags: typing.Dict[str, str]={}
|
||||
) -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("NOTICE", [target, message], tags=tags)
|
||||
def tagmsg(target, tags: dict) -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("TAGMSG", [target], tags=tags)
|
||||
) -> IRCLine.ParsedLine:
|
||||
return IRCLine.ParsedLine("NOTICE", [target, message], tags=tags)
|
||||
def tagmsg(target, tags: dict) -> IRCLine.ParsedLine:
|
||||
return IRCLine.ParsedLine("TAGMSG", [target], tags=tags)
|
||||
|
||||
def mode(target: str, mode: str=None, args: typing.List[str]=None
|
||||
) -> 'utils.irc.IRCParsedLine':
|
||||
) -> IRCLine.ParsedLine:
|
||||
command_args = [target]
|
||||
if mode:
|
||||
command_args.append(mode)
|
||||
if args:
|
||||
command_args = command_args+args
|
||||
return utils.irc.IRCParsedLine("MODE", command_args)
|
||||
return IRCLine.ParsedLine("MODE", command_args)
|
||||
|
||||
def topic(channel_name: str, topic: str) -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("TOPIC", [channel_name, topic])
|
||||
def topic(channel_name: str, topic: str) -> IRCLine.ParsedLine:
|
||||
return IRCLine.ParsedLine("TOPIC", [channel_name, topic])
|
||||
def kick(channel_name: str, target: str, reason: str=None
|
||||
) -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("KICK", [channel_name, target]+(
|
||||
) -> IRCLine.ParsedLine:
|
||||
return IRCLine.ParsedLine("KICK", [channel_name, target]+(
|
||||
[reason] if reason else []))
|
||||
def names(channel_name: str) -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("NAMES", [channel_name])
|
||||
def list(search_for: str=None) -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("LIST", [search_for] if search_for else [])
|
||||
def invite(target: str, channel_name: str) -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("INVITE", [target, channel_name])
|
||||
def names(channel_name: str) -> IRCLine.ParsedLine:
|
||||
return IRCLine.ParsedLine("NAMES", [channel_name])
|
||||
def list(search_for: str=None) -> IRCLine.ParsedLine:
|
||||
return IRCLine.ParsedLine("LIST", [search_for] if search_for else [])
|
||||
def invite(target: str, channel_name: str) -> IRCLine.ParsedLine:
|
||||
return IRCLine.ParsedLine("INVITE", [target, channel_name])
|
||||
|
||||
def whois(target: str) -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("WHOIS", [target])
|
||||
def whois(target: str) -> IRCLine.ParsedLine:
|
||||
return IRCLine.ParsedLine("WHOIS", [target])
|
||||
def whowas(target: str, amount: int=None, server: str=None
|
||||
) -> 'utils.irc.IRCParsedLine':
|
||||
) -> IRCLine.ParsedLine:
|
||||
command_args = [target]
|
||||
if amount:
|
||||
command_args.append(str(amount))
|
||||
if server:
|
||||
command_args.append(server)
|
||||
return utils.irc.IRCParsedLine("WHOWAS", command_args)
|
||||
def who(filter: str=None) -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("WHO", [filter] if filter else [])
|
||||
return IRCLine.ParsedLine("WHOWAS", command_args)
|
||||
def who(filter: str=None) -> IRCLine.ParsedLine:
|
||||
return IRCLine.ParsedLine("WHO", [filter] if filter else [])
|
||||
def whox(mask: str, filter: str, fields: str, label: str=None
|
||||
) -> 'utils.irc.IRCParsedLine':
|
||||
) -> IRCLine.ParsedLine:
|
||||
flags = "%s%%%s%s" % (filter, fields, ","+label if label else "")
|
||||
return utils.irc.IRCParsedLine("WHO", [mask, flags])
|
||||
return IRCLine.ParsedLine("WHO", [mask, flags])
|
||||
|
||||
def batch_start(identifier: str, batch_type: str, tags: typing.Dict[str, str]={}
|
||||
) -> 'utils.irc.IRCParsedLine':
|
||||
return utils.irc.IRCParsedLine("BATCH", ["+%s" % identifier, batch_type],
|
||||
) -> IRCLine.ParsedLine:
|
||||
return IRCLine.ParsedLine("BATCH", ["+%s" % identifier, batch_type],
|
||||
tags=tags)
|
||||
|
||||
def batch_end(identifier: str, tags: typing.Dict[str, str]={}):
|
||||
return utils.irc.IRCParsedLine("BATCH", ["-%s" % identifier], tags=tags)
|
||||
return IRCLine.ParsedLine("BATCH", ["-%s" % identifier], tags=tags)
|
||||
|
|
Loading…
Reference in a new issue