aboutsummaryrefslogtreecommitdiffstats
path: root/test/vcpe/vcpecommon.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/vcpe/vcpecommon.py')
-rwxr-xr-xtest/vcpe/vcpecommon.py102
1 files changed, 100 insertions, 2 deletions
diff --git a/test/vcpe/vcpecommon.py b/test/vcpe/vcpecommon.py
index bb83c2dcb..78085fc1f 100755
--- a/test/vcpe/vcpecommon.py
+++ b/test/vcpe/vcpecommon.py
@@ -56,9 +56,10 @@ class VcpeCommon:
}
#############################################################################
- # set name of sdnc controller pod, prefix is taken from helm environment name
+ # Set name of Onap's k8s namespace and sdnc controller pod
# CHANGEME part
- sdnc_controller_pod = 'dev-sdnc-sdnc-0'
+ onap_namespace = 'dev'
+ sdnc_controller_pod = '-'.join([onap_namespace,'sdnc-sdnc-0'])
template_variable_symbol = '${'
cpe_vm_prefix = 'zdcpe'
@@ -196,6 +197,17 @@ class VcpeCommon:
self.vpp_api_userpass = ('admin', 'admin')
self.vpp_ves_url= 'http://{0}:8183/restconf/config/vesagent:vesagent'
+ #############################################################################################
+ # POLICY urls
+ self.policy_userpass = ('healthcheck', 'zb!XztG34')
+ self.policy_headers = {'Accept': 'application/json', 'Content-Type': 'application/json'}
+ self.policy_api_url = 'https://{0}:6969/policy/api/v1/policytypes/onap.policies.controlloop.Operational/versions/1.0.0/policies'
+ self.policy_pap_get_url = 'https://{0}:6969/policy/pap/v1/pdps'
+ self.policy_pap_json = {'policies': [{'policy-id': 'operational.vcpe'}]}
+ self.policy_pap_post_url = self.policy_pap_get_url + '/policies'
+ self.policy_api_service_name = 'policy-api'
+ self.policy_pap_service_name = 'policy-pap'
+
def heatbridge(self, openstack_stack_name, svc_instance_uuid):
"""
Add vserver information to AAI
@@ -323,6 +335,74 @@ class VcpeCommon:
self.logger.error("Can't get subnet info from network name: " + network_name)
return False
+ def set_closed_loop_policy(self, policy_template_file):
+ # Gather policy services cluster ips
+ p_api_cluster_ip = self.get_k8s_service_cluster_ip(self.policy_api_service_name)
+ p_pap_cluster_ip = self.get_k8s_service_cluster_ip(self.policy_pap_service_name)
+
+ # Read policy json from file
+ with open(policy_template_file) as f:
+ try:
+ policy_json = json.load(f)
+ except ValueError:
+ self.logger.error(policy_template_file + " doesn't seem to contain valid JSON data")
+ sys.exit()
+
+ # Check policy already applied
+ requests.packages.urllib3.disable_warnings()
+ policy_exists_req = requests.get(self.policy_pap_get_url.format(
+ p_pap_cluster_ip), auth=self.policy_userpass,
+ verify=False, headers=self.policy_headers)
+ if policy_exists_req.status_code != 200:
+ self.logger.error('Failure in checking CL policy existence. '
+ 'Policy-pap responded with HTTP code {0}'.format(
+ policy_exists_req.status_code))
+ sys.exit()
+
+ try:
+ policy_exists_json = policy_exists_req.json()
+ except ValueError as e:
+ self.logger.error('Policy-pap request failed: ' + e.message)
+ sys.exit()
+
+ try:
+ assert policy_exists_json['groups'][0]['pdpSubgroups'] \
+ [1]['policies'][0]['name'] != 'operational.vcpe'
+ except AssertionError:
+ self.logger.info('vCPE closed loop policy already exists, not applying')
+ return
+ except IndexError:
+ pass # policy doesn't exist
+
+ # Create policy
+ policy_create_req = requests.post(self.policy_api_url.format(
+ p_api_cluster_ip), auth=self.policy_userpass,
+ json=policy_json, verify=False,
+ headers=self.policy_headers)
+ # Get the policy id from policy-api response
+ if policy_create_req.status_code != 200:
+ self.logger.error('Failed creating policy. Policy-api responded'
+ ' with HTTP code {0}'.format(policy_create_req.status_code))
+ sys.exit()
+
+ try:
+ policy_version = json.loads(policy_create_req.text)['policy-version']
+ except (KeyError, ValueError):
+ self.logger.error('Policy API response not understood:')
+ self.logger.debug('\n' + str(policy_create_req.text))
+
+ # Inject the policy into Policy PAP
+ self.policy_pap_json['policies'].append({'policy-version': policy_version})
+ policy_insert_req = requests.post(self.policy_pap_post_url.format(
+ p_pap_cluster_ip), auth=self.policy_userpass,
+ json=self.policy_pap_json, verify=False,
+ headers=self.policy_headers)
+ if policy_insert_req.status_code != 200:
+ self.logger.error('Policy PAP request failed with HTTP code'
+ '{0}'.format(policy_insert_req.status_code))
+ sys.exit()
+ self.logger.info('Successully pushed closed loop Policy')
+
def is_node_in_aai(self, node_type, node_uuid):
key = None
search_node_type = None
@@ -463,6 +543,24 @@ class VcpeCommon:
vm_ip[vm] = self.oom_so_sdnc_aai_ip
return vm_ip
+ def get_k8s_service_cluster_ip(self, service):
+ """
+ Returns cluster IP for a given service
+ :param service: name of the service
+ :return: cluster ip
+ """
+ config.load_kube_config()
+ api = client.CoreV1Api()
+ kslogger = logging.getLogger('kubernetes')
+ kslogger.setLevel(logging.INFO)
+ try:
+ resp = api.read_namespaced_service(service, self.onap_namespace)
+ except client.rest.ApiException as e:
+ self.logger.error('Error while making k8s API request: ' + e.body)
+ sys.exit()
+
+ return resp.spec.cluster_ip
+
def extract_vm_ip_as_dict(self, novalist_results, net_addr, net_addr_len):
vm_ip_dict = {}
for line in novalist_results.split('\n'):