aboutsummaryrefslogtreecommitdiffstats
path: root/onap-client/onap_client/client/request.py
diff options
context:
space:
mode:
Diffstat (limited to 'onap-client/onap_client/client/request.py')
-rw-r--r--onap-client/onap_client/client/request.py203
1 files changed, 203 insertions, 0 deletions
diff --git a/onap-client/onap_client/client/request.py b/onap-client/onap_client/client/request.py
new file mode 100644
index 0000000..0e40591
--- /dev/null
+++ b/onap-client/onap_client/client/request.py
@@ -0,0 +1,203 @@
+# -*- coding: utf8 -*-
+# ============LICENSE_START=======================================================
+# org.onap.vvp/validation-scripts
+# ===================================================================
+# Copyright © 2020 AT&T Intellectual Property. All rights reserved.
+# ===================================================================
+#
+# Unless otherwise specified, all software contained herein is licensed
+# under the Apache License, Version 2.0 (the "License");
+# you may not use this software except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+#
+# Unless otherwise specified, all documentation contained herein is licensed
+# under the Creative Commons License, Attribution 4.0 Intl. (the "License");
+# you may not use this documentation except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://creativecommons.org/licenses/by/4.0/
+#
+# Unless required by applicable law or agreed to in writing, documentation
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ============LICENSE_END============================================
+
+import jinja2
+import requests
+import json
+import os
+import copy
+
+from onap_client.client.response import ResponseHandler
+from onap_client.config import LOG as logger
+from onap_client.exceptions import FilesRequestFailure
+from jinja2 import exceptions as jinja_exceptions
+
+
+class RequestHandler:
+ """Handles a APICatalogRequestObject to make a request
+ and returns a ResponseHandler object"""
+
+ def __init__(self, request_object):
+ """
+ :request_object: APICatalogRequestObject
+ """
+ self.request_object = request_object
+
+ def make_request(self):
+ r = Request(self.request_object)
+
+ logger.warning("Submitting request: {}".format(self.request_object.description))
+ # TODO
+ # Add verify to config file
+ return ResponseHandler(r.request(verify=False), self.request_object)
+
+
+class Request:
+ """Parses a APICatalogRequestObject to fill out the
+ kwargs to send to the requests library"""
+
+ def __init__(self, request_object):
+ """
+ :request_object: APICatalogRequestObject
+ """
+ self.request_object = request_object
+ self.kwargs = {}
+ self.response = None
+
+ self.build_request()
+
+ def build_request(self):
+ request_object = self.request_object
+
+ if request_object.verb:
+ self.kwargs["method"] = request_object.verb
+
+ if request_object.auth:
+ self.kwargs["auth"] = request_object.auth
+
+ if request_object.uri:
+ self.kwargs["url"] = request_object.uri
+
+ if request_object.headers:
+ self.kwargs["headers"] = request_object.headers
+
+ if request_object.payload:
+ self.kwargs["data"] = request_object.payload
+
+ if request_object.files:
+ self.kwargs["files"] = request_object.files
+
+ debug_request = copy.deepcopy(self.kwargs)
+ if "auth" in debug_request:
+ debug_request["auth"] = "***********"
+
+ try:
+ logger.info(json.dumps(debug_request, indent=4))
+ except TypeError:
+ logger.info(debug_request)
+
+ def request(self, verify=True):
+ return requests.request(**self.kwargs, verify=verify)
+
+
+class APICatalogRequestObject:
+ """Fills a APICatalogResource object with request-specific data"""
+
+ def __init__(self, api_catalog_resource, **kwargs):
+ """
+ :api_catalog_resource: APICatalogResource object
+ :kwargs: key/value to fill in APICatalogResource parameters
+ """
+ self.api_catalog_resource = api_catalog_resource
+ self.payload_parameters = kwargs.get("payload_parameters", {})
+ self.uri_parameters = kwargs.get("uri_parameters", {})
+ self.header_parameters = kwargs.get("header_parameters", {})
+ self.file_parameters = kwargs.get("file_parameters", {})
+ if api_catalog_resource.payload_path:
+ self.payload_path = kwargs.get("payload_path", {}).get(
+ api_catalog_resource.payload_path[0]
+ )
+
+ self.uri = ""
+ self.files = None
+ self.payload = None
+ self.verb = api_catalog_resource.verb
+ self.headers = api_catalog_resource.headers
+ self.success_code = api_catalog_resource.success_code
+ self.return_data = api_catalog_resource.return_data
+ self.auth = api_catalog_resource.auth
+ self.description = api_catalog_resource.description
+
+ if api_catalog_resource.payload or api_catalog_resource.payload_path:
+ self.resolve_payload()
+
+ if api_catalog_resource.file_parameters:
+ self.resolve_files()
+
+ if isinstance(self.headers, dict):
+ for k, v in self.header_parameters.items():
+ self.headers[k] = v
+
+ self.resolve_uri()
+
+ def resolve_files(self):
+ # TODO
+ # is there a better way to figure out waht params are needed?
+ # right now its hardcoded
+ file_type = self.file_parameters.get("file_type", "application/zip")
+ file_path = self.file_parameters.get("file_path")
+ if not file_path:
+ raise FilesRequestFailure("File path was not provided")
+
+ try:
+ with open(file_path, "rb") as f:
+ data = f.read()
+ except IOError:
+ logger.error("file {} was not found".format(file_path))
+ raise
+
+ file_name = os.path.basename(file_path)
+
+ self.files = {"upload": [file_name, data, file_type]}
+
+ def resolve_payload(self):
+ try:
+ if self.api_catalog_resource.payload_path:
+ with open(self.payload_path, "r") as f:
+ self.payload = f.read()
+ else:
+ with open(self.api_catalog_resource.payload, "r") as f:
+ self.payload = jinja2.Template(f.read()).render(
+ **self.payload_parameters
+ )
+ except jinja_exceptions.TemplateNotFound:
+ logger.error(
+ "{} file not found. Check payloads directory.".format(self.payload)
+ )
+ raise
+ except FileNotFoundError:
+ logger.error(
+ "{} file not found. Check payloads directory.".format(self.payload)
+ )
+ raise
+
+ def resolve_uri(self):
+ try:
+ self.uri = self.api_catalog_resource.uri(**self.uri_parameters)
+ except KeyError:
+ logger.error("invalid uri keys {}.".format(self.uri_parameters))
+ raise