From f8907f0c4fc0ba4bb97a1d636a50c5b40c2642f2 Mon Sep 17 00:00:00 2001 From: Edan Binshtok Date: Wed, 4 Oct 2017 09:33:23 +0300 Subject: Initial seed Initial upload of django test framework Change-Id: I643a7f4efc52cfafe4cc6d92e3178f36a0c1837c Issue-Id: VVP-1 Signed-off-by: Edan Binshtok --- services/database/__init__.py | 38 ++ .../database/__pycache__/__init__.cpython-36.pyc | Bin 0 -> 145 bytes .../database/__pycache__/db_bridge.cpython-36.pyc | Bin 0 -> 1251 bytes .../__pycache__/db_checklist.cpython-36.pyc | Bin 0 -> 12128 bytes .../database/__pycache__/db_cms.cpython-36.pyc | Bin 0 -> 8742 bytes .../database/__pycache__/db_general.cpython-36.pyc | Bin 0 -> 10828 bytes .../database/__pycache__/db_user.cpython-36.pyc | Bin 0 -> 11452 bytes .../__pycache__/db_virtual_function.cpython-36.pyc | Bin 0 -> 8197 bytes services/database/db_bridge.py | 60 +++ services/database/db_checklist.py | 386 +++++++++++++++++++ services/database/db_cms.py | 265 +++++++++++++ services/database/db_general.py | 416 ++++++++++++++++++++ services/database/db_user.py | 417 +++++++++++++++++++++ services/database/db_virtual_function.py | 227 +++++++++++ 14 files changed, 1809 insertions(+) create mode 100644 services/database/__init__.py create mode 100644 services/database/__pycache__/__init__.cpython-36.pyc create mode 100644 services/database/__pycache__/db_bridge.cpython-36.pyc create mode 100644 services/database/__pycache__/db_checklist.cpython-36.pyc create mode 100644 services/database/__pycache__/db_cms.cpython-36.pyc create mode 100644 services/database/__pycache__/db_general.cpython-36.pyc create mode 100644 services/database/__pycache__/db_user.cpython-36.pyc create mode 100644 services/database/__pycache__/db_virtual_function.cpython-36.pyc create mode 100644 services/database/db_bridge.py create mode 100644 services/database/db_checklist.py create mode 100644 services/database/db_cms.py create mode 100755 services/database/db_general.py create mode 100644 services/database/db_user.py create mode 100644 services/database/db_virtual_function.py (limited to 'services/database') diff --git a/services/database/__init__.py b/services/database/__init__.py new file mode 100644 index 0000000..30d7152 --- /dev/null +++ b/services/database/__init__.py @@ -0,0 +1,38 @@ + +# ============LICENSE_START========================================== +# org.onap.vvp/test-engine +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. diff --git a/services/database/__pycache__/__init__.cpython-36.pyc b/services/database/__pycache__/__init__.cpython-36.pyc new file mode 100644 index 0000000..12ed104 Binary files /dev/null and b/services/database/__pycache__/__init__.cpython-36.pyc differ diff --git a/services/database/__pycache__/db_bridge.cpython-36.pyc b/services/database/__pycache__/db_bridge.cpython-36.pyc new file mode 100644 index 0000000..06fb01e Binary files /dev/null and b/services/database/__pycache__/db_bridge.cpython-36.pyc differ diff --git a/services/database/__pycache__/db_checklist.cpython-36.pyc b/services/database/__pycache__/db_checklist.cpython-36.pyc new file mode 100644 index 0000000..81e29bd Binary files /dev/null and b/services/database/__pycache__/db_checklist.cpython-36.pyc differ diff --git a/services/database/__pycache__/db_cms.cpython-36.pyc b/services/database/__pycache__/db_cms.cpython-36.pyc new file mode 100644 index 0000000..783d2f8 Binary files /dev/null and b/services/database/__pycache__/db_cms.cpython-36.pyc differ diff --git a/services/database/__pycache__/db_general.cpython-36.pyc b/services/database/__pycache__/db_general.cpython-36.pyc new file mode 100644 index 0000000..c39ce12 Binary files /dev/null and b/services/database/__pycache__/db_general.cpython-36.pyc differ diff --git a/services/database/__pycache__/db_user.cpython-36.pyc b/services/database/__pycache__/db_user.cpython-36.pyc new file mode 100644 index 0000000..46d136e Binary files /dev/null and b/services/database/__pycache__/db_user.cpython-36.pyc differ diff --git a/services/database/__pycache__/db_virtual_function.cpython-36.pyc b/services/database/__pycache__/db_virtual_function.cpython-36.pyc new file mode 100644 index 0000000..8a46fb8 Binary files /dev/null and b/services/database/__pycache__/db_virtual_function.cpython-36.pyc differ diff --git a/services/database/db_bridge.py b/services/database/db_bridge.py new file mode 100644 index 0000000..fc765c7 --- /dev/null +++ b/services/database/db_bridge.py @@ -0,0 +1,60 @@ + +# ============LICENSE_START========================================== +# org.onap.vvp/test-engine +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +class DBBridge: + + """ + This class helps to use functions inside classes with circular import (dependencies). + Use this class only when there is circular import in one of the DB services. + """ + + @staticmethod + def select_personal_next_step(user_email): + """select_personal_next_step: Originally can be found under DBUser class.""" + from services.database.db_user import DBUser + return DBUser.select_personal_next_step(user_email) + + @staticmethod + def helper_rand_string(type, num=""): + from services.helper import Helper + return Helper.rand_string(type, num) + + @staticmethod + def helper_internal_assert(arg1, arg2): + from services.helper import Helper + return Helper.internal_assert(arg1, arg2) diff --git a/services/database/db_checklist.py b/services/database/db_checklist.py new file mode 100644 index 0000000..15851cf --- /dev/null +++ b/services/database/db_checklist.py @@ -0,0 +1,386 @@ + +# ============LICENSE_START========================================== +# org.onap.vvp/test-engine +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +import time +from uuid import uuid4 + +from django.utils import timezone +import psycopg2 + +from services.constants import Constants +from services.database.db_general import DBGeneral +from services.logging_service import LoggingServiceFactory +from services.session import session + + +logger = LoggingServiceFactory.get_logger() + +class DBChecklist: + + @staticmethod + def select_where_approval_state(queryColumnName, queryTableName, whereParametrType, whereParametrValue, fetchNum): + try: + dbConn = psycopg2.connect( + DBGeneral.return_db_native_connection('em_db')) + dbConn = dbConn + cur = dbConn.cursor() + queryStr = "select %s from %s Where %s = '%s' and state = 'approval';" % ( + queryColumnName, queryTableName, whereParametrType, whereParametrValue) + logger.debug("Query : " + queryStr) + cur.execute(queryStr) + if (fetchNum == 0): + result = str(cur.fetchall()) + elif (fetchNum == 1): + result = str(cur.fetchone()) + if(result.find("',)") != -1): # formatting strings e.g uuid + result = result.partition('\'')[-1].rpartition('\'')[0] + elif(result.find(",)") != -1): # formatting ints e.g id + result = result.partition('(')[-1].rpartition(',')[0] + dbConn.close() + logger.debug("Query result: " + str(result)) + if result == None: + errorMsg = "select_where_approval_state FAILED " + logger.error(errorMsg) + raise + return result + # If failed - count the failure and add the error to list of errors. + except: + errorMsg = "select_where_approval_state FAILED " + raise Exception(errorMsg, "select_where_approval_state FAILED") + + @staticmethod + def select_where_pr_state(queryColumnName, queryTableName, whereParametrType, whereParametrValue, fetchNum): + try: + dbConn = psycopg2.connect( + DBGeneral.return_db_native_connection('em_db')) + dbConn = dbConn + cur = dbConn.cursor() + queryStr = "select %s from %s Where %s = '%s' and state = 'peer_review';" % ( + queryColumnName, queryTableName, whereParametrType, whereParametrValue) + logger.debug("Query : " + queryStr) + cur.execute(queryStr) + if (fetchNum == 0): + result = str(cur.fetchall()) + elif (fetchNum == 1): + result = str(cur.fetchone()) + if(result.find("',)") != -1): # formatting strings e.g uuid + result = result.partition('\'')[-1].rpartition('\'')[0] + elif(result.find(",)") != -1): # formatting ints e.g id + result = result.partition('(')[-1].rpartition(',')[0] + dbConn.close() + if result == None: + errorMsg = "select_where_pr_state FAILED " + logger.error(errorMsg) + raise + logger.debug("Query result: " + str(result)) + return result + # If failed - count the failure and add the error to list of errors. + except: + errorMsg = "select_where FAILED " + raise Exception(errorMsg, "select_where") + + @staticmethod + def select_where_cl_not_archive(queryColumnName, queryTableName, whereParametrType, whereParametrValue, fetchNum): + try: + dbConn = psycopg2.connect( + DBGeneral.return_db_native_connection('em_db')) + dbConn = dbConn + cur = dbConn.cursor() + queryStr = "select %s from %s Where %s = '%s' and state != 'archive';" % ( + queryColumnName, queryTableName, whereParametrType, whereParametrValue) + logger.debug("Query : " + queryStr) + cur.execute(queryStr) + if (fetchNum == 0): + result = str(cur.fetchall()) + elif (fetchNum == 1): + result = str(cur.fetchone()) + if(result.find("',)") != -1): # formatting strings e.g uuid + result = result.partition('\'')[-1].rpartition('\'')[0] + elif(result.find(",)") != -1): # formatting ints e.g id + result = result.partition('(')[-1].rpartition(',')[0] + dbConn.close() + logger.debug("Query result: " + str(result)) + return result + # If failed - count the failure and add the error to list of errors. + except: + errorMsg = "select_where FAILED " + raise Exception(errorMsg, "select_where") + + @staticmethod + def select_native_where(queryColumnName, queryTableName, whereParametrType, whereParametrValue, fetchNum): + try: + dbConn = psycopg2.connect( + DBGeneral.return_db_native_connection('em_db')) + dbConn = dbConn + cur = dbConn.cursor() + queryStr = "select %s from %s Where %s = '%s';" % ( + queryColumnName, queryTableName, whereParametrType, whereParametrValue) + logger.debug("Query : " + queryStr) + cur.execute(queryStr) + if (fetchNum == 0): + result = str(cur.fetchall()) + elif (fetchNum == 1): + result = str(cur.fetchone()) + if(result.find("',)") != -1): # formatting strings e.g uuid + result = result.partition('\'')[-1].rpartition('\'')[0] + elif(result.find(",)") != -1): # formatting ints e.g id + result = result.partition('(')[-1].rpartition(',')[0] + dbConn.close() + logger.debug("Query result: " + str(result)) + return result + # If failed - count the failure and add the error to list of errors. + except: + errorMsg = "select_where FAILED " + raise Exception(errorMsg, "select_where") + + @staticmethod + def update_checklist_to_review_state(queryTableName): + try: + dbConn = psycopg2.connect( + DBGeneral.return_db_native_connection('em_db')) + dbConn = dbConn + cur = dbConn.cursor() + queryStr = "UPDATE ice_checklist SET state='review' Where name= '%s' and state= 'pending';" % ( + queryTableName) + logger.debug("Query : " + queryStr) + cur.execute(queryStr) + dbConn.commit() + dbConn.close() + # If failed - count the failure and add the error to list of errors. + except: + errorMsg = "Could not Update User" + raise Exception(errorMsg, "Update") + + @staticmethod + def update_all_decisions_to_approve(whereParametrValue): + try: + dbConn = psycopg2.connect( + DBGeneral.return_db_native_connection('em_db')) + dbConn = dbConn + cur = dbConn.cursor() + queryStr = "UPDATE ice_checklist_decision SET review_value='approved' , peer_review_value='approved' Where checklist_id = '%s';" % ( + whereParametrValue) + logger.debug(queryStr) + cur.execute(queryStr) + dbConn.commit() + logger.debug("Query : " + queryStr) + # If failed - count the failure and add the error to list of errors. + except Exception as e: + errorMsg = "Could not Update User" + logger.debug(e) + raise Exception(errorMsg, "Update") + finally: + dbConn.close() + + @staticmethod + def is_archive(checklistName): + try: + result = False + # Fetch all AT&T user ID. + checklist_ids = DBGeneral.select_where( + "uuid", "ice_checklist", "name", checklistName, 0) +# checklist_ids = DBGeneral.list_format(checklist_ids) + for checklist_id in checklist_ids: # Second Example + if isinstance(checklist_id, tuple): + checklist_id = checklist_id[0] + state = DBGeneral.select_where( + "state", "ice_checklist", "uuid", checklist_id, 1) + if state == "archive": + result = True + break + return result + # If failed - count the failure and add the error to list of errors. + except: + errorMsg = "is_archive FAILED " + raise Exception(errorMsg, "is_archive") + + @staticmethod + def get_pr_email(checklistUuid): + try: + # Fetch one AT&T user ID. + owner_id = DBChecklist.select_where_pr_state( + "owner_id", "ice_checklist", "uuid", checklistUuid, 1) + engLeadEmail = DBGeneral.select_where( + "email", "ice_user_profile", "id", owner_id, 1) + logger.debug("get_pr_email = " + engLeadEmail) + return engLeadEmail + # If failed - count the failure and add the error to list of errors. + except Exception as e: + errorMsg = "get_pr_email FAILED " + str(e) + raise Exception(errorMsg, "get_pr_email") + + @staticmethod + def get_admin_email(checklistUuid): + try: + owner_id = DBChecklist.select_where_approval_state( + "owner_id", "ice_checklist", "uuid", checklistUuid, 1) # Fetch one AT&T user ID. + engLeadEmail = DBGeneral.select_where( + "email", "ice_user_profile", "id", owner_id, 1) + logger.debug("get_admin_email = " + engLeadEmail) + return engLeadEmail + # If failed - count the failure and add the error to list of errors. + except: + errorMsg = "get_admin_email FAILED " + raise Exception(errorMsg, "get_admin_email") + + @staticmethod + def get_owner_email(checklistUuid): + try: + # Fetch one AT&T user ID. + owner_id = DBChecklist.select_native_where( + "owner_id", "ice_checklist", "uuid", checklistUuid, 1) + engLeadEmail = DBGeneral.select_where( + "email", "ice_user_profile", "id", owner_id, 1) + logger.debug("getPreeReviewerEngLeadEmail = " + engLeadEmail) + return engLeadEmail + # If failed - count the failure and add the error to list of errors. + except: + errorMsg = "get_admin_email FAILED " + raise Exception(errorMsg, "get_owner_email") + + @staticmethod + def update_decisions(checklistUuid, checklistName): + checklistTempid = DBGeneral.select_where( + "template_id", "ice_checklist", "name", checklistName, 1) + checklistLineItems = DBGeneral.select_where_and( + "uuid", "ice_checklist_line_item", "line_type", "auto", "template_id", checklistTempid, 0) + for lineItem in checklistLineItems: + setParametrType2 = "peer_review_value" + setParametrValue2 = "approved" + whereParametrType2 = "lineitem_id" + whereParametrValue2 = lineItem + DBGeneral.update_where_and("ice_checklist_decision", "review_value", checklistUuid, "approved", + "checklist_id", setParametrType2, setParametrValue2, whereParametrType2, whereParametrValue2) + + @staticmethod + def checkChecklistIsUpdated(): + query = "select uuid from ice_checklist_section where template_id in (select template_id from ice_checklist_template where name='{template_name}') and name='{section_name}'".format( + template_name=Constants.Dashboard.LeftPanel.EditChecklistTemplate.HEAT, section_name=Constants.Dashboard.LeftPanel.EditChecklistTemplate.HEAT) + return DBGeneral.select_query(query) + + @staticmethod + def fetchEngByVfName(vfName): + # Fetch one AT&T user ID. + return DBGeneral.select_where("engagement_id", "ice_vf", "name", vfName, 1) + + @staticmethod + def fetchEngManIdByEngUuid(engagement_id): + return DBGeneral.select_where("engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1) + + @staticmethod + def fetchChecklistByName(checklistName): + query = "select uuid from ice_checklist where name='{cl_name}'".format( + cl_name=checklistName) + return DBGeneral.select_query(query) + + @staticmethod + def create_default_heat_teampleate(): + template_query = "INSERT INTO public.ice_checklist_template(uuid, name, category, version, create_time)"\ + "VALUES ('%s', '%s', '%s', '%s', '%s');" % ( + str(uuid4()), 'Editing Heat', 'first category', '1', timezone.now()) + DBGeneral.insert_query(template_query) + template_id = DBGeneral.select_query( + "SELECT uuid FROM public.ice_checklist_template where name = 'Editing Heat'") + # SECTIONS + section1_query = "INSERT INTO public.ice_checklist_section(uuid, name, weight, description, validation_instructions, create_time, template_id) "\ + "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s');" % (str(uuid4()), 'External References', + '1', 'section descripyion', 'valid instructions', timezone.now(), template_id) + DBGeneral.insert_query(section1_query) + section1_id = DBGeneral.select_query( + ("""SELECT uuid FROM public.ice_checklist_section where name = 'External References' and template_id = '{s}'""").format(s=template_id)) + section2_query = "INSERT INTO public.ice_checklist_section(uuid, name, weight, description, validation_instructions, create_time, template_id) "\ + "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s');" % (str(uuid4()), 'Parameter Specification', + '2', 'section descripyion', 'valid instructions', timezone.now(), template_id) + DBGeneral.insert_query(section2_query) + section2_id = DBGeneral.select_query( + ("""SELECT uuid FROM public.ice_checklist_section where name = 'Parameter Specification' and template_id = '{s}'""").format(s=template_id)) + # Line items + line_item1 = "INSERT INTO public.ice_checklist_line_item(uuid, name, weight, description, line_type, validation_instructions,create_time,section_id, template_id) "\ + "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s', '%s', '%s');" % (str(uuid4()), 'Normal references', '1', 'Numeric parameters should include range and/or allowed values.', 'manual', + 'Here are some useful tips for how to validate this item in the most awesome way:

', timezone.now(), section1_id, template_id) + DBGeneral.insert_query(line_item1) + line_item2 = "INSERT INTO public.ice_checklist_line_item(uuid, name, weight, description, line_type, validation_instructions,create_time, section_id, template_id) "\ + "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s', '%s', '%s');" % (str(uuid4()), 'String parameters', '2', 'Numeric parameters should include range and/or allowed values.', 'auto', + 'Here are some useful tips for how to validate this item in the most awesome way:

', timezone.now(), section2_id, template_id) + DBGeneral.insert_query(line_item2) + line_item3 = "INSERT INTO public.ice_checklist_line_item(uuid, name, weight, description, line_type, validation_instructions,create_time,section_id, template_id) "\ + "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s', '%s', '%s');" % (str(uuid4()), 'Numeric parameters', '3', 'Numeric parameters should include range and/or allowed values.', 'manual', + 'Here are some useful tips for how to validate this item in the most awesome way:

', timezone.now(), section2_id, template_id) + DBGeneral.insert_query(line_item3) + line_item4 = "INSERT INTO public.ice_checklist_line_item(uuid, name, weight, description, line_type, validation_instructions,create_time, section_id, template_id) "\ + "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s', '%s', '%s');" % (str(uuid4()), 'VF image', '2', 'Numeric parameters should include range and/or allowed values.', 'auto', + 'Here are some useful tips for how to validate this item in the most awesome way:

', timezone.now(), section1_id, template_id) + DBGeneral.insert_query(line_item4) + line_item5 = "INSERT INTO public.ice_checklist_line_item(uuid, name, weight, description, line_type, validation_instructions,create_time,section_id, template_id) "\ + "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s', '%s', '%s');" % (str(uuid4()), 'Parameters', '1', 'Numeric parameters should include range and/or allowed values.', 'auto', + 'Here are some useful tips for how to validate this item in the most awesome way:

', timezone.now(), section2_id, template_id) + DBGeneral.insert_query(line_item5) + + @staticmethod + def create_editing_cl_template_if_not_exist(): + template_id = DBGeneral.select_query(("""SELECT uuid FROM public.ice_checklist_template where name = '{s}'""").format( + s=Constants.Dashboard.LeftPanel.EditChecklistTemplate.HEAT)) + if template_id == 'None': + DBChecklist.create_default_heat_teampleate() + session.createTemplatecount = True + + @staticmethod + def state_changed(identify_field, field_value, expected_state): + get_state = str(DBGeneral.select_where( + "state", Constants.DBConstants.IceTables.CHECKLIST, identify_field, field_value, 1)) + counter = 0 + while get_state != expected_state and counter <= Constants.DBConstants.RETRIES_NUMBER: + time.sleep(session.wait_until_time_pause_long) + logger.debug("Checklist state not changed yet , expecting state: %s, current result: %s (attempt %s of %s)" % ( + expected_state, get_state, counter, Constants.DBConstants.RETRIES_NUMBER)) + counter += 1 + get_state = str(DBGeneral.select_where( + "state", Constants.DBConstants.IceTables.CHECKLIST, identify_field, field_value, 1)) + + if get_state == expected_state: + logger.debug("Checklist state was successfully changed into: " + + expected_state + ", and was verified over the DB") + return expected_state + raise Exception( + "Expected checklist state never arrived " + expected_state, get_state) + + @staticmethod + def get_recent_checklist_uuid(name): + required_uuid = DBGeneral.select_where_not_and_order_by_desc( + 'uuid', Constants.DBConstants.IceTables.CHECKLIST, 'name', name, 'state', Constants.ChecklistStates.Archive.TEXT, 'create_time') + return required_uuid diff --git a/services/database/db_cms.py b/services/database/db_cms.py new file mode 100644 index 0000000..3c2b2c6 --- /dev/null +++ b/services/database/db_cms.py @@ -0,0 +1,265 @@ + +# ============LICENSE_START========================================== +# org.onap.vvp/test-engine +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +import psycopg2 +from wheel.signatures import assertTrue + +from services.constants import Constants +from services.database.db_general import DBGeneral +from services.frontend.base_actions.click import Click +from services.frontend.base_actions.enter import Enter +from services.frontend.base_actions.wait import Wait +from services.frontend.fe_dashboard import FEDashboard +from services.frontend.fe_general import FEGeneral +from services.frontend.fe_user import FEUser +from services.helper import Helper +from services.logging_service import LoggingServiceFactory +from services.session import session + + +logger = LoggingServiceFactory.get_logger() + + +class DBCMS: + + @staticmethod + def insert_query(queryStr): + try: + nativeIceDb = psycopg2.connect( + DBGeneral.return_db_native_connection('cms_db')) + dbConn = nativeIceDb + cur = dbConn.cursor() + logger.debug("Query: " + queryStr) + cur.execute(queryStr) + dbConn.commit() + dbConn.close() + logger.debug("Insert query success!") + # If failed - count the failure and add the error to list of errors. + except: + raise Exception("Couldn't fetch answer using the given query.") + + @staticmethod + def update_query(queryStr): + try: + nativeIceDb = psycopg2.connect( + DBGeneral.return_db_native_connection('cms_db')) + dbConn = nativeIceDb + cur = dbConn.cursor() + logger.debug("Query: " + queryStr) + cur.execute(queryStr) + dbConn.commit() + dbConn.close() + logger.debug("Update query success!") + # If failed - count the failure and add the error to list of errors. + except: + raise Exception("Couldn't fetch answer using the given query.") + + @staticmethod + def select_query(queryStr): + try: + nativeIceDb = psycopg2.connect( + DBGeneral.return_db_native_connection('cms_db')) + dbConn = nativeIceDb + cur = dbConn.cursor() + logger.debug("Query: " + queryStr) + cur.execute(queryStr) + result = str(cur.fetchone()) + if(result.find("',)") != -1): # formatting strings e.g uuid + result = result.partition('\'')[-1].rpartition('\'')[0] + elif(result.find(",)") != -1): # formatting ints e.g id + result = result.partition('(')[-1].rpartition(',')[0] + dbConn.close() + logger.debug("Query result: " + str(result)) + return result + # If failed - count the failure and add the error to list of errors. + except: + raise Exception("Couldn't fetch answer using the given query.") + + @staticmethod + def get_cms_category_id(categoryName): + logger.debug("Get DBCMS category id for name: " + categoryName) + queryStr = "SELECT id FROM public.blog_blogcategory WHERE title = '%s' LIMIT 1;" % ( + categoryName) + logger.debug("Query : " + queryStr) + result = DBCMS.select_query(queryStr) + return result + + @staticmethod + def insert_cms_new_post(title, description, categoryName): + logger.debug("Insert new post : " + title) + queryStr = "INSERT INTO public.blog_blogpost" \ + "(comments_count, keywords_string, rating_count, rating_sum, rating_average, title, slug, _meta_title, description, gen_description, created, updated, status, publish_date, expiry_date, short_url, in_sitemap, content, allow_comments, featured_image, site_id, user_id) "\ + "VALUES (0, '', 0, 0, 0, '%s', '%s-slug', '', '%s', true, current_timestamp - interval '1 day', current_timestamp - interval '2 day', 2, current_timestamp - interval '1 day', NULL, '', true, '

%s

', true, '', 1, 1);" % ( + title, title, description, description) + logger.debug("Query : " + queryStr) + DBCMS.insert_query(queryStr) + post_id = DBCMS.get_last_added_post_id() + categoryId = DBCMS.get_cms_category_id(categoryName) + DBCMS.add_category_to_post(post_id, categoryId) + return post_id + + @staticmethod + def get_last_added_post_id(): + logger.debug("Get the id of the post inserted") + queryStr = "select MAX(id) FROM public.blog_blogpost;" + logger.debug("Query : " + queryStr) + result = DBCMS.select_query(queryStr) + return result + + @staticmethod + def update_days(xdays, title): + logger.debug("Get the id of the post inserted") +# queryStr = "select MAX(id) FROM public.blog_blogpost;" + queryStr = "UPDATE public.blog_blogpost SET created=current_timestamp - interval '%s day' WHERE title='%s';" % ( + xdays, title) + logger.debug("Query : " + queryStr) + result = DBCMS.update_query(queryStr) + return result + + @staticmethod + def add_category_to_post(postId, categoryId): + logger.debug("bind category into inserted post: " + postId) + queryStr = "INSERT INTO public.blog_blogpost_categories(blogpost_id, blogcategory_id) VALUES (%s, %s);" % ( + postId, categoryId) + logger.debug("Query : " + queryStr) + DBCMS.insert_query(queryStr) + + @staticmethod + def get_documentation_page_id(): + logger.debug("Retrive id of documentation page: ") + queryStr = "SELECT id FROM public.pages_page WHERE title = 'Documentation' LIMIT 1;" + logger.debug("Query : " + queryStr) + result = DBCMS.select_query(queryStr) + return result + + @staticmethod + def get_last_inserted_page_id(): + logger.debug("Retrive id of last page inserted: ") + queryStr = "select MAX(id) FROM public.pages_page;" + logger.debug("Query : " + queryStr) + result = DBCMS.select_query(queryStr) + return result + + @staticmethod + def delete_old_tips_of_the_day(): + logger.debug("Delete all posts ") + queryStr = "DELETE FROM public.blog_blogpost_categories WHERE id>0;" + logger.debug("Query : " + queryStr) + DBCMS.insert_query(queryStr) + queryStr = "DELETE FROM public.blog_blogpost WHERE id>0;;" + logger.debug("Query : " + queryStr) + DBCMS.insert_query(queryStr) + + @staticmethod + def insert_page(title, content, parent_id=None): + logger.debug("Retrive id of documentation page: ") + if parent_id is None: + parent_id = DBCMS.get_documentation_page_id() + queryStr = "INSERT INTO public.pages_page(" \ + "keywords_string, title, slug, _meta_title, description, gen_description, created, updated, status, publish_date, expiry_date, short_url, in_sitemap, _order, in_menus, titles, content_model, login_required, parent_id, site_id)" \ + "VALUES ('', '%s', '%s-slug', '', '%s', true, current_timestamp - interval '1 day', current_timestamp - interval '1 day', 2, current_timestamp - interval '1 day', NULL, '', true, 0, '1,2,3', '%s', 'richtextpage', true, %s, 1);" % ( + title, title, content, title, parent_id) + logger.debug("Query : " + queryStr) + DBCMS.insert_query(queryStr) + + createdPageId = DBCMS.get_last_inserted_page_id() + logger.debug( + "Bind the page with the rich text content related to this page") + queryStr = "INSERT INTO public.pages_richtextpage(page_ptr_id, content) VALUES (%s, '

%s

');" % ( + createdPageId, content) + logger.debug("Query : " + queryStr) + DBCMS.insert_query(queryStr) + return createdPageId + + @staticmethod + def create_faq(): + title = "title_FAQ" + Helper.rand_string("randomString") + description = "description_FAQ_" + Helper.rand_string("randomString") + DBCMS.delete_old_tips_of_the_day() + postId = DBCMS.insert_cms_new_post(title, description, "FAQ") + assertTrue(len(postId) > 0 and not None) + return title, description + + @staticmethod + def create_news(): + title = "title_News" + Helper.rand_string("randomString") + description = "description_News" + Helper.rand_string("randomString") + postId = DBCMS.insert_cms_new_post(title, description, "News") + assertTrue(len(postId) > 0 and not None) + return title, description + + @staticmethod + def create_announcement(): + title = "title_Announcement_" + Helper.rand_string("randomString") + description = "description_Announcement_" + \ + Helper.rand_string("randomString") + postId = DBCMS.insert_cms_new_post(title, description, "Announcement") + assertTrue(len(postId) > 0 and not None) + return title, description + + @staticmethod + def create_page(parent_id=None): + title = "title_Of_Page_" + Helper.rand_string("randomString") + description = "description_Of_Page_" + \ + Helper.rand_string("randomString") + createdPageId = DBCMS.insert_page(title, description) + assertTrue(len(createdPageId) > 0 and not None) + return title, description + + @staticmethod + def update_X_days_back_post(title, xdays): + logger.debug("Get the id of the post inserted") + queryStr = "UPDATE blog_blogpost SET created = current_timestamp - interval '%s day', publish_date=current_timestamp - interval '%s day' WHERE title= '%s' ;" % ( + xdays, xdays, title) + logger.debug("Query : " + queryStr) + DBCMS.update_query(queryStr) + + @staticmethod + def create_announcements(x): + listOfTitleAnDescriptions = [] + for _ in range(x): + # print x ->str + title = "title_Announcement_" + Helper.rand_string("randomString") + description = "description_Announcement_" + \ + Helper.rand_string("randomString") + postId = DBCMS.insert_cms_new_post( + title, description, "Announcement") + assertTrue(len(postId) > 0 and not None) + xList = [title, description] + listOfTitleAnDescriptions.append(xList) + return listOfTitleAnDescriptions diff --git a/services/database/db_general.py b/services/database/db_general.py new file mode 100755 index 0000000..c850d3a --- /dev/null +++ b/services/database/db_general.py @@ -0,0 +1,416 @@ + +# ============LICENSE_START========================================== +# org.onap.vvp/test-engine +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +from datetime import datetime +import sqlite3 + +from django.conf import settings +from django.db import transaction +import psycopg2 + +from services.logging_service import LoggingServiceFactory + + +logger = LoggingServiceFactory.get_logger() + +class DBGeneral: + + @staticmethod + # desigredDB: Use 'default' for CI General and 'em_db' for EM General + # (according to settings.DATABASES). + def return_db_native_connection(desigredDB): + dbConnectionStr = "dbname='" + str(settings.SINGLETONE_DB[desigredDB]['NAME']) + \ + "' user='" + str(settings.SINGLETONE_DB[desigredDB]['USER']) + \ + "' host='" + str(settings.SINGLETONE_DB[desigredDB]['HOST']) + \ + "' password='" + str(settings.SINGLETONE_DB[desigredDB]['PASSWORD']) + \ + "' port='" + \ + str(settings.SINGLETONE_DB[desigredDB]['PORT']) + "'" + return dbConnectionStr + + @staticmethod + def insert_results(testType, testFeature, testResult, testName, testDuration, notes=" "): + try: + if settings.DATABASE_TYPE == 'sqlite': + dbfile = str(settings.DATABASES['default']['TEST_NAME']) + dbConn = sqlite3.connect(dbfile) + cur = dbConn.cursor() + else: + # Connect to General 'default'. + dbConn = psycopg2.connect( + DBGeneral.return_db_native_connection("default")) + dbConn = dbConn + cur = dbConn.cursor() + except Exception as e: + errorMsg = "Failed to create connection to General." + str(e) + raise Exception(errorMsg) + try: # Create INSERT query. + if settings.DATABASE_TYPE == 'sqlite': + query_str = 'INSERT INTO ice_test_results (testType, testFeature, testResult, testName, notes,'\ + 'create_time, build_id, duration) VALUES (?, ?, ?, ?, ?, ?, ?, ?);' + else: + query_str = 'INSERT INTO ice_test_results ("testType", "testFeature", "testResult", "testName", notes,'\ + 'create_time, build_id, duration) VALUES (%s, %s, %s, %s, %s, %s, %s, %s);' + cur.execute(query_str, (testType, testFeature, testResult, testName, notes, str(datetime.now()), + settings.ICE_BUILD_REPORT_NUM, testDuration)) + dbConn.commit() + logger.debug("Test result in DB - " + testResult) + except Exception as e: + logger.error(e) + errorMsg = "Failed to insert results to DB." + str(e) + raise Exception(errorMsg) + dbConn.close() + + @staticmethod + def select_query(queryStr, return_type="str", fetch_num=1): + try: + dbConn = psycopg2.connect( + DBGeneral.return_db_native_connection('em_db')) + dbConn = dbConn + cur = dbConn.cursor() + logger.debug("Query: " + queryStr) + cur.execute(queryStr) + if return_type == "str": + if fetch_num == 1: + result = str(cur.fetchone()) + else: + result = str(cur.fetchall()) + if result != 'None': + # formatting strings e.g uuid + if(result.find("',)") != -1): + result = result.partition('\'')[-1].rpartition('\'')[0] + elif(result.find(",)") != -1): # formatting ints e.g id + result = result.partition('(')[-1].rpartition(',')[0] + if return_type == "list": + if fetch_num == 1: + result = list(cur.fetchone()) + else: + result = [item[0] for item in cur.fetchall()] + dbConn.close() + logger.debug("Query result: " + str(result)) + return result + except: + raise Exception("Couldn't fetch answer using the given query.") + + @staticmethod + def insert_query(queryStr): + try: + nativeIceDb = psycopg2.connect( + DBGeneral.return_db_native_connection('em_db')) + dbConn = nativeIceDb + cur = dbConn.cursor() + logger.debug("Query: " + queryStr) + cur.execute(queryStr) + dbConn.commit() + dbConn.close() + logger.debug("Insert query success!") + # If failed - count the failure and add the error to list of errors. + except Exception as e: + logger.error(e) + transaction.rollback() + raise Exception("Couldn't fetch answer using the given query.") + + @staticmethod + def update_query(queryStr): + try: + nativeIceDb = psycopg2.connect( + DBGeneral.return_db_native_connection('em_db')) + dbConn = nativeIceDb + cur = dbConn.cursor() + logger.debug("Query: " + queryStr) + cur.execute(queryStr) + dbConn.commit() + dbConn.close() + logger.debug("Update query success!") + # If failed - count the failure and add the error to list of errors. + except Exception as e: + logger.error(e) + transaction.rollback() + raise Exception("Couldn't fetch answer using the given query.") + + @staticmethod + def select_where_email(queryColumnName, queryTableName, email): + try: + dbConn = psycopg2.connect( + DBGeneral.return_db_native_connection('em_db')) + dbConn = dbConn + cur = dbConn.cursor() + queryStr = "select %s from %s WHERE Email = '%s';" % ( + queryColumnName, queryTableName, email) + logger.debug("Query : " + queryStr) + cur.execute(queryStr) + result = str(cur.fetchone()) + if(result.find("',)") != -1): # formatting strings e.g uuid + result = result.partition('\'')[-1].rpartition('\'')[0] + elif(result.find(",)") != -1): # formatting ints e.g id + result = result.partition('(')[-1].rpartition(',')[0] + dbConn.close() + logger.debug("Query result: " + str(result)) + return result + # If failed - count the failure and add the error to list of errors. + except: + errorMsg = "select_where_email FAILED " + raise Exception(errorMsg, "select_where_email") + raise + + @staticmethod + def select_from(queryColumnName, queryTableName, fetchNum): + try: + dbConn = psycopg2.connect( + DBGeneral.return_db_native_connection('em_db')) + cur = dbConn.cursor() + queryStr = "select %s from %s;" % (queryColumnName, queryTableName) + logger.debug("Query : " + queryStr) + cur.execute(queryStr) + if (fetchNum == 0): + result = str(cur.fetchall()) + elif (fetchNum == 1): + result = str(cur.fetchone()) + if(result.find("',)") != -1): # formatting strings e.g uuid + result = result.partition('\'')[-1].rpartition('\'')[0] + elif(result.find(",)") != -1): # formatting ints e.g id + result = result.partition('(')[-1].rpartition(',')[0] + dbConn.close() + logger.debug("Query result: " + str(result)) + return result + # If failed - count the failure and add the error to list of errors. + except Exception as e: + errorMsg = "select_from FAILED " + str(e) + raise Exception(errorMsg, "select_from") + + @staticmethod + def select_where(queryColumnName, queryTableName, whereParametrType, whereParametrValue, fetchNum): + try: + dbConn = psycopg2.connect( + DBGeneral.return_db_native_connection('em_db')) + cur = dbConn.cursor() + queryStr = "select %s from %s Where %s = '%s';" % ( + queryColumnName, queryTableName, whereParametrType, whereParametrValue) + logger.debug("Query : " + queryStr) + cur.execute(queryStr) + if (fetchNum == 0): + result = list(cur.fetchall()) + elif (fetchNum == 1): + result = str(cur.fetchone()) + if(result.find("',)") != -1): # formatting strings e.g uuid + result = result.partition('\'')[-1].rpartition('\'')[0] + elif(result.find(",)") != -1): # formatting ints e.g id + result = result.partition('(')[-1].rpartition(',')[0] + dbConn.close() + logger.debug("Query result: " + str(result)) + return result + # If failed - count the failure and add the error to list of errors. + except: + errorMsg = "select_where FAILED " + raise Exception(errorMsg, "select_where") + + @staticmethod + def select_where_order_by_desc(queryColumnName, queryTableName, whereParametrType, whereParametrValue, order_by): + dbConn = psycopg2.connect( + DBGeneral.return_db_native_connection('em_db')) + cur = dbConn.cursor() + queryStr = "select %s from %s Where %s = '%s' order by %s desc limit 1;" \ + % (queryColumnName, queryTableName, whereParametrType, whereParametrValue, order_by) + logger.debug("Query : " + queryStr) + cur.execute(queryStr) + result = str(cur.fetchall()) + result = DBGeneral.list_format(result) + dbConn.close() + return result + + @staticmethod + def select_where_dict(queryColumnName, queryTableName, whereParametrType): + dbConn = psycopg2.connect( + DBGeneral.return_db_native_connection('em_db')) + cur = dbConn.cursor() + x = "" + count = 0 + for key, val in whereParametrType.items(): + x += "%s='%s'" % (key, val) + if len(whereParametrType.items()) - count > 1: + x += ' and ' + count += 1 + queryStr = "select %s from %s Where %s;" \ + % (queryColumnName, queryTableName, x) + logger.debug("Query : " + queryStr) + cur.execute(queryStr) + result = str(cur.fetchall()) + result = DBGeneral.list_format(result) + dbConn.close() + return result + + @staticmethod + def select_where_not_and_order_by_desc(queryColumnName, queryTableName, whereParametrType, + whereParametrValue, parametrTypeAnd, parametrAnd, order_by): + dbConn = psycopg2.connect( + DBGeneral.return_db_native_connection('em_db')) + cur = dbConn.cursor() + queryStr = "select %s from %s Where %s = '%s' and %s != '%s' order by %s desc limit 1;" \ + % (queryColumnName, queryTableName, whereParametrType, whereParametrValue, + parametrTypeAnd, parametrAnd, order_by) + logger.debug("Query : " + queryStr) + cur.execute(queryStr) + result = str(cur.fetchall()) + result = DBGeneral.list_format(result) + dbConn.close() + return result + + @staticmethod + def select_where_and(queryColumnName, queryTableName, whereParametrType, whereParametrValue, + parametrTypeAnd, parametrAnd, fetchNum): + try: + dbConn = psycopg2.connect( + DBGeneral.return_db_native_connection('em_db')) + dbConn = dbConn + cur = dbConn.cursor() + queryStr = "select %s from %s Where %s = '%s' and %s = '%s';" % ( + queryColumnName, queryTableName, whereParametrType, whereParametrValue, parametrTypeAnd, parametrAnd) + logger.debug("Query : " + queryStr) + cur.execute(queryStr) + if (fetchNum == 0): + result = str(cur.fetchall()) + result = DBGeneral.list_format(result) + elif (fetchNum == 1): + result = str(cur.fetchone()) + if(result.find("',)") != -1): # formatting strings e.g uuid + result = result.partition('\'')[-1].rpartition('\'')[0] + elif(result.find(",)") != -1): # formatting ints e.g id + result = result.partition('(')[-1].rpartition(',')[0] + dbConn.close() + logger.debug("Query result: " + str(result)) + return result + # If failed - count the failure and add the error to list of errors. + except: + errorMsg = "select_where_and FAILED " + raise Exception(errorMsg, "select_where_and") + + @staticmethod + def select_where_is_bigger(queryColumnName, queryTableName, whereParametrType, whereParametrValue, parametrTypeAnd, parametrAnd, fetchNum): + try: + dbConn = psycopg2.connect( + DBGeneral.return_db_native_connection('em_db')) + dbConn = dbConn + cur = dbConn.cursor() + queryStr = "select %s from %s Where %s = '%s' and %s > %s;" % ( + queryColumnName, queryTableName, whereParametrType, whereParametrValue, parametrTypeAnd, parametrAnd) + logger.debug("Query : " + queryStr) + cur.execute(queryStr) + if (fetchNum == 0): + result = cur.fetchall() + elif (fetchNum == 1): + result = cur.fetchone() + if(result.find("',)") != -1): # formatting strings e.g uuid + result = result.partition('\'')[-1].rpartition('\'')[0] + elif(result.find(",)") != -1): # formatting ints e.g id + result = result.partition('(')[-1].rpartition(',')[0] + dbConn.close() + return result + # If failed - count the failure and add the error to list of errors. + except: + errorMsg = "select_where_is_bigger FAILED " + raise Exception(errorMsg, "select_where_is_bigger") + + @staticmethod + def update_where(queryTableName, setParametrType, setparametrValue, whereParametrType, whereParametrValue): + try: + dbConn = psycopg2.connect( + DBGeneral.return_db_native_connection('em_db')) + dbConn = dbConn + cur = dbConn.cursor() + queryStr = "UPDATE %s SET %s = '%s' Where %s = '%s';" % ( + queryTableName, setParametrType, setparametrValue, whereParametrType, whereParametrValue) + cur.execute(queryStr) + dbConn.commit() + logger.debug("Query : " + queryStr) + # If failed - count the failure and add the error to list of errors. + except Exception as e: + errorMsg = "Could not Update User" + logger.debug(e) + raise Exception(errorMsg, "Update") + finally: + dbConn.close() + + @staticmethod + def update_by_query(queryStr): + try: + dbConn = psycopg2.connect( + DBGeneral.return_db_native_connection('em_db')) + dbConn = dbConn + cur = dbConn.cursor() + logger.debug("Query : " + queryStr) + cur.execute(queryStr) + dbConn.commit() + dbConn.close() + # If failed - count the failure and add the error to list of errors. + except: + errorMsg = "Could not Update User" + raise Exception(errorMsg, "Update") + + @staticmethod + def update_where_and(queryTableName, setParametrType, parametrValue, changeToValue, whereParametrType, setParametrType2, setParametrValue2, whereParametrType2, whereParametrValue2): + try: + dbConn = psycopg2.connect( + DBGeneral.return_db_native_connection('em_db')) + dbConn = dbConn + cur = dbConn.cursor() + queryStr = "UPDATE %s SET %s = '%s', %s = '%s' Where %s = '%s' and %s = '%s';" % ( + queryTableName, setParametrType, changeToValue, setParametrType2, setParametrValue2, whereParametrType, parametrValue, whereParametrType2, whereParametrValue2) + logger.debug("Query : " + queryStr) + cur.execute(queryStr) + dbConn.commit() + dbConn.close() + # If failed - count the failure and add the error to list of errors. + except: + errorMsg = "Could not Update User" + raise Exception(errorMsg, "Update") + + @staticmethod + def list_format(un_listed): + un_listed = un_listed[1:-1] + un_listed = un_listed.replace("',), ('", "|||") + un_listed = un_listed.replace("(u'", "") # Format list + un_listed = un_listed[1:-1].replace("('", "") # Format list + un_listed = un_listed.replace("',)", "") # Format list + listed = un_listed[1:-2].split("|||") + return listed + + @staticmethod + def get_vendors_list(): + # Select approved vendors from db. + vendors_list = DBGeneral.select_where( + "name", "ice_vendor", "public", "TRUE", 0) + return vendors_list diff --git a/services/database/db_user.py b/services/database/db_user.py new file mode 100644 index 0000000..d347dd2 --- /dev/null +++ b/services/database/db_user.py @@ -0,0 +1,417 @@ + +# ============LICENSE_START========================================== +# org.onap.vvp/test-engine +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +import time + +from django.conf import settings +import psycopg2 + +from services.constants import Constants +from services.database.db_bridge import DBBridge +from services.database.db_general import DBGeneral +from services.database.db_virtual_function import DBVirtualFunction +from services.frontend.base_actions.wait import Wait +from services.logging_service import LoggingServiceFactory +from services.session import session + + +logger = LoggingServiceFactory.get_logger() + +class DBUser: + + @staticmethod + def get_activation_url(email): + # Fetch one user ID. + uuid = DBUser.select_user_uuid(email) + # Fetch one user ID. + index = DBGeneral.select_where_email("id", "auth_user", email) + activation_token = DBGeneral.select_where( + "activation_token", "ice_custom_user", "user_ptr_id", index, 1) + # / activate /:userID /:token + activationUrl = settings.ICE_PORTAL_URL + '/#/activate/' + \ + str(uuid) + '/' + str(activation_token) + logger.debug("activationUrl :" + activationUrl) + return activationUrl + + @staticmethod + def get_contact_signup_url(invite_token, uuid, email, fullName, phoneNum, companyName): + companyId = DBGeneral.select_where( + "uuid", "ice_vendor", "name", companyName, 1) + signUpURLforContact = settings.ICE_PORTAL_URL + "#/signUp?invitation=" + invite_token + \ + "&email=" + email + "&full_name=" + fullName + \ + "&phone_number=" + phoneNum + "&company=" + companyId + logger.debug("SignUpURLforContact :" + signUpURLforContact) + return signUpURLforContact + + @staticmethod + def select_invitation_token(queryColumnName, queryTableName, whereParametrType, whereParametrValue, email, fetchNum): + try: + dbConn = psycopg2.connect( + DBGeneral.return_db_native_connection('em_db')) + dbConn = dbConn + cur = dbConn.cursor() + queryStr = "select %s from %s Where %s = '%s' and email = '%s' ;" % ( + queryColumnName, queryTableName, whereParametrType, whereParametrValue, email) + logger.debug("Query : " + queryStr) + cur.execute(queryStr) + if (fetchNum == 0): + result = str(cur.fetchall()) + elif (fetchNum == 1): + result = str(cur.fetchone()) + if(result.find("',)") != -1): # formatting strings e.g uuid + result = result.partition('\'')[-1].rpartition('\'')[0] + elif(result.find(",)") != -1): # formatting ints e.g id + result = result.partition('(')[-1].rpartition(',')[0] + dbConn.close() + if result == None: + errorMsg = "select_where_pr_state FAILED " + logger.error(errorMsg) + raise + logger.debug("Query result: " + str(result)) + return result + # If failed - count the failure and add the error to list of errors. + except: + errorMsg = "select_where FAILED " + raise Exception(errorMsg, "select_where") + + @staticmethod + def get_el_name(vfName): + try: + # Fetch one AT&T user ID. + engagement_id = DBVirtualFunction.select_eng_uuid(vfName) + engagement_manual_id = DBGeneral.select_where( + "engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1) + reviewer_id = DBGeneral.select_where( + "reviewer_id", "ice_engagement", "engagement_manual_id", engagement_manual_id, 1) + engLeadFullName = DBGeneral.select_where_and( + "full_name", "ice_user_profile", "id", reviewer_id, "role_id", "2", 1) + return engLeadFullName + # If failed - count the failure and add the error to list of errors. + except: + errorMsg = "get_el_name FAILED " + raise Exception(errorMsg, "get_el_name") + + @staticmethod + def get_email_by_full_name(fullname): + # try: + query_str = "select email from ice_user_profile where full_name = '%s';" % ( + fullname) + user_email = DBGeneral.select_query(query_str) + return user_email +# except: # If failed - count the failure and add the error to list of errors. +# errorMsg = "get_email_by_full_name FAILED " +# raise Exception(errorMsg, "get_el_name") + + @staticmethod + def select_recent_vf_of_user(user_uuid, fetchNum): + try: + dbConn = psycopg2.connect( + DBGeneral.return_db_native_connection('em_db')) + dbConn = dbConn + cur = dbConn.cursor() + queryStr = "SELECT vf_id FROM public.ice_recent_engagement where user_uuid = '%s' order by last_update desc limit 20;" % ( + user_uuid) + logger.debug("Query : " + queryStr) + cur.execute(queryStr) + if (fetchNum == 0): + result = str(cur.fetchall()) + elif (fetchNum == 1): + result = str(cur.fetchone()) + if(result.find("',)") != -1): # formatting strings e.g uuid + result = result.partition('\'')[-1].rpartition('\'')[0] + elif(result.find(",)") != -1): # formatting ints e.g id + result = result.partition('(')[-1].rpartition(',')[0] + dbConn.close() + logger.debug("Query result: " + str(result)) + return result + # If failed - count the failure and add the error to list of errors. + except: + errorMsg = "select_where FAILED " + raise Exception(errorMsg, "select_where") + + @staticmethod + def select_el_email(vfName): + try: + # Fetch one AT&T user ID. + engagement_id = DBVirtualFunction.select_eng_uuid(vfName) + engagement_manual_id = DBGeneral.select_where( + "engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1) + reviewer_id = DBGeneral.select_where( + "reviewer_id", "ice_engagement", "engagement_manual_id", engagement_manual_id, 1) + engLeadEmail = DBGeneral.select_where_and( + "email", "ice_user_profile", "id", reviewer_id, "role_id", "2", 1) + return engLeadEmail + # If failed - count the failure and add the error to list of errors. + except: + errorMsg = "select_el_email FAILED " + raise Exception(errorMsg, "select_el_email") + + @staticmethod + def select_user_native_id(email): + try: + # Fetch one AT&T user ID. + engLeadId = DBUser.select_user_profile_property(email, "id") + return engLeadId + # If failed - count the failure and add the error to list of errors. + except: + errorMsg = "select_user_native_id FAILED " + raise Exception(errorMsg, "select_user_native_id") + + @staticmethod + def select_personal_next_step(email): + user_id = DBUser.select_user_native_id(email) + return DBGeneral.select_where("uuid", "ice_next_step", "owner_id", user_id, 1) + + @staticmethod + def select_pr_email(vfName): + try: + # Fetch one AT&T user ID. + engagement_id = DBVirtualFunction.select_eng_uuid(vfName) + engagement_manual_id = DBGeneral.select_where( + "engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1) + reviewer_id = DBGeneral.select_where( + "peer_reviewer_id", "ice_engagement", "engagement_manual_id", engagement_manual_id, 1) + engLeadEmail = DBGeneral.select_where( + "email", "ice_user_profile", "id", reviewer_id, 1) + return engLeadEmail + # If failed - count the failure and add the error to list of errors. + except: + errorMsg = "select_el_email FAILED " + raise Exception(errorMsg, "select_el_email") + + @staticmethod + def get_notification_id_by_email(userEmail): + uuid = DBGeneral.select_where_email( + "id", "ice_user_profile", userEmail) + notifIDs = DBGeneral.select_where( + "uuid", "ice_notification", "user_id", uuid, 0) + return notifIDs + + @staticmethod + def get_not_seen_notifications_number_by_email(user_email, is_negative=False): + user_id = DBGeneral.select_where_email( + "id", Constants.DBConstants.IceTables.USER_PROFILE, user_email) + notifications_number = DBGeneral.select_where_and( + Constants.DBConstants.Queries.COUNT, Constants.DBConstants.IceTables.NOTIFICATION, "user_id", user_id, "is_read", "False", 1) + if is_negative: + counter = 0 + while notifications_number != "0" and counter <= Constants.Dashboard.Avatar.Notifications.Count.RETRIES_NUMBER: + notifications_number = DBGeneral.select_where_and( + Constants.DBConstants.Queries.COUNT, Constants.DBConstants.IceTables.NOTIFICATION, "user_id", user_id, "is_read", "False", 1) + time.sleep(1) + counter += 1 + return notifications_number + + @staticmethod + def get_eng_lead_email_per_enguuid(enguuid): + reviewer_id = DBGeneral.select_where( + "reviewer_id", Constants.DBConstants.IceTables.ENGAGEMENT, "uuid", enguuid, 1) + engLeadEmail = DBGeneral.select_where( + "email", Constants.DBConstants.IceTables.USER_PROFILE, "id", reviewer_id, 1) + return engLeadEmail + + @staticmethod + def select_all_user_engagements(engLeadID): + try: + dbConn = psycopg2.connect( + DBGeneral.return_db_native_connection('em_db')) + dbConn = dbConn + cur = dbConn.cursor() + queryStr = "select COUNT(*) from ice_engagement_engagement_team Where iceuserprofile_id = %s and (select engagement_stage from public.ice_engagement where uuid = engagement_id LIMIT 1) != 'Archived';" % ( + engLeadID) + logger.debug("Query : " + queryStr) + cur.execute(queryStr) + result = cur.fetchall() + dbConn.close() + logger.debug("Query result: " + str(result)) + logger.debug(result[0][0]) + return result[0][0] + # If failed - count the failure and add the error to list of errors. + except: + errorMsg = "select_user_engagements_by_stage FAILED " + raise Exception(errorMsg, "select_user_engagements_by_stage") + + @staticmethod + def select_user_engagements_by_stage(stage, engLeadID): + try: + dbConn = psycopg2.connect( + DBGeneral.return_db_native_connection('em_db')) + dbConn = dbConn + cur = dbConn.cursor() + queryStr = "select count(*) from ice_engagement INNER JOIN ice_engagement_engagement_team ON ice_engagement_engagement_team.engagement_id= ice_engagement.uuid Where (ice_engagement.engagement_stage = '%s') and (ice_engagement_engagement_team.iceuserprofile_id = %s );" % ( + stage, engLeadID) + logger.debug("Query : " + queryStr) + cur.execute(queryStr) + result = cur.fetchall() + dbConn.close() + logger.debug("Query result: " + str(result)) + logger.debug(result[0][0]) + return result[0][0] + # If failed - count the failure and add the error to list of errors. + except: + errorMsg = "select_user_engagements_by_stage FAILED " + raise Exception(errorMsg, "select_user_engagements_by_stage") + + @staticmethod + def set_new_temp_password(email): + encodePass = DBGeneral.select_where_email( + "password", "auth_user", Constants.Users.Admin.EMAIL) + # Fetch one user ID. + index = DBGeneral.select_where_email("id", "auth_user", email) + DBGeneral.update_where( + "ice_custom_user", "temp_password", encodePass, "user_ptr_id", index) + + @staticmethod + def set_password_to_default(email): + encodePass = DBGeneral.select_where_email( + "password", "auth_user", Constants.Users.Admin.EMAIL) + DBGeneral.update_where( + "auth_user", "password", encodePass, "email", email) + + @staticmethod + def select_el_not_in_engagement(el_name, pr_name): + query_str = "select full_name from ice_user_profile where role_id = 2 and full_name != '%s' and full_name != '%s';" % ( + el_name, pr_name) + new_user = DBGeneral.select_query(query_str) + if new_user == 'None': + new_user = DBUser.update_to_el_not_in_engagement() + return new_user + + @staticmethod + def select_user_uuid(email): + user_uuid = DBUser.select_user_profile_property(email, "uuid") + return user_uuid + + @staticmethod + def select_access_key(email): + access_key = DBUser.select_user_profile_property(email, "rgwa_access_key") + return access_key + + @staticmethod + def select_secret_key(email): + secret_key = DBUser.select_user_profile_property(email, "rgwa_secret_key") + return secret_key + + @staticmethod + def update_to_el_not_in_engagement(): + query_str = "select uuid from ice_user_profile where role_id = 1 ;" + user_uuid = DBGeneral.select_query(query_str) + updatequery = "UPDATE ice_user_profile SET role_id=2 ,full_name = 'el_for_test' WHERE uuid = '%s' ;" % ( + user_uuid) + DBGeneral.update_query(updatequery) + updatequery = "UPDATE ice_user_profile SET role_id=2 WHERE full_name = '%s' ;" % ( + 'el_for_test') + DBGeneral.update_query(updatequery) + return 'el_for_test' + + @staticmethod + def rollback_for_el_not_in_engagement(): + query_str = "select uuid from ice_user_profile where full_name = 'el_for_test';" + user_uuid = DBGeneral.select_query(query_str) + fullName = DBBridge.helper_rand_string("randomString") + updatequery = "UPDATE ice_user_profile SET role_id=1,full_name = '%s' WHERE uuid = '%s' ;" % ( + fullName, user_uuid) + DBGeneral.update_query(updatequery) + + @staticmethod + def set_engagement_peer_reviewer(engagement_uuid, email): + user_uuid = DBUser.select_user_uuid(email) + update_query = "UPDATE ice_user_profile SET role_id=2 WHERE uuid = '%s';" % user_uuid + DBGeneral.update_query(update_query) + + user_id = DBGeneral.select_query( + "SELECT id FROM ice_user_profile WHERE uuid = '%s';" % user_uuid) + update_query = "UPDATE ice_engagement SET peer_reviewer_id=%s WHERE uuid = '%s';" % ( + user_id, engagement_uuid) + DBGeneral.update_query(update_query) + + @staticmethod + def select_user_profile_property(user_email, property_name): + return DBGeneral.select_where(property_name, "ice_user_profile", "email", user_email, 1) + + @staticmethod + def validate_user_profile_settings_in_db(user_email, checked): + Wait.page_has_loaded() + regular_email_updates = DBUser.select_user_profile_property( + user_email, 'regular_email_updates') + DBBridge.helper_internal_assert(regular_email_updates, checked) + email_updates_on_every_notification = \ + DBUser.select_user_profile_property( + user_email, 'email_updates_on_every_notification') + DBBridge.helper_internal_assert( + email_updates_on_every_notification, checked) + email_updates_daily_digest = DBUser.select_user_profile_property( + user_email, 'email_updates_daily_digest') + DBBridge.helper_internal_assert( + email_updates_daily_digest, not checked) + + @staticmethod + def retrieve_admin_ssh_from_db(): + ssh_key = DBGeneral.select_where( + 'ssh_public_key', Constants.DBConstants.IceTables.USER_PROFILE, + 'email', Constants.Users.Admin.EMAIL, 1) + return ssh_key + + @staticmethod + def get_access_key(user_uuid): + counter = 0 + access_key = DBGeneral.select_where( + "rgwa_access_key", Constants.DBConstants.IceTables.USER_PROFILE, "uuid", user_uuid, 1) + while access_key == "None" and counter <= Constants.RGWAConstants.RETRIES_NUMBER: + time.sleep(session.wait_until_time_pause) + logger.debug( + "rgwa_access_key are not ready yet, trying again (%s of 20)" % counter) + access_key = DBGeneral.select_where( + "rgwa_access_key", Constants.DBConstants.IceTables.USER_PROFILE, "uuid", user_uuid, 1) + counter += 1 + return access_key + + @staticmethod + def get_access_secret(user_uuid): + counter = 0 + access_secret = DBGeneral.select_where( + "rgwa_secret_key", Constants.DBConstants.IceTables.USER_PROFILE, "uuid", user_uuid, 1) + while access_secret == "None" and counter <= Constants.RGWAConstants.RETRIES_NUMBER: + time.sleep(session.wait_until_time_pause) + logger.debug( + "rgwa_secret_key are not ready yet, trying again (%s of 100)" % counter) + access_secret = DBGeneral.select_where( + "rgwa_secret_key", Constants.DBConstants.IceTables.USER_PROFILE, "uuid", user_uuid, 1) + + counter += 1 + return access_secret diff --git a/services/database/db_virtual_function.py b/services/database/db_virtual_function.py new file mode 100644 index 0000000..143bca2 --- /dev/null +++ b/services/database/db_virtual_function.py @@ -0,0 +1,227 @@ + +# ============LICENSE_START========================================== +# org.onap.vvp/test-engine +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +import uuid + +from django.conf import settings +import psycopg2 + +from services.constants import Constants +from services.database.db_bridge import DBBridge +from services.database.db_general import DBGeneral +from services.logging_service import LoggingServiceFactory + + +logger = LoggingServiceFactory.get_logger() + +class DBVirtualFunction: + + @staticmethod + def insert_ecomp_release(uuid, name, ui_visibility="TRUE"): + try: + queryTableName = "ice_ecomp_release" + # Connect to General 'default'. + dbConn = psycopg2.connect( + DBGeneral.return_db_native_connection("em_db")) + dbConn = dbConn + cur = dbConn.cursor() + logger.debug("DATABASE_TYPE: " + settings.DATABASE_TYPE) + except Exception as e: + errorMsg = "Failed to create connection to General: " + str(e) + raise Exception(errorMsg) + try: + logger.debug("DATABASE_TYPE: " + settings.DATABASE_TYPE) + # Create INSERT query. + queryStr = "INSERT INTO %s (""uuid, name, weight, ui_visibility"") VALUES ('%s', '%s', '%s', '%s');" % ( + queryTableName, uuid, name, 0, ui_visibility) + logger.debug("Query: " + queryStr) + cur.execute(queryStr) # Execute query. + dbConn.commit() + logger.debug("Test results are in General now.") + except Exception as e: + errorMsg = "Failed to insert ECOMP release to General:" + str(e) + raise Exception(errorMsg) + dbConn.close() + + @staticmethod + def delete_ecomp_release(uuid, name): + try: + queryTableName = "ice_ecomp_release" + # Connect to General 'default'. + dbConn = psycopg2.connect( + DBGeneral.return_db_native_connection("em_db")) + dbConn = dbConn + cur = dbConn.cursor() + except Exception as e: + errorMsg = "Failed to create connection to DBGeneral.because :" + \ + str(e) + raise Exception(errorMsg) + try: + # Create INSERT query. + queryStr = "DELETE FROM %s WHERE uuid = '%s';" % ( + queryTableName, uuid) + logger.debug("Query: " + queryStr) + cur.execute(queryStr) # Execute query. + dbConn.commit() + logger.debug("Test results are in General now.") + except Exception as e: + errorMsg = "Failed to delete ECOMP release from General . because :" + \ + str(e) + raise Exception(errorMsg) + raise + dbConn.close() + + @staticmethod + def select_next_steps_ids(engagement_uuid): + ice_next_steps = DBGeneral.select_where( + "uuid", "ice_next_step", "engagement_id", engagement_uuid, 0) + return ice_next_steps + + @staticmethod + def select_next_steps_uuids_by_stage(engagement_uuid, engagement_stage): + query = "SELECT uuid FROM %s WHERE engagement_id='%s' AND engagement_stage='%s' ORDER BY position;" % ( + Constants.DBConstants.IceTables.NEXT_STEP, engagement_uuid, engagement_stage) + return DBGeneral.select_query(query, "list", 0) + + @staticmethod + def update_next_step_position(next_step_uuid, new_index): + DBGeneral.update_where( + "ice_next_step", "position", new_index, "uuid", next_step_uuid) + + @staticmethod + def select_next_step_description(next_step_uuid): + return DBGeneral.select_where("description", "ice_next_step", "uuid", next_step_uuid, 1) + + @staticmethod + def select_eng_uuid(vf_name): + return DBGeneral.select_where("engagement_id", "ice_vf", "name", vf_name, 1) + + @staticmethod + def select_engagment_uuid_by_vf_name(vfName): + engagement_id = DBGeneral.select_where( + "engagement_id", "ice_vf", "name", vfName, 1) + engagement_manual_id = DBGeneral.select_where( + "engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1) + enguuid = DBGeneral.select_where( + "uuid", "ice_engagement", "engagement_manual_id", engagement_manual_id, 1) + return enguuid + + @staticmethod + def select_vf_version_by_vf_name(vfName): + queryStr = "SELECT version FROM ice_vf WHERE name= '%s';" % vfName + version_name = str(DBGeneral.select_query(queryStr)) + return version_name + + @staticmethod + def select_vf_name_by_vf_version(version_name): + queryofname = "SELECT name FROM ice_vf WHERE version= '%s';" % version_name + vfNameDb = str(DBGeneral.select_query(queryofname)) + return vfNameDb + + @staticmethod + def return_expected_steps(engagement_uuid, stage, user_email): + steps_uuids = DBVirtualFunction.select_next_steps_uuids_by_stage( + engagement_uuid, stage) + personal_step_uuid = DBBridge.select_personal_next_step(user_email) + DBVirtualFunction.update_next_step_position(personal_step_uuid, 1) + steps_uuids.insert(0, personal_step_uuid) + return steps_uuids + + @staticmethod + def get_engagement(): + """Use this function instead of creating a new engagement where no need to""" + queryStr = "SELECT DISTINCT ice_engagement.uuid, engagement_manual_id, ice_vf.name, ice_user_profile.full_name, \ + ice_user_profile.email, reviewer_table.full_name, reviewer_table.email, \ + ice_deployment_target.version, ice_ecomp_release.name \ + FROM ice_engagement LEFT JOIN ice_vf ON engagement_id = ice_engagement.uuid \ + LEFT JOIN ice_user_profile reviewer_table ON reviewer_table.id = ice_engagement.reviewer_id \ + LEFT JOIN ice_user_profile ON ice_user_profile.id = ice_engagement.peer_reviewer_id \ + LEFT JOIN ice_deployment_target ON ice_deployment_target.uuid = ice_vf.deployment_target_id \ + LEFT JOIN ice_ecomp_release ON ice_ecomp_release.uuid = ice_vf.ecomp_release_id \ + WHERE ice_user_profile.id IS NOT NULL LIMIT 1;" + list_of_values = DBGeneral.select_query(queryStr, return_type="list") + list_of_keys = ["engagement_uuid", "engagement_manual_id", "vfName", "pr_name", + "pr_email", "el_name", "el_email", "target_aic", "ecomp_release"] + return dict(zip(list_of_keys, list_of_values)) + + @staticmethod + def insert_aic_version(ui_visibility="TRUE"): + new_aic_version = { + "uuid": str(uuid.uuid4()), "name": "AIC", "version": DBBridge.helper_rand_string("randomNumber", 2), "ui_visibility": ui_visibility, "weight": 0} + queryStr = "INSERT INTO public.ice_deployment_target( \ + uuid, name, version, ui_visibility, weight) \ + VALUES ('%s', '%s', '%s', '%s', %s);" % (new_aic_version['uuid'], new_aic_version['name'], new_aic_version['version'], new_aic_version['ui_visibility'], new_aic_version['weight']) + DBGeneral.insert_query(queryStr) + return new_aic_version + + @staticmethod + def delete_aic_version(aic_uuid): + DBGeneral.insert_query( + "DELETE FROM public.ice_deployment_target WHERE uuid='%s';" % aic_uuid) + + @staticmethod + def change_aic_version_weight(new_weight, old_weight): + DBGeneral.insert_query( + "UPDATE public.ice_deployment_target SET weight=%s WHERE weight=%s" % (new_weight, old_weight)) + + @staticmethod + def change_ecomp_release_weight(new_weight, old_weight): + DBGeneral.insert_query( + "UPDATE public.ice_ecomp_release SET weight=%s WHERE weight=%s" % (new_weight, old_weight)) + + @staticmethod + def select_aic_version_uuid(aic_version): + return DBGeneral.select_where("uuid", "ice_deployment_target", "version", aic_version, 1) + + @staticmethod + def select_ecomp_release_uuid(ecomp_release): + return DBGeneral.select_where("uuid", "ice_ecomp_release", "name", ecomp_release, 1) + + @staticmethod + def add_admin_to_eng_team(eng_uuid): + admin_db_id = DBGeneral.select_where( + 'id', Constants.DBConstants.IceTables.USER_PROFILE, 'email', Constants.Users.Admin.EMAIL, 1) + queryStr = "INSERT INTO public.ice_engagement_engagement_team(engagement_id, iceuserprofile_id) VALUES ('%s', '%s');" % ( + eng_uuid, admin_db_id) + logger.debug("add_admin_to_eng_team Query: %s" % queryStr) + DBGeneral.insert_query(queryStr) + + @staticmethod + def remove_engagement_from_recent(vf_uuid): + DBGeneral.insert_query( + "DELETE FROM %s WHERE vf_id='%s'" % (Constants.DBConstants.IceTables.RECENT, vf_uuid)) -- cgit 1.2.3-korg