add cookies
and .json()
to utils.http.Response objects
This commit is contained in:
parent
8626a29a69
commit
6a6e789ec9
8 changed files with 74 additions and 75 deletions
|
@ -15,12 +15,12 @@ class Module(ModuleManager.BaseModule):
|
||||||
def _get_definition(self, word):
|
def _get_definition(self, word):
|
||||||
page = utils.http.request(URL_WORDNIK % word, get_params={
|
page = utils.http.request(URL_WORDNIK % word, get_params={
|
||||||
"useCanonical": "true", "limit": 1,
|
"useCanonical": "true", "limit": 1,
|
||||||
"sourceDictionaries": "wiktionary", "api_key": self.bot.config[
|
"sourceDictionaries": "wiktionary",
|
||||||
"wordnik-api-key"]}, json=True)
|
"api_key": self.bot.config["wordnik-api-key"]})
|
||||||
|
|
||||||
if page:
|
if page:
|
||||||
if page.code == 200:
|
if page.code == 200:
|
||||||
return True, page.data[0]
|
return True, page.json()[0]
|
||||||
else:
|
else:
|
||||||
return True, None
|
return True, None
|
||||||
else:
|
else:
|
||||||
|
@ -59,15 +59,15 @@ class Module(ModuleManager.BaseModule):
|
||||||
|
|
||||||
page = utils.http.request(URL_WORDNIK_RANDOM, get_params={
|
page = utils.http.request(URL_WORDNIK_RANDOM, get_params={
|
||||||
"api_key": self.bot.config["wordnik-api-key"],
|
"api_key": self.bot.config["wordnik-api-key"],
|
||||||
"min_dictionary_count":1},json=True)
|
"min_dictionary_count": 1}).json()
|
||||||
if page and len(page.data):
|
if page:
|
||||||
success, definition = self._get_definition(page.data["word"])
|
success, definition = self._get_definition(page["word"])
|
||||||
if not success:
|
if not success:
|
||||||
raise utils.EventError("Try again in a couple of seconds")
|
raise utils.EventError("Try again in a couple of seconds")
|
||||||
|
|
||||||
text = utils.http.strip_html(definition["text"])
|
text = utils.http.strip_html(definition["text"])
|
||||||
event["stdout"].write("Random Word: %s - Definition: %s" % (
|
event["stdout"].write("Random Word: %s - Definition: %s" % (
|
||||||
page.data["word"], text))
|
page["word"], text))
|
||||||
else:
|
else:
|
||||||
raise utils.EventResultsError()
|
raise utils.EventResultsError()
|
||||||
else:
|
else:
|
||||||
|
|
|
@ -14,10 +14,11 @@ class Actor(object):
|
||||||
def load(self):
|
def load(self):
|
||||||
response = ap_utils.activity_request(self.url)
|
response = ap_utils.activity_request(self.url)
|
||||||
if response.code == 200:
|
if response.code == 200:
|
||||||
self.username = response.data["preferredUsername"]
|
response = response.json()
|
||||||
self.inbox = Inbox(response.data["inbox"])
|
self.username = response["preferredUsername"]
|
||||||
self.outbox = Outbox(response.data["outbox"])
|
self.inbox = Inbox(response["inbox"])
|
||||||
self.followers = response.data["followers"]
|
self.outbox = Outbox(response["outbox"])
|
||||||
|
self.followers = response["followers"]
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
@ -26,19 +27,19 @@ class Outbox(object):
|
||||||
self._url = url
|
self._url = url
|
||||||
|
|
||||||
def load(self):
|
def load(self):
|
||||||
outbox = ap_utils.activity_request(self._url)
|
outbox = ap_utils.activity_request(self._url).json()
|
||||||
|
|
||||||
items = None
|
items = None
|
||||||
if "first" in outbox.data:
|
if "first" in outbox:
|
||||||
if type(outbox.data["first"]) == dict:
|
if type(outbox["first"]) == dict:
|
||||||
# pleroma
|
# pleroma
|
||||||
items = outbox.data["first"]["orderedItems"]
|
items = outbox["first"]["orderedItems"]
|
||||||
else:
|
else:
|
||||||
# mastodon
|
# mastodon
|
||||||
first = ap_utils.activity_request(outbox.data["first"])
|
first = ap_utils.activity_request(outbox["first"]).json()
|
||||||
items = first.data["orderedItems"]
|
items = first["orderedItems"]
|
||||||
else:
|
else:
|
||||||
items = outbox.data["orderedItems"]
|
items = outbox["orderedItems"]
|
||||||
return items
|
return items
|
||||||
|
|
||||||
class Inbox(object):
|
class Inbox(object):
|
||||||
|
@ -58,5 +59,5 @@ class Inbox(object):
|
||||||
headers.append(["signature", signature])
|
headers.append(["signature", signature])
|
||||||
|
|
||||||
return ap_utils.activity_request(self._url, activity.format(sender),
|
return ap_utils.activity_request(self._url, activity.format(sender),
|
||||||
method="POST", headers=dict(headers)).data
|
method="POST", headers=dict(headers)).json()
|
||||||
|
|
||||||
|
|
|
@ -26,7 +26,7 @@ def activity_request(url, data=None, method="GET", type=ACTIVITY_TYPE,
|
||||||
headers = {"Accept": type}
|
headers = {"Accept": type}
|
||||||
|
|
||||||
request = utils.http.Request(url, headers=headers,
|
request = utils.http.Request(url, headers=headers,
|
||||||
content_type=content_type, post_data=data, method=method, json=True,
|
content_type=content_type, post_data=data, method=method,
|
||||||
json_body=True, fallback_encoding="utf8")
|
json_body=True, fallback_encoding="utf8")
|
||||||
return utils.http.request(request)
|
return utils.http.request(request)
|
||||||
|
|
||||||
|
@ -64,7 +64,7 @@ def find_actor(username, instance):
|
||||||
|
|
||||||
actor_url = None
|
actor_url = None
|
||||||
if webfinger.code == 200:
|
if webfinger.code == 200:
|
||||||
for link in webfinger.data["links"]:
|
for link in webfinger.json()["links"]:
|
||||||
if link["type"] == ACTIVITY_TYPE:
|
if link["type"] == ACTIVITY_TYPE:
|
||||||
return link["href"]
|
return link["href"]
|
||||||
else:
|
else:
|
||||||
|
@ -128,15 +128,15 @@ def format_note(actor, note, type="Create"):
|
||||||
if type == "Announce":
|
if type == "Announce":
|
||||||
retoot_url = note
|
retoot_url = note
|
||||||
retoot_instance = urllib.parse.urlparse(retoot_url).hostname
|
retoot_instance = urllib.parse.urlparse(retoot_url).hostname
|
||||||
retoot = activity_request(retoot_url)
|
retoot = activity_request(retoot_url).json()
|
||||||
retoot_url = retoot.data.get("url", retoot.data["id"])
|
retoot_url = retoot.get("url", retoot["id"])
|
||||||
|
|
||||||
original_tooter = ap_actor.Actor(retoot.data["attributedTo"])
|
original_tooter = ap_actor.Actor(retoot["attributedTo"])
|
||||||
original_tooter.load()
|
original_tooter.load()
|
||||||
retooted_user = "@%s@%s" % (original_tooter.username, retoot_instance)
|
retooted_user = "@%s@%s" % (original_tooter.username, retoot_instance)
|
||||||
retoot_content = _content(retoot.data)
|
retoot_content = _content(retoot)
|
||||||
|
|
||||||
return (retoot.data.get("summary", None), "%s (boost %s): %s" % (
|
return (retoot.get("summary", None), "%s (boost %s): %s" % (
|
||||||
actor.username, retooted_user, retoot_content), retoot_url)
|
actor.username, retooted_user, retoot_content), retoot_url)
|
||||||
|
|
||||||
elif type == "Create":
|
elif type == "Create":
|
||||||
|
|
|
@ -48,11 +48,10 @@ class Module(ModuleManager.BaseModule):
|
||||||
def _parse_gallery(self, hash):
|
def _parse_gallery(self, hash):
|
||||||
api_key = self.bot.config["imgur-api-key"]
|
api_key = self.bot.config["imgur-api-key"]
|
||||||
result = utils.http.request(URL_GALLERY % hash,
|
result = utils.http.request(URL_GALLERY % hash,
|
||||||
headers={"Authorization": "Client-ID %s" % api_key},
|
headers={"Authorization": "Client-ID %s" % api_key}).json()
|
||||||
json=True)
|
|
||||||
|
|
||||||
if result and result.data["success"]:
|
if result and result["success"]:
|
||||||
data = result.data["data"]
|
data = result["data"]
|
||||||
text = ""
|
text = ""
|
||||||
|
|
||||||
nsfw = utils.irc.bold(NSFW_TEXT) + " " if data["nsfw"] else ""
|
nsfw = utils.irc.bold(NSFW_TEXT) + " " if data["nsfw"] else ""
|
||||||
|
@ -75,11 +74,10 @@ class Module(ModuleManager.BaseModule):
|
||||||
def _parse_image(self, hash):
|
def _parse_image(self, hash):
|
||||||
api_key = self.bot.config["imgur-api-key"]
|
api_key = self.bot.config["imgur-api-key"]
|
||||||
result = utils.http.request(URL_IMAGE % hash,
|
result = utils.http.request(URL_IMAGE % hash,
|
||||||
headers={"Authorization": "Client-ID %s" % api_key},
|
headers={"Authorization": "Client-ID %s" % api_key}).json()
|
||||||
json=True)
|
|
||||||
|
|
||||||
if result and result.data["success"]:
|
if result and result["success"]:
|
||||||
data = result.data["data"]
|
data = result["data"]
|
||||||
text = ""
|
text = ""
|
||||||
|
|
||||||
nsfw = utils.irc.bold(NSFW_TEXT) + " " if data["nsfw"] else ""
|
nsfw = utils.irc.bold(NSFW_TEXT) + " " if data["nsfw"] else ""
|
||||||
|
@ -102,11 +100,10 @@ class Module(ModuleManager.BaseModule):
|
||||||
def _image_info(self, hash):
|
def _image_info(self, hash):
|
||||||
api_key = self.bot.config["imgur-api-key"]
|
api_key = self.bot.config["imgur-api-key"]
|
||||||
result = utils.http.request(URL_IMAGE % hash,
|
result = utils.http.request(URL_IMAGE % hash,
|
||||||
headers={"Authorization": "Client-ID %s" % api_key},
|
headers={"Authorization": "Client-ID %s" % api_key}).json()
|
||||||
json=True)
|
|
||||||
|
|
||||||
if result and result.data["success"]:
|
if result and result["success"]:
|
||||||
data = result.data["data"]
|
data = result["data"]
|
||||||
text = self._prefix(data)
|
text = self._prefix(data)
|
||||||
|
|
||||||
text += "(%s %dx%d, %d views)" % (data["type"], data["width"],
|
text += "(%s %dx%d, %d views)" % (data["type"], data["width"],
|
||||||
|
@ -120,11 +117,10 @@ class Module(ModuleManager.BaseModule):
|
||||||
def _gallery_info(self, hash):
|
def _gallery_info(self, hash):
|
||||||
api_key = self.bot.config["imgur-api-key"]
|
api_key = self.bot.config["imgur-api-key"]
|
||||||
result = utils.http.request(URL_GALLERY % hash,
|
result = utils.http.request(URL_GALLERY % hash,
|
||||||
headers={"Authorization": "Client-ID %s" % api_key},
|
headers={"Authorization": "Client-ID %s" % api_key}).json()
|
||||||
json=True)
|
|
||||||
|
|
||||||
if result and result.data["success"]:
|
if result and result["success"]:
|
||||||
data = result.data["data"]
|
data = result["data"]
|
||||||
text = self._prefix(data)
|
text = self._prefix(data)
|
||||||
text += "(%d views, %d▲▼%d)" % (data["views"],
|
text += "(%d views, %d▲▼%d)" % (data["views"],
|
||||||
data["ups"], data["downs"])
|
data["ups"], data["downs"])
|
||||||
|
|
|
@ -40,11 +40,10 @@ class Module(ModuleManager.BaseModule):
|
||||||
page = utils.http.request(URL_SCROBBLER, get_params={
|
page = utils.http.request(URL_SCROBBLER, get_params={
|
||||||
"method": "user.getrecenttracks", "user": lastfm_username,
|
"method": "user.getrecenttracks", "user": lastfm_username,
|
||||||
"api_key": self.bot.config["lastfm-api-key"],
|
"api_key": self.bot.config["lastfm-api-key"],
|
||||||
"format": "json", "limit": "1"}, json=True)
|
"format": "json", "limit": "1"}).json()
|
||||||
if page:
|
if page:
|
||||||
if ("recenttracks" in page.data and
|
if "recenttracks" in page and len(page["recenttracks"]["track"]):
|
||||||
len(page.data["recenttracks"]["track"])):
|
now_playing = page["recenttracks"]["track"]
|
||||||
now_playing = page.data["recenttracks"]["track"]
|
|
||||||
if type(now_playing) == list:
|
if type(now_playing) == list:
|
||||||
now_playing = now_playing[0]
|
now_playing = now_playing[0]
|
||||||
|
|
||||||
|
@ -70,9 +69,9 @@ class Module(ModuleManager.BaseModule):
|
||||||
"method": "track.getInfo", "artist": artist,
|
"method": "track.getInfo", "artist": artist,
|
||||||
"track": track_name, "autocorrect": "1",
|
"track": track_name, "autocorrect": "1",
|
||||||
"api_key": self.bot.config["lastfm-api-key"],
|
"api_key": self.bot.config["lastfm-api-key"],
|
||||||
"user": lastfm_username, "format": "json"}, json=True)
|
"user": lastfm_username, "format": "json"}).json()
|
||||||
|
|
||||||
track = info_page.data.get("track", {})
|
track = info_page.get("track", {})
|
||||||
|
|
||||||
tags_str = ""
|
tags_str = ""
|
||||||
if "toptags" in track and track["toptags"]["tag"]:
|
if "toptags" in track and track["toptags"]["tag"]:
|
||||||
|
|
|
@ -35,14 +35,14 @@ class Module(ModuleManager.BaseModule):
|
||||||
args["code"] = FN_TEMPLATE % event["args"]
|
args["code"] = FN_TEMPLATE % event["args"]
|
||||||
try:
|
try:
|
||||||
page = utils.http.request(EVAL_URL, post_data=args,
|
page = utils.http.request(EVAL_URL, post_data=args,
|
||||||
method="POST", json=True, content_type="application/json")
|
method="POST", content_type="application/json").json()
|
||||||
except socket.timeout:
|
except socket.timeout:
|
||||||
raise utils.EventError("%s: eval timed out" %
|
raise utils.EventError("%s: eval timed out" %
|
||||||
event["user"].nickname)
|
event["user"].nickname)
|
||||||
|
|
||||||
err_or_out = "stdout" if page.data["success"] else "stderr"
|
err_or_out = "stdout" if page["success"] else "stderr"
|
||||||
event[err_or_out].write("%s: %s" % (event["user"].nickname,
|
event[err_or_out].write("%s: %s" % (event["user"].nickname,
|
||||||
page.data[err_or_out].strip("\n")))
|
page[err_or_out].strip("\n")))
|
||||||
|
|
||||||
@utils.hook("received.command.crate")
|
@utils.hook("received.command.crate")
|
||||||
@utils.kwarg("min_args", 1)
|
@utils.kwarg("min_args", 1)
|
||||||
|
@ -50,10 +50,10 @@ class Module(ModuleManager.BaseModule):
|
||||||
@utils.kwarg("usage", "<crate-name>")
|
@utils.kwarg("usage", "<crate-name>")
|
||||||
def crate(self, event):
|
def crate(self, event):
|
||||||
query = event["args_split"][0]
|
query = event["args_split"][0]
|
||||||
request = utils.http.Request(API_CRATE % query, json=True)
|
request = utils.http.Request(API_CRATE % query)
|
||||||
response = utils.http.request(request)
|
response = utils.http.request(request)
|
||||||
if response.code == 200:
|
if response.code == 200:
|
||||||
crate = response.data["crate"]
|
crate = response.json()["crate"]
|
||||||
name = crate["id"]
|
name = crate["id"]
|
||||||
url = URL_CRATE % name
|
url = URL_CRATE % name
|
||||||
event["stdout"].write("%s %s: %s - %s" % (
|
event["stdout"].write("%s %s: %s - %s" % (
|
||||||
|
|
|
@ -30,8 +30,7 @@ class Module(ModuleManager.BaseModule):
|
||||||
def get_video_page(self, video_id):
|
def get_video_page(self, video_id):
|
||||||
return utils.http.request(URL_YOUTUBEVIDEO, get_params={
|
return utils.http.request(URL_YOUTUBEVIDEO, get_params={
|
||||||
"part": "contentDetails,snippet,statistics",
|
"part": "contentDetails,snippet,statistics",
|
||||||
"id": video_id, "key": self.bot.config["google-api-key"]},
|
"id": video_id, "key": self.bot.config["google-api-key"]}).json()
|
||||||
json=True)
|
|
||||||
|
|
||||||
def _number(self, n):
|
def _number(self, n):
|
||||||
if n:
|
if n:
|
||||||
|
@ -39,8 +38,8 @@ class Module(ModuleManager.BaseModule):
|
||||||
|
|
||||||
def video_details(self, video_id):
|
def video_details(self, video_id):
|
||||||
page = self.get_video_page(video_id)
|
page = self.get_video_page(video_id)
|
||||||
if page.data["items"]:
|
if page["items"]:
|
||||||
item = page.data["items"][0]
|
item = page["items"][0]
|
||||||
snippet = item["snippet"]
|
snippet = item["snippet"]
|
||||||
statistics = item["statistics"]
|
statistics = item["statistics"]
|
||||||
content = item["contentDetails"]
|
content = item["contentDetails"]
|
||||||
|
@ -91,8 +90,8 @@ class Module(ModuleManager.BaseModule):
|
||||||
"key": self.bot.config["google-api-key"]}, json=True)
|
"key": self.bot.config["google-api-key"]}, json=True)
|
||||||
def playlist_details(self, playlist_id):
|
def playlist_details(self, playlist_id):
|
||||||
page = self.get_playlist_page(playlist_id)
|
page = self.get_playlist_page(playlist_id)
|
||||||
if page.data["items"]:
|
if page["items"]:
|
||||||
item = page.data["items"][0]
|
item = page["items"][0]
|
||||||
snippet = item["snippet"]
|
snippet = item["snippet"]
|
||||||
content = item["contentDetails"]
|
content = item["contentDetails"]
|
||||||
|
|
||||||
|
@ -121,12 +120,11 @@ class Module(ModuleManager.BaseModule):
|
||||||
search_page = utils.http.request(URL_YOUTUBESEARCH,
|
search_page = utils.http.request(URL_YOUTUBESEARCH,
|
||||||
get_params={"q": query, "part": "snippet",
|
get_params={"q": query, "part": "snippet",
|
||||||
"maxResults": "1", "type": "video",
|
"maxResults": "1", "type": "video",
|
||||||
"key": self.bot.config["google-api-key"]},
|
"key": self.bot.config["google-api-key"]}).json()
|
||||||
json=True)
|
|
||||||
|
|
||||||
if search_page:
|
if search_page:
|
||||||
if search_page.data["pageInfo"]["totalResults"] > 0:
|
if search_page["pageInfo"]["totalResults"] > 0:
|
||||||
video_id = search_page.data["items"][0]["id"]["videoId"]
|
video_id = search_page["items"][0]["id"]["videoId"]
|
||||||
return "https://youtu.be/%s" % video_id
|
return "https://youtu.be/%s" % video_id
|
||||||
|
|
||||||
@utils.hook("received.command.yt", alias_of="youtube")
|
@utils.hook("received.command.yt", alias_of="youtube")
|
||||||
|
@ -157,11 +155,10 @@ class Module(ModuleManager.BaseModule):
|
||||||
search_page = utils.http.request(URL_YOUTUBESEARCH,
|
search_page = utils.http.request(URL_YOUTUBESEARCH,
|
||||||
get_params={"q": search, "part": "snippet", "maxResults": "1",
|
get_params={"q": search, "part": "snippet", "maxResults": "1",
|
||||||
"type": "video", "key": self.bot.config["google-api-key"],
|
"type": "video", "key": self.bot.config["google-api-key"],
|
||||||
"safeSearch": safe}, json=True)
|
"safeSearch": safe}).json()
|
||||||
if search_page:
|
if search_page:
|
||||||
if search_page.data["pageInfo"]["totalResults"] > 0:
|
if search_page["pageInfo"]["totalResults"] > 0:
|
||||||
url = URL_VIDEO % search_page.data[
|
url = URL_VIDEO % search_page["items"][0]["id"]["videoId"]
|
||||||
"items"][0]["id"]["videoId"]
|
|
||||||
else:
|
else:
|
||||||
raise utils.EventError("No videos found")
|
raise utils.EventError("No videos found")
|
||||||
else:
|
else:
|
||||||
|
|
|
@ -68,7 +68,7 @@ class Request(object):
|
||||||
default_factory=dict)
|
default_factory=dict)
|
||||||
|
|
||||||
json: bool = False
|
json: bool = False
|
||||||
json_body: typing.Any = None
|
json_body: bool = False
|
||||||
|
|
||||||
allow_redirects: bool = True
|
allow_redirects: bool = True
|
||||||
check_content_type: bool = True
|
check_content_type: bool = True
|
||||||
|
@ -118,13 +118,16 @@ class Request(object):
|
||||||
return None
|
return None
|
||||||
|
|
||||||
class Response(object):
|
class Response(object):
|
||||||
def __init__(self, code: int, data: typing.Any,
|
def __init__(self, code: int, data: typing.Any, encoding: str,
|
||||||
headers: typing.Dict[str, str], encoding: str):
|
headers: typing.Dict[str, str], cookies: typing.Dict[str, str]):
|
||||||
self.code = code
|
self.code = code
|
||||||
self.data = data
|
self.data = data
|
||||||
self.headers = headers
|
|
||||||
self.content_type = headers.get("Content-Type", "").split(";", 1)[0]
|
self.content_type = headers.get("Content-Type", "").split(";", 1)[0]
|
||||||
self.encoding = encoding
|
self.encoding = encoding
|
||||||
|
self.headers = headers
|
||||||
|
self.cookies = cookies
|
||||||
|
def json(self):
|
||||||
|
return _json.loads(self.data)
|
||||||
|
|
||||||
def _meta_content(s: str) -> typing.Dict[str, str]:
|
def _meta_content(s: str) -> typing.Dict[str, str]:
|
||||||
out = {}
|
out = {}
|
||||||
|
@ -167,7 +170,8 @@ def _request(request_obj: Request) -> Response:
|
||||||
params=request_obj.get_params,
|
params=request_obj.get_params,
|
||||||
data=request_obj.get_body(),
|
data=request_obj.get_body(),
|
||||||
allow_redirects=request_obj.allow_redirects,
|
allow_redirects=request_obj.allow_redirects,
|
||||||
stream=True
|
stream=True,
|
||||||
|
cookies=request_obj.cookies
|
||||||
)
|
)
|
||||||
response_content = response.raw.read(RESPONSE_MAX,
|
response_content = response.raw.read(RESPONSE_MAX,
|
||||||
decode_content=True)
|
decode_content=True)
|
||||||
|
@ -176,7 +180,8 @@ def _request(request_obj: Request) -> Response:
|
||||||
|
|
||||||
headers = utils.CaseInsensitiveDict(dict(response.headers))
|
headers = utils.CaseInsensitiveDict(dict(response.headers))
|
||||||
our_response = Response(response.status_code, response_content,
|
our_response = Response(response.status_code, response_content,
|
||||||
headers=headers, encoding=response.encoding)
|
encoding=response.encoding, headers=headers,
|
||||||
|
cookies=response.cookies.get_dict())
|
||||||
return our_response
|
return our_response
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
@ -255,7 +260,8 @@ def request_many(requests: typing.List[Request]) -> typing.Dict[str, Response]:
|
||||||
|
|
||||||
headers = utils.CaseInsensitiveDict(dict(response.headers))
|
headers = utils.CaseInsensitiveDict(dict(response.headers))
|
||||||
data = response.body.decode("utf8")
|
data = response.body.decode("utf8")
|
||||||
responses[request.id] = Response(response.code, data, headers, "utf8")
|
responses[request.id] = Response(response.code, data, "utf8", headers,
|
||||||
|
{})
|
||||||
|
|
||||||
loop = asyncio.new_event_loop()
|
loop = asyncio.new_event_loop()
|
||||||
awaits = []
|
awaits = []
|
||||||
|
|
Loading…
Reference in a new issue