Source code for steps.api.utils.request

import json
import requests

[docs] AUTHORIZATION_KEY = "Authorization"
[docs] CONTENT_TYPE_KEY = "Content-Type"
[docs] JSON_HEADER_KEY = "application/json"
[docs] FORM_HEADER_KEY = "application/x-www-form-urlencoded"
[docs] class Request(requests.Request): def __init__( self, api=None, timeout: int = 3): super().__init__() from .api import Api
[docs] self.api: Api = api
[docs] self.method = ""
[docs] self.headers = {'Accept': JSON_HEADER_KEY}
[docs] self.params = {}
[docs] self.body = {}
[docs] self.original_path_url = ""
[docs] self.path_parameters = {}
[docs] self.path_url = ""
[docs] self.url = ""
[docs] self.log = True
[docs] self.timeout = timeout
[docs] self.expected_status_code = "200"
[docs] self.response = None
[docs] def set_header(self, key, value): self.headers[key] = value
[docs] def set_body(self, key, value): self.body[key] = value
[docs] def set_param(self, key, value): self.params[key] = value
[docs] def set_authorization_header(self, value): self.set_header(AUTHORIZATION_KEY, f"Bearer {value}")
[docs] def set_json_content_type(self): self.set_header(CONTENT_TYPE_KEY, JSON_HEADER_KEY)
[docs] def set_form_content_type(self): self.set_header(CONTENT_TYPE_KEY, FORM_HEADER_KEY)
[docs] def set_url(self, path_url: str, path_parameters: dict = {}): self.original_path_url = path_url self.path_parameters = path_parameters self.path_url = path_url.format(**path_parameters) self.url = self.api.url + self.path_url
[docs] def set_method(self, method: str): self.method = method
[docs] def send(self): prepare_request = self.prepare() prepare_request.prepare_url(self.url, self.params) if self.body and CONTENT_TYPE_KEY in self.headers: content_type = self.headers[CONTENT_TYPE_KEY] if JSON_HEADER_KEY in content_type: prepare_request.prepare_body(None, None, json=self.body) elif FORM_HEADER_KEY in content_type: prepare_request.prepare_body(self.body, None) else: raise NotImplementedError(f'Unknown request type {content_type}') self.response = requests.Session().send( prepare_request, timeout=self.timeout)
def __str__(self): return self.get_pretty_request() @staticmethod def _hide_secret(my_obj): if isinstance(my_obj, dict): for key, value in my_obj.items(): if key == "token" or "password" in key: my_obj[key] = "xxxxxxxxxx" else: my_obj[key] = Request._hide_secret(value) return my_obj
[docs] def get_pretty_request(self) -> str: headers = self.headers.copy() if 'Authorization' in headers: headers['Authorization'] = "xxxxxxxxxx" request_str = f"{self.method} " if self.path_parameters: request_str += f"{self.original_path_url} with {self.path_parameters}\n" else: request_str += f" {self.original_path_url}\n" request_str += f"headers: {json.dumps(headers, indent=2)}" if self.body: request_str += f"\nbody: {json.dumps(self.body, indent=2)}" return request_str
[docs] def get_pretty_response(self) -> str: try: response_json = Request._hide_secret(self.response.json()) except json.JSONDecodeError: assert False, "Cannot decode response as a json, status: " +\ f"{self.response.status_code}\n text: {self.response.text}" return f"status: {self.response.status_code}\nresponse:\n{json.dumps(response_json, indent=2)}"
[docs] def copy(self): req = Request(self.api) req.method = self.method req.headers = self.headers.copy() req.params = self.params.copy() req.body = self.body.copy() req.original_path_url = self.original_path_url req.path_parameters = self.path_parameters.copy() self.path_url = self.path_parameters req.url = self.url req.log = self.log req.timeout = self.timeout req.expected_status_code = self.expected_status_code return req