summaryrefslogtreecommitdiffstats
path: root/vnftest/onap/onap_api_call.py
diff options
context:
space:
mode:
Diffstat (limited to 'vnftest/onap/onap_api_call.py')
-rw-r--r--vnftest/onap/onap_api_call.py188
1 files changed, 188 insertions, 0 deletions
diff --git a/vnftest/onap/onap_api_call.py b/vnftest/onap/onap_api_call.py
new file mode 100644
index 0000000..7cc68c3
--- /dev/null
+++ b/vnftest/onap/onap_api_call.py
@@ -0,0 +1,188 @@
+##############################################################################
+# Copyright 2018 EuropeanSoftwareMarketingLtd.
+# ===================================================================
+# Licensed under the ApacheLicense, Version2.0 (the"License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# software distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and limitations under
+# the License
+##############################################################################
+from __future__ import absolute_import
+
+import copy
+import logging
+import time
+
+import os
+import yaml
+
+from vnftest.common import constants as consts
+from vnftest.common import rest_client
+from vnftest.common.exceptions import MandatoryKeyException, InputParameterMissing
+from vnftest.crawlers.base import Crawler
+from vnftest.onap.common.vnf_type_crawler import VnfTypeCrawler
+from vnftest.steps import base
+
+LOG = logging.getLogger(__name__)
+
+
+class OnapApiCall(base.Step):
+
+ __step_type__ = "OnapApiCall"
+
+ def __init__(self, step_cfg, context_cfg, input_params):
+ self.step_cfg = step_cfg
+ self.context_cfg = context_cfg
+ self.input_params = input_params
+ self.input_cfg = None
+ self.output_cfg = None
+ self.rest_def_file = None
+ self.setup_done = False
+ self.curr_path = os.path.dirname(os.path.abspath(__file__))
+
+ def setup(self):
+ options = self.step_cfg['options']
+ self.rest_def_file = options.get("file")
+ self.input_cfg = options.get("input", {})
+ self.output_cfg = options.get("output", {})
+ self.sla_cfg = self.step_cfg.get('sla', {'retries': 0})
+ self.setup_done = True
+
+ def eval_input(self, params):
+ for input_parameter in self.input_cfg:
+ param_name = input_parameter['parameter_name']
+ value = None
+ if 'value' in input_parameter:
+ value_def = input_parameter['value']
+ value = self.format_string(value_def, self.input_params)
+ if value is None or value == "":
+ raise InputParameterMissing(param_name=param_name, source="task configuration")
+ params[param_name] = value
+
+ def run(self, result, attempt=0):
+ output = self.run_impl(result)
+ try:
+ self.handle_sla(output)
+ except AssertionError as e:
+ LOG.info(str(e))
+ if attempt < self.sla_cfg['retries']:
+ time.sleep(self.sla_cfg['interval'])
+ LOG.info("retry operation")
+ attempt = attempt + 1
+ return self.run(result, attempt)
+ else:
+ raise e
+ return output
+
+ def run_impl(self, result):
+ if not self.setup_done:
+ self.setup()
+ output = {}
+ params = copy.deepcopy(consts.component_constants)
+ self.eval_input(params)
+ execution_result = self.execute_operation(params)
+ result_body = execution_result['body']
+ for output_parameter in self.output_cfg:
+ param_name = output_parameter['parameter_name']
+ param_value = output_parameter['value']
+ if param_value.find("[") > -1:
+ crawler_type = output_parameter.get('type', 'default')
+ crawler_class = Crawler.get_cls(crawler_type)
+ crawler = crawler_class()
+ param_value = crawler.crawl(result_body, param_value)
+ if param_value is None:
+ raise MandatoryKeyException(key_name='param_path', class_name=str(result_body))
+ result[param_name] = param_value
+ output[param_name] = param_value
+ self.handle_sla(output)
+ return output
+
+ def execute_operation(self, params, attempt=0):
+ try:
+ return self.execute_operation_impl(params)
+ except Exception as e:
+ LOG.info(str(e))
+ if attempt < 2:
+ time.sleep(15)
+ LOG.info("############# retry operation ##########")
+ attempt = attempt + 1
+ return self.execute_operation(params, attempt)
+ else:
+ raise e
+
+ def execute_operation_impl(self, params):
+ input_yaml = self.rest_def_file
+ LOG.info("########## processing " + input_yaml + "##########")
+ yaml_path = os.path.join(self.curr_path, input_yaml)
+ with open(yaml_path) as info:
+ operation = yaml.load(info)
+ operation = self.format(operation, params)
+ url = operation['url']
+ headers = operation['headers']
+ body = {}
+ if 'body' in operation:
+ body = operation['body']
+ LOG.info(url)
+ LOG.info(headers)
+ LOG.info(body)
+ if 'file' in operation:
+ file_path = operation['file']
+ LOG.info(file_path)
+ files = {'upload': open(file_path)}
+ result = rest_client.upload_file(url, headers, files, LOG)
+ else:
+ result = rest_client.call(url,
+ operation['method'],
+ headers,
+ body,
+ LOG)
+ if result['return_code'] >= 300:
+ raise RuntimeError(
+ "Operation failed. return_code:{}, message:{}".format(result['return_code'], result['body']))
+ LOG.info("Results: " + str(result))
+ return result
+
+ def format(self, d, params):
+ ret = None
+ if isinstance(d, dict):
+ ret = {}
+ for k, v in d.iteritems():
+ if isinstance(v, basestring):
+ v = self.format_string(v, params)
+ else:
+ v = self.format(v, params)
+ ret[k] = v
+ if isinstance(d, list):
+ ret = []
+ for v in d:
+ if isinstance(v, basestring):
+ v = self.format_string(v, params)
+ else:
+ v = self.format(v, params)
+ ret.append(v)
+ if isinstance(d, basestring):
+ ret = self.format_string(d, params)
+ return ret
+
+ @staticmethod
+ def format_string(st, params):
+ try:
+ return st.format(**params)
+ except Exception as e:
+ s = str(e)
+ s = s.replace("'", "")
+ LOG.info(s)
+ params[s] = ""
+ LOG.info("param" + params[s])
+ return st.format(**params)
+
+ def handle_sla(self, output):
+ if 'assert' in self.sla_cfg and 'equals' in self.sla_cfg:
+ value_def = self.sla_cfg['value']
+ value = self.format_string(value_def, output)
+ expected_value = self.sla_cfg['equals']
+ assert value == expected_value