refactor utils.http.requests to support a Request object

This commit is contained in:
jesopo 2019-09-11 17:44:07 +01:00
parent 8f8cf92ae2
commit 4a97c9eb0d
6 changed files with 83 additions and 34 deletions

View file

@ -9,7 +9,7 @@ class Module(ModuleManager.BaseModule):
@utils.kwarg("usage", "<acronym>")
def acronym(self, event):
query = event["args_split"][0].upper()
response = utils.http.request(API % query, soup=True)
response = utils.http.request(API % query, parse=True)
if response.data:
acronyms = []
for element in response.data.find_all("acro"):

View file

@ -12,8 +12,7 @@ class Module(ModuleManager.BaseModule):
try:
page = utils.http.request(EVAL_URL,
post_data={"input": event["args"]},
method="POST",
soup=True)
method="POST", parse=True)
except socket.timeout:
raise utils.EventError("%s: eval timed out" %
event["user"].nickname)

View file

@ -32,8 +32,8 @@ class Module(ModuleManager.BaseModule):
args = API_ARGS.copy()
args["code"] = FN_TEMPLATE % event["args"]
try:
page = utils.http.request(EVAL_URL, json_data=args,
method="POST", json=True)
page = utils.http.request(EVAL_URL, post_data=args,
method="POST", json=True, content_type="application/json")
except socket.timeout:
raise utils.EventError("%s: eval timed out" %
event["user"].nickname)

View file

@ -56,7 +56,7 @@ class Module(ModuleManager.BaseModule):
raise utils.EventError("Please provide @<user>@<instance>")
hostmeta = utils.http.request(HOSTMETA % instance,
soup=True, check_content_type=False)
parse=True, check_content_type=False)
webfinger_url = None
for item in hostmeta.data.find_all("link"):
if item["rel"] and item["rel"][0] == "lrdd":

View file

@ -26,7 +26,7 @@ class Module(ModuleManager.BaseModule):
return None
try:
page = utils.http.request(url, soup=True)
page = utils.http.request(url, parse=True)
except utils.http.HTTPWrongContentTypeException:
return None
except Exception as e:

View file

@ -52,6 +52,62 @@ class HTTPWrongContentTypeException(HTTPException):
def throw_timeout():
raise HTTPTimeoutException()
class Request(object):
def __init__(self, url: str, method: str="GET",
get_params: typing.Dict[str, str]={}, post_data: typing.Any=None,
headers: typing.Dict[str, str]={},
json: bool=False, allow_redirects: bool=True,
check_content_type: bool=True, parse: bool=False,
detect_encoding: bool=True,
parser: str="lxml", fallback_encoding="iso-8859-1",
content_type: str=None,
**kwargs):
self.set_url(url)
self.method = method.upper()
self.get_params = get_params
self.post_data = post_data
self.headers = headers
self.json = json
self.allow_redirects = allow_redirects
self.check_content_type = check_content_type
self.parse = parse
self.detect_encoding = detect_encoding
self.parser = parser
self.fallback_encoding = fallback_encoding
self.content_type = content_type
if kwargs:
if method == "POST":
self.post_data = kwargs
else:
self.get_params.update(kwargs)
def set_url(self, url: str):
if not urllib.parse.urlparse(url).scheme:
url = "http://%s" % url
self.url = url
def get_headers(self) -> typing.Dict[str, str]:
headers = self.headers.copy()
if not "Accept-Language" in headers:
headers["Accept-Language"] = "en-GB"
if not "User-Agent" in headers:
headers["User-Agent"] = USER_AGENT
if not "Content-Type" in headers and self.content_type:
headers["Content-Type"] = self.content_type
return headers
def get_body(self) -> typing.Any:
if self.content_type == "application/json":
return _json.dumps(self.post_data)
else:
return self.post_data
class Response(object):
def __init__(self, code: int, data: typing.Any,
headers: typing.Dict[str, str]):
@ -84,31 +140,23 @@ def _find_encoding(soup: bs4.BeautifulSoup) -> typing.Optional[str]:
return None
def request(url: str, method: str="GET", get_params: dict={},
post_data: typing.Any=None, headers: dict={},
json_data: typing.Any=None, code: bool=False, json: bool=False,
soup: bool=False, parser: str="lxml", detect_encoding: bool=True,
fallback_encoding: str="utf8", allow_redirects: bool=True,
check_content_type: bool=True) -> Response:
def request(request_obj: typing.Union[str, Request], **kwargs) -> Response:
if type(request_obj) == str:
request_obj = Request(request_obj, **kwargs)
return _request(request_obj)
if not urllib.parse.urlparse(url).scheme:
url = "http://%s" % url
if not "Accept-Language" in headers:
headers["Accept-Language"] = "en-GB"
if not "User-Agent" in headers:
headers["User-Agent"] = USER_AGENT
def _request(request_obj: Request) -> Response:
headers = request_obj.get_headers()
with utils.deadline(seconds=5):
try:
response = requests.request(
method.upper(),
url,
request_obj.method,
request_obj.url,
headers=headers,
params=get_params,
data=post_data,
json=json_data,
allow_redirects=allow_redirects,
params=request_obj.get_params,
data=request_obj.get_body(),
allow_redirects=request_obj.allow_redirects,
stream=True
)
response_content = response.raw.read(RESPONSE_MAX,
@ -122,23 +170,25 @@ def request(url: str, method: str="GET", get_params: dict={},
response_headers = utils.CaseInsensitiveDict(dict(response.headers))
content_type = response.headers.get("Content-Type", "").split(";", 1)[0]
encoding = response.encoding or "iso-8859-1"
if detect_encoding and content_type and content_type in SOUP_CONTENT_TYPES:
souped = bs4.BeautifulSoup(response_content, parser)
encoding = response.encoding or request_obj.fallback_encoding
if (request_obj.detect_encoding and
content_type and content_type in SOUP_CONTENT_TYPES):
souped = bs4.BeautifulSoup(response_content, request_obj.parser)
encoding = _find_encoding(souped) or encoding
def _decode_data():
return response_content.decode(encoding)
if soup:
if not check_content_type or content_type in SOUP_CONTENT_TYPES:
soup = bs4.BeautifulSoup(_decode_data(), parser)
return Response(response.status_code, soup, response_headers)
if request_obj.parse:
if (not request_obj.check_content_type or
content_type in SOUP_CONTENT_TYPES):
souped = bs4.BeautifulSoup(_decode_data(), request_obj.parser)
return Response(response.status_code, souped, response_headers)
else:
raise HTTPWrongContentTypeException(
"Tried to soup non-html/non-xml data (%s)" % content_type)
if json and response_content:
if request_obj.json and response_content:
data = _decode_data()
try:
return Response(response.status_code, _json.loads(data),