154 lines
4.6 KiB
Python
154 lines
4.6 KiB
Python
import os.path, urllib.parse
|
|
import bs4
|
|
from src import IRCBot, utils
|
|
from . import ap_actor
|
|
|
|
LD_TYPE = ("application/ld+json; "
|
|
"profile=\"https://www.w3.org/ns/activitystreams\"")
|
|
JRD_TYPE = "application/jrd+json"
|
|
ACTIVITY_TYPE = "application/activity+json"
|
|
|
|
AP_TYPES = [ACTIVITY_TYPE, "application/ld+json"]
|
|
|
|
def split_username(s):
|
|
if s[0] == "@":
|
|
s = s[1:]
|
|
username, _, instance = s.partition("@")
|
|
if username and instance:
|
|
return username, instance
|
|
return None, None
|
|
|
|
def activity_request(url, data=None, method="GET", type=ACTIVITY_TYPE,
|
|
headers={}):
|
|
content_type = None
|
|
|
|
if method == "POST":
|
|
content_type = type
|
|
else:
|
|
headers = {"Accept": type}
|
|
|
|
request = utils.http.Request(url, headers=headers,
|
|
content_type=content_type, post_data=data, method=method,
|
|
json_body=True, fallback_encoding="utf8")
|
|
return utils.http.request(request)
|
|
|
|
HOSTMETA_TEMPLATE = "https://%s/.well-known/host-meta"
|
|
WEBFINGER_TEMPLATE = "https://%s/.well-known/webfinger?resource={uri}"
|
|
|
|
class FindActorException(Exception):
|
|
pass
|
|
|
|
def find_actor(username, instance):
|
|
hostmeta_url = HOSTMETA_TEMPLATE % instance
|
|
hostmeta_request = utils.http.Request(hostmeta_url)
|
|
try:
|
|
hostmeta = utils.http.request(hostmeta_request)
|
|
except:
|
|
# failed to GET hostmeta; this is an optional step for servers that do
|
|
# not host their webfinger at the usual URL (see WEBFINGER_TEMPLATE)
|
|
hostmeta = None
|
|
|
|
webfinger_url = None
|
|
if hostmeta and hostmeta.code == 200:
|
|
for item in hostmeta.soup().find_all("link"):
|
|
if item["rel"] and item["rel"][0] == "lrdd":
|
|
webfinger_url = item["template"]
|
|
break
|
|
|
|
if not webfinger_url:
|
|
webfinger_url = WEBFINGER_TEMPLATE % instance
|
|
webfinger_url = webfinger_url.replace("{uri}",
|
|
"acct:%s@%s" % (username, instance), 1)
|
|
|
|
try:
|
|
webfinger = activity_request(webfinger_url, type=JRD_TYPE)
|
|
except Exception as e:
|
|
raise FindActorException("Failed to get webfinger for %s: %s" %
|
|
(instance, str(e)))
|
|
|
|
actor_url = None
|
|
if webfinger.code == 200:
|
|
for link in webfinger.json()["links"]:
|
|
if link["type"] == ACTIVITY_TYPE:
|
|
return link["href"]
|
|
else:
|
|
raise FindActorException("Could not find user @%s@%s" %
|
|
(username, instance))
|
|
|
|
KNOWN_TAGS = ["p", "br"]
|
|
|
|
def _line(item):
|
|
if type(item) == bs4.element.Tag:
|
|
if item.name == "p":
|
|
out = ""
|
|
for subitem in item.children:
|
|
out += _line(subitem)
|
|
return "\n%s\n" % out
|
|
elif item.name == "br":
|
|
return "\n"
|
|
else:
|
|
return str(item)
|
|
|
|
def _normalise_note(content):
|
|
soup = bs4.BeautifulSoup(content, "lxml").body
|
|
lines = []
|
|
for element in soup.find_all():
|
|
if not element.name in KNOWN_TAGS:
|
|
if element.text.strip() == "":
|
|
element.decompose()
|
|
else:
|
|
element.unwrap()
|
|
|
|
out = ""
|
|
for element in soup.children:
|
|
out += _line(element)
|
|
|
|
return utils.parse.line_normalise(out)
|
|
|
|
def _content(note):
|
|
content = note.get("content", None)
|
|
attachment = note.get("attachment", [])
|
|
|
|
if note.get("content", None):
|
|
return _normalise_note(content)
|
|
elif attachment:
|
|
type = attachment[0]["mediaType"].split("/", 1)[0]
|
|
filename = os.path.basename(attachment[0]["url"])
|
|
|
|
extension = None
|
|
if "." in filename:
|
|
filename, extension = filename.rsplit(".", 1)
|
|
if len(filename) > 20:
|
|
filename = "%s[...]" % filename[:20]
|
|
|
|
if extension:
|
|
filename = "%s.%s" % (filename, extension)
|
|
else:
|
|
filename = "%s: %s" % (type, filename)
|
|
|
|
return "<%s>" % filename
|
|
|
|
def parse_note(actor, note, type="Create"):
|
|
if type == "Announce":
|
|
retoot_url = note
|
|
retoot_instance = urllib.parse.urlparse(retoot_url).hostname
|
|
retoot = activity_request(retoot_url).json()
|
|
retoot_url = retoot.get("url", retoot["id"])
|
|
|
|
original_tooter = ap_actor.Actor(retoot["attributedTo"])
|
|
original_tooter.load()
|
|
retooted_user = "@%s@%s" % (original_tooter.username, retoot_instance)
|
|
retoot_content = _content(retoot)
|
|
|
|
author = "%s (boost %s)" % (actor.display_name, retooted_user)
|
|
|
|
return (retoot.get("summary", None), author, retoot_content, retoot_url)
|
|
|
|
|
|
elif type == "Create":
|
|
content = _content(note)
|
|
url = note.get("url", note["id"])
|
|
|
|
return note.get("summary", None), actor.display_name, content, url
|
|
|
|
return None
|