diff options
Diffstat (limited to 'services/database')
-rw-r--r-- | services/database/__init__.py | 38 | ||||
-rw-r--r-- | services/database/db_bridge.py | 65 | ||||
-rw-r--r-- | services/database/db_checklist.py | 526 | ||||
-rw-r--r-- | services/database/db_cms.py | 278 | ||||
-rwxr-xr-x | services/database/db_general.py | 485 | ||||
-rw-r--r-- | services/database/db_user.py | 515 | ||||
-rw-r--r-- | services/database/db_virtual_function.py | 286 |
7 files changed, 0 insertions, 2193 deletions
diff --git a/services/database/__init__.py b/services/database/__init__.py deleted file mode 100644 index 32b601a..0000000 --- a/services/database/__init__.py +++ /dev/null @@ -1,38 +0,0 @@ - -# ============LICENSE_START========================================== -# org.onap.vvp/test-engine -# =================================================================== -# Copyright © 2017 AT&T Intellectual Property. All rights reserved. -# =================================================================== -# -# Unless otherwise specified, all software contained herein is licensed -# under the Apache License, Version 2.0 (the “License”); -# you may not use this software except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# -# -# Unless otherwise specified, all documentation contained herein is licensed -# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); -# you may not use this documentation except in compliance with the License. -# You may obtain a copy of the License at -# -# https://creativecommons.org/licenses/by/4.0/ -# -# Unless required by applicable law or agreed to in writing, documentation -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# ============LICENSE_END============================================ -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. diff --git a/services/database/db_bridge.py b/services/database/db_bridge.py deleted file mode 100644 index 1eb79fa..0000000 --- a/services/database/db_bridge.py +++ /dev/null @@ -1,65 +0,0 @@ - -# ============LICENSE_START========================================== -# org.onap.vvp/test-engine -# =================================================================== -# Copyright © 2017 AT&T Intellectual Property. All rights reserved. -# =================================================================== -# -# Unless otherwise specified, all software contained herein is licensed -# under the Apache License, Version 2.0 (the “License”); -# you may not use this software except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# -# -# Unless otherwise specified, all documentation contained herein is licensed -# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); -# you may not use this documentation except in compliance with the License. -# You may obtain a copy of the License at -# -# https://creativecommons.org/licenses/by/4.0/ -# -# Unless required by applicable law or agreed to in writing, documentation -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# ============LICENSE_END============================================ -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. - - -class DBBridge: - - """ - This class helps to use functions inside classes - with circular import (dependencies). - Use this class only when there is circular - import in one of the DB services. - """ - - @staticmethod - def select_personal_next_step(user_email): - """select_personal_next_step: Originally """ +\ - """can be found under DBUser class.""" - from services.database.db_user import DBUser - return DBUser.select_personal_next_step(user_email) - - @staticmethod - def helper_rand_string(type, num=""): - from services.helper import Helper - return Helper.rand_string(type, num) - - @staticmethod - def helper_internal_assert(arg1, arg2): - from services.helper import Helper - return Helper.internal_assert(arg1, arg2) diff --git a/services/database/db_checklist.py b/services/database/db_checklist.py deleted file mode 100644 index 0f8fd6e..0000000 --- a/services/database/db_checklist.py +++ /dev/null @@ -1,526 +0,0 @@ - -# ============LICENSE_START========================================== -# org.onap.vvp/test-engine -# =================================================================== -# Copyright © 2017 AT&T Intellectual Property. All rights reserved. -# =================================================================== -# -# Unless otherwise specified, all software contained herein is licensed -# under the Apache License, Version 2.0 (the “License”); -# you may not use this software except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# -# -# Unless otherwise specified, all documentation contained herein is licensed -# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); -# you may not use this documentation except in compliance with the License. -# You may obtain a copy of the License at -# -# https://creativecommons.org/licenses/by/4.0/ -# -# Unless required by applicable law or agreed to in writing, documentation -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# ============LICENSE_END============================================ -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. -import time -from uuid import uuid4 - -from django.utils import timezone -import psycopg2 - -from services.constants import Constants -from services.database.db_general import DBGeneral -from services.logging_service import LoggingServiceFactory -from services.session import session - - -logger = LoggingServiceFactory.get_logger() - - -class DBChecklist: - - @staticmethod - def select_where_approval_state( - queryColumnName, - queryTableName, - whereParametrType, - whereParametrValue, - fetchNum): - try: - dbConn = psycopg2.connect( - DBGeneral.return_db_native_connection('em_db')) - dbConn = dbConn - cur = dbConn.cursor() - queryStr = \ - "select %s from %s " % (queryColumnName, queryTableName) +\ - "Where %s = '%s'" % (whereParametrType, whereParametrValue) +\ - " and state = 'approval';" - logger.debug("Query : " + queryStr) - cur.execute(queryStr) - if (fetchNum == 0): - result = str(cur.fetchall()) - elif (fetchNum == 1): - result = str(cur.fetchone()) - if(result.find("',)") != -1): # formatting strings e.g uuid - result = result.partition('\'')[-1].rpartition('\'')[0] - elif(result.find(",)") != -1): # formatting ints e.g id - result = result.partition('(')[-1].rpartition(',')[0] - dbConn.close() - logger.debug("Query result: " + str(result)) - if result is None: - errorMsg = "select_where_approval_state FAILED " - logger.error(errorMsg) - raise - return result - # If failed - count the failure and add the error to list of errors. - except BaseException: - errorMsg = "select_where_approval_state FAILED " - raise Exception(errorMsg, "select_where_approval_state FAILED") - - @staticmethod - def select_where_pr_state( - queryColumnName, - queryTableName, - whereParametrType, - whereParametrValue, - fetchNum): - try: - dbConn = psycopg2.connect( - DBGeneral.return_db_native_connection('em_db')) - dbConn = dbConn - cur = dbConn.cursor() - queryStr = \ - "select %s from %s " % (queryColumnName, queryTableName) +\ - "Where %s = '%s' and " % ( - whereParametrType, whereParametrValue) +\ - "state = 'peer_review';" - logger.debug("Query : " + queryStr) - cur.execute(queryStr) - if (fetchNum == 0): - result = str(cur.fetchall()) - elif (fetchNum == 1): - result = str(cur.fetchone()) - if(result.find("',)") != -1): # formatting strings e.g uuid - result = result.partition('\'')[-1].rpartition('\'')[0] - elif(result.find(",)") != -1): # formatting ints e.g id - result = result.partition('(')[-1].rpartition(',')[0] - dbConn.close() - if result is None: - errorMsg = "select_where_pr_state FAILED " - logger.error(errorMsg) - raise - logger.debug("Query result: " + str(result)) - return result - # If failed - count the failure and add the error to list of errors. - except BaseException: - errorMsg = "select_where FAILED " - raise Exception(errorMsg, "select_where") - - @staticmethod - def select_where_cl_not_archive( - queryColumnName, - queryTableName, - whereParametrType, - whereParametrValue, - fetchNum): - try: - dbConn = psycopg2.connect( - DBGeneral.return_db_native_connection('em_db')) - dbConn = dbConn - cur = dbConn.cursor() - queryStr = \ - "select %s from %s " % (queryColumnName, queryTableName) +\ - "Where %s = '%s'" % (whereParametrType, whereParametrValue) +\ - "and state != 'archive';" - logger.debug("Query : " + queryStr) - cur.execute(queryStr) - if (fetchNum == 0): - result = str(cur.fetchall()) - elif (fetchNum == 1): - result = str(cur.fetchone()) - if(result.find("',)") != -1): # formatting strings e.g uuid - result = result.partition('\'')[-1].rpartition('\'')[0] - elif(result.find(",)") != -1): # formatting ints e.g id - result = result.partition('(')[-1].rpartition(',')[0] - dbConn.close() - logger.debug("Query result: " + str(result)) - return result - # If failed - count the failure and add the error to list of errors. - except BaseException: - errorMsg = "select_where FAILED " - raise Exception(errorMsg, "select_where") - - @staticmethod - def select_native_where( - queryColumnName, - queryTableName, - whereParametrType, - whereParametrValue, - fetchNum): - try: - dbConn = psycopg2.connect( - DBGeneral.return_db_native_connection('em_db')) - dbConn = dbConn - cur = dbConn.cursor() - queryStr = "select %s from %s Where %s = '%s';" % ( - queryColumnName, queryTableName, whereParametrType, - whereParametrValue) - logger.debug("Query : " + queryStr) - cur.execute(queryStr) - if (fetchNum == 0): - result = str(cur.fetchall()) - elif (fetchNum == 1): - result = str(cur.fetchone()) - if(result.find("',)") != -1): # formatting strings e.g uuid - result = result.partition('\'')[-1].rpartition('\'')[0] - elif(result.find(",)") != -1): # formatting ints e.g id - result = result.partition('(')[-1].rpartition(',')[0] - dbConn.close() - logger.debug("Query result: " + str(result)) - return result - # If failed - count the failure and add the error to list of errors. - except BaseException: - errorMsg = "select_where FAILED " - raise Exception(errorMsg, "select_where") - - @staticmethod - def update_checklist_to_review_state(queryTableName): - try: - dbConn = psycopg2.connect( - DBGeneral.return_db_native_connection('em_db')) - dbConn = dbConn - cur = dbConn.cursor() - queryStr = "UPDATE ice_checklist SET state='review' Where " +\ - "name= '%s' and state= 'pending';" % ( - queryTableName) - logger.debug("Query : " + queryStr) - cur.execute(queryStr) - dbConn.commit() - dbConn.close() - # If failed - count the failure and add the error to list of errors. - except BaseException: - errorMsg = "Could not Update User" - raise Exception(errorMsg, "Update") - - @staticmethod - def update_all_decisions_to_approve(whereParametrValue): - try: - dbConn = psycopg2.connect( - DBGeneral.return_db_native_connection('em_db')) - dbConn = dbConn - cur = dbConn.cursor() - queryStr = "UPDATE ice_checklist_decision SET " +\ - "review_value='approved' , peer_review_value='approved' " +\ - "Where checklist_id = '%s';" % ( - whereParametrValue) - logger.debug(queryStr) - cur.execute(queryStr) - dbConn.commit() - logger.debug("Query : " + queryStr) - # If failed - count the failure and add the error to list of errors. - except Exception as e: - errorMsg = "Could not Update User" - logger.debug(e) - raise Exception(errorMsg, "Update") - finally: - dbConn.close() - - @staticmethod - def is_archive(checklistName): - try: - result = False - # Fetch all AT&T user ID. - checklist_ids = DBGeneral.select_where( - "uuid", "ice_checklist", "name", checklistName, 0) -# checklist_ids = DBGeneral.list_format(checklist_ids) - for checklist_id in checklist_ids: # Second Example - if isinstance(checklist_id, tuple): - checklist_id = checklist_id[0] - state = DBGeneral.select_where( - "state", "ice_checklist", "uuid", checklist_id, 1) - if state == "archive": - result = True - break - return result - # If failed - count the failure and add the error to list of errors. - except BaseException: - errorMsg = "is_archive FAILED " - raise Exception(errorMsg, "is_archive") - - @staticmethod - def get_pr_email(checklistUuid): - try: - # Fetch one AT&T user ID. - owner_id = DBChecklist.select_where_pr_state( - "owner_id", "ice_checklist", "uuid", checklistUuid, 1) - engLeadEmail = DBGeneral.select_where( - "email", "ice_user_profile", "id", owner_id, 1) - logger.debug("get_pr_email = " + engLeadEmail) - return engLeadEmail - # If failed - count the failure and add the error to list of errors. - except Exception as e: - errorMsg = "get_pr_email FAILED " + str(e) - raise Exception(errorMsg, "get_pr_email") - - @staticmethod - def get_admin_email(checklistUuid): - try: - # Fetch one AT&T user ID. - owner_id = DBChecklist.select_where_approval_state( - "owner_id", "ice_checklist", "uuid", checklistUuid, 1) - engLeadEmail = DBGeneral.select_where( - "email", "ice_user_profile", "id", owner_id, 1) - logger.debug("get_admin_email = " + engLeadEmail) - return engLeadEmail - # If failed - count the failure and add the error to list of errors. - except BaseException: - errorMsg = "get_admin_email FAILED " - raise Exception(errorMsg, "get_admin_email") - - @staticmethod - def get_owner_email(checklistUuid): - try: - # Fetch one AT&T user ID. - owner_id = DBChecklist.select_native_where( - "owner_id", "ice_checklist", "uuid", checklistUuid, 1) - engLeadEmail = DBGeneral.select_where( - "email", "ice_user_profile", "id", owner_id, 1) - logger.debug("getPreeReviewerEngLeadEmail = " + engLeadEmail) - return engLeadEmail - # If failed - count the failure and add the error to list of errors. - except BaseException: - errorMsg = "get_admin_email FAILED " - raise Exception(errorMsg, "get_owner_email") - - @staticmethod - def update_decisions(checklistUuid, checklistName): - checklistTempid = DBGeneral.select_where( - "template_id", "ice_checklist", "name", checklistName, 1) - checklistLineItems = DBGeneral.select_where_and( - "uuid", - "ice_checklist_line_item", - "line_type", - "auto", - "template_id", - checklistTempid, - 0) - for lineItem in checklistLineItems: - setParametrType2 = "peer_review_value" - setParametrValue2 = "approved" - whereParametrType2 = "lineitem_id" - whereParametrValue2 = lineItem - DBGeneral.update_where_and( - "ice_checklist_decision", - "review_value", - checklistUuid, - "approved", - "checklist_id", - setParametrType2, - setParametrValue2, - whereParametrType2, - whereParametrValue2) - - @staticmethod - def checkChecklistIsUpdated(): - query = "select uuid from ice_checklist_section where template_id " +\ - "in (select template_id from ice_checklist_template where " +\ - "name='{template_name}') and name='{section_name}'".format( - template_name=Constants.Dashboard.LeftPanel. - EditChecklistTemplate.HEAT, section_name=Constants. - Dashboard.LeftPanel.EditChecklistTemplate.HEAT) - return DBGeneral.select_query(query) - - @staticmethod - def fetchEngByVfName(vfName): - # Fetch one AT&T user ID. - return DBGeneral.select_where( - "engagement_id", "ice_vf", "name", vfName, 1) - - @staticmethod - def fetchEngManIdByEngUuid(engagement_id): - return DBGeneral.select_where( - "engagement_manual_id", - "ice_engagement", - "uuid", - engagement_id, - 1) - - @staticmethod - def fetchChecklistByName(checklistName): - query = "select uuid from ice_checklist where " +\ - "name='{cl_name}'".format( - cl_name=checklistName) - return DBGeneral.select_query(query) - - @staticmethod - def create_default_heat_teampleate(): - template_query = "INSERT INTO public.ice_checklist_template(uuid, " +\ - "name, category, version, create_time)"\ - "VALUES ('%s', '%s', '%s', '%s', '%s');" % ( - str(uuid4()), 'Editing Heat', 'first category', '1', - timezone.now()) - DBGeneral.insert_query(template_query) - template_id = DBGeneral.select_query( - "SELECT uuid FROM public.ice_checklist_template where " + - "name = 'Editing Heat'") - # SECTIONS - section1_query = "INSERT INTO public.ice_checklist_section(uuid, " +\ - "name, weight, description, validation_instructions, " +\ - "create_time, template_id) "\ - "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s');" % ( - str(uuid4()), 'External References', - '1', 'section descripyion', 'valid instructions', - timezone.now(), template_id) - DBGeneral.insert_query(section1_query) - section1_id = DBGeneral.select_query( - ("""SELECT uuid FROM public.ice_checklist_section """ + - """where name = 'External References' """ + - """and template_id = '{s}'""").format( - s=template_id)) - section2_query = "INSERT INTO public.ice_checklist_section(uuid, " +\ - "name, weight, description, validation_instructions, " +\ - "create_time, template_id) "\ - "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s');" % ( - str(uuid4()), 'Parameter Specification', - '2', 'section descripyion', 'valid instructions', - timezone.now(), template_id) - DBGeneral.insert_query(section2_query) - section2_id = DBGeneral.select_query( - ("""SELECT uuid FROM public.ice_checklist_section """ + - """where name = """ + - """'Parameter Specification' and template_id = '{s}'""").format( - s=template_id)) - # Line items - line_item1 = \ - "INSERT INTO public.ice_checklist_line_item(uuid, " +\ - "name, weight, description, line_type, validation_instructions," +\ - "create_time,section_id, template_id) "\ - "VALUES ('%s', '%s', " % (str(uuid4()), 'Normal references') +\ - "'%s', " % '1' +\ - "'%s'," % 'Numeric parameters should include ' +\ - 'range and/or allowed values.' +\ - " '%s'," % 'manual', +\ - "'%s'" % 'Here are some useful tips ' +\ - 'for how to validate this item ' +\ - 'in the most awesome way:<br><br><ul><li>Here is my ' +\ - 'awesome tip 1</li><li>Here is my awesome tip 2</li><li>' +\ - 'Here is my awesome tip 3</li></ul>' +\ - ", '%s'" % timezone.now() +\ - ", '%s'," % section1_id +\ - " '%s');" % template_id - DBGeneral.insert_query(line_item1) - line_item2 = "INSERT INTO public.ice_checklist_line_item(uuid, " +\ - "name, weight, description, line_type, validation_instructions," +\ - "create_time, section_id, template_id) "\ - "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s', '%s', '%s');" % ( - str(uuid4()), 'String parameters', '2', - 'Numeric parameters should include range ' + - 'and/or allowed values.', 'auto', - 'Here are some useful tips for how to validate this item ' + - 'in the most awesome way:<br><br><ul><li>Here is my ' + - 'awesome tip 1</li><li>Here is my awesome tip 2</li><li>' + - 'Here is my awesome tip 3</li></ul>', timezone.now(), - section2_id, template_id) - DBGeneral.insert_query(line_item2) - line_item3 = "INSERT INTO public.ice_checklist_line_item(uuid, " +\ - "name, weight, description, line_type, validation_instructions," +\ - "create_time,section_id, template_id) "\ - "VALUES ('%s', '%s', '%s', '%s', '%s','%s', " +\ - "'%s', '%s', '%s');" % ( - str(uuid4()), 'Numeric parameters', '3', - 'Numeric parameters should include range and/or ' + - 'allowed values.', 'manual', - 'Here are some useful tips for how to validate this item ' + - 'in the most awesome way:<br><br><ul><li>Here is my ' + - 'awesome tip 1</li><li>Here is my awesome tip 2</li><li>' + - 'Here is my awesome tip 3</li></ul>', timezone.now(), - section2_id, template_id) - DBGeneral.insert_query(line_item3) - line_item4 = "INSERT INTO public.ice_checklist_line_item(uuid, " +\ - "name, weight, description, line_type, " +\ - "validation_instructions,create_time, section_id, " +\ - "template_id) "\ - "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s', " +\ - "'%s', '%s');" % ( - str(uuid4()), 'VF image', '2', - 'Numeric parameters should include range and/or ' + - 'allowed values.', 'auto', - 'Here are some useful tips for how to validate this ' + - 'item in the most awesome way:<br><br><ul><li>Here is ' + - 'my awesome tip 1</li><li>Here is my awesome tip 2' + - '</li><li>Here is my awesome tip 3</li></ul>', - timezone.now(), section1_id, template_id) - DBGeneral.insert_query(line_item4) - line_item5 = "INSERT INTO public.ice_checklist_line_item(uuid, " +\ - "name, weight, description, line_type, validation_instructions," +\ - "create_time,section_id, template_id) "\ - "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s'," +\ - " '%s', '%s');" % (str( - uuid4()), 'Parameters', '1', - 'Numeric parameters should include range ' + - 'and/or allowed values.', 'auto', - 'Here are some useful tips for how to validate this item ' + - 'in the most awesome way:<br><br><ul><li>Here is my awesome ' + - 'tip 1</li><li>Here is my awesome tip 2</li><li>Here is my ' + - 'awesome tip 3</li></ul>', timezone.now(), section2_id, - template_id) - DBGeneral.insert_query(line_item5) - - @staticmethod - def create_editing_cl_template_if_not_exist(): - template_id = DBGeneral.select_query( - ("""SELECT uuid FROM public.ice_checklist_template """ + - """where name = '{s}'""").format( - s=Constants.Dashboard.LeftPanel.EditChecklistTemplate.HEAT)) - if template_id == 'None': - DBChecklist.create_default_heat_teampleate() - session.createTemplatecount = True - - @staticmethod - def state_changed(identify_field, field_value, expected_state): - get_state = DBGeneral.select_where_order_by_desc( - "state", Constants.DBConstants.IceTables.CHECKLIST, - identify_field, field_value, "create_time")[0] - counter = 0 - while get_state != expected_state and \ - counter <= Constants.DBConstants.RETRIES_NUMBER: - time.sleep(session.wait_until_time_pause_long) - logger.debug( - "Checklist state not changed yet ," + - "expecting state: %s, current result: %s (attempt %s of %s)" % - (expected_state, get_state, counter, - Constants.DBConstants.RETRIES_NUMBER)) - counter += 1 - get_state = DBGeneral.select_where_order_by_desc( - "state", Constants.DBConstants.IceTables.CHECKLIST, - identify_field, field_value, "create_time")[0] - - if get_state == expected_state: - logger.debug("Checklist state was successfully changed into: " + - expected_state + ", and was verified over the DB") - return expected_state - raise Exception( - "Expected checklist state never arrived " + - expected_state, - get_state) - - @staticmethod - def get_recent_checklist_uuid(name): - required_uuid = DBGeneral.select_where_not_and_order_by_desc( - 'uuid', Constants.DBConstants.IceTables.CHECKLIST, 'name', name, - 'state', Constants.ChecklistStates.Archive.TEXT, 'create_time') - return required_uuid diff --git a/services/database/db_cms.py b/services/database/db_cms.py deleted file mode 100644 index 288121a..0000000 --- a/services/database/db_cms.py +++ /dev/null @@ -1,278 +0,0 @@ -# ============LICENSE_START========================================== -# org.onap.vvp/test-engine -# =================================================================== -# Copyright © 2017 AT&T Intellectual Property. All rights reserved. -# =================================================================== -# -# Unless otherwise specified, all software contained herein is licensed -# under the Apache License, Version 2.0 (the “License”); -# you may not use this software except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# -# -# Unless otherwise specified, all documentation contained herein is licensed -# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); -# you may not use this documentation except in compliance with the License. -# You may obtain a copy of the License at -# -# https://creativecommons.org/licenses/by/4.0/ -# -# Unless required by applicable law or agreed to in writing, documentation -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# ============LICENSE_END============================================ -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. -import psycopg2 -from wheel.signatures import assertTrue - -from services.database.db_general import DBGeneral -from services.helper import Helper -from services.logging_service import LoggingServiceFactory - - -logger = LoggingServiceFactory.get_logger() - - -class DBCMS: - - @staticmethod - def insert_query(queryStr): - try: - nativeIceDb = psycopg2.connect( - DBGeneral.return_db_native_connection('cms_db')) - dbConn = nativeIceDb - cur = dbConn.cursor() - logger.debug("Query: " + queryStr) - cur.execute(queryStr) - dbConn.commit() - dbConn.close() - logger.debug("Insert query success!") - # If failed - count the failure and add the error to list of errors. - except BaseException: - raise Exception("Couldn't fetch answer using the given query.") - - @staticmethod - def update_query(queryStr): - try: - nativeIceDb = psycopg2.connect( - DBGeneral.return_db_native_connection('cms_db')) - dbConn = nativeIceDb - cur = dbConn.cursor() - logger.debug("Query: " + queryStr) - cur.execute(queryStr) - dbConn.commit() - dbConn.close() - logger.debug("Update query success!") - # If failed - count the failure and add the error to list of errors. - except BaseException: - raise Exception("Couldn't fetch answer using the given query.") - - @staticmethod - def select_query(queryStr): - try: - nativeIceDb = psycopg2.connect( - DBGeneral.return_db_native_connection('cms_db')) - dbConn = nativeIceDb - cur = dbConn.cursor() - logger.debug("Query: " + queryStr) - cur.execute(queryStr) - result = str(cur.fetchone()) - if(result.find("',)") != -1): # formatting strings e.g uuid - result = result.partition('\'')[-1].rpartition('\'')[0] - elif(result.find(",)") != -1): # formatting ints e.g id - result = result.partition('(')[-1].rpartition(',')[0] - dbConn.close() - logger.debug("Query result: " + str(result)) - return result - # If failed - count the failure and add the error to list of errors. - except BaseException: - raise Exception("Couldn't fetch answer using the given query.") - - @staticmethod - def get_cms_category_id(categoryName): - logger.debug("Get DBCMS category id for name: " + categoryName) - queryStr = "SELECT id FROM public.blog_blogcategory WHERE " +\ - "title = '%s' LIMIT 1;" % (categoryName) - logger.debug("Query : " + queryStr) - result = DBCMS.select_query(queryStr) - return result - - @staticmethod - def insert_cms_new_post(title, description, categoryName): - logger.debug("Insert new post : " + title) - queryStr = "INSERT INTO public.blog_blogpost" \ - "(comments_count, keywords_string, rating_count, rating_sum, " +\ - "rating_average, title, slug, _meta_title, description, " +\ - "gen_description, created, updated, status, publish_date, " +\ - "expiry_date, short_url, in_sitemap, content, allow_comments, " +\ - "featured_image, site_id, user_id) "\ - "VALUES (0, '', 0, 0, 0, " +\ - "'%s', '%s-slug', " % (title, title) +\ - "'', '%s', true, " % description +\ - "current_timestamp - interval '1 day', current_timestamp - " +\ - "interval '2 day', 2, current_timestamp - interval '1 day', " +\ - "NULL, '', true, '<p>%s</p>', true, '', 1, 1);" % description - logger.debug("Query : " + queryStr) - DBCMS.insert_query(queryStr) - post_id = DBCMS.get_last_added_post_id() - categoryId = DBCMS.get_cms_category_id(categoryName) - DBCMS.add_category_to_post(post_id, categoryId) - return post_id - - @staticmethod - def get_last_added_post_id(): - logger.debug("Get the id of the post inserted") - queryStr = "select MAX(id) FROM public.blog_blogpost;" - logger.debug("Query : " + queryStr) - result = DBCMS.select_query(queryStr) - return result - - @staticmethod - def update_days(xdays, title): - logger.debug("Get the id of the post inserted") - queryStr = "UPDATE public.blog_blogpost SET " +\ - "created=current_timestamp - interval '%s day' " % xdays +\ - "WHERE title='%s';" % title - logger.debug("Query : " + queryStr) - result = DBCMS.update_query(queryStr) - return result - - @staticmethod - def add_category_to_post(postId, categoryId): - logger.debug("bind category into inserted post: " + postId) - queryStr = "INSERT INTO public.blog_blogpost_categories" +\ - "(blogpost_id, blogcategory_id) " +\ - "VALUES (%s, %s);" % (postId, categoryId) - logger.debug("Query : " + queryStr) - DBCMS.insert_query(queryStr) - - @staticmethod - def get_documentation_page_id(): - logger.debug("Retrive id of documentation page: ") - queryStr = "SELECT id FROM public.pages_page WHERE " +\ - "title = 'Documentation' LIMIT 1;" - logger.debug("Query : " + queryStr) - result = DBCMS.select_query(queryStr) - return result - - @staticmethod - def get_last_inserted_page_id(): - logger.debug("Retrive id of last page inserted: ") - queryStr = "select MAX(id) FROM public.pages_page;" - logger.debug("Query : " + queryStr) - result = DBCMS.select_query(queryStr) - return result - - @staticmethod - def delete_old_tips_of_the_day(): - logger.debug("Delete all posts ") - queryStr = "DELETE FROM public.blog_blogpost_categories WHERE id>0;" - logger.debug("Query : " + queryStr) - DBCMS.insert_query(queryStr) - queryStr = "DELETE FROM public.blog_blogpost WHERE id>0;;" - logger.debug("Query : " + queryStr) - DBCMS.insert_query(queryStr) - - @staticmethod - def insert_page(title, content, parent_id=None): - logger.debug("Retrive id of documentation page: ") - if parent_id is None: - parent_id = DBCMS.get_documentation_page_id() - queryStr = "INSERT INTO public.pages_page(" \ - "keywords_string, title, slug, _meta_title, description, " +\ - "gen_description, created, updated, status, publish_date, " +\ - "expiry_date, short_url, in_sitemap, _order, in_menus, titles, " +\ - "content_model, login_required, parent_id, site_id)" \ - "VALUES ('', " +\ - "'%s', '%s-slug'" % (title, title) +\ - ", '', '%s', true, " % content +\ - "current_timestamp - interval '1 day', current_timestamp " +\ - "- interval '1 day', 2, current_timestamp - interval '1 day', " +\ - "NULL, '', true, 0, '1,2,3', " +\ - "'%s', 'richtextpage', " % title +\ - "true, %s, 1);" % parent_id - logger.debug("Query : " + queryStr) - DBCMS.insert_query(queryStr) - - createdPageId = DBCMS.get_last_inserted_page_id() - logger.debug( - "Bind the page with the rich text content related to this page") - queryStr = "INSERT INTO public.pages_richtextpage(page_ptr_id, " +\ - "content) VALUES (%s, '<p>%s</p>');" % ( - createdPageId, content) - logger.debug("Query : " + queryStr) - DBCMS.insert_query(queryStr) - return createdPageId - - @staticmethod - def create_faq(): - title = "title_FAQ" + Helper.rand_string("randomString") - description = "description_FAQ_" + Helper.rand_string("randomString") - DBCMS.delete_old_tips_of_the_day() - postId = DBCMS.insert_cms_new_post(title, description, "FAQ") - assertTrue(len(postId) > 0 and not None) - return title, description - - @staticmethod - def create_news(): - title = "title_News" + Helper.rand_string("randomString") - description = "description_News" + Helper.rand_string("randomString") - postId = DBCMS.insert_cms_new_post(title, description, "News") - assertTrue(len(postId) > 0 and not None) - return title, description - - @staticmethod - def create_announcement(): - title = "title_Announcement_" + Helper.rand_string("randomString") - description = "description_Announcement_" + \ - Helper.rand_string("randomString") - postId = DBCMS.insert_cms_new_post(title, description, "Announcement") - assertTrue(len(postId) > 0 and not None) - return title, description - - @staticmethod - def create_page(parent_id=None): - title = "title_Of_Page_" + Helper.rand_string("randomString") - description = "description_Of_Page_" + \ - Helper.rand_string("randomString") - createdPageId = DBCMS.insert_page(title, description) - assertTrue(len(createdPageId) > 0 and not None) - return title, description - - @staticmethod - def update_X_days_back_post(title, xdays): - logger.debug("Get the id of the post inserted") - queryStr = "UPDATE blog_blogpost SET created = current_timestamp" +\ - " - interval '%s day', " % xdays +\ - "publish_date=current_timestamp - " +\ - "interval '%s day' WHERE title= '%s' ;" % (xdays, title) - logger.debug("Query : " + queryStr) - DBCMS.update_query(queryStr) - - @staticmethod - def create_announcements(x): - listOfTitleAnDescriptions = [] - for _ in range(x): - # print x ->str - title = "title_Announcement_" + Helper.rand_string("randomString") - description = "description_Announcement_" + \ - Helper.rand_string("randomString") - postId = DBCMS.insert_cms_new_post( - title, description, "Announcement") - assertTrue(len(postId) > 0 and not None) - xList = [title, description] - listOfTitleAnDescriptions.append(xList) - return listOfTitleAnDescriptions diff --git a/services/database/db_general.py b/services/database/db_general.py deleted file mode 100755 index 2c83fb0..0000000 --- a/services/database/db_general.py +++ /dev/null @@ -1,485 +0,0 @@ -# ============LICENSE_START========================================== -# org.onap.vvp/test-engine -# =================================================================== -# Copyright © 2017 AT&T Intellectual Property. All rights reserved. -# =================================================================== -# -# Unless otherwise specified, all software contained herein is licensed -# under the Apache License, Version 2.0 (the “License”); -# you may not use this software except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# -# -# Unless otherwise specified, all documentation contained herein is licensed -# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); -# you may not use this documentation except in compliance with the License. -# You may obtain a copy of the License at -# -# https://creativecommons.org/licenses/by/4.0/ -# -# Unless required by applicable law or agreed to in writing, documentation -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# ============LICENSE_END============================================ -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. -from datetime import datetime -import sqlite3 - -from django.conf import settings -from django.db import transaction -import psycopg2 - -from services.logging_service import LoggingServiceFactory - - -logger = LoggingServiceFactory.get_logger() - - -class DBGeneral: - - @staticmethod - # desigredDB: Use 'default' for CI General and 'em_db' for EM General - # (according to settings.DATABASES). - def return_db_native_connection(desigredDB): - dbConnectionStr = "dbname='" + str( - settings.SINGLETONE_DB[desigredDB]['NAME']) + \ - "' user='" + str(settings.SINGLETONE_DB[desigredDB]['USER']) + \ - "' host='" + str(settings.SINGLETONE_DB[desigredDB]['HOST']) + \ - "' password='" + str( - settings.SINGLETONE_DB[desigredDB]['PASSWORD']) + \ - "' port='" + \ - str(settings.SINGLETONE_DB[desigredDB]['PORT']) + "'" - return dbConnectionStr - - @staticmethod - def insert_results( - testType, - testFeature, - testResult, - testName, - testDuration, - notes=" "): - try: - if settings.DATABASE_TYPE == 'sqlite': - dbfile = str(settings.DATABASES['default']['TEST_NAME']) - dbConn = sqlite3.connect(dbfile) - cur = dbConn.cursor() - else: - # Connect to General 'default'. - dbConn = psycopg2.connect( - DBGeneral.return_db_native_connection("default")) - dbConn = dbConn - cur = dbConn.cursor() - except Exception as e: - errorMsg = "Failed to create connection to General." + str(e) - raise Exception(errorMsg) - try: # Create INSERT query. - if settings.DATABASE_TYPE == 'sqlite': - query_str = 'INSERT INTO ice_test_results ' +\ - '(testType, testFeature, testResult, testName, notes,'\ - 'create_time, build_id, duration) VALUES ' +\ - '(?, ?, ?, ?, ?, ?, ?, ?);' - else: - query_str = 'INSERT INTO ice_test_results ("testType", ' +\ - '"testFeature", "testResult", "testName", notes,'\ - 'create_time, build_id, duration) VALUES ' +\ - '(%s, %s, %s, %s, %s, %s, %s, %s);' - cur.execute(query_str, (testType, testFeature, testResult, - testName, notes, - str(datetime.now()), - settings.ICE_BUILD_REPORT_NUM, - testDuration)) - dbConn.commit() - logger.debug("Test result in DB - " + testResult) - except Exception as e: - logger.error(e) - errorMsg = "Failed to insert results to DB." + str(e) - raise Exception(errorMsg) - dbConn.close() - - @staticmethod - def select_query(queryStr, return_type="str", fetch_num=1): - try: - dbConn = psycopg2.connect( - DBGeneral.return_db_native_connection('em_db')) - dbConn = dbConn - cur = dbConn.cursor() - logger.debug("Query: " + queryStr) - cur.execute(queryStr) - if return_type == "str": - if fetch_num == 1: - result = str(cur.fetchone()) - else: - result = str(cur.fetchall()) - if result != 'None': - # formatting strings e.g uuid - if(result.find("',)") != -1): - result = result.partition('\'')[-1].rpartition('\'')[0] - elif(result.find(",)") != -1): # formatting ints e.g id - result = result.partition('(')[-1].rpartition(',')[0] - if return_type == "list": - if fetch_num == 1: - result = list(cur.fetchone()) - else: - result = [item[0] for item in cur.fetchall()] - dbConn.close() - logger.debug("Query result: " + str(result)) - return result - except BaseException: - raise Exception("Couldn't fetch answer using the given query.") - - @staticmethod - def insert_query(queryStr): - try: - nativeIceDb = psycopg2.connect( - DBGeneral.return_db_native_connection('em_db')) - dbConn = nativeIceDb - cur = dbConn.cursor() - logger.debug("Query: " + queryStr) - cur.execute(queryStr) - dbConn.commit() - dbConn.close() - logger.debug("Insert query success!") - # If failed - count the failure and add the error to list of errors. - except Exception as e: - logger.error(e) - transaction.rollback() - raise Exception("Couldn't fetch answer using the given query.") - - @staticmethod - def update_query(queryStr): - try: - nativeIceDb = psycopg2.connect( - DBGeneral.return_db_native_connection('em_db')) - dbConn = nativeIceDb - cur = dbConn.cursor() - logger.debug("Query: " + queryStr) - cur.execute(queryStr) - dbConn.commit() - dbConn.close() - logger.debug("Update query success!") - # If failed - count the failure and add the error to list of errors. - except Exception as e: - logger.error(e) - transaction.rollback() - raise Exception("Couldn't fetch answer using the given query.") - - @staticmethod - def select_where_email(queryColumnName, queryTableName, email): - try: - dbConn = psycopg2.connect( - DBGeneral.return_db_native_connection('em_db')) - dbConn = dbConn - cur = dbConn.cursor() - queryStr = "select %s from %s WHERE Email = '%s';" % ( - queryColumnName, queryTableName, email) - logger.debug("Query : " + queryStr) - cur.execute(queryStr) - result = str(cur.fetchone()) - if(result.find("',)") != -1): # formatting strings e.g uuid - result = result.partition('\'')[-1].rpartition('\'')[0] - elif(result.find(",)") != -1): # formatting ints e.g id - result = result.partition('(')[-1].rpartition(',')[0] - dbConn.close() - logger.debug("Query result: " + str(result)) - return result - # If failed - count the failure and add the error to list of errors. - except BaseException: - errorMsg = "select_where_email FAILED " - raise Exception(errorMsg, "select_where_email") - raise - - @staticmethod - def select_from(queryColumnName, queryTableName, fetchNum): - try: - dbConn = psycopg2.connect( - DBGeneral.return_db_native_connection('em_db')) - cur = dbConn.cursor() - queryStr = "select %s from %s;" % (queryColumnName, queryTableName) - logger.debug("Query : " + queryStr) - cur.execute(queryStr) - if (fetchNum == 0): - result = str(cur.fetchall()) - elif (fetchNum == 1): - result = str(cur.fetchone()) - if(result.find("',)") != -1): # formatting strings e.g uuid - result = result.partition('\'')[-1].rpartition('\'')[0] - elif(result.find(",)") != -1): # formatting ints e.g id - result = result.partition('(')[-1].rpartition(',')[0] - dbConn.close() - logger.debug("Query result: " + str(result)) - return result - # If failed - count the failure and add the error to list of errors. - except Exception as e: - errorMsg = "select_from FAILED " + str(e) - raise Exception(errorMsg, "select_from") - - @staticmethod - def select_where( - queryColumnName, - queryTableName, - whereParametrType, - whereParametrValue, - fetchNum): - try: - dbConn = psycopg2.connect( - DBGeneral.return_db_native_connection('em_db')) - cur = dbConn.cursor() - queryStr = "select %s from %s Where %s = '%s';" % ( - queryColumnName, queryTableName, whereParametrType, - whereParametrValue) - logger.debug("Query : " + queryStr) - cur.execute(queryStr) - if (fetchNum == 0): - result = list(cur.fetchall()) - elif (fetchNum == 1): - result = str(cur.fetchone()) - if(result.find("',)") != -1): # formatting strings e.g uuid - result = result.partition('\'')[-1].rpartition('\'')[0] - elif(result.find(",)") != -1): # formatting ints e.g id - result = result.partition('(')[-1].rpartition(',')[0] - dbConn.close() - logger.debug("Query result: " + str(result)) - return result - # If failed - count the failure and add the error to list of errors. - except BaseException: - errorMsg = "select_where FAILED " - raise Exception(errorMsg, "select_where") - - @staticmethod - def select_where_order_by_desc( - queryColumnName, - queryTableName, - whereParametrType, - whereParametrValue, - order_by): - dbConn = psycopg2.connect( - DBGeneral.return_db_native_connection('em_db')) - cur = dbConn.cursor() - queryStr = \ - "select %s from %s " % (queryColumnName, queryTableName,) +\ - "Where %s = '%s' " % (whereParametrType, whereParametrValue) +\ - "order by %s desc limit 1;" % order_by - logger.debug("Query : " + queryStr) - cur.execute(queryStr) - result = str(cur.fetchall()) - result = DBGeneral.list_format(result) - dbConn.close() - return result - - @staticmethod - def select_where_dict(queryColumnName, queryTableName, whereParametrType): - dbConn = psycopg2.connect( - DBGeneral.return_db_native_connection('em_db')) - cur = dbConn.cursor() - x = "" - count = 0 - for key, val in whereParametrType.items(): - x += "%s='%s'" % (key, val) - if len(whereParametrType.items()) - count > 1: - x += ' and ' - count += 1 - queryStr = "select %s from %s Where %s;" \ - % (queryColumnName, queryTableName, x) - logger.debug("Query : " + queryStr) - cur.execute(queryStr) - result = str(cur.fetchall()) - result = DBGeneral.list_format(result) - dbConn.close() - return result - - @staticmethod - def select_where_not_and_order_by_desc( - queryColumnName, - queryTableName, - whereParametrType, - whereParametrValue, - parametrTypeAnd, - parametrAnd, - order_by): - dbConn = psycopg2.connect( - DBGeneral.return_db_native_connection('em_db')) - cur = dbConn.cursor() - queryStr = \ - "select %s from %s " % (queryColumnName, queryTableName) +\ - "Where %s = '%s' " % (whereParametrType, whereParametrValue) +\ - "and %s != '%s' " % (parametrTypeAnd, parametrAnd) +\ - "order by %s desc limit 1;" % order_by - logger.debug("Query : " + queryStr) - cur.execute(queryStr) - result = str(cur.fetchall()) - result = DBGeneral.list_format(result) - dbConn.close() - return result - - @staticmethod - def select_where_and( - queryColumnName, - queryTableName, - whereParametrType, - whereParametrValue, - parametrTypeAnd, - parametrAnd, - fetchNum): - try: - dbConn = psycopg2.connect( - DBGeneral.return_db_native_connection('em_db')) - dbConn = dbConn - cur = dbConn.cursor() - queryStr = "select %s from %s Where %s = '%s' and %s = '%s';" % ( - queryColumnName, queryTableName, whereParametrType, - whereParametrValue, parametrTypeAnd, parametrAnd) - logger.debug("Query : " + queryStr) - cur.execute(queryStr) - if (fetchNum == 0): - result = str(cur.fetchall()) - result = DBGeneral.list_format(result) - elif (fetchNum == 1): - result = str(cur.fetchone()) - if(result.find("',)") != -1): # formatting strings e.g uuid - result = result.partition('\'')[-1].rpartition('\'')[0] - elif(result.find(",)") != -1): # formatting ints e.g id - result = result.partition('(')[-1].rpartition(',')[0] - dbConn.close() - logger.debug("Query result: " + str(result)) - return result - # If failed - count the failure and add the error to list of errors. - except BaseException: - errorMsg = "select_where_and FAILED " - raise Exception(errorMsg, "select_where_and") - - @staticmethod - def select_where_is_bigger( - queryColumnName, - queryTableName, - whereParametrType, - whereParametrValue, - parametrTypeAnd, - parametrAnd, - fetchNum): - try: - dbConn = psycopg2.connect( - DBGeneral.return_db_native_connection('em_db')) - dbConn = dbConn - cur = dbConn.cursor() - queryStr = "select %s from %s Where %s = '%s' and %s > %s;" % ( - queryColumnName, queryTableName, whereParametrType, - whereParametrValue, parametrTypeAnd, parametrAnd) - logger.debug("Query : " + queryStr) - cur.execute(queryStr) - if (fetchNum == 0): - result = cur.fetchall() - elif (fetchNum == 1): - result = cur.fetchone() - if(result.find("',)") != -1): # formatting strings e.g uuid - result = result.partition('\'')[-1].rpartition('\'')[0] - elif(result.find(",)") != -1): # formatting ints e.g id - result = result.partition('(')[-1].rpartition(',')[0] - dbConn.close() - return result - # If failed - count the failure and add the error to list of errors. - except BaseException: - errorMsg = "select_where_is_bigger FAILED " - raise Exception(errorMsg, "select_where_is_bigger") - - @staticmethod - def update_where( - queryTableName, - setParametrType, - setparametrValue, - whereParametrType, - whereParametrValue): - try: - dbConn = psycopg2.connect( - DBGeneral.return_db_native_connection('em_db')) - dbConn = dbConn - cur = dbConn.cursor() - queryStr = "UPDATE %s SET %s = '%s' Where %s = '%s';" % ( - queryTableName, setParametrType, setparametrValue, - whereParametrType, whereParametrValue) - cur.execute(queryStr) - dbConn.commit() - logger.debug("Query : " + queryStr) - # If failed - count the failure and add the error to list of errors. - except Exception as e: - errorMsg = "Could not Update User" - logger.debug(e) - raise Exception(errorMsg, "Update") - finally: - dbConn.close() - - @staticmethod - def update_by_query(queryStr): - try: - dbConn = psycopg2.connect( - DBGeneral.return_db_native_connection('em_db')) - dbConn = dbConn - cur = dbConn.cursor() - logger.debug("Query : " + queryStr) - cur.execute(queryStr) - dbConn.commit() - dbConn.close() - # If failed - count the failure and add the error to list of errors. - except BaseException: - errorMsg = "Could not Update User" - raise Exception(errorMsg, "Update") - - @staticmethod - def update_where_and( - queryTableName, - setParametrType, - parametrValue, - changeToValue, - whereParametrType, - setParametrType2, - setParametrValue2, - whereParametrType2, - whereParametrValue2): - try: - dbConn = psycopg2.connect( - DBGeneral.return_db_native_connection('em_db')) - dbConn = dbConn - cur = dbConn.cursor() - queryStr = "UPDATE %s SET " % queryTableName +\ - "%s = '%s', " % (setParametrType, changeToValue) +\ - "%s = '%s' Where " % (setParametrType2, setParametrValue2) +\ - "%s = '%s' " % (whereParametrType, parametrValue) +\ - "and %s = '%s';" % (whereParametrType2, whereParametrValue2) - logger.debug("Query : " + queryStr) - cur.execute(queryStr) - dbConn.commit() - dbConn.close() - # If failed - count the failure and add the error to list of errors. - except BaseException: - errorMsg = "Could not Update User" - raise Exception(errorMsg, "Update") - - @staticmethod - def list_format(un_listed): - un_listed = un_listed[1:-1] - un_listed = un_listed.replace("',), ('", "|||") - un_listed = un_listed.replace("(u'", "") # Format list - un_listed = un_listed[1:-1].replace("('", "") # Format list - un_listed = un_listed.replace("',)", "") # Format list - listed = un_listed[1:-2].split("|||") - return listed - - @staticmethod - def get_vendors_list(): - # Select approved vendors from db. - vendors_list = DBGeneral.select_where( - "name", "ice_vendor", "public", "TRUE", 0) - return vendors_list diff --git a/services/database/db_user.py b/services/database/db_user.py deleted file mode 100644 index 10d02ff..0000000 --- a/services/database/db_user.py +++ /dev/null @@ -1,515 +0,0 @@ - -# ============LICENSE_START========================================== -# org.onap.vvp/test-engine -# =================================================================== -# Copyright © 2017 AT&T Intellectual Property. All rights reserved. -# =================================================================== -# -# Unless otherwise specified, all software contained herein is licensed -# under the Apache License, Version 2.0 (the “License”); -# you may not use this software except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# -# -# Unless otherwise specified, all documentation contained herein is licensed -# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); -# you may not use this documentation except in compliance with the License. -# You may obtain a copy of the License at -# -# https://creativecommons.org/licenses/by/4.0/ -# -# Unless required by applicable law or agreed to in writing, documentation -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# ============LICENSE_END============================================ -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. -import time - -from django.conf import settings -import psycopg2 - -from services.constants import Constants -from services.database.db_bridge import DBBridge -from services.database.db_general import DBGeneral -from services.database.db_virtual_function import DBVirtualFunction -from services.frontend.base_actions.wait import Wait -from services.logging_service import LoggingServiceFactory -from services.session import session - - -logger = LoggingServiceFactory.get_logger() - - -class DBUser: - - @staticmethod - def get_activation_url(email): - # Fetch one user ID. - uuid = DBUser.select_user_uuid(email) - # Fetch one user ID. - index = DBGeneral.select_where_email("id", "auth_user", email) - activation_token = DBGeneral.select_where( - "activation_token", "ice_custom_user", "user_ptr_id", index, 1) - # / activate /:userID /:token - activationUrl = settings.ICE_PORTAL_URL + '/#/activate/' + \ - str(uuid) + '/' + str(activation_token) - logger.debug("activationUrl :" + activationUrl) - return activationUrl - - @staticmethod - def get_contact_signup_url( - invite_token, - uuid, - email, - fullName, - phoneNum, - companyName): - companyId = DBGeneral.select_where( - "uuid", "ice_vendor", "name", companyName, 1) - signUpURLforContact = settings.ICE_PORTAL_URL + \ - "#/signUp?invitation=" + invite_token + \ - "&email=" + email + "&full_name=" + fullName + \ - "&phone_number=" + phoneNum + "&company=" + companyId - logger.debug("SignUpURLforContact :" + signUpURLforContact) - return signUpURLforContact - - @staticmethod - def select_invitation_token( - queryColumnName, - queryTableName, - whereParametrType, - whereParametrValue, - email, - fetchNum): - try: - dbConn = psycopg2.connect( - DBGeneral.return_db_native_connection('em_db')) - dbConn = dbConn - cur = dbConn.cursor() - queryStr = \ - "select %s from %s Where %s = '%s' and email = '%s' ;" % ( - queryColumnName, queryTableName, whereParametrType, - whereParametrValue, email) - logger.debug("Query : " + queryStr) - cur.execute(queryStr) - if (fetchNum == 0): - result = str(cur.fetchall()) - elif (fetchNum == 1): - result = str(cur.fetchone()) - if(result.find("',)") != -1): # formatting strings e.g uuid - result = result.partition('\'')[-1].rpartition('\'')[0] - elif(result.find(",)") != -1): # formatting ints e.g id - result = result.partition('(')[-1].rpartition(',')[0] - dbConn.close() - if result is None: - errorMsg = "select_where_pr_state FAILED " - logger.error(errorMsg) - raise - logger.debug("Query result: " + str(result)) - return result - # If failed - count the failure and add the error to list of errors. - except BaseException: - errorMsg = "select_where FAILED " - raise Exception(errorMsg, "select_where") - - @staticmethod - def get_el_name(vfName): - try: - # Fetch one AT&T user ID. - engagement_id = DBVirtualFunction.select_eng_uuid(vfName) - engagement_manual_id = DBGeneral.select_where( - "engagement_manual_id", "ice_engagement", "uuid", - engagement_id, 1) - reviewer_id = DBGeneral.select_where( - "reviewer_id", - "ice_engagement", - "engagement_manual_id", - engagement_manual_id, - 1) - engLeadFullName = DBGeneral.select_where_and( - "full_name", "ice_user_profile", "id", reviewer_id, - "role_id", "2", 1) - return engLeadFullName - # If failed - count the failure and add the error to list of errors. - except BaseException: - errorMsg = "get_el_name FAILED " - raise Exception(errorMsg, "get_el_name") - - @staticmethod - def get_email_by_full_name(fullname): - # try: - query_str = "select email from ice_user_profile where " +\ - "full_name = '%s';" % (fullname) - user_email = DBGeneral.select_query(query_str) - return user_email - - @staticmethod - def select_recent_vf_of_user(user_uuid, fetchNum): - try: - dbConn = psycopg2.connect( - DBGeneral.return_db_native_connection('em_db')) - dbConn = dbConn - cur = dbConn.cursor() - queryStr = "SELECT vf_id FROM public.ice_recent_engagement " +\ - "where user_uuid = '%s' order by last_update " % user_uuid +\ - "desc limit 20;" - logger.debug("Query : " + queryStr) - cur.execute(queryStr) - if (fetchNum == 0): - result = str(cur.fetchall()) - elif (fetchNum == 1): - result = str(cur.fetchone()) - if(result.find("',)") != -1): # formatting strings e.g uuid - result = result.partition('\'')[-1].rpartition('\'')[0] - elif(result.find(",)") != -1): # formatting ints e.g id - result = result.partition('(')[-1].rpartition(',')[0] - dbConn.close() - logger.debug("Query result: " + str(result)) - return result - # If failed - count the failure and add the error to list of errors. - except BaseException: - errorMsg = "select_where FAILED " - raise Exception(errorMsg, "select_where") - - @staticmethod - def select_el_email(vfName): - try: - # Fetch one AT&T user ID. - engagement_id = DBVirtualFunction.select_eng_uuid(vfName) - engagement_manual_id = DBGeneral.select_where( - "engagement_manual_id", "ice_engagement", "uuid", - engagement_id, 1) - reviewer_id = DBGeneral.select_where( - "reviewer_id", - "ice_engagement", - "engagement_manual_id", - engagement_manual_id, - 1) - engLeadEmail = DBGeneral.select_where_and( - "email", "ice_user_profile", "id", reviewer_id, "role_id", - "2", 1) - return engLeadEmail - # If failed - count the failure and add the error to list of errors. - except BaseException: - errorMsg = "select_el_email FAILED " - raise Exception(errorMsg, "select_el_email") - - @staticmethod - def select_user_native_id(email): - try: - # Fetch one AT&T user ID. - engLeadId = DBUser.select_user_profile_property(email, "id") - return engLeadId - # If failed - count the failure and add the error to list of errors. - except BaseException: - errorMsg = "select_user_native_id FAILED " - raise Exception(errorMsg, "select_user_native_id") - - @staticmethod - def select_personal_next_step(email): - user_id = DBUser.select_user_native_id(email) - return DBGeneral.select_where( - "uuid", "ice_next_step", "owner_id", user_id, 1) - - @staticmethod - def select_pr_email(vfName): - try: - # Fetch one AT&T user ID. - engagement_id = DBVirtualFunction.select_eng_uuid(vfName) - engagement_manual_id = DBGeneral.select_where( - "engagement_manual_id", "ice_engagement", "uuid", - engagement_id, 1) - reviewer_id = DBGeneral.select_where( - "peer_reviewer_id", - "ice_engagement", - "engagement_manual_id", - engagement_manual_id, - 1) - engLeadEmail = DBGeneral.select_where( - "email", "ice_user_profile", "id", reviewer_id, 1) - return engLeadEmail - # If failed - count the failure and add the error to list of errors. - except BaseException: - errorMsg = "select_el_email FAILED " - raise Exception(errorMsg, "select_el_email") - - @staticmethod - def get_notification_id_by_email(userEmail): - uuid = DBGeneral.select_where_email( - "id", "ice_user_profile", userEmail) - notifIDs = DBGeneral.select_where( - "uuid", "ice_notification", "user_id", uuid, 0) - return notifIDs - - @staticmethod - def get_not_seen_notifications_number_by_email( - user_email, is_negative=False): - user_id = DBGeneral.select_where_email( - "id", Constants.DBConstants.IceTables.USER_PROFILE, user_email) - notifications_number = DBGeneral.select_where_and( - Constants.DBConstants.Queries.COUNT, - Constants.DBConstants.IceTables.NOTIFICATION, - "user_id", - user_id, - "is_read", - "False", - 1) - if is_negative: - counter = 0 - while notifications_number != "0" and counter <= Constants.\ - Dashboard.Avatar.Notifications.Count.RETRIES_NUMBER: - notifications_number = DBGeneral.select_where_and( - Constants.DBConstants.Queries.COUNT, - Constants.DBConstants.IceTables.NOTIFICATION, - "user_id", - user_id, - "is_read", - "False", - 1) - time.sleep(1) - counter += 1 - return notifications_number - - @staticmethod - def get_eng_lead_email_per_enguuid(enguuid): - reviewer_id = DBGeneral.select_where( - "reviewer_id", - Constants.DBConstants.IceTables.ENGAGEMENT, - "uuid", - enguuid, - 1) - engLeadEmail = DBGeneral.select_where( - "email", Constants.DBConstants.IceTables.USER_PROFILE, "id", - reviewer_id, 1) - return engLeadEmail - - @staticmethod - def select_all_user_engagements(engLeadID): - try: - dbConn = psycopg2.connect( - DBGeneral.return_db_native_connection('em_db')) - dbConn = dbConn - cur = dbConn.cursor() - queryStr = "select COUNT(*) from ice_engagement_engagement_team" +\ - " Where iceuserprofile_id = %s" % engLeadID +\ - " and (select " +\ - "engagement_stage from public.ice_engagement " +\ - "where uuid = engagement_id LIMIT 1) != 'Archived';" - - logger.debug("Query : " + queryStr) - cur.execute(queryStr) - result = cur.fetchall() - dbConn.close() - logger.debug("Query result: " + str(result)) - logger.debug(result[0][0]) - return result[0][0] - # If failed - count the failure and add the error to list of errors. - except BaseException: - errorMsg = "select_user_engagements_by_stage FAILED " - raise Exception(errorMsg, "select_user_engagements_by_stage") - - @staticmethod - def select_user_engagements_by_stage(stage, engLeadID): - try: - dbConn = psycopg2.connect( - DBGeneral.return_db_native_connection('em_db')) - dbConn = dbConn - cur = dbConn.cursor() - queryStr = "select count(*) from ice_engagement INNER JOIN " +\ - "ice_engagement_engagement_team ON " +\ - "ice_engagement_engagement_team.engagement_id= " +\ - "ice_engagement.uuid Where " +\ - "(ice_engagement.engagement_stage " +\ - "= '%s') and " % stage +\ - "(ice_engagement_engagement_team.iceuserprofile_id = " +\ - "%s );" % engLeadID - logger.debug("Query : " + queryStr) - cur.execute(queryStr) - result = cur.fetchall() - dbConn.close() - logger.debug("Query result: " + str(result)) - logger.debug(result[0][0]) - return result[0][0] - # If failed - count the failure and add the error to list of errors. - except BaseException: - errorMsg = "select_user_engagements_by_stage FAILED " - raise Exception(errorMsg, "select_user_engagements_by_stage") - - @staticmethod - def set_new_temp_password(email): - encodePass = DBGeneral.select_where_email( - "password", "auth_user", Constants.Users.Admin.EMAIL) - # Fetch one user ID. - index = DBGeneral.select_where_email("id", "auth_user", email) - DBGeneral.update_where( - "ice_custom_user", - "temp_password", - encodePass, - "user_ptr_id", - index) - - @staticmethod - def set_password_to_default(email): - encodePass = DBGeneral.select_where_email( - "password", "auth_user", Constants.Users.Admin.EMAIL) - DBGeneral.update_where( - "auth_user", "password", encodePass, "email", email) - - @staticmethod - def select_el_not_in_engagement(el_name, pr_name): - query_str = "select full_name from ice_user_profile where " +\ - "role_id = 2 and full_name != '%s' and full_name != '%s';" % ( - el_name, pr_name) - new_user = DBGeneral.select_query(query_str) - if new_user == 'None': - new_user = DBUser.update_to_el_not_in_engagement() - return new_user - - @staticmethod - def select_user_uuid(email): - user_uuid = DBUser.select_user_profile_property(email, "uuid") - return user_uuid - - @staticmethod - def select_access_key(email): - access_key = DBUser.select_user_profile_property( - email, "rgwa_access_key") - return access_key - - @staticmethod - def select_secret_key(email): - secret_key = DBUser.select_user_profile_property( - email, "rgwa_secret_key") - return secret_key - - @staticmethod - def update_to_el_not_in_engagement(): - query_str = "select uuid from ice_user_profile where role_id = 1 ;" - user_uuid = DBGeneral.select_query(query_str) - updatequery = "UPDATE ice_user_profile SET role_id=2 ,full_name" +\ - " = 'el_for_test' WHERE uuid = '%s' ;" % ( - user_uuid) - DBGeneral.update_query(updatequery) - updatequery = "UPDATE ice_user_profile SET role_id=2 WHERE " +\ - "full_name = '%s' ;" % ( - 'el_for_test') - DBGeneral.update_query(updatequery) - return 'el_for_test' - - @staticmethod - def rollback_for_el_not_in_engagement(): - query_str = "select uuid from ice_user_profile where full_name = " +\ - "'el_for_test';" - user_uuid = DBGeneral.select_query(query_str) - fullName = DBBridge.helper_rand_string("randomString") - updatequery = "UPDATE ice_user_profile SET role_id=1,full_name " +\ - "= '%s' WHERE uuid = '%s' ;" % (fullName, user_uuid) - DBGeneral.update_query(updatequery) - - @staticmethod - def set_engagement_peer_reviewer(engagement_uuid, email): - user_uuid = DBUser.select_user_uuid(email) - update_query = "UPDATE ice_user_profile SET role_id=2 WHERE " +\ - "uuid = '%s';" % user_uuid - DBGeneral.update_query(update_query) - - user_id = DBGeneral.select_query( - "SELECT id FROM ice_user_profile WHERE uuid = '%s';" % user_uuid) - update_query = "UPDATE ice_engagement SET peer_reviewer_id=%s " +\ - "WHERE uuid = '%s';" % ( - user_id, engagement_uuid) - DBGeneral.update_query(update_query) - - @staticmethod - def select_user_profile_property(user_email, property_name): - return DBGeneral.select_where( - property_name, - "ice_user_profile", - "email", - user_email, - 1) - - @staticmethod - def validate_user_profile_settings_in_db(user_email, checked): - Wait.page_has_loaded() - regular_email_updates = DBUser.select_user_profile_property( - user_email, 'regular_email_updates') - DBBridge.helper_internal_assert(regular_email_updates, checked) - email_updates_on_every_notification = \ - DBUser.select_user_profile_property( - user_email, 'email_updates_on_every_notification') - DBBridge.helper_internal_assert( - email_updates_on_every_notification, checked) - email_updates_daily_digest = DBUser.select_user_profile_property( - user_email, 'email_updates_daily_digest') - DBBridge.helper_internal_assert( - email_updates_daily_digest, not checked) - - @staticmethod - def retrieve_admin_ssh_from_db(): - ssh_key = DBGeneral.select_where( - 'ssh_public_key', Constants.DBConstants.IceTables.USER_PROFILE, - 'email', Constants.Users.Admin.EMAIL, 1) - return ssh_key - - @staticmethod - def get_access_key(user_uuid): - counter = 0 - access_key = DBGeneral.select_where( - "rgwa_access_key", - Constants.DBConstants.IceTables.USER_PROFILE, - "uuid", - user_uuid, - 1) - while access_key == "None" and counter <= \ - Constants.RGWAConstants.RETRIES_NUMBER: - time.sleep(session.wait_until_time_pause) - logger.debug( - "rgwa_access_key are not ready yet, trying again (%s of 20)" % - counter) - access_key = DBGeneral.select_where( - "rgwa_access_key", - Constants.DBConstants.IceTables.USER_PROFILE, - "uuid", - user_uuid, - 1) - counter += 1 - return access_key - - @staticmethod - def get_access_secret(user_uuid): - counter = 0 - access_secret = DBGeneral.select_where( - "rgwa_secret_key", - Constants.DBConstants.IceTables.USER_PROFILE, - "uuid", - user_uuid, - 1) - while access_secret == "None" and counter <= Constants.\ - RGWAConstants.RETRIES_NUMBER: - time.sleep(session.wait_until_time_pause) - logger.debug( - "rgwa_secret_key are not ready yet, trying again (%s of 100)" % - counter) - access_secret = DBGeneral.select_where( - "rgwa_secret_key", - Constants.DBConstants.IceTables.USER_PROFILE, - "uuid", - user_uuid, - 1) - - counter += 1 - return access_secret diff --git a/services/database/db_virtual_function.py b/services/database/db_virtual_function.py deleted file mode 100644 index f61d1b7..0000000 --- a/services/database/db_virtual_function.py +++ /dev/null @@ -1,286 +0,0 @@ - -# ============LICENSE_START========================================== -# org.onap.vvp/test-engine -# =================================================================== -# Copyright © 2017 AT&T Intellectual Property. All rights reserved. -# =================================================================== -# -# Unless otherwise specified, all software contained herein is licensed -# under the Apache License, Version 2.0 (the “License”); -# you may not use this software except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# -# -# Unless otherwise specified, all documentation contained herein is licensed -# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); -# you may not use this documentation except in compliance with the License. -# You may obtain a copy of the License at -# -# https://creativecommons.org/licenses/by/4.0/ -# -# Unless required by applicable law or agreed to in writing, documentation -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# ============LICENSE_END============================================ -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. -import uuid - -from django.conf import settings -import psycopg2 - -from services.constants import Constants -from services.database.db_bridge import DBBridge -from services.database.db_general import DBGeneral -from services.logging_service import LoggingServiceFactory - - -logger = LoggingServiceFactory.get_logger() - - -class DBVirtualFunction: - - @staticmethod - def insert_ecomp_release(uuid, name, ui_visibility="TRUE"): - try: - queryTableName = "ice_ecomp_release" - # Connect to General 'default'. - dbConn = psycopg2.connect( - DBGeneral.return_db_native_connection("em_db")) - dbConn = dbConn - cur = dbConn.cursor() - logger.debug("DATABASE_TYPE: " + settings.DATABASE_TYPE) - except Exception as e: - errorMsg = "Failed to create connection to General: " + str(e) - raise Exception(errorMsg) - try: - logger.debug("DATABASE_TYPE: " + settings.DATABASE_TYPE) - # Create INSERT query. - queryStr = "INSERT INTO %s " % queryTableName +\ - "(""uuid, name, weight, ui_visibility"") VALUES " +\ - "('%s', '%s', " % (uuid, name) +\ - "'%s', '%s');" % (0, ui_visibility) - logger.debug("Query: " + queryStr) - cur.execute(queryStr) # Execute query. - dbConn.commit() - logger.debug("Test results are in General now.") - except Exception as e: - errorMsg = "Failed to insert ECOMP release to General:" + str(e) - raise Exception(errorMsg) - dbConn.close() - - @staticmethod - def delete_ecomp_release(uuid, name): - try: - queryTableName = "ice_ecomp_release" - # Connect to General 'default'. - dbConn = psycopg2.connect( - DBGeneral.return_db_native_connection("em_db")) - dbConn = dbConn - cur = dbConn.cursor() - except Exception as e: - errorMsg = "Failed to create connection to DBGeneral.because :" + \ - str(e) - raise Exception(errorMsg) - try: - # Create INSERT query. - queryStr = "DELETE FROM %s WHERE uuid = '%s';" % ( - queryTableName, uuid) - logger.debug("Query: " + queryStr) - cur.execute(queryStr) # Execute query. - dbConn.commit() - logger.debug("Test results are in General now.") - except Exception as e: - errorMsg = "Failed to delete ECOMP release from General ." +\ - " because :" + \ - str(e) - raise Exception(errorMsg) - raise - dbConn.close() - - @staticmethod - def select_next_steps_ids(engagement_uuid): - ice_next_steps = DBGeneral.select_where( - "uuid", "ice_next_step", "engagement_id", engagement_uuid, 0) - return ice_next_steps - - @staticmethod - def select_next_steps_uuids_by_stage(engagement_uuid, engagement_stage): - query = "SELECT uuid FROM %s WHERE " % ( - Constants.DBConstants.IceTables.NEXT_STEP) + "engagement_id=" +\ - "'%s' AND engagement_stage='%s' ORDER BY position;" % ( - engagement_uuid, engagement_stage) - return DBGeneral.select_query(query, "list", 0) - - @staticmethod - def update_next_step_position(next_step_uuid, new_index): - DBGeneral.update_where( - "ice_next_step", "position", new_index, "uuid", next_step_uuid) - - @staticmethod - def select_next_step_description(next_step_uuid): - return DBGeneral.select_where( - "description", - "ice_next_step", - "uuid", - next_step_uuid, - 1) - - @staticmethod - def select_eng_uuid(vf_name): - return DBGeneral.select_where( - "engagement_id", "ice_vf", "name", vf_name, 1) - - @staticmethod - def select_engagment_uuid_by_vf_name(vfName): - engagement_id = DBGeneral.select_where( - "engagement_id", "ice_vf", "name", vfName, 1) - engagement_manual_id = DBGeneral.select_where( - "engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1) - enguuid = DBGeneral.select_where( - "uuid", - "ice_engagement", - "engagement_manual_id", - engagement_manual_id, - 1) - return enguuid - - @staticmethod - def select_vf_version_by_vf_name(vfName): - queryStr = "SELECT version FROM ice_vf WHERE name= '%s';" % vfName - version_name = str(DBGeneral.select_query(queryStr)) - return version_name - - @staticmethod - def select_vf_name_by_vf_version(version_name): - queryofname = "SELECT name FROM ice_vf WHERE " +\ - "version= '%s';" % version_name - vfNameDb = str(DBGeneral.select_query(queryofname)) - return vfNameDb - - @staticmethod - def return_expected_steps(engagement_uuid, stage, user_email): - steps_uuids = DBVirtualFunction.select_next_steps_uuids_by_stage( - engagement_uuid, stage) - personal_step_uuid = DBBridge.select_personal_next_step(user_email) - DBVirtualFunction.update_next_step_position(personal_step_uuid, 1) - steps_uuids.insert(0, personal_step_uuid) - return steps_uuids - - @staticmethod - def get_engagement(): - """Use this function instead of creating a new """ +\ - """engagement where no need to""" - queryStr = "SELECT DISTINCT ice_engagement.uuid, " +\ - "engagement_manual_id, ice_vf.name, ice_user_profile.full_name, \ - ice_user_profile.email, reviewer_table.full_name, " +\ - "reviewer_table.email, \ - ice_deployment_target.version, ice_ecomp_release.name \ - FROM ice_engagement LEFT JOIN ice_vf ON engagement_id " +\ - "= ice_engagement.uuid \ - LEFT JOIN ice_user_profile reviewer_table ON " +\ - "reviewer_table.id = ice_engagement.reviewer_id \ - LEFT JOIN ice_user_profile ON ice_user_profile.id = " +\ - "ice_engagement.peer_reviewer_id \ - LEFT JOIN ice_deployment_target ON " +\ - "ice_deployment_target.uuid = " +\ - "ice_vf.deployment_target_id \ - LEFT JOIN ice_ecomp_release ON " +\ - "ice_ecomp_release.uuid = ice_vf.ecomp_release_id \ - WHERE ice_user_profile.id IS NOT NULL LIMIT 1;" - list_of_values = DBGeneral.select_query(queryStr, return_type="list") - list_of_keys = [ - "engagement_uuid", - "engagement_manual_id", - "vfName", - "pr_name", - "pr_email", - "el_name", - "el_email", - "target_aic", - "ecomp_release"] - return dict(zip(list_of_keys, list_of_values)) - - @staticmethod - def insert_aic_version(ui_visibility="TRUE"): - new_aic_version = { - "uuid": str( - uuid.uuid4()), - "name": "AIC", - "version": DBBridge.helper_rand_string( - "randomNumber", - 2), - "ui_visibility": ui_visibility, - "weight": 0} - queryStr = "INSERT INTO public.ice_deployment_target( \ - uuid, name, version, ui_visibility, weight) \ - VALUES " +\ - "('%s', '%s', '%s', '%s', %s);" % ( - new_aic_version['uuid'], - new_aic_version['name'], - new_aic_version['version'], - new_aic_version['ui_visibility'], - new_aic_version['weight']) - DBGeneral.insert_query(queryStr) - return new_aic_version - - @staticmethod - def delete_aic_version(aic_uuid): - DBGeneral.insert_query( - "DELETE FROM public.ice_deployment_target WHERE uuid='%s';" % - aic_uuid) - - @staticmethod - def change_aic_version_weight(new_weight, old_weight): - DBGeneral.insert_query( - "UPDATE public.ice_deployment_target " + - "SET weight=%s " % new_weight + - "WHERE weight=%s" % old_weight) - - @staticmethod - def change_ecomp_release_weight(new_weight, old_weight): - DBGeneral.insert_query( - "UPDATE public.ice_ecomp_release SET weight=%s WHERE weight=%s" % - (new_weight, old_weight)) - - @staticmethod - def select_aic_version_uuid(aic_version): - return DBGeneral.select_where( - "uuid", "ice_deployment_target", "version", aic_version, 1) - - @staticmethod - def select_ecomp_release_uuid(ecomp_release): - return DBGeneral.select_where( - "uuid", "ice_ecomp_release", "name", ecomp_release, 1) - - @staticmethod - def add_admin_to_eng_team(eng_uuid): - admin_db_id = DBGeneral.select_where( - 'id', - Constants.DBConstants.IceTables.USER_PROFILE, - 'email', - Constants.Users.Admin.EMAIL, - 1) - queryStr = "INSERT INTO public.ice_engagement_engagement_team" +\ - "(engagement_id, iceuserprofile_id) VALUES ('%s', '%s');" % ( - eng_uuid, admin_db_id) - logger.debug("add_admin_to_eng_team Query: %s" % queryStr) - DBGeneral.insert_query(queryStr) - - @staticmethod - def remove_engagement_from_recent(vf_uuid): - DBGeneral.insert_query( - "DELETE FROM %s WHERE vf_id='%s'" % (Constants.DBConstants. - IceTables.RECENT, vf_uuid)) |