'utils.http.get_url' -> 'utils.http.request', return a Response object from
utils.http.request
This commit is contained in:
parent
a41cf7b471
commit
793d234a0b
29 changed files with 169 additions and 165 deletions
|
@ -10,16 +10,16 @@ class Module(ModuleManager.BaseModule):
|
|||
:usage: [currency]
|
||||
"""
|
||||
currency = (event["args"] or "USD").upper()
|
||||
page = utils.http.get_url("https://blockchain.info/ticker",
|
||||
page = utils.http.request("https://blockchain.info/ticker",
|
||||
json=True)
|
||||
if page:
|
||||
if currency in page:
|
||||
conversion = page[currency]
|
||||
if currency in page.data:
|
||||
conversion = page.data[currency]
|
||||
buy, sell = conversion["buy"], conversion["sell"]
|
||||
event["stdout"].write("1 BTC = %.2f %s (buy) %.2f %s "
|
||||
"(sell)" % (buy, currency, sell, currency))
|
||||
else:
|
||||
event["stderr"].write("Unknown currency, available "
|
||||
"currencies: %s" % ", ".join(page.keys()))
|
||||
"currencies: %s" % ", ".join(page.data.keys()))
|
||||
else:
|
||||
raise utils.EventsResultsError()
|
||||
|
|
|
@ -9,11 +9,11 @@ class Module(ModuleManager.BaseModule):
|
|||
_name = "ISBN"
|
||||
|
||||
def get_book(self, query, event):
|
||||
page = utils.http.get_url(URL_GOOGLEBOOKS, get_params={
|
||||
page = utils.http.request(URL_GOOGLEBOOKS, get_params={
|
||||
"q": query, "country": "us"}, json=True)
|
||||
if page:
|
||||
if page["totalItems"] > 0:
|
||||
book = page["items"][0]["volumeInfo"]
|
||||
if page.data["totalItems"] > 0:
|
||||
book = page.data["items"][0]["volumeInfo"]
|
||||
title = book["title"]
|
||||
sub_title = (", %s" % book.get("subtitle")
|
||||
) if book.get("subtitle") else ""
|
||||
|
|
|
@ -22,11 +22,11 @@ class Module(ModuleManager.BaseModule):
|
|||
if match and event["channel"].get_setting("check-urls",
|
||||
event["server"].get_setting("check-urls", False)):
|
||||
url = match.group(0)
|
||||
page = utils.http.get_url(URL_VIRUSTOTAL, get_params={
|
||||
page = utils.http.request(URL_VIRUSTOTAL, get_params={
|
||||
"apikey": self.bot.config["virustotal-api-key"],
|
||||
"resource": url}, json=True)
|
||||
|
||||
if page and page.get("positives", 0) > 1:
|
||||
if page and page.data.get("positives", 0) > 1:
|
||||
if event["channel"].get_setting("check-urls-kick", False):
|
||||
event["channel"].send_kick(event["user"].nickname,
|
||||
"Don't send malicious URLs!")
|
||||
|
|
|
@ -12,7 +12,7 @@ class Module(ModuleManager.BaseModule):
|
|||
_last_called = 0
|
||||
|
||||
def _get_definition(self, word):
|
||||
page = utils.http.get_url(URL_WORDNIK % word, get_params={
|
||||
page = utils.http.request(URL_WORDNIK % word, get_params={
|
||||
"useCanonical": "true", "limit": 1,
|
||||
"sourceDictionaries": "wiktionary", "api_key": self.bot.config[
|
||||
"wordnik-api-key"]}, json=True)
|
||||
|
@ -33,9 +33,9 @@ class Module(ModuleManager.BaseModule):
|
|||
|
||||
page = self._get_definition(word)
|
||||
if page:
|
||||
if len(page):
|
||||
event["stdout"].write("%s: %s" % (page[0]["word"],
|
||||
page[0]["text"]))
|
||||
if len(page.data):
|
||||
event["stdout"].write("%s: %s" % (page.data[0]["word"],
|
||||
page.data[0]["text"]))
|
||||
else:
|
||||
event["stderr"].write("No definitions found")
|
||||
else:
|
||||
|
@ -50,13 +50,13 @@ class Module(ModuleManager.BaseModule):
|
|||
RANDOM_DELAY_SECONDS):
|
||||
self._last_called = time.time()
|
||||
|
||||
page = utils.http.get_url(URL_WORDNIK_RANDOM, get_params={
|
||||
page = utils.http.request(URL_WORDNIK_RANDOM, get_params={
|
||||
"api_key":self.bot.config["wordnik-api-key"],
|
||||
"min_dictionary_count":1},json=True)
|
||||
if page and len(page):
|
||||
definition = self._get_definition(page["word"])
|
||||
if len(definition):
|
||||
definition = definition[0]
|
||||
if page and len(page.data):
|
||||
definition = self._get_definition(page.data["word"])
|
||||
if definition and len(definition.data):
|
||||
definition = definition.data[0]
|
||||
else:
|
||||
raise utils.EventError("Try again in a couple of seconds")
|
||||
|
||||
|
|
|
@ -8,7 +8,7 @@ class Module(ModuleManager.BaseModule):
|
|||
@utils.hook("received.command.lua", min_args=1)
|
||||
def eval(self, event):
|
||||
try:
|
||||
page = utils.http.get_url(EVAL_URL,
|
||||
page = utils.http.request(EVAL_URL,
|
||||
post_data={"input": event["args"]},
|
||||
method="POST",
|
||||
soup=True)
|
||||
|
@ -17,7 +17,7 @@ class Module(ModuleManager.BaseModule):
|
|||
event["user"].nickname)
|
||||
|
||||
if page:
|
||||
textareas = page.find_all("textarea")
|
||||
textareas = page.data.find_all("textarea")
|
||||
if len(textareas) > 1:
|
||||
out = textareas[1].text.strip("\n")
|
||||
event["stdout"].write("%s: %s" % (event["user"].nickname, out))
|
||||
|
|
|
@ -35,7 +35,7 @@ class Module(ModuleManager.BaseModule):
|
|||
def _eval(self, lang, event):
|
||||
page = None
|
||||
try:
|
||||
page = utils.http.get_url(EVAL_URL,
|
||||
page = utils.http.request(EVAL_URL,
|
||||
post_data={
|
||||
"lang": lang,
|
||||
"code": EVAL_TEMPLATE,
|
||||
|
@ -47,8 +47,8 @@ class Module(ModuleManager.BaseModule):
|
|||
except:
|
||||
pass
|
||||
|
||||
if page:
|
||||
out = page.split("</b></span><br>", 1)[1]
|
||||
if page.data:
|
||||
out = page.data.split("</b></span><br>", 1)[1]
|
||||
out = html.unescape(out)
|
||||
out = json.loads(out)
|
||||
|
||||
|
|
|
@ -30,12 +30,12 @@ class Module(ModuleManager.BaseModule):
|
|||
args = API_ARGS.copy()
|
||||
args["code"] = FN_TEMPLATE % event["args"]
|
||||
try:
|
||||
page = utils.http.get_url(EVAL_URL, json_data=args,
|
||||
page = utils.http.request(EVAL_URL, json_data=args,
|
||||
method="POST", json=True)
|
||||
except socket.timeout:
|
||||
raise utils.EventError("%s: eval timed out" %
|
||||
event["user"].nickname)
|
||||
|
||||
err_or_out = "stdout" if page["success"] else "stderr"
|
||||
err_or_out = "stdout" if page.data["success"] else "stderr"
|
||||
event[err_or_out].write("%s: %s" % (event["user"].nickname,
|
||||
page[err_or_out].strip("\n")))
|
||||
page.data[err_or_out].strip("\n")))
|
||||
|
|
|
@ -17,15 +17,15 @@ class Module(ModuleManager.BaseModule):
|
|||
"""
|
||||
phrase = event["args"] or event["target"].buffer.get()
|
||||
if phrase:
|
||||
page = utils.http.get_url(URL_GOOGLESEARCH, get_params={
|
||||
page = utils.http.request(URL_GOOGLESEARCH, get_params={
|
||||
"q": phrase, "key": self.bot.config[
|
||||
"google-api-key"], "cx": self.bot.config[
|
||||
"google-search-id"], "prettyPrint": "true",
|
||||
"num": 1, "gl": "gb"}, json=True)
|
||||
if page:
|
||||
if "items" in page and len(page["items"]):
|
||||
if "items" in page.data and len(page.data["items"]):
|
||||
event["stdout"].write(
|
||||
"(%s) %s" % (phrase, page["items"][0]["link"]))
|
||||
"(%s) %s" % (phrase, page.data["items"][0]["link"]))
|
||||
else:
|
||||
event["stderr"].write("No results found")
|
||||
else:
|
||||
|
@ -41,11 +41,11 @@ class Module(ModuleManager.BaseModule):
|
|||
"""
|
||||
phrase = event["args"] or event["target"].buffer.get()
|
||||
if phrase:
|
||||
page = utils.http.get_url(URL_GOOGLESUGGEST, get_params={
|
||||
page = utils.http.request(URL_GOOGLESUGGEST, get_params={
|
||||
"output": "json", "client": "hp", "gl": "gb", "q": phrase})
|
||||
if page:
|
||||
# google gives us jsonp, so we need to unwrap it.
|
||||
page = page.split("(", 1)[1][:-1]
|
||||
page = page.data.split("(", 1)[1][:-1]
|
||||
page = json.loads(page)
|
||||
suggestions = page[1]
|
||||
suggestions = [utils.http.strip_html(s[0]) for s in suggestions]
|
||||
|
|
|
@ -11,11 +11,10 @@ class Module(ModuleManager.BaseModule):
|
|||
hacked databases
|
||||
:usage: <username/email>
|
||||
"""
|
||||
page = utils.http.get_url(URL_HAVEIBEENPWNEDAPI % event["args"],
|
||||
page = utils.http.request(URL_HAVEIBEENPWNEDAPI % event["args"],
|
||||
json=True, code=True)
|
||||
if page:
|
||||
code, page = page
|
||||
if code == 200:
|
||||
if page.code == 200:
|
||||
event["stdout"].write(
|
||||
"It seems '%s' has been pwned. check on %s." % (event["args"],
|
||||
URL_HAVEIBEENPWNED))
|
||||
|
|
|
@ -15,16 +15,16 @@ class Module(ModuleManager.BaseModule):
|
|||
:help: Search for a given title on IMDb
|
||||
:usage: <movie/tv title>
|
||||
"""
|
||||
page = utils.http.get_url(URL_OMDB, get_params={
|
||||
page = utils.http.request(URL_OMDB, get_params={
|
||||
"t": event["args"],
|
||||
"apikey": self.bot.config["omdbapi-api-key"]},
|
||||
json=True)
|
||||
if page:
|
||||
if "Title" in page:
|
||||
if "Title" in page.data:
|
||||
event["stdout"].write("%s, %s (%s) %s (%s/10.0) %s" % (
|
||||
page["Title"], page["Year"], page["Runtime"],
|
||||
page["Plot"], page["imdbRating"],
|
||||
URL_IMDBTITLE % page["imdbID"]))
|
||||
page.data["Title"], page.data["Year"], page.data["Runtime"],
|
||||
page.data["Plot"], page.data["imdbRating"],
|
||||
URL_IMDBTITLE % page.data["imdbID"]))
|
||||
else:
|
||||
event["stderr"].write("Title not found")
|
||||
else:
|
||||
|
|
|
@ -20,12 +20,12 @@ class Module(ModuleManager.BaseModule):
|
|||
|
||||
def _image_info(self, hash):
|
||||
api_key = self.bot.config["imgur-api-key"]
|
||||
result = utils.http.get_url(URL_IMAGE % hash,
|
||||
result = utils.http.request(URL_IMAGE % hash,
|
||||
headers={"Authorization": "Client-ID %s" % api_key},
|
||||
json=True)
|
||||
|
||||
if result and result["success"]:
|
||||
data = result["data"]
|
||||
if result and result.data["success"]:
|
||||
data = result.data["data"]
|
||||
text = self._prefix(data)
|
||||
|
||||
text += "(%s %dx%d, %d views)" % (data["type"], data["width"],
|
||||
|
@ -38,12 +38,12 @@ class Module(ModuleManager.BaseModule):
|
|||
|
||||
def _gallery_info(self, hash):
|
||||
api_key = self.bot.config["imgur-api-key"]
|
||||
result = utils.http.get_url(URL_GALLERY % hash,
|
||||
result = utils.http.request(URL_GALLERY % hash,
|
||||
headers={"Authorization": "Client-ID %s" % api_key},
|
||||
json=True)
|
||||
|
||||
if result and result["success"]:
|
||||
data = result["data"]
|
||||
if result and result.data["success"]:
|
||||
data = result.data["data"]
|
||||
text = self._prefix(data)
|
||||
text += "(%d views, %d▲▼%d)" % (data["views"],
|
||||
data["ups"], data["downs"])
|
||||
|
|
|
@ -34,19 +34,19 @@ class Module(ModuleManager.BaseModule):
|
|||
:usage: <IP>
|
||||
:prefix: GeoIP
|
||||
"""
|
||||
page = utils.http.get_url(URL_GEOIP % event["args_split"][0],
|
||||
page = utils.http.request(URL_GEOIP % event["args_split"][0],
|
||||
json=True)
|
||||
if page:
|
||||
if page["status"] == "success":
|
||||
data = page["query"]
|
||||
data += " | Organisation: %s" % page["org"]
|
||||
data += " | City: %s" % page["city"]
|
||||
data += " | Region: %s (%s)" % (page["regionName"],
|
||||
page["countryCode"])
|
||||
data += " | ISP: %s" % page["isp"]
|
||||
data += " | Lon/Lat: %s/%s" % (page["lon"],
|
||||
page["lat"])
|
||||
data += " | Timezone: %s" % page["timezone"]
|
||||
if page.data["status"] == "success":
|
||||
data = page.data["query"]
|
||||
data += " | Organisation: %s" % page.data["org"]
|
||||
data += " | City: %s" % page.data["city"]
|
||||
data += " | Region: %s (%s)" % (page.data["regionName"],
|
||||
page.data["countryCode"])
|
||||
data += " | ISP: %s" % page.data["isp"]
|
||||
data += " | Lon/Lat: %s/%s" % (page.data["lon"],
|
||||
page.data["lat"])
|
||||
data += " | Timezone: %s" % page.data["timezone"]
|
||||
event["stdout"].write(data)
|
||||
else:
|
||||
event["stderr"].write("No geoip data found")
|
||||
|
|
|
@ -11,11 +11,11 @@ class Module(ModuleManager.BaseModule):
|
|||
if not re.match(REGEX_URL, url):
|
||||
url = "http://%s" % url
|
||||
|
||||
data = utils.http.get_url(ISGD_API_URL, get_params=
|
||||
page = utils.http.request(ISGD_API_URL, get_params=
|
||||
{"format": "json", "url": url}, json=True)
|
||||
|
||||
if data and data["shorturl"]:
|
||||
return data["shorturl"]
|
||||
if page and page.data["shorturl"]:
|
||||
return page.data["shorturl"]
|
||||
|
||||
@utils.hook("received.command.shorten", min_args=1)
|
||||
def shorten(self, event):
|
||||
|
|
|
@ -24,14 +24,14 @@ class Module(ModuleManager.BaseModule):
|
|||
lastfm_username = event["user"].get_setting("lastfm",
|
||||
event["user"].nickname)
|
||||
shown_username = event["user"].nickname
|
||||
page = utils.http.get_url(URL_SCROBBLER, get_params={
|
||||
page = utils.http.request(URL_SCROBBLER, get_params={
|
||||
"method": "user.getrecenttracks", "user": lastfm_username,
|
||||
"api_key": self.bot.config["lastfm-api-key"],
|
||||
"format": "json", "limit": "1"}, json=True)
|
||||
if page:
|
||||
if "recenttracks" in page and len(page["recenttracks"
|
||||
if "recenttracks" in page.data and len(page.data["recenttracks"
|
||||
]["track"]):
|
||||
now_playing = page["recenttracks"]["track"][0]
|
||||
now_playing = page.data["recenttracks"]["track"][0]
|
||||
track_name = now_playing["name"]
|
||||
artist = now_playing["artist"]["#text"]
|
||||
|
||||
|
@ -53,14 +53,14 @@ class Module(ModuleManager.BaseModule):
|
|||
|
||||
short_url = " -- " + short_url if short_url else ""
|
||||
|
||||
info_page = utils.http.get_url(URL_SCROBBLER, get_params={
|
||||
info_page = utils.http.request(URL_SCROBBLER, get_params={
|
||||
"method": "track.getInfo", "artist": artist,
|
||||
"track": track_name, "autocorrect": "1",
|
||||
"api_key": self.bot.config["lastfm-api-key"],
|
||||
"user": lastfm_username, "format": "json"}, json=True)
|
||||
tags = []
|
||||
if "toptags" in info_page.get("track", []):
|
||||
for tag in info_page["track"]["toptags"]["tag"]:
|
||||
if "toptags" in info_page.data.get("track", []):
|
||||
for tag in info_page.data["track"]["toptags"]["tag"]:
|
||||
tags.append(tag["name"])
|
||||
if tags:
|
||||
tags = " (%s)" % ", ".join(tags)
|
||||
|
@ -68,8 +68,8 @@ class Module(ModuleManager.BaseModule):
|
|||
tags = ""
|
||||
|
||||
play_count = ""
|
||||
if "userplaycount" in info_page.get("track", []):
|
||||
play_count = int(info_page["track"]["userplaycount"])
|
||||
if "userplaycount" in info_page.data.get("track", []):
|
||||
play_count = int(info_page.data["track"]["userplaycount"])
|
||||
play_count = " (%d play%s)" % (play_count,
|
||||
"s" if play_count > 1 else "")
|
||||
|
||||
|
|
|
@ -235,10 +235,10 @@ class Module(ModuleManager.BaseModule):
|
|||
trains.append(parsed)
|
||||
|
||||
if eagle_url:
|
||||
summary_query = utils.http.get_url("%s/json/summaries/%s?uids=%s" % (eagle_url, now.date().isoformat(), "%20".join([a["uid"] for a in trains])), json=True, headers={"x-eagle-key": self.bot.config["eagle-api-key"]})
|
||||
summary_query = utils.http.request("%s/json/summaries/%s?uids=%s" % (eagle_url, now.date().isoformat(), "%20".join([a["uid"] for a in trains])), json=True, headers={"x-eagle-key": self.bot.config["eagle-api-key"]})
|
||||
if summary_query:
|
||||
for t in trains:
|
||||
summary = summary_query[t["uid"]]
|
||||
summary = summary_query.data[t["uid"]]
|
||||
t.update(summary)
|
||||
summary_plat = summary.get("platforms", {}).get(query["crs"])
|
||||
if summary_plat and t["platform"]=="?":
|
||||
|
@ -333,9 +333,9 @@ class Module(ModuleManager.BaseModule):
|
|||
query = client.service.QueryServices(service_id, datetime.utcnow().date().isoformat(),
|
||||
datetime.utcnow().time().strftime("%H:%M:%S+0000"))
|
||||
if eagle_url:
|
||||
schedule_query = utils.http.get_url("%s/json/schedule/%s/%s" % (eagle_url, service_id, datetime.now().date().isoformat()), json=True, headers={"x-eagle-key": eagle_key})
|
||||
schedule_query = utils.http.request("%s/json/schedule/%s/%s" % (eagle_url, service_id, datetime.now().date().isoformat()), json=True, headers={"x-eagle-key": eagle_key})
|
||||
if schedule_query:
|
||||
schedule = schedule_query["current"]
|
||||
schedule = schedule_query.data["current"]
|
||||
if not query and not schedule:
|
||||
return event["stdout"].write("No service information is available for this identifier.")
|
||||
|
||||
|
|
|
@ -43,12 +43,12 @@ class Module(ModuleManager.BaseModule):
|
|||
else:
|
||||
get_params["url"] = url
|
||||
|
||||
page = utils.http.get_url(
|
||||
page = utils.http.request(
|
||||
URL_SOUNDCLOUD_TRACK if has_query else URL_SOUNDCLOUD_RESOLVE,
|
||||
get_params=get_params, json=True)
|
||||
|
||||
if page:
|
||||
page = page[0] if has_query else page
|
||||
page = page.data[0] if has_query else page
|
||||
title = page["title"]
|
||||
user = page["user"]["username"]
|
||||
duration = time.strftime("%H:%M:%S", time.gmtime(page[
|
||||
|
|
|
@ -10,12 +10,12 @@ class Module(ModuleManager.BaseModule):
|
|||
:help: Search for a track on spotify
|
||||
:usage: <term>
|
||||
"""
|
||||
page = utils.http.get_url(URL_SPOTIFY, get_params=
|
||||
page = utils.http.request(URL_SPOTIFY, get_params=
|
||||
{"type": "track", "limit": 1, "q": event["args"]},
|
||||
json=True)
|
||||
if page:
|
||||
if len(page["tracks"]["items"]):
|
||||
item = page["tracks"]["items"][0]
|
||||
if len(page.data["tracks"]["items"]):
|
||||
item = page.data["tracks"]["items"][0]
|
||||
title = item["name"]
|
||||
artist_name = item["artists"][0]["name"]
|
||||
url = item["external_urls"]["spotify"]
|
||||
|
|
|
@ -63,26 +63,26 @@ class Module(ModuleManager.BaseModule):
|
|||
real_stop_id = ""
|
||||
stop_name = ""
|
||||
if stop_id.isdigit():
|
||||
bus_search = utils.http.get_url(URL_BUS_SEARCH % stop_id,
|
||||
bus_search = utils.http.request(URL_BUS_SEARCH % stop_id,
|
||||
get_params={"app_id": app_id, "app_key": app_key},
|
||||
json=True)
|
||||
bus_stop = bus_search["matches"][0]
|
||||
real_stop_id = bus_stop["id"]
|
||||
stop_name = bus_stop["name"]
|
||||
bus_stop = bus_search.data["matches"][0]
|
||||
real_stop_id = bus_stop.data["id"]
|
||||
stop_name = bus_stop.data["name"]
|
||||
else:
|
||||
bus_stop = utils.http.get_url(URL_STOP % stop_id,
|
||||
bus_stop = utils.http.request(URL_STOP % stop_id,
|
||||
get_params={"app_id": app_id, "app_key": app_key},
|
||||
json=True)
|
||||
if bus_stop:
|
||||
real_stop_id = stop_id
|
||||
stop_name = bus_stop["commonName"]
|
||||
stop_name = bus_stop.data["commonName"]
|
||||
|
||||
if real_stop_id:
|
||||
bus_stop = utils.http.get_url(URL_BUS % real_stop_id,
|
||||
bus_stop = utils.http.request(URL_BUS % real_stop_id,
|
||||
get_params={"app_id": app_id, "app_key": app_key},
|
||||
json=True)
|
||||
busses = []
|
||||
for bus in bus_stop:
|
||||
for bus in bus_stop.data:
|
||||
bus_number = bus["lineName"]
|
||||
human_time = self.vehicle_span(bus["expectedArrival"])
|
||||
time_until = self.vehicle_span(bus["expectedArrival"], human=False)
|
||||
|
@ -135,10 +135,10 @@ class Module(ModuleManager.BaseModule):
|
|||
app_id = self.bot.config["tfl-api-id"]
|
||||
app_key = self.bot.config["tfl-api-key"]
|
||||
|
||||
lines = utils.http.get_url(URL_LINE, get_params={
|
||||
lines = utils.http.request(URL_LINE, get_params={
|
||||
"app_id": app_id, "app_key": app_key}, json=True)
|
||||
statuses = []
|
||||
for line in lines:
|
||||
for line in lines.data:
|
||||
for status in line["lineStatuses"]:
|
||||
entry = {
|
||||
"id": line["id"],
|
||||
|
@ -182,13 +182,13 @@ class Module(ModuleManager.BaseModule):
|
|||
#As awful as this is, it also makes it ~work~.
|
||||
stop_name = event["args"].replace(" ", "%20")
|
||||
|
||||
stop_search = utils.http.get_url(URL_STOP_SEARCH % stop_name, get_params={
|
||||
stop_search = utils.http.request(URL_STOP_SEARCH % stop_name, get_params={
|
||||
"app_id": app_id, "app_key": app_key, "maxResults": "6", "faresOnly": "False"}, json=True)
|
||||
if stop_search:
|
||||
for stop in stop_search["matches"]:
|
||||
for stop in stop_search.data["matches"]:
|
||||
pass
|
||||
results = ["%s (%s): %s" % (stop["name"], ", ".join(stop["modes"]), stop["id"]) for stop in stop_search["matches"]]
|
||||
event["stdout"].write("[%s results] %s" % (stop_search["total"], "; ".join(results)))
|
||||
results = ["%s (%s): %s" % (stop["name"], ", ".join(stop["modes"]), stop["id"]) for stop in stop_search.data["matches"]]
|
||||
event["stdout"].write("[%s results] %s" % (stop_search.data["total"], "; ".join(results)))
|
||||
else:
|
||||
event["stderr"].write("No results")
|
||||
|
||||
|
@ -203,15 +203,15 @@ class Module(ModuleManager.BaseModule):
|
|||
|
||||
vehicle_id = event["args_split"][0]
|
||||
|
||||
vehicle = utils.http.get_url(URL_VEHICLE % vehicle_id, get_params={
|
||||
vehicle = utils.http.request(URL_VEHICLE % vehicle_id, get_params={
|
||||
"app_id": app_id, "app_key": app_key}, json=True)[0]
|
||||
|
||||
arrival_time = self.vehicle_span(vehicle["expectedArrival"], human=False)
|
||||
platform = self.platform(vehicle["platformName"])
|
||||
arrival_time = self.vehicle_span(vehicle.data["expectedArrival"], human=False)
|
||||
platform = self.platform(vehicle.data["platformName"])
|
||||
|
||||
event["stdout"].write("%s (%s) to %s. %s. Arrival at %s (%s) in %s minutes on %s" % (
|
||||
vehicle["vehicleId"], vehicle["lineName"], vehicle["destinationName"], vehicle["currentLocation"],
|
||||
vehicle["stationName"], vehicle["naptanId"], arrival_time, platform))
|
||||
vehicle.data["vehicleId"], vehicle.data["lineName"], vehicle.data["destinationName"], vehicle.data["currentLocation"],
|
||||
vehicle.data["stationName"], vehicle.data["naptanId"], arrival_time, platform))
|
||||
|
||||
@utils.hook("received.command.tflservice", min_args=1)
|
||||
def service(self, event):
|
||||
|
@ -233,10 +233,10 @@ class Module(ModuleManager.BaseModule):
|
|||
event["stdout"].write("%s is too high. Remember that the first arrival is 0" % service_id)
|
||||
return
|
||||
service = results[int(service_id)]
|
||||
arrivals = utils.http.get_url(URL_LINE_ARRIVALS % service["route"],
|
||||
arrivals = utils.http.request(URL_LINE_ARRIVALS % service["route"],
|
||||
get_params={"app_id": app_id, "app_key": app_key}, json=True)
|
||||
|
||||
arrivals = [a for a in arrivals if a["vehicleId"] == service["id"]]
|
||||
arrivals = [a for a in arrivals.data if a["vehicleId"] == service["id"]]
|
||||
arrivals = sorted(arrivals, key=lambda b: b["timeToStation"])
|
||||
|
||||
event["stdout"].write(
|
||||
|
@ -257,7 +257,7 @@ class Module(ModuleManager.BaseModule):
|
|||
|
||||
stop_id = event["args_split"][0]
|
||||
|
||||
stop = utils.http.get_url(URL_STOP % stop_id, get_params={
|
||||
stop = utils.http.requesst(URL_STOP % stop_id, get_params={
|
||||
"app_id": app_id, "app_key": app_key}, json=True)
|
||||
|
||||
def route(self, event):
|
||||
|
@ -266,7 +266,7 @@ class Module(ModuleManager.BaseModule):
|
|||
|
||||
route_id = event["args_split"][0]
|
||||
|
||||
route = utils.http.get_url(URL_ROUTE % route_id, get_params={
|
||||
route = utils.http.request(URL_ROUTE % route_id, get_params={
|
||||
"app_id": app_id, "app_key": app_key}, json=True)
|
||||
|
||||
event["stdout"].write("")
|
||||
|
|
|
@ -12,14 +12,14 @@ class Module(ModuleManager.BaseModule):
|
|||
:usage: <word> [type]
|
||||
"""
|
||||
phrase = event["args_split"][0]
|
||||
page = utils.http.get_url(URL_THESAURUS % (self.bot.config[
|
||||
page = utils.http.request(URL_THESAURUS % (self.bot.config[
|
||||
"bighugethesaurus-api-key"], phrase), json=True)
|
||||
syn_ant = event["command"][:3]
|
||||
if page:
|
||||
if not len(event["args_split"]) > 1:
|
||||
word_types = []
|
||||
for word_type in page.keys():
|
||||
if syn_ant in page[word_type]:
|
||||
for word_type in page.data.keys():
|
||||
if syn_ant in page.data[word_type]:
|
||||
word_types.append(word_type)
|
||||
if word_types:
|
||||
word_types = sorted(word_types)
|
||||
|
@ -30,11 +30,11 @@ class Module(ModuleManager.BaseModule):
|
|||
event["stderr"].write("No categories available")
|
||||
else:
|
||||
category = event["args_split"][1].lower()
|
||||
if category in page:
|
||||
if syn_ant in page[category]:
|
||||
if category in page.data:
|
||||
if syn_ant in page.data[category]:
|
||||
event["stdout"].write("%ss for %s: %s" % (
|
||||
event["command"].title(), phrase, ", ".join(
|
||||
page[category][syn_ant])))
|
||||
page.data[category][syn_ant])))
|
||||
else:
|
||||
event["stderr"].write("No %ss for %s" % (
|
||||
event["command"], phrase))
|
||||
|
|
|
@ -21,16 +21,15 @@ class Module(ModuleManager.BaseModule):
|
|||
if not url:
|
||||
raise utils.EventError("No URL provided/found.")
|
||||
|
||||
soup = None
|
||||
try:
|
||||
soup = utils.http.get_url(url, soup=True)
|
||||
page = utils.http.request(url, soup=True)
|
||||
except:
|
||||
pass
|
||||
|
||||
if not soup:
|
||||
if not page:
|
||||
raise utils.EventError("Failed to get URL.")
|
||||
|
||||
title = soup.title
|
||||
title = page.data.title
|
||||
if title:
|
||||
title = title.text.replace("\n", " ").replace("\r", ""
|
||||
).replace(" ", " ").strip()
|
||||
|
|
|
@ -19,30 +19,29 @@ class Module(ModuleManager.BaseModule):
|
|||
else:
|
||||
username = event["user"].get_setting("trakt",
|
||||
event["user"].nickname)
|
||||
page = utils.http.get_url(URL_TRAKT % username, headers={
|
||||
page = utils.http.request(URL_TRAKT % username, headers={
|
||||
"Content-Type": "application/json",
|
||||
"trakt-api-version": "2", "trakt-api-key":
|
||||
self.bot.config["trakt-api-key"]}, json=True,
|
||||
code=True)
|
||||
if page[0]:
|
||||
code, page = page
|
||||
if code == 200:
|
||||
type = page["type"]
|
||||
if page
|
||||
if page.code == 200:
|
||||
type = page.data["type"]
|
||||
if type == "movie":
|
||||
title = page["movie"]["title"]
|
||||
year = page["movie"]["year"]
|
||||
slug = page["movie"]["ids"]["slug"]
|
||||
title = page.data["movie"]["title"]
|
||||
year = page.data["movie"]["year"]
|
||||
slug = page.data["movie"]["ids"]["slug"]
|
||||
event["stdout"].write(
|
||||
"%s is now watching %s (%s) %s" % (
|
||||
username, title, year,
|
||||
URL_TRAKTSLUG % ("movie", slug)))
|
||||
elif type == "episode":
|
||||
season = page["episode"]["season"]
|
||||
episode_number = page["episode"]["number"]
|
||||
episode_title = page["episode"]["title"]
|
||||
show_title = page["show"]["title"]
|
||||
show_year = page["show"]["year"]
|
||||
slug = page["show"]["ids"]["slug"]
|
||||
season = page.data["episode"]["season"]
|
||||
episode_number = page.data["episode"]["number"]
|
||||
episode_title = page.data["episode"]["title"]
|
||||
show_title = page.data["show"]["title"]
|
||||
show_year = page.data["show"]["year"]
|
||||
slug = page.data["show"]["ids"]["slug"]
|
||||
event["stdout"].write(
|
||||
"%s is now watching %s s%se%s - %s %s" % (
|
||||
username, show_title, str(season).zfill(2),
|
||||
|
|
|
@ -32,11 +32,12 @@ class Module(ModuleManager.BaseModule):
|
|||
target_language = language_match.group(2)
|
||||
phrase = phrase.split(" ", 1)[1]
|
||||
|
||||
data = utils.http.get_url(URL_TRANSLATE, get_params={
|
||||
page = utils.http.request(URL_TRANSLATE, get_params={
|
||||
"client": "gtx", "sl": source_language,
|
||||
"tl": target_language, "dt": "t", "q": phrase})
|
||||
|
||||
if data and not data == "[null,null,\"\"]":
|
||||
if page and not page.data == "[null,null,\"\"]":
|
||||
data = page.data
|
||||
while ",," in data:
|
||||
data = data.replace(",,", ",null,")
|
||||
data = data.replace("[,", "[null,")
|
||||
|
|
|
@ -15,13 +15,13 @@ class Module(ModuleManager.BaseModule):
|
|||
if not arg_len == 12 and not arg_len == 13:
|
||||
raise utils.EventError("Invalid UPC/EAN/GTIN provided")
|
||||
|
||||
page = utils.http.get_url(UPCITEMDB_URL,
|
||||
page = utils.http.request(UPCITEMDB_URL,
|
||||
get_params={"upc": event["args_split"][0]},
|
||||
json=True)
|
||||
if page:
|
||||
if not len(page["items"]):
|
||||
if not len(page.data["items"]):
|
||||
raise utils.EventError("UPC/EAN not found")
|
||||
item = page["items"][0]
|
||||
item = page.data["items"][0]
|
||||
|
||||
brand = item.get("brand", None)
|
||||
brand = "" if not brand else "%s - " % brand
|
||||
|
|
|
@ -18,12 +18,12 @@ class Module(ModuleManager.BaseModule):
|
|||
if match:
|
||||
number = int(match.group(1))
|
||||
term = term.split(" ", 1)[1]
|
||||
page = utils.http.get_url(URL_URBANDICTIONARY,
|
||||
page = utils.http.request(URL_URBANDICTIONARY,
|
||||
get_params={"term": term}, json=True)
|
||||
if page:
|
||||
if len(page["list"]):
|
||||
if number > 0 and len(page["list"]) > number-1:
|
||||
definition = page["list"][number-1]
|
||||
if len(page.data["list"]):
|
||||
if number > 0 and len(page.data["list"]) > number-1:
|
||||
definition = page.data["list"][number-1]
|
||||
event["stdout"].write("%s: %s" % (definition["word"],
|
||||
definition["definition"].replace("\n", " ").replace(
|
||||
"\r", "").replace(" ", " ")))
|
||||
|
|
|
@ -12,19 +12,19 @@ class Module(ModuleManager.BaseModule):
|
|||
:usage: <location>
|
||||
"""
|
||||
api_key = self.bot.config["openweathermap-api-key"]
|
||||
page = utils.http.get_url(URL_WEATHER, get_params={
|
||||
page = utils.http.request(URL_WEATHER, get_params={
|
||||
"q": event["args"], "units": "metric",
|
||||
"APPID": api_key},
|
||||
json=True)
|
||||
if page:
|
||||
if "weather" in page:
|
||||
location = "%s, %s" % (page["name"], page["sys"][
|
||||
if "weather" in page.data:
|
||||
location = "%s, %s" % (page.data["name"], page.data["sys"][
|
||||
"country"])
|
||||
celsius = "%dC" % page["main"]["temp"]
|
||||
fahrenheit = "%dF" % ((page["main"]["temp"]*(9/5))+32)
|
||||
description = page["weather"][0]["description"].title()
|
||||
humidity = "%s%%" % page["main"]["humidity"]
|
||||
wind_speed = "%sKM/h" % page["wind"]["speed"]
|
||||
celsius = "%dC" % page.data["main"]["temp"]
|
||||
fahrenheit = "%dF" % ((page.data["main"]["temp"]*(9/5))+32)
|
||||
description = page.data["weather"][0]["description"].title()
|
||||
humidity = "%s%%" % page.data["main"]["humidity"]
|
||||
wind_speed = "%sKM/h" % page.data["wind"]["speed"]
|
||||
|
||||
event["stdout"].write(
|
||||
"(%s) %s/%s | %s | Humidity: %s | Wind: %s" % (
|
||||
|
|
|
@ -10,13 +10,13 @@ class Module(ModuleManager.BaseModule):
|
|||
:help: Get information from wikipedia
|
||||
:usage: <term>
|
||||
"""
|
||||
page = utils.http.get_url(URL_WIKIPEDIA, get_params={
|
||||
page = utils.http.request(URL_WIKIPEDIA, get_params={
|
||||
"action": "query", "prop": "extracts",
|
||||
"titles": event["args"], "exintro": "",
|
||||
"explaintext": "", "exchars": "500",
|
||||
"redirects": "", "format": "json"}, json=True)
|
||||
if page:
|
||||
pages = page["query"]["pages"]
|
||||
pages = page.data["query"]["pages"]
|
||||
article = list(pages.items())[0][1]
|
||||
if not "missing" in article:
|
||||
title, info = article["title"], article["extract"]
|
||||
|
|
|
@ -14,14 +14,14 @@ class Module(ModuleManager.BaseModule):
|
|||
:help: Evauate a given string on Wolfram|Alpha
|
||||
:usage: <query>
|
||||
"""
|
||||
code, result = utils.http.get_url(URL_WA,
|
||||
page = utils.http.requestl(URL_WA,
|
||||
get_params={"i": event["args"],
|
||||
"appid": self.bot.config["wolframalpha-api-key"],
|
||||
"reinterpret": "true", "units": "metric"}, code=True)
|
||||
|
||||
if not result == None:
|
||||
if code == 200:
|
||||
event["stdout"].write("%s: %s" % (event["args"], result))
|
||||
if page:
|
||||
if page.code == 200:
|
||||
event["stdout"].write("%s: %s" % (event["args"], page.data))
|
||||
else:
|
||||
event["stdout"].write("No results")
|
||||
else:
|
||||
|
|
|
@ -21,13 +21,13 @@ ARROW_DOWN = "↓"
|
|||
"validate": utils.bool_or_none})
|
||||
class Module(ModuleManager.BaseModule):
|
||||
def get_video_page(self, video_id, part):
|
||||
return utils.http.get_url(URL_YOUTUBEVIDEO, get_params={"part": part,
|
||||
return utils.http.request(URL_YOUTUBEVIDEO, get_params={"part": part,
|
||||
"id": video_id, "key": self.bot.config["google-api-key"]},
|
||||
json=True)
|
||||
def video_details(self, video_id):
|
||||
snippet = self.get_video_page(video_id, "snippet")
|
||||
if snippet["items"]:
|
||||
snippet = snippet["items"][0]["snippet"]
|
||||
if snippet.data["items"]:
|
||||
snippet = snippet.data["items"][0]["snippet"]
|
||||
statistics = self.get_video_page(video_id, "statistics")[
|
||||
"items"][0]["statistics"]
|
||||
content = self.get_video_page(video_id, "contentDetails")[
|
||||
|
@ -60,15 +60,15 @@ class Module(ModuleManager.BaseModule):
|
|||
search = event["query"]
|
||||
video_id = ""
|
||||
|
||||
search_page = utils.http.get_url(URL_YOUTUBESEARCH,
|
||||
search_page = utils.http.request(URL_YOUTUBESEARCH,
|
||||
get_params={"q": search, "part": "snippet",
|
||||
"maxResults": "1", "type": "video",
|
||||
"key": self.bot.config["google-api-key"]},
|
||||
json=True)
|
||||
|
||||
if search_page:
|
||||
if search_page["pageInfo"]["totalResults"] > 0:
|
||||
video_id = search_page["items"][0]["id"]["videoId"]
|
||||
if search_page.data["pageInfo"]["totalResults"] > 0:
|
||||
video_id = search_page.data["items"][0]["id"]["videoId"]
|
||||
return "https://youtu.be/%s" % video_id
|
||||
|
||||
@utils.hook("received.command.yt", alias_of="youtube")
|
||||
|
@ -93,14 +93,14 @@ class Module(ModuleManager.BaseModule):
|
|||
|
||||
if search or video_id:
|
||||
if not video_id:
|
||||
search_page = utils.http.get_url(URL_YOUTUBESEARCH,
|
||||
search_page = utils.http.request(URL_YOUTUBESEARCH,
|
||||
get_params={"q": search, "part": "snippet",
|
||||
"maxResults": "1", "type": "video",
|
||||
"key": self.bot.config["google-api-key"]},
|
||||
json=True)
|
||||
if search_page:
|
||||
if search_page["pageInfo"]["totalResults"] > 0:
|
||||
video_id = search_page["items"][0]["id"]["videoId"]
|
||||
if search_page.data["pageInfo"]["totalResults"] > 0:
|
||||
video_id = search_page.data["items"][0]["id"]["videoId"]
|
||||
else:
|
||||
event["stderr"].write("No videos found")
|
||||
else:
|
||||
|
|
|
@ -18,10 +18,18 @@ class HTTPParsingException(HTTPException):
|
|||
def throw_timeout():
|
||||
raise HTTPTimeoutException()
|
||||
|
||||
def get_url(url: str, method: str="GET", get_params: dict={},
|
||||
class Response(object):
|
||||
def __init__(self, code: int, data: typing.Any,
|
||||
headers: typing.Dict[str, str]):
|
||||
self.code = code
|
||||
self.data = data
|
||||
self.headers = headers
|
||||
|
||||
def request(url: str, method: str="GET", get_params: dict={},
|
||||
post_data: typing.Any=None, headers: dict={},
|
||||
json_data: typing.Any=None, code: bool=False, json: bool=False,
|
||||
soup: bool=False, parser: str="lxml", fallback_encoding: str="utf8"):
|
||||
soup: bool=False, parser: str="lxml", fallback_encoding: str="utf8",
|
||||
) -> Response:
|
||||
|
||||
if not urllib.parse.urlparse(url).scheme:
|
||||
url = "http://%s" % url
|
||||
|
@ -49,23 +57,21 @@ def get_url(url: str, method: str="GET", get_params: dict={},
|
|||
finally:
|
||||
signal.signal(signal.SIGALRM, signal.SIG_IGN)
|
||||
|
||||
response_headers = utils.CaseInsensitiveDict(response.headers)
|
||||
|
||||
if soup:
|
||||
soup = bs4.BeautifulSoup(response_content, parser)
|
||||
if code:
|
||||
return response.status_code, soup
|
||||
return soup
|
||||
return Response(response.status_code, soup, response_heders)
|
||||
|
||||
data = response_content.decode(response.encoding or fallback_encoding)
|
||||
if json and data:
|
||||
try:
|
||||
data = _json.loads(data)
|
||||
return Response(response.status_code, _json.loads(data),
|
||||
response_headers)
|
||||
except _json.decoder.JSONDecodeError as e:
|
||||
raise HTTPParsingException(str(e))
|
||||
|
||||
if code:
|
||||
return response.status_code, data
|
||||
else:
|
||||
return data
|
||||
return Response(response.status_code, data, response_headers)
|
||||
|
||||
def strip_html(s: str) -> str:
|
||||
return bs4.BeautifulSoup(s, "lxml").get_text()
|
||||
|
|
Loading…
Reference in a new issue