bitbot-3.11-fork/src/utils/http.py

173 lines
5.7 KiB
Python
Raw Normal View History

2019-07-08 11:45:10 +00:00
import asyncio, ipaddress, re, signal, socket, traceback, typing
import urllib.error, urllib.parse
2018-10-10 12:41:58 +00:00
import json as _json
2019-07-08 11:45:10 +00:00
import bs4, netifaces, requests
import tornado.httpclient
from src import utils
REGEX_URL = re.compile("https?://\S+", re.I)
PAIRED_CHARACTERS = ["<>", "()"]
# best-effort tidying up of URLs
2019-07-02 13:15:49 +00:00
def url_sanitise(url: str):
if not urllib.parse.urlparse(url).scheme:
url = "http://%s" % url
for pair_start, pair_end in PAIRED_CHARACTERS:
# trim ")" from the end only if there's not a "(" to match it
# google.com/) -> google.com/
# google.com/() -> google.com/()
# google.com/()) -> google.com/()
if url.endswith(pair_end):
if pair_start in url:
open_index = url.rfind("(")
other_index = url.rfind(")", 0, len(url)-1)
if not other_index == -1 and other_index < open_index:
url = url[:-1]
else:
url = url[:-1]
return url
USER_AGENT = ("Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/49.0.2623.87 Safari/537.36")
RESPONSE_MAX = (1024*1024)*100
SOUP_CONTENT_TYPES = ["text/html", "text/xml", "application/xml"]
class HTTPException(Exception):
pass
class HTTPTimeoutException(HTTPException):
def __init__(self):
Exception.__init__(self, "HTTP request timed out")
class HTTPParsingException(HTTPException):
def __init__(self, message: str, data: str=None):
Exception.__init__(self, message or "HTTP parsing failed")
class HTTPWrongContentTypeException(HTTPException):
def __init__(self, message: str=None):
Exception.__init__(self,
message or "HTTP request gave wrong content type")
def throw_timeout():
raise HTTPTimeoutException()
class Response(object):
def __init__(self, code: int, data: typing.Any,
headers: typing.Dict[str, str]):
self.code = code
self.data = data
self.headers = headers
def request(url: str, method: str="GET", get_params: dict={},
post_data: typing.Any=None, headers: dict={},
json_data: typing.Any=None, code: bool=False, json: bool=False,
soup: bool=False, parser: str="lxml", fallback_encoding: str="utf8",
allow_redirects: bool=True, check_content_type: bool=True
) -> Response:
2018-10-10 12:41:58 +00:00
if not urllib.parse.urlparse(url).scheme:
url = "http://%s" % url
2018-10-10 12:41:58 +00:00
if not "Accept-Language" in headers:
headers["Accept-Language"] = "en-GB"
if not "User-Agent" in headers:
headers["User-Agent"] = USER_AGENT
with utils.deadline(seconds=5):
try:
response = requests.request(
method.upper(),
url,
headers=headers,
params=get_params,
data=post_data,
json=json_data,
allow_redirects=allow_redirects,
stream=True
)
response_content = response.raw.read(RESPONSE_MAX, decode_content=True)
except utils.DeadlineExceededException:
raise HTTPTimeoutException()
2018-10-10 12:41:58 +00:00
response_headers = utils.CaseInsensitiveDict(dict(response.headers))
content_type = response.headers.get("Content-Type", "").split(";", 1)[0]
def _decode_data():
return response_content.decode(response.encoding or fallback_encoding)
if soup:
if not check_content_type or content_type in SOUP_CONTENT_TYPES:
soup = bs4.BeautifulSoup(_decode_data(), parser)
return Response(response.status_code, soup, response_headers)
else:
raise HTTPWrongContentTypeException(
"Tried to soup non-html/non-xml data (%s)" % content_type)
data = _decode_data()
2018-10-10 12:41:58 +00:00
if json and data:
try:
return Response(response.status_code, _json.loads(data),
response_headers)
except _json.decoder.JSONDecodeError as e:
raise HTTPParsingException(str(e), data)
2018-10-10 12:41:58 +00:00
return Response(response.status_code, data, response_headers)
def request_many(urls: typing.List[str]) -> typing.Dict[str, Response]:
responses = {}
async def _request(url):
client = tornado.httpclient.AsyncHTTPClient()
request = tornado.httpclient.HTTPRequest(url, method="GET",
connect_timeout=2, request_timeout=2)
2019-07-08 12:51:02 +00:00
response = await client.fetch(request)
headers = utils.CaseInsensitiveDict(dict(response.headers))
data = response.body.decode("utf8")
responses[url] = Response(response.code, data, headers)
loop = asyncio.new_event_loop()
awaits = []
for url in urls:
awaits.append(_request(url))
task = asyncio.wait(awaits, loop=loop, timeout=5)
loop.run_until_complete(task)
loop.close()
return responses
def strip_html(s: str) -> str:
return bs4.BeautifulSoup(s, "lxml").get_text()
def resolve_hostname(hostname: str) -> typing.List[str]:
try:
addresses = socket.getaddrinfo(hostname, None, 0, socket.SOCK_STREAM)
except:
return []
return [address[-1][0] for address in addresses]
def is_ip(addr: str) -> bool:
try:
ipaddress.ip_address(addr)
except ValueError:
return False
return True
def is_localhost(hostname: str) -> bool:
if is_ip(hostname):
ips = [ipaddress.ip_address(hostname)]
else:
ips = [ipaddress.ip_address(ip) for ip in resolve_hostname(hostname)]
for interface in netifaces.interfaces():
links = netifaces.ifaddresses(interface)
for link in links.get(netifaces.AF_INET, []
2019-04-25 16:50:41 +00:00
)+links.get(netifaces.AF_INET6, []):
address = ipaddress.ip_address(link["addr"].split("%", 1)[0])
if address in ips:
return True
return False