import json
import requests
[docs]
AUTHORIZATION_KEY = "Authorization"
[docs]
CONTENT_TYPE_KEY = "Content-Type"
[docs]
class Request(requests.Request):
def __init__(
self, api=None, timeout: int = 3):
super().__init__()
from .api import Api
[docs]
self.original_path_url = ""
[docs]
self.path_parameters = {}
[docs]
self.expected_status_code = "200"
[docs]
def set_body(self, key, value):
self.body[key] = value
[docs]
def set_param(self, key, value):
self.params[key] = value
[docs]
def set_json_content_type(self):
self.set_header(CONTENT_TYPE_KEY, JSON_HEADER_KEY)
[docs]
def set_form_content_type(self):
self.set_header(CONTENT_TYPE_KEY, FORM_HEADER_KEY)
[docs]
def set_url(self, path_url: str, path_parameters: dict = {}):
self.original_path_url = path_url
self.path_parameters = path_parameters
self.path_url = path_url.format(**path_parameters)
self.url = self.api.url + self.path_url
[docs]
def set_method(self, method: str):
self.method = method
[docs]
def send(self):
prepare_request = self.prepare()
prepare_request.prepare_url(self.url, self.params)
if self.body and CONTENT_TYPE_KEY in self.headers:
content_type = self.headers[CONTENT_TYPE_KEY]
if JSON_HEADER_KEY in content_type:
prepare_request.prepare_body(None, None, json=self.body)
elif FORM_HEADER_KEY in content_type:
prepare_request.prepare_body(self.body, None)
else:
raise NotImplementedError(f'Unknown request type {content_type}')
self.response = requests.Session().send(
prepare_request,
timeout=self.timeout)
def __str__(self):
return self.get_pretty_request()
@staticmethod
def _hide_secret(my_obj):
if isinstance(my_obj, dict):
for key, value in my_obj.items():
if key == "token" or "password" in key:
my_obj[key] = "xxxxxxxxxx"
else:
my_obj[key] = Request._hide_secret(value)
return my_obj
[docs]
def get_pretty_request(self) -> str:
headers = self.headers.copy()
if 'Authorization' in headers:
headers['Authorization'] = "xxxxxxxxxx"
request_str = f"{self.method} "
if self.path_parameters:
request_str += f"{self.original_path_url} with {self.path_parameters}\n"
else:
request_str += f" {self.original_path_url}\n"
request_str += f"headers: {json.dumps(headers, indent=2)}"
if self.body:
request_str += f"\nbody: {json.dumps(self.body, indent=2)}"
return request_str
[docs]
def get_pretty_response(self) -> str:
try:
response_json = Request._hide_secret(self.response.json())
except json.JSONDecodeError:
assert False, "Cannot decode response as a json, status: " +\
f"{self.response.status_code}\n text: {self.response.text}"
return f"status: {self.response.status_code}\nresponse:\n{json.dumps(response_json, indent=2)}"
[docs]
def copy(self):
req = Request(self.api)
req.method = self.method
req.headers = self.headers.copy()
req.params = self.params.copy()
req.body = self.body.copy()
req.original_path_url = self.original_path_url
req.path_parameters = self.path_parameters.copy()
self.path_url = self.path_parameters
req.url = self.url
req.log = self.log
req.timeout = self.timeout
req.expected_status_code = self.expected_status_code
return req